Files
statusServer/statusWebsocket.go
2025-08-24 08:12:25 +02:00

134 lines
4.1 KiB
Go

package statusServer
import (
"fmt"
"gitea.tecamino.com/paadi/statusServer/handlers"
logging "gitea.tecamino.com/paadi/tecamino-logger/logging"
"github.com/gin-gonic/gin"
"encoding/json"
"gitea.tecamino.com/paadi/pubSub"
pubSubModels "gitea.tecamino.com/paadi/pubSub/models"
"gitea.tecamino.com/paadi/tecamino-dbm/auth"
)
// StatusWebsocket wraps a PubSub instance with websocket support.
// It manages connected clients, handles their subscriptions, and
// relays messages via a PubSub publish/subscribe mechanism.
type StatusWebsocket struct {
*pubSub.Pubsub // core publish/subscribe engine
clientHandler *handlers.ClientHandler // manages websocket clients
*logging.Logger // application logger
}
// NewStatusWebsocket initializes a new PubSubWebsocket server.
// - worker: number of worker goroutines processing messages.
// - maxMessage: size of the message queue.
// - logger: optional custom logger (creates default if nil).
func NewStatusWebsocket(worker, maxMessage int, logger *logging.Logger) (ws *StatusWebsocket, err error) {
if logger == nil {
// Create default logger if none provided.
logger, err = logging.NewLogger("PubSubWebsocket.log", logging.DefaultConfig())
if err != nil {
return nil, err
}
}
ws = &StatusWebsocket{
Pubsub: pubSub.NewPubsub(worker, maxMessage),
clientHandler: handlers.NewConnectionHandler(),
Logger: logger,
}
return
}
// NewConection upgrades an HTTP request to a websocket connection
// and registers the client with the PubSubWebsocket server.
//
// Each connected client can:
// - subscribe to topics (receives messages published on them),
// - unsubscribe from topics,
// - publish messages to topics (delivered to all subscribers).
func (ws *StatusWebsocket) NewConection(c *gin.Context) {
// Authenticate and extract client ID from query parameters.
id, err := auth.GetIDFromQuery(c)
if err != nil {
ws.Error("Websocket", "error GetIDFromQuery: "+err.Error())
return
}
ws.Debug("Websocket", "authorization id token: "+id)
// Create or reuse a websocket client.
client, err := ws.clientHandler.ConnectNewClient(id, c)
if err != nil {
ws.Error("Websocket", err.Error())
return
}
// ---------------- Event Handlers ----------------
// Handle incoming messages from the client.
client.OnMessage = func(data []byte) {
// Decode request into PubSub message model.
request, err := readJsonData(data)
if err != nil {
client.SendData([]byte(`{"error":"read json: ` + err.Error() + `"}`))
ws.Error("Websocket", "read json: "+err.Error())
return
}
// Route the action type.
switch request.Action {
case "subscribe":
ws.Debug("Websocket", "subscribe id:"+id+" topic:"+request.Topic)
// Register subscription callback.
ws.Subscribe(id, request.Topic, func(data any) {
b, err := json.Marshal(data)
if err != nil {
return
}
client.SendData(b)
})
case "unsubscribe":
ws.Debug("Websocket", "unsubscribe id:"+id+" topic:"+request.Topic)
ws.Unsubscribe(id, request.Topic)
case "publish":
ws.Publish(request.Topic, request.Data)
default:
// Invalid or unsupported action.
ws.Error("Websocket", "action type '"+request.Action+"' not supported")
client.SendData([]byte(`{"error":"action type '` + request.Action + `' not supported"}`))
}
}
// Handle warnings raised by the client connection.
client.OnWarning = func(msg string) {
ws.Warning("Websocket", "warning on websocket connection: "+msg)
}
// Handle errors raised by the client connection.
client.OnError = func(err error) {
ws.Error("Websocket", "error on websocket connection: "+err.Error())
}
// Cleanup when the client disconnects.
client.OnClose = func(code int, reason string) {
ws.Debug("Websocket", fmt.Sprintf("onClose id:%s code:%d reason:%s", id, code, reason))
ws.UnsubscribeAll(id) // free all subscriptions for this client
ws.clientHandler.RemoveClient(id)
}
}
// readJsonData decodes raw websocket bytes into a PubSub Data struct.
func readJsonData(data []byte) (request pubSubModels.Data, err error) {
err = json.Unmarshal(data, &request)
return
}