2 Commits

Author SHA1 Message Date
Adrian Zürcher
59c7705ca1 fix concurrent write new sendPong channel 2025-06-22 08:58:14 +02:00
Adrian Zürcher
91ea59ed6e improvement websocket according to gorilla example 2025-06-22 07:46:24 +02:00
6 changed files with 142 additions and 68 deletions

View File

@@ -20,6 +20,7 @@ func (d *DBMHandler) Set(req *json_dataModels.Request, id string) {
for _, set := range req.Set { for _, set := range req.Set {
dps := d.DBM.QueryDatapoints(1, set.Uuid, set.Path) dps := d.DBM.QueryDatapoints(1, set.Uuid, set.Path)
if len(dps) == 0 { if len(dps) == 0 {
resp.SetError() resp.SetError()
if resp.Message == "" { if resp.Message == "" {
@@ -27,8 +28,10 @@ func (d *DBMHandler) Set(req *json_dataModels.Request, id string) {
} }
continue continue
} }
for _, dp := range dps { for _, dp := range dps {
dp.UpdateValue(d.Conns, set.Value) dp.UpdateValue(d.Conns, set.Value)
resp.AddSet(json_dataModels.Set{ resp.AddSet(json_dataModels.Set{
Uuid: dp.Uuid, Uuid: dp.Uuid,
Path: dp.Path, Path: dp.Path,
@@ -36,6 +39,7 @@ func (d *DBMHandler) Set(req *json_dataModels.Request, id string) {
}) })
} }
} }
if err := d.Conns.SendResponse(id, resp); err != nil { if err := d.Conns.SendResponse(id, resp); err != nil {
d.Log.Error("get.Set", err.Error()) d.Log.Error("get.Set", err.Error())
} }

View File

@@ -54,7 +54,6 @@ func (d *DBMHandler) Subscribe(req *json_dataModels.Request, id string) {
}) })
} }
} }
if err := d.Conns.SendResponse(id, resp); err != nil { if err := d.Conns.SendResponse(id, resp); err != nil {
d.Log.Error("subscribe.Subscribe", err.Error()) d.Log.Error("subscribe.Subscribe", err.Error())
} }

View File

@@ -33,12 +33,15 @@ func (d *DBMHandler) WebSocket(c *gin.Context) {
if err != nil { if err != nil {
d.Log.Error("dbmHandler.webSocket.Websocket", "read json: "+err.Error()) d.Log.Error("dbmHandler.webSocket.Websocket", "read json: "+err.Error())
} }
// Sets // Sets
d.Get(request, id) d.Get(request, id)
// Sets // Sets
d.Set(request, id) d.Set(request, id)
// Subscribe // Subscribe
d.Subscribe(request, id) d.Subscribe(request, id)
// Unsubscribe // Unsubscribe
d.Unsubscribe(request, id) d.Unsubscribe(request, id)
} }

View File

@@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@@ -34,7 +33,6 @@ type Datapoint struct {
ReadWrite json_dataModels.Rights `json:"readWrite"` ReadWrite json_dataModels.Rights `json:"readWrite"`
Drivers json_dataModels.Drivers `json:"drivers,omitempty"` Drivers json_dataModels.Drivers `json:"drivers,omitempty"`
Subscriptions Subscriptions `json:"-"` Subscriptions Subscriptions `json:"-"`
sync.RWMutex `json:"-"`
} }
func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) { func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) {
@@ -381,9 +379,6 @@ func (d *Datapoint) RemoveSubscribtion(client *wsModels.Client) {
} }
func (d *Datapoint) Publish(eventType string) error { func (d *Datapoint) Publish(eventType string) error {
d.RLock()
defer d.RUnlock()
for client := range d.Subscriptions { for client := range d.Subscriptions {
r := json_data.NewResponse() r := json_data.NewResponse()
r.AddPublish(json_dataModels.Publish{ r.AddPublish(json_dataModels.Publish{
@@ -392,14 +387,13 @@ func (d *Datapoint) Publish(eventType string) error {
Path: d.Path, Path: d.Path,
Value: d.Value, Value: d.Value,
}) })
b, err := json.Marshal(r) b, err := json.Marshal(r)
if err != nil { if err != nil {
return err return err
} }
if err := client.SendResponse(b, 5); err != nil {
return err
}
client.SendResponse(b)
} }
return nil return nil
} }

View File

@@ -60,14 +60,12 @@ func (c *ClientHandler) SendResponse(id string, r *json_dataModels.Response) err
return fmt.Errorf("client not found for id %s", id) return fmt.Errorf("client not found for id %s", id)
} }
b, err := json.Marshal(r)
b, err := json.Marshal(*r)
if err != nil { if err != nil {
return err return err
} }
client.SendResponse(b)
if err := client.SendResponse(b, 5); err != nil {
return err
}
return nil return nil
} }

View File

@@ -13,17 +13,30 @@ import (
var Origins []string = []string{"*"} var Origins []string = []string{"*"}
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 10 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
type Client struct { type Client struct {
Id string Id string
Connected bool `json:"connected"` Connected bool `json:"connected"`
Conn *websocket.Conn `json:"-"` conn *websocket.Conn `json:"-"`
OnOpen func() OnOpen func()
OnMessage func(data []byte) OnMessage func(data []byte)
OnClose func(code int, reason string) OnClose func(code int, reason string)
OnError func(err error) OnError func(err error)
OnPing func() OnPing func()
OnPong func() OnPong func()
timeout time.Duration sendPong chan string
send chan []byte
unregister chan []byte
} }
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
@@ -54,75 +67,132 @@ func ConnectNewClient(id string, c *gin.Context) (*Client, error) {
client := &Client{ client := &Client{
Id: id, Id: id,
Connected: true, Connected: true,
Conn: conn, conn: conn,
timeout: 5, sendPong: make(chan string),
send: make(chan []byte),
unregister: make(chan []byte),
} }
conn.SetPingHandler(func(appData string) error { conn.SetPingHandler(func(appData string) error {
if client.OnPing != nil { if client.OnPing != nil {
client.OnPing() client.OnPing()
} }
conn.SetWriteDeadline(time.Now().Add(client.timeout)) conn.SetWriteDeadline(time.Now().Add(writeWait))
conn.SetReadDeadline(time.Now().Add(client.timeout)) conn.SetReadDeadline(time.Now().Add(writeWait))
return conn.WriteMessage(websocket.PongMessage, []byte(appData)) client.sendPong <- appData
return nil
}) })
conn.SetPongHandler(func(appData string) error { conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
if client.OnPong != nil { if client.OnPong != nil {
client.OnPong() client.OnPong()
} }
conn.SetReadDeadline(time.Now().Add(client.timeout))
return nil return nil
}) })
// Start reading messages from client // Start reading messages from client
go client.Listen(7) go client.Read()
go client.Write()
return client, nil return client, nil
} }
func (c *Client) Listen(timeout uint) { func (c *Client) Read() {
if timeout > 0 {
c.timeout = time.Duration(timeout) * time.Second
}
if c.OnOpen != nil { if c.OnOpen != nil {
c.OnOpen() c.OnOpen()
} }
c.Conn.SetReadDeadline(time.Now().Add(c.timeout)) c.conn.SetReadDeadline(time.Now().Add(writeWait))
for c.Connected { for c.Connected {
msgType, msg, err := c.Conn.ReadMessage() msgType, msg, err := c.conn.ReadMessage()
if err != nil { if err != nil {
c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err)) c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err))
return return
} }
switch msgType { switch msgType {
case websocket.CloseMessage: case websocket.CloseMessage:
c.handleClose(1000, "Client closed") c.Close(websocket.CloseNormalClosure, "Client closed")
return return
case websocket.TextMessage: case websocket.TextMessage:
if isPing := c.handleJsonPing(msg); isPing { if isPing := c.handleJsonPing(msg); !isPing {
continue
}
if c.OnMessage != nil { if c.OnMessage != nil {
c.OnMessage(msg) c.OnMessage(msg)
} else { } else {
log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg)) log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg))
} }
}
default: default:
log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id) log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id)
} }
} }
} }
func (c *Client) Write() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
c.handleError(err)
return
}
c.handleError(fmt.Errorf("server %s closed channel", c.Id))
return
} else {
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
c.handleError(err)
return
}
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.handleError(err)
return
}
if c.OnPing != nil {
c.OnPing()
}
case message, ok := <-c.sendPong:
if ok {
c.conn.WriteMessage(websocket.PongMessage, []byte(message))
}
case message := <-c.unregister:
c.conn.WriteMessage(websocket.CloseMessage, message)
c.Connected = false
close(c.sendPong)
close(c.send)
close(c.unregister)
return
}
}
}
func (c *Client) handleJsonPing(msg []byte) (isPing bool) { func (c *Client) handleJsonPing(msg []byte) (isPing bool) {
var wsMsg WSMessage var wsMsg WSMessage
err := json.Unmarshal(msg, &wsMsg) err := json.Unmarshal(msg, &wsMsg)
if err == nil && wsMsg.IsPing() { if err == nil && wsMsg.IsPing() {
c.Conn.SetReadDeadline(time.Now().Add(c.timeout)) c.conn.SetReadDeadline(time.Now().Add(writeWait))
// Respond with pong JSON // Respond with pong JSON
c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)) select {
err = c.Conn.WriteMessage(websocket.TextMessage, GetPongByteSlice()) case c.send <- GetPongByteSlice():
default:
// optional: log or handle if send buffer is full
c.handleError(fmt.Errorf("failed to queue pong message"))
return
}
if err != nil { if err != nil {
c.handleError(fmt.Errorf("write pong error: %w", err)) c.handleError(fmt.Errorf("write pong error: %w", err))
return return
@@ -132,33 +202,39 @@ func (c *Client) handleJsonPing(msg []byte) (isPing bool) {
} }
isPing = true isPing = true
} }
return return
} }
func (c *Client) SendResponse(data []byte, timeout uint) error { func (c *Client) SendResponse(data []byte) {
c.Conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
return c.Conn.WriteMessage(websocket.TextMessage, data)
}
func (c *Client) Close(code int, reason string) {
c.handleClose(code, reason)
}
func (c *Client) handleClose(code int, text string) {
if !c.Connected { if !c.Connected {
return return
} }
c.Connected = false c.send <- data
if c.OnClose != nil { }
c.OnClose(code, text)
func (c *Client) Close(code int, reason string) error {
closeMsg := websocket.FormatCloseMessage(code, reason)
select {
case c.unregister <- closeMsg: // Attempt to send
default: // If the channel is full, this runs
return fmt.Errorf("attempt close client socket failed")
} }
c.Conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, text)) if c.OnClose != nil {
c.Conn.Close() c.OnClose(code, reason)
}
return nil
} }
func (c *Client) handleError(err error) { func (c *Client) handleError(err error) {
if c.OnError != nil { if c.OnError != nil {
c.OnError(err) c.OnError(err)
} }
c.Close(websocket.CloseInternalServerErr, err.Error()) if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil {
if c.OnError != nil {
c.OnError(err)
} else {
fmt.Println("error: ", err)
}
}
} }