package statusServer import ( "fmt" "statusServer/handlers" logging "gitea.tecamino.com/paadi/tecamino-logger/logging" "github.com/gin-gonic/gin" "encoding/json" "gitea.tecamino.com/paadi/pubSub" pubSubModels "gitea.tecamino.com/paadi/pubSub/models" "gitea.tecamino.com/paadi/tecamino-dbm/auth" ) // StatusWebsocket wraps a PubSub instance with websocket support. // It manages connected clients, handles their subscriptions, and // relays messages via a PubSub publish/subscribe mechanism. type StatusWebsocket struct { *pubSub.Pubsub // core publish/subscribe engine clientHandler *handlers.ClientHandler // manages websocket clients *logging.Logger // application logger } // NewStatusWebsocket initializes a new PubSubWebsocket server. // - worker: number of worker goroutines processing messages. // - maxMessage: size of the message queue. // - logger: optional custom logger (creates default if nil). func NewStatusWebsocket(worker, maxMessage int, logger *logging.Logger) (ws *StatusWebsocket, err error) { if logger == nil { // Create default logger if none provided. logger, err = logging.NewLogger("PubSubWebsocket.log", logging.DefaultConfig()) if err != nil { return nil, err } } ws = &StatusWebsocket{ Pubsub: pubSub.NewPubsub(worker, maxMessage), clientHandler: handlers.NewConnectionHandler(), Logger: logger, } return } // NewConection upgrades an HTTP request to a websocket connection // and registers the client with the PubSubWebsocket server. // // Each connected client can: // - subscribe to topics (receives messages published on them), // - unsubscribe from topics, // - publish messages to topics (delivered to all subscribers). func (ws *StatusWebsocket) NewConection(c *gin.Context) { // Authenticate and extract client ID from query parameters. id, err := auth.GetIDFromQuery(c) if err != nil { ws.Error("Websocket", "error GetIDFromQuery: "+err.Error()) return } ws.Debug("Websocket", "authorization id token: "+id) // Create or reuse a websocket client. client, err := ws.clientHandler.ConnectNewClient(id, c) if err != nil { ws.Error("Websocket", err.Error()) return } // ---------------- Event Handlers ---------------- // Handle incoming messages from the client. client.OnMessage = func(data []byte) { // Decode request into PubSub message model. request, err := readJsonData(data) if err != nil { client.SendData([]byte(`{"error":"read json: ` + err.Error() + `"}`)) ws.Error("Websocket", "read json: "+err.Error()) return } // Route the action type. switch request.Action { case "subscribe": ws.Debug("Websocket", "subscribe id:"+id+" topic:"+request.Topic) // Register subscription callback. ws.Subscribe(id, request.Topic, func(data any) { b, err := json.Marshal(data) if err != nil { return } client.SendData(b) }) case "unsubscribe": ws.Debug("Websocket", "unsubscribe id:"+id+" topic:"+request.Topic) ws.Unsubscribe(id, request.Topic) case "publish": ws.Publish(request.Topic, request.Data) default: // Invalid or unsupported action. ws.Error("Websocket", "action type '"+request.Action+"' not supported") client.SendData([]byte(`{"error":"action type '` + request.Action + `' not supported"}`)) } } // Handle warnings raised by the client connection. client.OnWarning = func(msg string) { ws.Warning("Websocket", "warning on websocket connection: "+msg) } // Handle errors raised by the client connection. client.OnError = func(err error) { ws.Error("Websocket", "error on websocket connection: "+err.Error()) } // Cleanup when the client disconnects. client.OnClose = func(code int, reason string) { ws.Debug("Websocket", fmt.Sprintf("onClose id:%s code:%d reason:%s", id, code, reason)) ws.UnsubscribeAll(id) // free all subscriptions for this client ws.clientHandler.RemoveClient(id) } } // readJsonData decodes raw websocket bytes into a PubSub Data struct. func readJsonData(data []byte) (request pubSubModels.Data, err error) { err = json.Unmarshal(data, &request) return }