package models import ( "encoding/json" "fmt" "log" "net/http" "net/url" "slices" "time" pubSubModels "gitea.tecamino.com/paadi/pubSub/models" "gitea.tecamino.com/paadi/tecamino-logger/logging" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) var Origins []string = []string{"*"} var Broadcast Clients = make(Clients) const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 30 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 ) type Client struct { id string connected bool conn *websocket.Conn `json:"-"` OnOpen func() OnMessage func(data []byte) OnClose func(code int, reason string) OnError func(err error) OnWarning func(warn string) OnPing func() OnPong func() send chan []byte unregister chan []byte } var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { if len(Origins) == 0 { return false } else if Origins[0] == "*" { return true } return slices.Contains(Origins, r.Header.Get("Origin")) }, EnableCompression: false, } func NewClient(ip, id string, port uint, logger *logging.Logger) (*Client, error) { u := url.URL{Scheme: "ws", Host: fmt.Sprintf("%s:%d", ip, port), Path: "status", RawQuery: "id=" + id} c := &Client{ id: id, connected: true, send: make(chan []byte, 512), } dialer := websocket.DefaultDialer conn, resp, err := dialer.Dial(u.String(), nil) if err != nil { if c.OnError != nil { c.OnError(err) } return nil, fmt.Errorf("dial error %v (status %v)", err, resp) } c.conn = conn //Setup control handlers logger.Debug("NewClient", "set PingHandler") conn.SetPingHandler(func(appData string) error { if c.OnPing != nil { c.OnPing() } conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetWriteDeadline(time.Now().Add(pongWait)) if err := conn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(2*pongWait)); err != nil { c.OnError(err) return err } return nil }) logger.Debug("NewClient", "set PongHandler") conn.SetPongHandler(func(appData string) error { conn.SetReadDeadline(time.Now().Add(pongWait)) if c.OnPong != nil { c.OnPong() } return nil }) // Start reading messages from client logger.Debug("NewClient", "start read goroutine") go c.Read() logger.Debug("NewClient", "start write goroutine") go c.Write() return c, nil } func ConnectNewClient(id string, c *gin.Context) (*Client, error) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { return nil, fmt.Errorf("websocket upgrade error: %w", err) } client := &Client{ id: id, connected: true, conn: conn, send: make(chan []byte, 512), unregister: make(chan []byte, 256), } Broadcast[client.id] = client conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetPingHandler(func(appData string) error { if client.OnPing != nil { client.OnPing() } conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := client.conn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(pongWait)); err != nil { client.OnError(err) } return nil }) conn.SetPongHandler(func(appData string) error { conn.SetReadDeadline(time.Now().Add(pongWait)) if client.OnPong != nil { client.OnPong() } return nil }) // Start reading messages from client go client.Read() go client.Write() go client.PingInterval(pingPeriod) return client, nil } func (c *Client) Read() { if c.OnOpen != nil { c.OnOpen() } c.conn.SetReadDeadline(time.Now().Add(writeWait)) for c.connected { msgType, msg, err := c.conn.ReadMessage() if err != nil { c.handleError(fmt.Errorf("read error (id:%s): %w", c.id, err)) return } switch msgType { case websocket.CloseMessage: c.Close(websocket.CloseNormalClosure, "Client closed") return case websocket.TextMessage: if c.OnMessage != nil { c.OnMessage(msg) } else { log.Printf("Received message but no handler set (id:%s): %s", c.id, string(msg)) } default: log.Printf("Unhandled message type %d (id:%s)", msgType, c.id) } } } func (c *Client) Write() { defer c.conn.Close() for { select { case message, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // The hub closed the channel. if err := c.conn.WriteMessage(websocket.CloseMessage, []byte("ping")); err != nil { c.handleError(err) return } c.handleError(fmt.Errorf("server %s closed channel", c.id)) return } else { if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil { c.handleError(err) return } } case message := <-c.unregister: c.conn.WriteMessage(websocket.CloseMessage, message) c.connected = false close(c.send) delete(Broadcast, c.id) close(c.unregister) return } } } func (c *Client) PingInterval(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() if err := c.conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(pongWait)); err != nil { c.OnError(err) return } for range ticker.C { if c.OnPing != nil { c.OnPing() } if err := c.conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(pongWait)); err != nil { c.OnError(err) return } } } func (c *Client) SendData(data []byte) { if !c.connected { return } select { case c.send <- data: // sent successfully default: // channel full, drop or log if c.OnWarning != nil { c.OnWarning("Dropping message: channel full") } } } func (c *Client) Close(code int, reason string) error { closeMsg := websocket.FormatCloseMessage(code, reason) select { case c.unregister <- closeMsg: // Attempt to send default: // If the channel is full, this runs return fmt.Errorf("attempt close client socket failed") } if c.OnClose != nil { c.OnClose(code, reason) } return nil } func (c *Client) handleError(err error) { if c.OnError != nil { c.OnError(err) } if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil { if c.OnError != nil { c.OnError(err) } else { fmt.Println("error: ", err) } } } func (c *Client) SendInfo(data any) error { var payload pubSubModels.Data payload.Action = "publish" payload.Topic = fmt.Sprintf("%s/info", c.id) payload.Data = data b, err := json.Marshal(payload) if err != nil { return err } c.SendData(b) return nil } func (c *Client) SendStatus(data any) error { var payload pubSubModels.Data payload.Action = "publish" payload.Topic = fmt.Sprintf("%s/status", c.id) payload.Data = data b, err := json.Marshal(payload) if err != nil { return err } c.SendData(b) return nil } func (c *Client) SendDebug(data any) error { var payload pubSubModels.Data payload.Action = "publish" payload.Topic = fmt.Sprintf("%s/debug", c.id) payload.Data = data b, err := json.Marshal(payload) if err != nil { return err } c.SendData(b) return nil } func (c *Client) SendWarning(data any) error { var payload pubSubModels.Data payload.Action = "publish" payload.Topic = fmt.Sprintf("%s/warning", c.id) payload.Data = data b, err := json.Marshal(payload) if err != nil { return err } c.SendData(b) return nil } func (c *Client) SendError(data any) error { var payload pubSubModels.Data payload.Action = "publish" payload.Topic = fmt.Sprintf("%s/error", c.id) payload.Data = data b, err := json.Marshal(payload) if err != nil { return err } c.SendData(b) return nil }