package dbm import ( "fmt" json_dataModels "github.com/tecamino/tecamino-json_data/models" ) func (d *DBMHandler) Subscribe(req *json_dataModels.Request, id string) { if req == nil { return } if len(req.Subscribe) == 0 { return } d.RLock() defer d.RUnlock() resp := json_dataModels.NewResponse() resp.Id = req.Id for _, sub := range req.Subscribe { dps := d.DBM.QueryDatapoints(sub.Depth, sub.Uuid, sub.Path) if len(dps) == 0 { resp.SetError() if resp.Message == "" { resp.SetMessage(fmt.Sprintf("datapoint %s not found", sub.Path)) } continue } for _, dp := range dps { if sub.Driver != "" { if dp.Drivers == nil || dp.Drivers[sub.Driver] == nil { continue } } dp.AddSubscribtion(id, sub) resp.AddSubscription(json_dataModels.Subscription{ Uuid: dp.Uuid, Path: dp.Path, Value: dp.Value, HasChild: dp.Datapoints != nil, Driver: sub.Driver, Drivers: &dp.Drivers, }) } } if err := d.Conns.SendResponse(id, resp); err != nil { d.Log.Error("subscribe.Subscribe", err.Error()) } } func (d *DBMHandler) Unsubscribe(req *json_dataModels.Request, id string) { if req == nil { return } if len(req.Unsubscribe) == 0 { return } d.RLock() defer d.RUnlock() resp := json_dataModels.NewResponse() resp.Id = req.Id for _, sub := range req.Unsubscribe { for _, dp := range d.DBM.QueryDatapoints(sub.Depth, sub.Uuid, sub.Path) { if _, ok := dp.Subscriptions[id]; !ok { continue } dp.RemoveSubscribtion(id) resp.AddUnsubscription(json_dataModels.Subscription{ Uuid: dp.Uuid, Path: dp.Path, }) } } if err := d.Conns.SendResponse(id, resp); err != nil { d.Log.Error("subscribe.Unsubscribe", err.Error()) } }