package websocket import ( "encoding/json" "fmt" "log" "time" "github.com/gorilla/websocket" json_dataModels "github.com/tecamino/tecamino-json_data/models" ) const ( // Time allowed to write a message to the peer. writeWait = 40 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 40 * time.Second ) type Client struct { ip string port uint Connected bool conn *websocket.Conn OnOpen func() OnMessage func(data []byte) OnClose func(code int, reason string) OnError func(err error) OnPing func() OnPong func() send chan []byte unregister chan []byte } // Connect to websocket server // ip: ip address of server func NewClient(ip, id string, port uint) (*Client, error) { url := fmt.Sprintf("ws://%s:%d/ws?id=%s", ip, port, id) c := &Client{ ip: ip, port: port, Connected: true, send: make(chan []byte, 256), unregister: make(chan []byte, 256), } dialer := websocket.DefaultDialer conn, resp, err := dialer.Dial(url, nil) if err != nil { if c.OnError != nil { c.OnError(err) } return nil, fmt.Errorf("dial error %v (status %v)", err, resp) } c.conn = conn //Setup control handlers conn.SetPingHandler(func(appData string) error { if c.OnPing != nil { c.OnPing() } conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetWriteDeadline(time.Now().Add(pongWait)) if err := conn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(2*pongWait)); err != nil { c.OnError(err) return err } return nil }) conn.SetPongHandler(func(appData string) error { conn.SetReadDeadline(time.Now().Add(pongWait)) if c.OnPong != nil { c.OnPong() } return nil }) // Start reading messages from client go c.Read() go c.Write() return c, nil } func (c *Client) Read() { if c.OnOpen != nil { c.OnOpen() } c.conn.SetReadDeadline(time.Now().Add(pongWait)) for c.Connected { msgType, msg, err := c.conn.ReadMessage() c.conn.SetReadDeadline(time.Now().Add(pongWait)) if err != nil { c.handleError(fmt.Errorf("read error (ip:%s): %w", c.ip, err)) return } switch msgType { case websocket.CloseMessage: c.Close(websocket.CloseNormalClosure, "Client closed") return case websocket.TextMessage: if c.OnMessage != nil { c.OnMessage(msg) } else { log.Printf("Received message but no handler set (ip:%s): %s", c.ip, string(msg)) } default: log.Printf("Unhandled message type %d (ip:%s)", msgType, c.ip) } } } func (c *Client) Write() { defer c.conn.Close() c.conn.SetWriteDeadline(time.Now().Add(writeWait)) for { select { case message, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // The hub closed the channel. if err := c.conn.WriteMessage(websocket.CloseMessage, []byte("ping")); err != nil { c.handleError(err) return } c.handleError(fmt.Errorf("server %s closed channel", c.ip)) return } else { if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil { c.handleError(err) return } } case message := <-c.unregister: c.conn.WriteMessage(websocket.CloseMessage, message) c.Connected = false close(c.send) close(c.unregister) return } } } func (c *Client) SendRequest(req *json_dataModels.Request) error { if !c.Connected { return nil } data, err := json.Marshal(*req) if err != nil { return err } c.send <- data return nil } func (c *Client) ReadJsonData(data []byte) (json_dataModels.Response, error) { var resp json_dataModels.Response err := json.Unmarshal(data, &resp) return resp, err } // Close connection to websocket server func (c *Client) Close(code int, reason string) error { closeMsg := websocket.FormatCloseMessage(code, reason) select { case c.unregister <- closeMsg: // Attempt to send default: // If the channel is full, this runs return fmt.Errorf("attempt close client socket failed") } if c.OnClose != nil { c.OnClose(code, reason) } return nil } func (c *Client) handleError(err error) { if c.OnError != nil { c.OnError(err) } if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil { if c.OnError != nil { c.OnError(err) } else { log.Println("error:", err) } } }