133 lines
4.1 KiB
Go
133 lines
4.1 KiB
Go
package statusServer
|
|
|
|
import (
|
|
"fmt"
|
|
"statusServer/handlers"
|
|
|
|
logging "gitea.tecamino.com/paadi/tecamino-logger/logging"
|
|
"github.com/gin-gonic/gin"
|
|
|
|
"encoding/json"
|
|
|
|
"gitea.tecamino.com/paadi/pubSub"
|
|
pubSubModels "gitea.tecamino.com/paadi/pubSub/models"
|
|
"gitea.tecamino.com/paadi/tecamino-dbm/auth"
|
|
)
|
|
|
|
// StatusWebsocket wraps a PubSub instance with websocket support.
|
|
// It manages connected clients, handles their subscriptions, and
|
|
// relays messages via a PubSub publish/subscribe mechanism.
|
|
type StatusWebsocket struct {
|
|
*pubSub.Pubsub // core publish/subscribe engine
|
|
clientHandler *handlers.ClientHandler // manages websocket clients
|
|
*logging.Logger // application logger
|
|
}
|
|
|
|
// NewStatusWebsocket initializes a new PubSubWebsocket server.
|
|
// - worker: number of worker goroutines processing messages.
|
|
// - maxMessage: size of the message queue.
|
|
// - logger: optional custom logger (creates default if nil).
|
|
func NewStatusWebsocket(worker, maxMessage int, logger *logging.Logger) (ws *StatusWebsocket, err error) {
|
|
if logger == nil {
|
|
// Create default logger if none provided.
|
|
logger, err = logging.NewLogger("PubSubWebsocket.log", logging.DefaultConfig())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
ws = &StatusWebsocket{
|
|
Pubsub: pubSub.NewPubsub(worker, maxMessage),
|
|
clientHandler: handlers.NewConnectionHandler(),
|
|
Logger: logger,
|
|
}
|
|
return
|
|
}
|
|
|
|
// NewConection upgrades an HTTP request to a websocket connection
|
|
// and registers the client with the PubSubWebsocket server.
|
|
//
|
|
// Each connected client can:
|
|
// - subscribe to topics (receives messages published on them),
|
|
// - unsubscribe from topics,
|
|
// - publish messages to topics (delivered to all subscribers).
|
|
func (ws *StatusWebsocket) NewConection(c *gin.Context) {
|
|
// Authenticate and extract client ID from query parameters.
|
|
id, err := auth.GetIDFromQuery(c)
|
|
if err != nil {
|
|
ws.Error("Websocket", "error GetIDFromQuery: "+err.Error())
|
|
return
|
|
}
|
|
ws.Debug("Websocket", "authorization id token: "+id)
|
|
|
|
// Create or reuse a websocket client.
|
|
client, err := ws.clientHandler.ConnectNewClient(id, c)
|
|
if err != nil {
|
|
ws.Error("Websocket", err.Error())
|
|
return
|
|
}
|
|
|
|
// ---------------- Event Handlers ----------------
|
|
|
|
// Handle incoming messages from the client.
|
|
client.OnMessage = func(data []byte) {
|
|
// Decode request into PubSub message model.
|
|
request, err := readJsonData(data)
|
|
if err != nil {
|
|
client.SendData([]byte(`{"error":"read json: ` + err.Error() + `"}`))
|
|
ws.Error("Websocket", "read json: "+err.Error())
|
|
return
|
|
}
|
|
|
|
// Route the action type.
|
|
switch request.Action {
|
|
case "subscribe":
|
|
ws.Debug("Websocket", "subscribe id:"+id+" topic:"+request.Topic)
|
|
|
|
// Register subscription callback.
|
|
ws.Subscribe(id, request.Topic, func(data any) {
|
|
b, err := json.Marshal(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
client.SendData(b)
|
|
})
|
|
|
|
case "unsubscribe":
|
|
ws.Debug("Websocket", "unsubscribe id:"+id+" topic:"+request.Topic)
|
|
ws.Unsubscribe(id, request.Topic)
|
|
|
|
case "publish":
|
|
ws.Publish(request.Topic, request.Data)
|
|
|
|
default:
|
|
// Invalid or unsupported action.
|
|
ws.Error("Websocket", "action type '"+request.Action+"' not supported")
|
|
client.SendData([]byte(`{"error":"action type '` + request.Action + `' not supported"}`))
|
|
}
|
|
}
|
|
|
|
// Handle warnings raised by the client connection.
|
|
client.OnWarning = func(msg string) {
|
|
ws.Warning("Websocket", "warning on websocket connection: "+msg)
|
|
}
|
|
|
|
// Handle errors raised by the client connection.
|
|
client.OnError = func(err error) {
|
|
ws.Error("Websocket", "error on websocket connection: "+err.Error())
|
|
}
|
|
|
|
// Cleanup when the client disconnects.
|
|
client.OnClose = func(code int, reason string) {
|
|
ws.Debug("Websocket", fmt.Sprintf("onClose id:%s code:%d reason:%s", id, code, reason))
|
|
ws.UnsubscribeAll(id) // free all subscriptions for this client
|
|
ws.clientHandler.RemoveClient(id)
|
|
}
|
|
}
|
|
|
|
// readJsonData decodes raw websocket bytes into a PubSub Data struct.
|
|
func readJsonData(data []byte) (request pubSubModels.Data, err error) {
|
|
err = json.Unmarshal(data, &request)
|
|
return
|
|
}
|