add broadcast to all client and modify lingpong handler

This commit is contained in:
Adrian Zuercher
2025-07-23 09:09:14 +02:00
parent 5e1e4b9daf
commit 8be5c80a22
2 changed files with 40 additions and 66 deletions

View File

@@ -17,6 +17,12 @@ type ClientHandler struct {
Clients models.Clients Clients models.Clients
} }
func SendBroadcast(msg []byte) {
for _, c := range models.Broadcast {
c.SendResponse(msg)
}
}
// initaiates new conections with client map // initaiates new conections with client map
func NewConnectionHandler() *ClientHandler { func NewConnectionHandler() *ClientHandler {
return &ClientHandler{ return &ClientHandler{
@@ -41,7 +47,6 @@ func (cH *ClientHandler) ConnectNewClient(id string, c *gin.Context) (client *mo
cH.Lock() cH.Lock()
cH.Clients[id] = client cH.Clients[id] = client
cH.Unlock() cH.Unlock()
return client, nil return client, nil
} }

View File

@@ -1,10 +1,10 @@
package models package models
import ( import (
"encoding/json"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"slices"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -13,6 +13,8 @@ import (
var Origins []string = []string{"*"} var Origins []string = []string{"*"}
var Broadcast Clients = make(Clients)
const ( const (
// Time allowed to write a message to the peer. // Time allowed to write a message to the peer.
writeWait = 10 * time.Second writeWait = 10 * time.Second
@@ -34,7 +36,6 @@ type Client struct {
OnError func(err error) OnError func(err error)
OnPing func() OnPing func()
OnPong func() OnPong func()
sendPong chan string
send chan []byte send chan []byte
unregister chan []byte unregister chan []byte
} }
@@ -43,17 +44,10 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { CheckOrigin: func(r *http.Request) bool {
if len(Origins) == 0 { if len(Origins) == 0 {
return false return false
} } else if Origins[0] == "*" {
if Origins[0] == "*" {
return true return true
} }
origin := r.Header.Get("Origin") return slices.Contains(Origins, r.Header.Get("Origin"))
for _, o := range Origins {
if o == origin {
return true
}
}
return false
}, },
EnableCompression: false, EnableCompression: false,
} }
@@ -68,22 +62,25 @@ func ConnectNewClient(id string, c *gin.Context) (*Client, error) {
Id: id, Id: id,
Connected: true, Connected: true,
conn: conn, conn: conn,
sendPong: make(chan string), send: make(chan []byte, 512),
send: make(chan []byte), unregister: make(chan []byte, 256),
unregister: make(chan []byte),
} }
Broadcast[client.Id] = client
conn.SetPingHandler(func(appData string) error { conn.SetPingHandler(func(appData string) error {
if client.OnPing != nil { if client.OnPing != nil {
client.OnPing() client.OnPing()
} }
conn.SetWriteDeadline(time.Now().Add(writeWait)) conn.SetWriteDeadline(time.Now().Add(writeWait))
conn.SetReadDeadline(time.Now().Add(writeWait)) conn.SetReadDeadline(time.Now().Add(writeWait))
client.sendPong <- appData if err := client.conn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(pongWait)); err != nil {
client.OnError(err)
}
return nil return nil
}) })
conn.SetPongHandler(func(string) error { conn.SetPongHandler(func(appData string) error {
conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetReadDeadline(time.Now().Add(pongWait))
if client.OnPong != nil { if client.OnPong != nil {
client.OnPong() client.OnPong()
@@ -94,6 +91,7 @@ func ConnectNewClient(id string, c *gin.Context) (*Client, error) {
// Start reading messages from client // Start reading messages from client
go client.Read() go client.Read()
go client.Write() go client.Write()
go client.PingInterval(pingPeriod)
return client, nil return client, nil
} }
@@ -105,6 +103,7 @@ func (c *Client) Read() {
c.conn.SetReadDeadline(time.Now().Add(writeWait)) c.conn.SetReadDeadline(time.Now().Add(writeWait))
for c.Connected { for c.Connected {
msgType, msg, err := c.conn.ReadMessage() msgType, msg, err := c.conn.ReadMessage()
if err != nil { if err != nil {
c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err)) c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err))
return return
@@ -114,13 +113,11 @@ func (c *Client) Read() {
c.Close(websocket.CloseNormalClosure, "Client closed") c.Close(websocket.CloseNormalClosure, "Client closed")
return return
case websocket.TextMessage: case websocket.TextMessage:
if isPing := c.handleJsonPing(msg); !isPing {
if c.OnMessage != nil { if c.OnMessage != nil {
c.OnMessage(msg) c.OnMessage(msg)
} else { } else {
log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg)) log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg))
} }
}
default: default:
log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id) log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id)
} }
@@ -128,19 +125,15 @@ func (c *Client) Read() {
} }
func (c *Client) Write() { func (c *Client) Write() {
ticker := time.NewTicker(pingPeriod) defer c.conn.Close()
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
for {
select { select {
case message, ok := <-c.send: case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok { if !ok {
// The hub closed the channel. // The hub closed the channel.
if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil { if err := c.conn.WriteMessage(websocket.CloseMessage, []byte("ping")); err != nil {
c.handleError(err) c.handleError(err)
return return
} }
@@ -152,26 +145,11 @@ func (c *Client) Write() {
return return
} }
} }
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.handleError(err)
return
}
if c.OnPing != nil {
c.OnPing()
}
case message, ok := <-c.sendPong:
if ok {
c.conn.WriteMessage(websocket.PongMessage, []byte(message))
}
case message := <-c.unregister: case message := <-c.unregister:
c.conn.WriteMessage(websocket.CloseMessage, message) c.conn.WriteMessage(websocket.CloseMessage, message)
c.Connected = false c.Connected = false
close(c.sendPong)
close(c.send) close(c.send)
delete(Broadcast, c.Id)
close(c.unregister) close(c.unregister)
return return
} }
@@ -179,31 +157,20 @@ func (c *Client) Write() {
} }
} }
func (c *Client) handleJsonPing(msg []byte) (isPing bool) { func (c *Client) PingInterval(interval time.Duration) {
var wsMsg WSMessage ticker := time.NewTicker(interval)
err := json.Unmarshal(msg, &wsMsg) defer ticker.Stop()
if err == nil && wsMsg.IsPing() { for range ticker.C {
c.conn.SetReadDeadline(time.Now().Add(writeWait))
// Respond with pong JSON
select {
case c.send <- GetPongByteSlice():
default:
// optional: log or handle if send buffer is full
c.handleError(fmt.Errorf("failed to queue pong message"))
return
}
if err != nil {
c.handleError(fmt.Errorf("write pong error: %w", err))
return
}
if c.OnPing != nil { if c.OnPing != nil {
c.OnPing() c.OnPing()
} }
isPing = true
}
if err := c.conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(pongWait)); err != nil {
c.OnError(err)
return return
}
}
} }
func (c *Client) SendResponse(data []byte) { func (c *Client) SendResponse(data []byte) {
@@ -215,6 +182,7 @@ func (c *Client) SendResponse(data []byte) {
func (c *Client) Close(code int, reason string) error { func (c *Client) Close(code int, reason string) error {
closeMsg := websocket.FormatCloseMessage(code, reason) closeMsg := websocket.FormatCloseMessage(code, reason)
select { select {
case c.unregister <- closeMsg: // Attempt to send case c.unregister <- closeMsg: // Attempt to send
default: // If the channel is full, this runs default: // If the channel is full, this runs
@@ -230,6 +198,7 @@ func (c *Client) handleError(err error) {
if c.OnError != nil { if c.OnError != nil {
c.OnError(err) c.OnError(err)
} }
if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil { if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil {
if c.OnError != nil { if c.OnError != nil {
c.OnError(err) c.OnError(err)