From 64ad4e8b3eaf3725a4b8f30cae14ca1fc05444e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20Z=C3=BCrcher?= Date: Sun, 22 Jun 2025 09:03:18 +0200 Subject: [PATCH] improve websocket connection according to gorrila example --- driver/artNet.go | 59 ++-------- driver/bus.go | 2 +- main.go | 2 +- server/models/clients.go | 44 ------- websocket/client.go | 240 ++++++++++++++++++++++++--------------- 5 files changed, 163 insertions(+), 184 deletions(-) delete mode 100644 server/models/clients.go diff --git a/driver/artNet.go b/driver/artNet.go index 4ef3407..34ab8ef 100644 --- a/driver/artNet.go +++ b/driver/artNet.go @@ -6,7 +6,6 @@ import ( ws "artNet/websocket" "fmt" "path" - "time" json_data "github.com/tecamino/tecamino-json_data" "github.com/tecamino/tecamino-logger/logging" @@ -90,67 +89,33 @@ func (d *ArtNetDriver) SetValue(bus string, address uint, value uint8) error { // id: id of driver // port: port of server func (d *ArtNetDriver) Connect(ip, id string, port uint) error { - var err error + errChan := make(chan error) client, err := ws.NewClient(ip, id, port) if err != nil { return err } + client.OnError = func(err error) { d.Log.Error("websocket connection", err) - } - client.OnMessage = func(data []byte) { - //fmt.Println(100, string(data)) - fmt.Println(100, string(data)) + errChan <- err } - client.Connect(5) + client.OnMessage = func(data []byte) { + fmt.Println(100, string(data)) + } req := json_data.NewRequest() req.AddDriverSubscription(".*", id, 0, true, false, false) - if err := client.SendData(req); err != nil { + if err := client.SendRequest(req); err != nil { + errChan <- err d.Log.Error("websocket send data", err) } for { - time.Sleep(1) + select { + case err := <-errChan: + return err + } } - return nil - - // d.Conn = websocket.NewClient() - // if err := d.Conn.Connect(ip, id, port); err != nil { - // return err - // } - // defer d.Conn.Disconnect() - - // if err := d.Conn.Subscribe(id); err != nil { - // return err - // } - - // Subscribe to websocket server - // func (c *Client) Subscribe(id string) error { - // req := json_data.NewRequest() - // req.AddDriverSubscription(".*", id, 0, true, false, false) - // if err := wsjson.Write(c.ctx, c.conn, req); err != nil { - // return err - // } - // return nil - // } - - // for { - // respond, err := d.Conn.ReadJsonData() - // if err != nil { - // return err - // } - - // d.Subscribe(respond.Subscribe...) - - // for _, pub := range respond.Publish { - // if sub, ok := d.Subscriptions[pub.Uuid]; ok { - // if err := d.SetValue(sub.Bus, sub.Address, uint8(pub.Value.(float64))); err != nil { - // d.Log.Info("artNet.Connect", err.Error()) - // } - // } - // } - // } } diff --git a/driver/bus.go b/driver/bus.go index f132c01..36ef7a1 100644 --- a/driver/bus.go +++ b/driver/bus.go @@ -105,7 +105,7 @@ func (d *ArtNetDriver) Start(c *gin.Context) { } busPayload := models.Bus{} - if busPayload.ParsePayload(c); err != nil { + if err := busPayload.ParsePayload(c); err != nil { return } diff --git a/main.go b/main.go index 2464e28..5209f88 100644 --- a/main.go +++ b/main.go @@ -75,7 +75,7 @@ func main() { if err := artNetDriver.Connect(*serverIp, DriverName, *serverPort); err != nil { artNetDriver.Log.Error("main", err) } - fmt.Println(555) + artNetDriver.Log.Info("main", "next reconnecting attempt in 10 seconds") time.Sleep(10 * time.Second) } diff --git a/server/models/clients.go b/server/models/clients.go deleted file mode 100644 index 7fb1588..0000000 --- a/server/models/clients.go +++ /dev/null @@ -1,44 +0,0 @@ -package models - -// type Clients map[string]Client - -// type Client struct { -// Connected *bool `json:"connected"` -// SndConn *websocket.Conn `json:"-"` //sending connection -// RvcConn *websocket.Conn `json:"-"` // revieving connection -// } - -// func NewClients() Clients { -// return make(Clients) -// } - -// // Connect a recieving websocket connection -// func (c *Clients) ConnectRecievingWsConnection(id string, ctx *gin.Context) (*websocket.Conn, error) { -// conn, err := websocket.Accept(ctx.Writer, ctx.Request, &websocket.AcceptOptions{ -// OriginPatterns: []string{"*"}, -// }) - -// if err != nil { -// return nil, fmt.Errorf("error accept websocket client: %s", err) -// } - -// b := true -// (*c)[id] = Client{ -// Connected: &b, -// RvcConn: conn, -// } -// return conn, nil -// } - -// func (c *Clients) RemoveClient(id string) { -// delete(*c, id) -// } - -// func (c *Clients) GetClientPointer(id string) *bool { -// return (*c)[id].Connected -// } - -// func (c *Clients) DisconnectRecievingWsConnection(id string, code websocket.StatusCode, reason string) { -// *(*c)[id].Connected = false -// (*c)[id].RvcConn.Close(code, reason) -// } diff --git a/websocket/client.go b/websocket/client.go index 30eb0c5..bd86f2c 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -4,30 +4,51 @@ import ( "encoding/json" "fmt" "log" - "sync" "time" "github.com/gorilla/websocket" json_dataModels "github.com/tecamino/tecamino-json_data/models" ) +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 10 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 +) + type Client struct { - conn *websocket.Conn - writeMu sync.Mutex - OnMessage func(data []byte) - OnOpen func() - OnClose func(code int, reason string) - OnError func(err error) - OnPing func() - OnPong func() - timeout time.Duration + ip string + port uint + Connected bool + conn *websocket.Conn + OnOpen func() + OnMessage func(data []byte) + OnClose func(code int, reason string) + OnError func(err error) + OnPing func() + OnPong func() + sendPong chan string + send chan []byte + unregister chan []byte } // Connect to websocket server // ip: ip address of server func NewClient(ip, id string, port uint) (*Client, error) { url := fmt.Sprintf("ws://%s:%d/ws?id=%s", ip, port, id) - c := &Client{} + c := &Client{ + ip: ip, + port: port, + Connected: true, + sendPong: make(chan string), + send: make(chan []byte), + unregister: make(chan []byte), + } dialer := websocket.DefaultDialer conn, resp, err := dialer.Dial(url, nil) @@ -40,20 +61,25 @@ func NewClient(ip, id string, port uint) (*Client, error) { c.conn = conn // Setup control handlers - c.conn.SetPingHandler(func(appData string) error { + conn.SetPingHandler(func(appData string) error { if c.OnPing != nil { c.OnPing() } - return c.conn.WriteMessage(websocket.PongMessage, nil) + conn.SetWriteDeadline(time.Now().Add(writeWait)) + conn.SetReadDeadline(time.Now().Add(writeWait)) + c.sendPong <- appData + return nil }) - c.conn.SetPongHandler(func(appData string) error { - c.conn.SetReadDeadline(time.Now().Add(c.timeout)) + + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(pongWait)) if c.OnPong != nil { c.OnPong() } return nil }) - c.conn.SetCloseHandler(func(code int, text string) error { + + conn.SetCloseHandler(func(code int, text string) error { if c.OnClose != nil { c.OnClose(code, text) } @@ -63,98 +89,103 @@ func NewClient(ip, id string, port uint) (*Client, error) { if c.OnOpen != nil { c.OnOpen() } + + // Start reading messages from client + go c.Read() + go c.Write() return c, nil } -func (c *Client) Connect(timeout uint) { - if timeout > 0 { - fmt.Println(1234, timeout) - c.timeout = time.Duration(timeout) * time.Second +func (c *Client) Read() { + if c.OnOpen != nil { + c.OnOpen() } - go c.pingLoop() - - c.conn.SetReadDeadline(time.Now().Add(c.timeout)) - go func() { - - for { - msgType, msg, err := c.conn.ReadMessage() - if err != nil { - if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Println("WebSocket closed:", err) - } - if c.OnError != nil { - c.OnError(fmt.Errorf("read error: %w", err)) - } - return - } - switch msgType { - case websocket.TextMessage, websocket.BinaryMessage: - if c.OnMessage != nil { - c.OnMessage(msg) - } - default: - log.Printf("Unhandled message type: %d", msgType) - } - } - }() -} - -func (c *Client) pingLoop() { - interval := c.timeout / 2 - if interval <= 0 { - interval = 5 * time.Second - } - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for range ticker.C { - if err := c.Write(websocket.PingMessage, nil); err != nil { - if c.OnError != nil { - c.OnError(fmt.Errorf("ping error: %w", err)) - } + c.conn.SetReadDeadline(time.Now().Add(writeWait)) + for c.Connected { + msgType, msg, err := c.conn.ReadMessage() + if err != nil { + c.handleError(fmt.Errorf("read error (id:%s:%d): %w", c.ip, c.port, err)) return } - if c.OnPing != nil { - c.OnPing() + switch msgType { + case websocket.CloseMessage: + c.Close(websocket.CloseNormalClosure, "Client closed") + return + case websocket.TextMessage: + if c.OnMessage != nil { + c.OnMessage(msg) + } else { + log.Printf("Received message but no handler set (id:%s:%d): %s", c.ip, c.port, string(msg)) + } + default: + log.Printf("Unhandled message type %d (id:%s:%d)", msgType, c.ip, c.port) } } } -func (c *Client) Write(msgType int, data []byte) error { - c.writeMu.Lock() - defer c.writeMu.Unlock() - - c.conn.SetWriteDeadline(time.Now().Add(c.timeout)) - - if err := c.conn.WriteMessage(msgType, data); err != nil { - if c.OnError != nil { - c.OnError(err) - } - return err - } - return nil -} - -// Close connection to websocket server -func (c *Client) Close(code int, reason string) { - if c.conn != nil { - if c.OnClose != nil { - c.OnClose(code, reason) - } - c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, reason)) +func (c *Client) Write() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() c.conn.Close() + }() + for { + + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil { + c.handleError(err) + return + } + c.handleError(fmt.Errorf("server %s:%d closed channel", c.ip, c.port)) + return + } else { + if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil { + c.handleError(err) + return + } + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + c.handleError(err) + return + } + + if c.OnPing != nil { + c.OnPing() + } + + case message, ok := <-c.sendPong: + if ok { + c.conn.WriteMessage(websocket.PongMessage, []byte(message)) + } + case message := <-c.unregister: + c.conn.WriteMessage(websocket.CloseMessage, message) + c.Connected = false + close(c.send) + close(c.sendPong) + close(c.unregister) + return + } + } } -func (c *Client) SendData(data any) error { - c.conn.SetWriteDeadline(time.Now().Add(c.timeout)) - if err := c.conn.WriteJSON(data); err != nil { - if c.OnError != nil { - c.OnError(err) - } +func (c *Client) SendRequest(req *json_dataModels.Request) error { + if !c.Connected { + return nil + } + + data, err := json.Marshal(*req) + if err != nil { return err } + c.send <- data return nil } @@ -163,3 +194,30 @@ func (c *Client) ReadJsonData(data []byte) (json_dataModels.Response, error) { err := json.Unmarshal(data, &resp) return resp, err } + +// Close connection to websocket server +func (c *Client) Close(code int, reason string) error { + closeMsg := websocket.FormatCloseMessage(code, reason) + select { + case c.unregister <- closeMsg: // Attempt to send + default: // If the channel is full, this runs + return fmt.Errorf("attempt close client socket failed") + } + if c.OnClose != nil { + c.OnClose(code, reason) + } + return nil +} + +func (c *Client) handleError(err error) { + if c.OnError != nil { + c.OnError(err) + } + if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil { + if c.OnError != nil { + c.OnError(err) + } else { + fmt.Println("error: ", err) + } + } +}