diff --git a/dbm/get.go b/dbm/get.go new file mode 100644 index 0000000..7ae1f66 --- /dev/null +++ b/dbm/get.go @@ -0,0 +1,37 @@ +package dbm + +import ( + json_data "github.com/tecamino/tecamino-json_data" + json_dataModels "github.com/tecamino/tecamino-json_data/models" +) + +func (d *DBMHandler) Get(gets []json_dataModels.Get, id, id2 string) { + if gets == nil { + return + } + d.RLock() + defer d.RUnlock() + + r := json_data.NewResponse() + r.Id = id2 + for _, get := range gets { + var depth uint = 1 + if get.Query != nil { + depth = get.Query.Depth + } + + for _, dp := range d.DB.QueryDatapoints(depth, get.Path) { + r.AddGet(json_dataModels.Get{ + Uuid: dp.Uuid, + Path: dp.Path, + Type: dp.Type, + Value: dp.Value, + Rights: dp.ReadWrite, + }) + } + } + + if err := d.Conns.SendResponse(id, r); err != nil { + d.Log.Error("get.Get", err.Error()) + } +} diff --git a/dbm/subscribe.go b/dbm/subscribe.go index 3a5305b..6695b72 100644 --- a/dbm/subscribe.go +++ b/dbm/subscribe.go @@ -1,24 +1,18 @@ package dbm import ( - "github.com/coder/websocket/wsjson" json_dataModels "github.com/tecamino/tecamino-json_data/models" ) -func (d *DBMHandler) Subscribe(subs []json_dataModels.Subscribe, id string) { +func (d *DBMHandler) Subscribe(subs []json_dataModels.Subscribe, id, id2 string) { if subs == nil { return } d.RLock() defer d.RUnlock() - client, ok := d.Conns.Clients[id] - if !ok { - d.Log.Error("subscribe.Subscribe", "client not found for id "+id) - return - } - - response := json_dataModels.NewResponse() + r := json_dataModels.NewResponse() + r.Id = id2 for _, sub := range subs { for _, dp := range d.DB.QueryDatapoints(sub.Depth, sub.Path) { @@ -28,7 +22,7 @@ func (d *DBMHandler) Subscribe(subs []json_dataModels.Subscribe, id string) { } } dp.AddSubscribtion(id, sub) - response.AddSubscription(json_dataModels.Subscribe{ + r.AddSubscription(json_dataModels.Subscribe{ Uuid: dp.Uuid, Path: dp.Path, Value: dp.Value, @@ -37,7 +31,8 @@ func (d *DBMHandler) Subscribe(subs []json_dataModels.Subscribe, id string) { }) } } - if err := wsjson.Write(client.Ctx, client.Conn, response); err != nil { + + if err := d.Conns.SendResponse(id, r); err != nil { d.Log.Error("subscribe.Subscribe", err.Error()) } } @@ -49,13 +44,7 @@ func (d *DBMHandler) Unsubscribe(subs []json_dataModels.Subscribe, id string) { d.RLock() defer d.RUnlock() - client, ok := d.Conns.Clients[id] - if !ok { - d.Log.Error("subscribe.Subscribe", "client not found for id "+id) - return - } - - response := json_dataModels.NewResponse() + r := json_dataModels.NewResponse() for _, sub := range subs { for _, dp := range d.DB.QueryDatapoints(sub.Depth, sub.Path) { @@ -63,13 +52,14 @@ func (d *DBMHandler) Unsubscribe(subs []json_dataModels.Subscribe, id string) { continue } dp.RemoveSubscribtion(id) - response.AddUnsubscription(json_dataModels.Subscribe{ + r.AddUnsubscription(json_dataModels.Subscribe{ Uuid: dp.Uuid, Path: dp.Path, }) } } - if err := wsjson.Write(client.Ctx, client.Conn, response); err != nil { - d.Log.Error("subscribe.Subscribe", err.Error()) + + if err := d.Conns.SendResponse(id, r); err != nil { + d.Log.Error("subscribe.Unsubscribe", err.Error()) } } diff --git a/dbm/webSocket.go b/dbm/webSocket.go index 05d6834..5420841 100644 --- a/dbm/webSocket.go +++ b/dbm/webSocket.go @@ -1,11 +1,12 @@ package dbm import ( + "encoding/json" "errors" "fmt" + "io" "github.com/coder/websocket" - "github.com/coder/websocket/wsjson" "github.com/gin-gonic/gin" "github.com/tecamino/tecamino-dbm/auth" json_dataModels "github.com/tecamino/tecamino-json_data/models" @@ -35,31 +36,46 @@ func (d *DBMHandler) WebSocket(c *gin.Context) { //Read loop for { + request, err := d.readJsonData(id) if err != nil { + d.Log.Error("websocket.WebSocket", err.Error()) break } // Sets - go d.Set(request.Set) + + d.Get(request.Get, id, request.Id) + // Sets + d.Set(request.Set) // Subscribe - go d.Subscribe(request.Subscribe, id) + d.Subscribe(request.Subscribe, id, request.Id) // Unsubscribe - go d.Unsubscribe(request.Unsubscribe, id) + d.Unsubscribe(request.Unsubscribe, id) + request.Get = make([]json_dataModels.Get, 0) + request.Set = make([]json_dataModels.Set, 0) + request.Subscribe = make([]json_dataModels.Subscribe, 0) + request.Unsubscribe = make([]json_dataModels.Subscribe, 0) + request = nil } } -func (d *DBMHandler) readJsonData(id string) (request json_dataModels.Request, err error) { +func (d *DBMHandler) readJsonData(id string) (request *json_dataModels.Request, err error) { client, ok := d.Conns.Clients[id] if !ok { return request, errors.New("client id not found") } - err = wsjson.Read(client.Ctx, client.Conn, &request) + _, reader, err := client.Conn.Reader(client.Ctx) + if err != nil { + return request, err + } + + b, err := io.ReadAll(reader) if err != nil { code := websocket.CloseStatus(err) @@ -74,5 +90,10 @@ func (d *DBMHandler) readJsonData(id string) (request json_dataModels.Request, e return } } + + if err := json.Unmarshal(b, &request); err != nil { + return request, err + } + return } diff --git a/server/models/connections.go b/server/models/connections.go index 049948e..db38f1c 100644 --- a/server/models/connections.go +++ b/server/models/connections.go @@ -1,10 +1,15 @@ package models import ( + "context" + "encoding/json" + "fmt" "sync" + "time" "github.com/coder/websocket" "github.com/gin-gonic/gin" + json_dataModels "github.com/tecamino/tecamino-json_data/models" ) type Connections struct { @@ -30,3 +35,30 @@ func (c *Connections) RemoveClient(id string) { func (c *Connections) DisconnectWsConnection(id string, code websocket.StatusCode, reason string) { c.Clients.DisconnectWsConnection(id, code, reason) } + +func (c *Connections) SendResponse(id string, r *json_dataModels.Response) error { + client, ok := c.Clients[id] + if !ok { + return fmt.Errorf("client not found for id " + id) + + } + b, err := json.Marshal(r) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + w, err := client.Conn.Writer(ctx, websocket.MessageText) + if err != nil { + return err + } + defer w.Close() + + _, err = w.Write(b) + if err != nil { + return err + } + return nil +}