package models import ( "errors" "fmt" "regexp" "strings" "time" "github.com/coder/websocket/wsjson" "github.com/google/uuid" serverModels "github.com/tecamino/tecamino-dbm/server/models" "github.com/tecamino/tecamino-dbm/utils" ) const ( OnCreate = "onCreate" OnChange = "onChange" OnDelete = "onDelete" ) type Datapoint struct { Datapoints map[string]*Datapoint `json:"-"` Uuid uuid.UUID `json:"uuid"` Path string `json:"path"` Value any `json:"value,omitempty"` CreateDateTime int64 `json:"createDateTime,omitempty"` UpdateDateTime int64 `json:"updateDateTime,omitempty"` Type Type `json:"type"` ReadWrite Rights `json:"readWrite"` Drivers Drivers `json:"drivers,omitempty"` Subscriptions Subscriptions `json:"-"` } func (d *Datapoint) Set(path string, set Set) (bool, error) { var changed bool if path != "" { changed = true d.Path = path } if set.Type != nil { changed = true d.Type = *set.Type } if d.Type != "" { if d.Value == nil && set.Value == nil { changed = true d.Value = d.Type.DefaultValue() } else if d.Value != d.Type.ConvertValue(set.Value) { changed = true d.Value = d.Type.ConvertValue(set.Value) } } if set.Rights != nil { changed = true d.ReadWrite = set.Rights.GetRights() } if changed { d.UpdateDateTime = time.Now().UnixMilli() } if set.Driver == nil { return changed, nil } if set.Driver.Type == "" { return changed, errors.New("driver type missing") } if set.Driver.Bus == "" { return changed, errors.New("driver bus name missing") } if d.Drivers == nil { d.Drivers = NewDrivers() } d.Drivers.AddDriver(set.Driver.Type, set.Driver.Bus, set.Driver.Address) d.UpdateDateTime = time.Now().UnixMilli() return changed, nil } func (d *Datapoint) GetValueUint64() uint64 { return utils.Uint64From(d.Value) } func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...Set) (created []Set, err error) { if len(sets) == 0 { return } for _, dp := range sets { parts := strings.Split(dp.Path, ":") current := d for i, part := range parts { if current.Datapoints == nil { current.Datapoints = make(map[string]*Datapoint) } if i == len(parts)-1 { // Leaf node: create or update datapoint if existing, ok := current.Datapoints[part]; ok { publish, err := existing.Set("", dp) if err != nil { return nil, err } created = append(created, Set{ Path: existing.Path, Type: &existing.Type, Value: existing.Value, Rights: &existing.ReadWrite, Drivers: &existing.Drivers, Updated: true, }) if publish { existing.Publish(conns, OnChange) } } else { // Create new current.Datapoints[part] = &Datapoint{ Uuid: uuid.New(), CreateDateTime: time.Now().UnixMilli(), Subscriptions: InitSubscribtion(), } publish, err := current.Datapoints[part].Set(strings.Join(parts, ":"), dp) if err != nil { return nil, err } created = append(created, Set{ Path: current.Path, Type: ¤t.Type, Value: current.Value, Rights: ¤t.ReadWrite, Driver: dp.Driver, }) if publish { current.Publish(conns, OnChange) } } return } // Traverse or create intermediate datapoints if next, ok := current.Datapoints[part]; ok { current = next } else { newDp := &Datapoint{ Uuid: uuid.New(), Path: strings.Join(parts[:i+1], ":"), Type: NONE, CreateDateTime: time.Now().UnixMilli(), UpdateDateTime: time.Now().UnixMilli(), Subscriptions: InitSubscribtion(), } if dp.Rights != nil { newDp.ReadWrite = dp.Rights.GetRights() } current.Datapoints[part] = newDp current = newDp } } } return } func (d *Datapoint) ImportDatapoint(conns *serverModels.Connections, dp Datapoint, path string) error { parts := strings.Split(dp.Path, ":") current := d for i, part := range parts { if current.Datapoints == nil { current.Datapoints = make(map[string]*Datapoint) } if i == len(parts)-1 { // Leaf node: import the datapoint if existing, ok := current.Datapoints[part]; ok { existing.Type = dp.Type existing.Value = current.Type.ConvertValue(dp.Value) existing.ReadWrite = dp.ReadWrite.GetRights() existing.UpdateDateTime = time.Now().UnixMilli() dp.Publish(conns, OnChange) } else { dp.Path = strings.Join(parts, ":") dp.ReadWrite = dp.ReadWrite.GetRights() dp.UpdateDateTime = time.Now().UnixMilli() dp.Subscriptions = InitSubscribtion() current.Datapoints[part] = &dp dp.Publish(conns, OnChange) } return nil } // Traverse or create intermediate nodes if next, ok := current.Datapoints[part]; ok { current = next } else { newDp := &Datapoint{ Path: strings.Join(parts[:i+1], ":"), Type: NONE, ReadWrite: dp.ReadWrite.GetRights(), UpdateDateTime: time.Now().UnixMilli(), } newDp.ReadWrite = newDp.ReadWrite.GetRights() current.Datapoints[part] = newDp current = newDp } } return nil } func (d *Datapoint) UpdateDatapointValue(conns *serverModels.Connections, value any, path string) error { paths := strings.Split(path, ":") current := d for i, part := range paths { dp, ok := current.Datapoints[part] if !ok { return fmt.Errorf("datapoint path not found: %s (at %s)", path, part) } if i == len(paths)-1 { dp.Value = dp.Type.ConvertValue(value) dp.UpdateDateTime = time.Now().UnixMilli() dp.Publish(conns, OnChange) return nil } current = dp } return nil } func (d *Datapoint) RemoveDatapoint(conns *serverModels.Connections, set Set) (Set, error) { parts := strings.Split(set.Path, ":") if len(parts) < 1 { return Set{}, fmt.Errorf("invalid path: '%s'", set.Path) } current := d for i := range len(parts) - 1 { next, ok := current.Datapoints[parts[i]] if !ok { return Set{}, fmt.Errorf("path not found: '%s'", strings.Join(parts[:i+1], ":")) } current = next } toDelete := parts[len(parts)-1] if dp, ok := current.Datapoints[toDelete]; ok { dp.Publish(conns, OnDelete) delete(current.Datapoints, toDelete) return Set{ Path: set.Path, }, nil } return Set{}, fmt.Errorf("datapoint '%s' not found", set.Path) } func (d *Datapoint) GetAllDatapoints(depth int) (dps []*Datapoint) { var dfs func(dp *Datapoint, currentDepth int) dfs = func(dp *Datapoint, currentDepth int) { if depth == 1 { return } else if depth == 0 { // Return all descendants for _, child := range dp.Datapoints { dps = append(dps, child) dfs(child, currentDepth+1) } return } if currentDepth == depth-1 { return } for _, child := range dp.Datapoints { dps = append(dps, child) dfs(child, currentDepth+1) } } dps = append(dps, d) dfs(d, 0) return } func (d *Datapoint) QueryDatapoints(depth int, path string) (dps []*Datapoint) { parts := strings.Split(path, ":") var dfs func(current *Datapoint, index int) dfs = func(current *Datapoint, index int) { if index == len(parts) { dps = append(dps, current.GetAllDatapoints(depth)...) return } pattern := "^" + parts[index] + "$" re, err := regexp.Compile(pattern) if err != nil { return } for name, dp := range current.Datapoints { if re.MatchString(name) { dfs(dp, index+1) } } } dfs(d, 0) return } func (d *Datapoint) AddSubscribtion(id string, sub *Subscribe) { if d.Subscriptions == nil { return } if s, ok := d.Subscriptions[id]; ok { s.OnCreate = sub.GetOnCreate() s.OnChange = sub.GetOnChange() s.OnDelete = sub.GetOnDelete() } else { d.Subscriptions[id] = &Subscription{ OnCreate: sub.GetOnCreate(), OnChange: sub.GetOnChange(), OnDelete: sub.GetOnDelete(), } } } func (d *Datapoint) RemoveSubscribtion(id string) { if _, ok := d.Subscriptions[id]; !ok { return } delete(d.Subscriptions, id) } func (d *Datapoint) Publish(conns *serverModels.Connections, eventType string) error { if conns.Clients == nil { return nil } conns.RLock() defer conns.RUnlock() for id := range d.Subscriptions { if client, ok := conns.Clients[id]; !ok { delete(d.Subscriptions, id) } else { err := wsjson.Write(client.Ctx, client.Conn, Publish{ Event: eventType, Path: d.Path, Value: d.Value, }) if err != nil { return err } } } return nil }