improve websocket connection according to gorrila example

This commit is contained in:
Adrian Zürcher
2025-06-22 09:03:18 +02:00
parent b29e7a97b8
commit 64ad4e8b3e
5 changed files with 163 additions and 184 deletions

View File

@@ -6,7 +6,6 @@ import (
ws "artNet/websocket" ws "artNet/websocket"
"fmt" "fmt"
"path" "path"
"time"
json_data "github.com/tecamino/tecamino-json_data" json_data "github.com/tecamino/tecamino-json_data"
"github.com/tecamino/tecamino-logger/logging" "github.com/tecamino/tecamino-logger/logging"
@@ -90,67 +89,33 @@ func (d *ArtNetDriver) SetValue(bus string, address uint, value uint8) error {
// id: id of driver // id: id of driver
// port: port of server // port: port of server
func (d *ArtNetDriver) Connect(ip, id string, port uint) error { func (d *ArtNetDriver) Connect(ip, id string, port uint) error {
var err error errChan := make(chan error)
client, err := ws.NewClient(ip, id, port) client, err := ws.NewClient(ip, id, port)
if err != nil { if err != nil {
return err return err
} }
client.OnError = func(err error) { client.OnError = func(err error) {
d.Log.Error("websocket connection", err) d.Log.Error("websocket connection", err)
} errChan <- err
client.OnMessage = func(data []byte) {
//fmt.Println(100, string(data))
fmt.Println(100, string(data))
} }
client.Connect(5) client.OnMessage = func(data []byte) {
fmt.Println(100, string(data))
}
req := json_data.NewRequest() req := json_data.NewRequest()
req.AddDriverSubscription(".*", id, 0, true, false, false) req.AddDriverSubscription(".*", id, 0, true, false, false)
if err := client.SendData(req); err != nil { if err := client.SendRequest(req); err != nil {
errChan <- err
d.Log.Error("websocket send data", err) d.Log.Error("websocket send data", err)
} }
for { for {
time.Sleep(1) select {
case err := <-errChan:
return err
}
} }
return nil
// d.Conn = websocket.NewClient()
// if err := d.Conn.Connect(ip, id, port); err != nil {
// return err
// }
// defer d.Conn.Disconnect()
// if err := d.Conn.Subscribe(id); err != nil {
// return err
// }
// Subscribe to websocket server
// func (c *Client) Subscribe(id string) error {
// req := json_data.NewRequest()
// req.AddDriverSubscription(".*", id, 0, true, false, false)
// if err := wsjson.Write(c.ctx, c.conn, req); err != nil {
// return err
// }
// return nil
// }
// for {
// respond, err := d.Conn.ReadJsonData()
// if err != nil {
// return err
// }
// d.Subscribe(respond.Subscribe...)
// for _, pub := range respond.Publish {
// if sub, ok := d.Subscriptions[pub.Uuid]; ok {
// if err := d.SetValue(sub.Bus, sub.Address, uint8(pub.Value.(float64))); err != nil {
// d.Log.Info("artNet.Connect", err.Error())
// }
// }
// }
// }
} }

View File

@@ -105,7 +105,7 @@ func (d *ArtNetDriver) Start(c *gin.Context) {
} }
busPayload := models.Bus{} busPayload := models.Bus{}
if busPayload.ParsePayload(c); err != nil { if err := busPayload.ParsePayload(c); err != nil {
return return
} }

View File

@@ -75,7 +75,7 @@ func main() {
if err := artNetDriver.Connect(*serverIp, DriverName, *serverPort); err != nil { if err := artNetDriver.Connect(*serverIp, DriverName, *serverPort); err != nil {
artNetDriver.Log.Error("main", err) artNetDriver.Log.Error("main", err)
} }
fmt.Println(555) artNetDriver.Log.Info("main", "next reconnecting attempt in 10 seconds")
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
} }

View File

@@ -1,44 +0,0 @@
package models
// type Clients map[string]Client
// type Client struct {
// Connected *bool `json:"connected"`
// SndConn *websocket.Conn `json:"-"` //sending connection
// RvcConn *websocket.Conn `json:"-"` // revieving connection
// }
// func NewClients() Clients {
// return make(Clients)
// }
// // Connect a recieving websocket connection
// func (c *Clients) ConnectRecievingWsConnection(id string, ctx *gin.Context) (*websocket.Conn, error) {
// conn, err := websocket.Accept(ctx.Writer, ctx.Request, &websocket.AcceptOptions{
// OriginPatterns: []string{"*"},
// })
// if err != nil {
// return nil, fmt.Errorf("error accept websocket client: %s", err)
// }
// b := true
// (*c)[id] = Client{
// Connected: &b,
// RvcConn: conn,
// }
// return conn, nil
// }
// func (c *Clients) RemoveClient(id string) {
// delete(*c, id)
// }
// func (c *Clients) GetClientPointer(id string) *bool {
// return (*c)[id].Connected
// }
// func (c *Clients) DisconnectRecievingWsConnection(id string, code websocket.StatusCode, reason string) {
// *(*c)[id].Connected = false
// (*c)[id].RvcConn.Close(code, reason)
// }

View File

@@ -4,30 +4,51 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"sync"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
json_dataModels "github.com/tecamino/tecamino-json_data/models" json_dataModels "github.com/tecamino/tecamino-json_data/models"
) )
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 10 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
type Client struct { type Client struct {
ip string
port uint
Connected bool
conn *websocket.Conn conn *websocket.Conn
writeMu sync.Mutex
OnMessage func(data []byte)
OnOpen func() OnOpen func()
OnMessage func(data []byte)
OnClose func(code int, reason string) OnClose func(code int, reason string)
OnError func(err error) OnError func(err error)
OnPing func() OnPing func()
OnPong func() OnPong func()
timeout time.Duration sendPong chan string
send chan []byte
unregister chan []byte
} }
// Connect to websocket server // Connect to websocket server
// ip: ip address of server // ip: ip address of server
func NewClient(ip, id string, port uint) (*Client, error) { func NewClient(ip, id string, port uint) (*Client, error) {
url := fmt.Sprintf("ws://%s:%d/ws?id=%s", ip, port, id) url := fmt.Sprintf("ws://%s:%d/ws?id=%s", ip, port, id)
c := &Client{} c := &Client{
ip: ip,
port: port,
Connected: true,
sendPong: make(chan string),
send: make(chan []byte),
unregister: make(chan []byte),
}
dialer := websocket.DefaultDialer dialer := websocket.DefaultDialer
conn, resp, err := dialer.Dial(url, nil) conn, resp, err := dialer.Dial(url, nil)
@@ -40,20 +61,25 @@ func NewClient(ip, id string, port uint) (*Client, error) {
c.conn = conn c.conn = conn
// Setup control handlers // Setup control handlers
c.conn.SetPingHandler(func(appData string) error { conn.SetPingHandler(func(appData string) error {
if c.OnPing != nil { if c.OnPing != nil {
c.OnPing() c.OnPing()
} }
return c.conn.WriteMessage(websocket.PongMessage, nil) conn.SetWriteDeadline(time.Now().Add(writeWait))
conn.SetReadDeadline(time.Now().Add(writeWait))
c.sendPong <- appData
return nil
}) })
c.conn.SetPongHandler(func(appData string) error {
c.conn.SetReadDeadline(time.Now().Add(c.timeout)) conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
if c.OnPong != nil { if c.OnPong != nil {
c.OnPong() c.OnPong()
} }
return nil return nil
}) })
c.conn.SetCloseHandler(func(code int, text string) error {
conn.SetCloseHandler(func(code int, text string) error {
if c.OnClose != nil { if c.OnClose != nil {
c.OnClose(code, text) c.OnClose(code, text)
} }
@@ -63,98 +89,103 @@ func NewClient(ip, id string, port uint) (*Client, error) {
if c.OnOpen != nil { if c.OnOpen != nil {
c.OnOpen() c.OnOpen()
} }
// Start reading messages from client
go c.Read()
go c.Write()
return c, nil return c, nil
} }
func (c *Client) Connect(timeout uint) { func (c *Client) Read() {
if timeout > 0 { if c.OnOpen != nil {
fmt.Println(1234, timeout) c.OnOpen()
c.timeout = time.Duration(timeout) * time.Second
} }
go c.pingLoop() c.conn.SetReadDeadline(time.Now().Add(writeWait))
for c.Connected {
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
go func() {
for {
msgType, msg, err := c.conn.ReadMessage() msgType, msg, err := c.conn.ReadMessage()
if err != nil { if err != nil {
if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { c.handleError(fmt.Errorf("read error (id:%s:%d): %w", c.ip, c.port, err))
log.Println("WebSocket closed:", err)
}
if c.OnError != nil {
c.OnError(fmt.Errorf("read error: %w", err))
}
return return
} }
switch msgType { switch msgType {
case websocket.TextMessage, websocket.BinaryMessage: case websocket.CloseMessage:
c.Close(websocket.CloseNormalClosure, "Client closed")
return
case websocket.TextMessage:
if c.OnMessage != nil { if c.OnMessage != nil {
c.OnMessage(msg) c.OnMessage(msg)
} else {
log.Printf("Received message but no handler set (id:%s:%d): %s", c.ip, c.port, string(msg))
} }
default: default:
log.Printf("Unhandled message type: %d", msgType) log.Printf("Unhandled message type %d (id:%s:%d)", msgType, c.ip, c.port)
} }
} }
}()
} }
func (c *Client) pingLoop() { func (c *Client) Write() {
interval := c.timeout / 2 ticker := time.NewTicker(pingPeriod)
if interval <= 0 { defer func() {
interval = 5 * time.Second ticker.Stop()
} c.conn.Close()
ticker := time.NewTicker(interval) }()
defer ticker.Stop() for {
for range ticker.C { select {
if err := c.Write(websocket.PingMessage, nil); err != nil { case message, ok := <-c.send:
if c.OnError != nil { c.conn.SetWriteDeadline(time.Now().Add(writeWait))
c.OnError(fmt.Errorf("ping error: %w", err)) if !ok {
} // The hub closed the channel.
if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
c.handleError(err)
return return
} }
c.handleError(fmt.Errorf("server %s:%d closed channel", c.ip, c.port))
return
} else {
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
c.handleError(err)
return
}
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.handleError(err)
return
}
if c.OnPing != nil { if c.OnPing != nil {
c.OnPing() c.OnPing()
} }
case message, ok := <-c.sendPong:
if ok {
c.conn.WriteMessage(websocket.PongMessage, []byte(message))
}
case message := <-c.unregister:
c.conn.WriteMessage(websocket.CloseMessage, message)
c.Connected = false
close(c.send)
close(c.sendPong)
close(c.unregister)
return
}
} }
} }
func (c *Client) Write(msgType int, data []byte) error { func (c *Client) SendRequest(req *json_dataModels.Request) error {
c.writeMu.Lock() if !c.Connected {
defer c.writeMu.Unlock()
c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
if err := c.conn.WriteMessage(msgType, data); err != nil {
if c.OnError != nil {
c.OnError(err)
}
return err
}
return nil return nil
} }
// Close connection to websocket server data, err := json.Marshal(*req)
func (c *Client) Close(code int, reason string) { if err != nil {
if c.conn != nil {
if c.OnClose != nil {
c.OnClose(code, reason)
}
c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, reason))
c.conn.Close()
}
}
func (c *Client) SendData(data any) error {
c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
if err := c.conn.WriteJSON(data); err != nil {
if c.OnError != nil {
c.OnError(err)
}
return err return err
} }
c.send <- data
return nil return nil
} }
@@ -163,3 +194,30 @@ func (c *Client) ReadJsonData(data []byte) (json_dataModels.Response, error) {
err := json.Unmarshal(data, &resp) err := json.Unmarshal(data, &resp)
return resp, err return resp, err
} }
// Close connection to websocket server
func (c *Client) Close(code int, reason string) error {
closeMsg := websocket.FormatCloseMessage(code, reason)
select {
case c.unregister <- closeMsg: // Attempt to send
default: // If the channel is full, this runs
return fmt.Errorf("attempt close client socket failed")
}
if c.OnClose != nil {
c.OnClose(code, reason)
}
return nil
}
func (c *Client) handleError(err error) {
if c.OnError != nil {
c.OnError(err)
}
if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil {
if c.OnError != nil {
c.OnError(err)
} else {
fmt.Println("error: ", err)
}
}
}