diff --git a/models/datapoint.go b/models/datapoint.go new file mode 100644 index 0000000..896ea3a --- /dev/null +++ b/models/datapoint.go @@ -0,0 +1,393 @@ +package models + +import ( + "errors" + "fmt" + "regexp" + "strings" + "time" + + "github.com/google/uuid" + serverModels "github.com/tecamino/tecamino-dbm/server/models" + "github.com/tecamino/tecamino-dbm/utils" + json_data "github.com/tecamino/tecamino-json_data" + json_dataModels "github.com/tecamino/tecamino-json_data/models" +) + +const ( + OnCreate = "onCreate" + OnChange = "onChange" + OnDelete = "onDelete" +) + +type Datapoint struct { + Datapoints map[string]*Datapoint `json:"-"` + Uuid uuid.UUID `json:"uuid"` + Path string `json:"path"` + Value any `json:"value,omitempty"` + CreateDateTime int64 `json:"createDateTime,omitempty"` + UpdateDateTime int64 `json:"updateDateTime,omitempty"` + Type json_dataModels.Type `json:"type"` + ReadWrite json_dataModels.Rights `json:"readWrite"` + Drivers json_dataModels.Drivers `json:"drivers,omitempty"` + Subscriptions Subscriptions `json:"-"` +} + +func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) { + var changed bool + if path != "" { + changed = true + d.Path = path + } + + if set.Type != "" { + changed = true + d.Type = set.Type + } + + if d.Type != "" { + if d.Value == nil && set.Value == nil { + changed = true + d.Value = d.Type.DefaultValue() + } else if d.Value != d.Type.ConvertValue(set.Value) { + changed = true + d.Value = d.Type.ConvertValue(set.Value) + } + + } + if set.Rights != "" { + changed = true + d.ReadWrite = set.Rights.GetRights() + } + + if changed { + d.UpdateDateTime = time.Now().UnixMilli() + } + + if set.Driver == nil { + return changed, nil + } + if set.Driver.Type == "" { + return changed, errors.New("driver type missing") + } + if set.Driver.Bus == "" { + return changed, errors.New("driver bus name missing") + } + + if d.Drivers == nil { + d.Drivers = json_dataModels.NewDrivers() + } + d.Drivers.AddDriver(set.Driver.Type, set.Driver.Bus, set.Driver.Address) + d.UpdateDateTime = time.Now().UnixMilli() + + return changed, nil +} + +func (d *Datapoint) GetValueUint64() uint64 { + return utils.Uint64From(d.Value) +} + +func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...json_dataModels.Set) (created []json_dataModels.Set, err error) { + if len(sets) == 0 { + return + } + for _, dp := range sets { + parts := strings.Split(dp.Path, ":") + + current := d + for i, part := range parts { + if current.Datapoints == nil { + current.Datapoints = make(map[string]*Datapoint) + } + + if i == len(parts)-1 { + // Leaf node: create or update datapoint + if existing, ok := current.Datapoints[part]; ok { + publish, err := existing.Set("", dp) + if err != nil { + return nil, err + } + created = append(created, json_dataModels.Set{ + Uuid: existing.Uuid, + Path: existing.Path, + Type: existing.Type, + Value: existing.Value, + Rights: existing.ReadWrite, + Drivers: &existing.Drivers, + Updated: true, + }) + + if publish { + existing.Publish(conns, OnChange) + } + } else { + ndp := Datapoint{ + Uuid: uuid.New(), + CreateDateTime: time.Now().UnixMilli(), + Subscriptions: InitSubscribtion(), + } + // Create new + current.Datapoints[part] = &ndp + publish, err := ndp.Set(strings.Join(parts, ":"), dp) + if err != nil { + return nil, err + } + created = append(created, json_dataModels.Set{ + Uuid: ndp.Uuid, + Path: ndp.Path, + Type: ndp.Type, + Value: ndp.Value, + Rights: ndp.ReadWrite, + Driver: dp.Driver, + }) + if publish { + current.Publish(conns, OnChange) + } + } + } + + // Traverse or create intermediate datapoints + if next, ok := current.Datapoints[part]; ok { + current = next + } else { + newDp := &Datapoint{ + Uuid: uuid.New(), + Path: strings.Join(parts[:i+1], ":"), + Type: json_dataModels.NONE, + CreateDateTime: time.Now().UnixMilli(), + UpdateDateTime: time.Now().UnixMilli(), + Subscriptions: InitSubscribtion(), + } + + created = append(created, json_dataModels.Set{ + Uuid: newDp.Uuid, + Path: newDp.Path, + Type: newDp.Type, + Value: newDp.Value, + Rights: newDp.ReadWrite, + }) + + if dp.Rights != "" { + newDp.ReadWrite = dp.Rights.GetRights() + } + + current.Datapoints[part] = newDp + current = newDp + } + } + } + return +} + +func (d *Datapoint) ImportDatapoint(conns *serverModels.Connections, dp Datapoint, path string) error { + parts := strings.Split(dp.Path, ":") + + current := d + for i, part := range parts { + if current.Datapoints == nil { + current.Datapoints = make(map[string]*Datapoint) + } + + if i == len(parts)-1 { + // Leaf node: import the datapoint + if existing, ok := current.Datapoints[part]; ok { + existing.Type = dp.Type + existing.Value = dp.Type.ConvertValue(dp.Value) + existing.ReadWrite = dp.ReadWrite.GetRights() + existing.UpdateDateTime = time.Now().UnixMilli() + dp.Publish(conns, OnChange) + } else { + dp.Path = strings.Join(parts, ":") + dp.ReadWrite = dp.ReadWrite.GetRights() + dp.UpdateDateTime = time.Now().UnixMilli() + dp.Subscriptions = InitSubscribtion() + current.Datapoints[part] = &dp + dp.Publish(conns, OnChange) + } + return nil + } + + // Traverse or create intermediate nodes + if next, ok := current.Datapoints[part]; ok { + current = next + } else { + newDp := &Datapoint{ + Path: strings.Join(parts[:i+1], ":"), + Type: json_dataModels.NONE, + ReadWrite: dp.ReadWrite.GetRights(), + UpdateDateTime: time.Now().UnixMilli(), + } + newDp.ReadWrite = newDp.ReadWrite.GetRights() + current.Datapoints[part] = newDp + current = newDp + } + } + return nil +} + +func (d *Datapoint) UpdateDatapointValue(conns *serverModels.Connections, value any, path string) error { + + paths := strings.Split(path, ":") + + current := d + for i, part := range paths { + dp, ok := current.Datapoints[part] + if !ok { + return fmt.Errorf("datapoint path not found: %s (at %s)", path, part) + } + if i == len(paths)-1 { + dp.Value = dp.Type.ConvertValue(value) + dp.UpdateDateTime = time.Now().UnixMilli() + dp.Publish(conns, OnChange) + return nil + } + current = dp + } + return nil +} + +func (d *Datapoint) UpdateValue(conns *serverModels.Connections, value any) error { + d.Value = d.Type.ConvertValue(value) + d.UpdateDateTime = time.Now().UnixMilli() + d.Publish(conns, OnChange) + return nil +} + +func (d *Datapoint) RemoveDatapoint(conns *serverModels.Connections, set json_dataModels.Set) (json_dataModels.Set, error) { + parts := strings.Split(set.Path, ":") + + if len(parts) < 1 { + return json_dataModels.Set{}, fmt.Errorf("invalid path: '%s'", set.Path) + } + + current := d + for i := range len(parts) - 1 { + next, ok := current.Datapoints[parts[i]] + if !ok { + return json_dataModels.Set{}, fmt.Errorf("path not found: '%s'", strings.Join(parts[:i+1], ":")) + } + current = next + } + + toDelete := parts[len(parts)-1] + if dp, ok := current.Datapoints[toDelete]; ok { + dp.Publish(conns, OnDelete) + delete(current.Datapoints, toDelete) + return json_dataModels.Set{ + Uuid: set.Uuid, + Path: set.Path, + }, nil + } + return json_dataModels.Set{}, fmt.Errorf("datapoint '%s' not found", set.Path) +} + +func (d *Datapoint) GetAllDatapoints(depth uint) (dps Datapoints) { + + var dfs func(dp *Datapoint, currentDepth uint) + dfs = func(dp *Datapoint, currentDepth uint) { + if depth == 1 { + return + } else if depth == 0 { + // Return all descendants + for _, child := range dp.Datapoints { + dps = append(dps, child) + dfs(child, currentDepth+1) + } + return + } + + if currentDepth == depth-1 { + return + } + + for _, child := range dp.Datapoints { + dps = append(dps, child) + dfs(child, currentDepth+1) + } + } + dps = append(dps, d) + dfs(d, 0) + dps.SortSlice() + return +} + +func (d *Datapoint) QueryDatapoints(depth uint, path string) (dps Datapoints) { + parts := strings.Split(path, ":") + + var dfs func(current *Datapoint, index int) + dfs = func(current *Datapoint, index int) { + + if index == len(parts) { + dps = append(dps, current.GetAllDatapoints(depth)...) + return + } + + pattern := "^" + parts[index] + "$" + re, err := regexp.Compile(pattern) + if err != nil { + return + } + + for name, dp := range current.Datapoints { + if re.MatchString(name) { + dfs(dp, index+1) + } + } + } + + dfs(d, 0) + dps.SortSlice() + return +} + +func (d *Datapoint) AddSubscribtion(id string, sub json_dataModels.Subscribe) { + if d.Subscriptions == nil { + return + } + + if s, ok := d.Subscriptions[id]; ok { + s.OnCreate = sub.OnCreate + s.OnChange = sub.OnChange + s.OnDelete = sub.OnDelete + } else { + d.Subscriptions[id] = &Subscription{ + OnCreate: sub.OnCreate, + OnChange: sub.OnChange, + OnDelete: sub.OnDelete, + } + } +} + +func (d *Datapoint) RemoveSubscribtion(id string) { + if _, ok := d.Subscriptions[id]; !ok { + return + } + delete(d.Subscriptions, id) +} + +func (d *Datapoint) Publish(conns *serverModels.Connections, eventType string) error { + if conns.Clients == nil { + return nil + } + conns.RLock() + defer conns.RUnlock() + + for id := range d.Subscriptions { + if _, ok := conns.Clients[id]; !ok { + delete(d.Subscriptions, id) + } else { + r := json_data.NewResponse() + r.AddUPublish(json_dataModels.Publish{ + Event: eventType, + Uuid: d.Uuid, + Path: d.Path, + Value: d.Value, + }) + + if err := conns.SendResponse(id, r); err != nil { + return err + } + } + } + return nil +} diff --git a/models/datapoints.go b/models/datapoints.go index 4745957..84ada1e 100644 --- a/models/datapoints.go +++ b/models/datapoints.go @@ -1,392 +1,12 @@ package models -import ( - "errors" - "fmt" - "regexp" - "strings" - "time" +import "sort" - "github.com/coder/websocket/wsjson" - "github.com/google/uuid" - serverModels "github.com/tecamino/tecamino-dbm/server/models" - "github.com/tecamino/tecamino-dbm/utils" - json_data "github.com/tecamino/tecamino-json_data" - json_dataModels "github.com/tecamino/tecamino-json_data/models" -) +type Datapoints []*Datapoint -const ( - OnCreate = "onCreate" - OnChange = "onChange" - OnDelete = "onDelete" -) - -type Datapoint struct { - Datapoints map[string]*Datapoint `json:"-"` - Uuid uuid.UUID `json:"uuid"` - Path string `json:"path"` - Value any `json:"value,omitempty"` - CreateDateTime int64 `json:"createDateTime,omitempty"` - UpdateDateTime int64 `json:"updateDateTime,omitempty"` - Type json_dataModels.Type `json:"type"` - ReadWrite json_dataModels.Rights `json:"readWrite"` - Drivers json_dataModels.Drivers `json:"drivers,omitempty"` - Subscriptions Subscriptions `json:"-"` -} - -func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) { - var changed bool - if path != "" { - changed = true - d.Path = path - } - - if set.Type != "" { - changed = true - d.Type = set.Type - } - - if d.Type != "" { - if d.Value == nil && set.Value == nil { - changed = true - d.Value = d.Type.DefaultValue() - } else if d.Value != d.Type.ConvertValue(set.Value) { - changed = true - d.Value = d.Type.ConvertValue(set.Value) - } - - } - if set.Rights != "" { - changed = true - d.ReadWrite = set.Rights.GetRights() - } - - if changed { - d.UpdateDateTime = time.Now().UnixMilli() - } - - if set.Driver == nil { - return changed, nil - } - if set.Driver.Type == "" { - return changed, errors.New("driver type missing") - } - if set.Driver.Bus == "" { - return changed, errors.New("driver bus name missing") - } - - if d.Drivers == nil { - d.Drivers = json_dataModels.NewDrivers() - } - d.Drivers.AddDriver(set.Driver.Type, set.Driver.Bus, set.Driver.Address) - d.UpdateDateTime = time.Now().UnixMilli() - - return changed, nil -} - -func (d *Datapoint) GetValueUint64() uint64 { - return utils.Uint64From(d.Value) -} - -func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...json_dataModels.Set) (created []json_dataModels.Set, err error) { - if len(sets) == 0 { - return - } - for _, dp := range sets { - parts := strings.Split(dp.Path, ":") - - current := d - for i, part := range parts { - if current.Datapoints == nil { - current.Datapoints = make(map[string]*Datapoint) - } - - if i == len(parts)-1 { - // Leaf node: create or update datapoint - if existing, ok := current.Datapoints[part]; ok { - publish, err := existing.Set("", dp) - if err != nil { - return nil, err - } - created = append(created, json_dataModels.Set{ - Uuid: existing.Uuid, - Path: existing.Path, - Type: existing.Type, - Value: existing.Value, - Rights: existing.ReadWrite, - Drivers: &existing.Drivers, - Updated: true, - }) - - if publish { - existing.Publish(conns, OnChange) - } - } else { - ndp := Datapoint{ - Uuid: uuid.New(), - CreateDateTime: time.Now().UnixMilli(), - Subscriptions: InitSubscribtion(), - } - // Create new - current.Datapoints[part] = &ndp - publish, err := ndp.Set(strings.Join(parts, ":"), dp) - if err != nil { - return nil, err - } - created = append(created, json_dataModels.Set{ - Uuid: ndp.Uuid, - Path: ndp.Path, - Type: ndp.Type, - Value: ndp.Value, - Rights: ndp.ReadWrite, - Driver: dp.Driver, - }) - if publish { - current.Publish(conns, OnChange) - } - } - } - - // Traverse or create intermediate datapoints - if next, ok := current.Datapoints[part]; ok { - current = next - } else { - newDp := &Datapoint{ - Uuid: uuid.New(), - Path: strings.Join(parts[:i+1], ":"), - Type: json_dataModels.NONE, - CreateDateTime: time.Now().UnixMilli(), - UpdateDateTime: time.Now().UnixMilli(), - Subscriptions: InitSubscribtion(), - } - - created = append(created, json_dataModels.Set{ - Uuid: newDp.Uuid, - Path: newDp.Path, - Type: newDp.Type, - Value: newDp.Value, - Rights: newDp.ReadWrite, - }) - - if dp.Rights != "" { - newDp.ReadWrite = dp.Rights.GetRights() - } - - current.Datapoints[part] = newDp - current = newDp - } - } - } - return -} - -func (d *Datapoint) ImportDatapoint(conns *serverModels.Connections, dp Datapoint, path string) error { - parts := strings.Split(dp.Path, ":") - - current := d - for i, part := range parts { - if current.Datapoints == nil { - current.Datapoints = make(map[string]*Datapoint) - } - - if i == len(parts)-1 { - // Leaf node: import the datapoint - if existing, ok := current.Datapoints[part]; ok { - existing.Type = dp.Type - existing.Value = current.Type.ConvertValue(dp.Value) - existing.ReadWrite = dp.ReadWrite.GetRights() - existing.UpdateDateTime = time.Now().UnixMilli() - dp.Publish(conns, OnChange) - } else { - dp.Path = strings.Join(parts, ":") - dp.ReadWrite = dp.ReadWrite.GetRights() - dp.UpdateDateTime = time.Now().UnixMilli() - dp.Subscriptions = InitSubscribtion() - current.Datapoints[part] = &dp - dp.Publish(conns, OnChange) - } - return nil - } - - // Traverse or create intermediate nodes - if next, ok := current.Datapoints[part]; ok { - current = next - } else { - newDp := &Datapoint{ - Path: strings.Join(parts[:i+1], ":"), - Type: json_dataModels.NONE, - ReadWrite: dp.ReadWrite.GetRights(), - UpdateDateTime: time.Now().UnixMilli(), - } - newDp.ReadWrite = newDp.ReadWrite.GetRights() - current.Datapoints[part] = newDp - current = newDp - } - } - return nil -} - -func (d *Datapoint) UpdateDatapointValue(conns *serverModels.Connections, value any, path string) error { - - paths := strings.Split(path, ":") - - current := d - for i, part := range paths { - dp, ok := current.Datapoints[part] - if !ok { - return fmt.Errorf("datapoint path not found: %s (at %s)", path, part) - } - if i == len(paths)-1 { - dp.Value = dp.Type.ConvertValue(value) - dp.UpdateDateTime = time.Now().UnixMilli() - dp.Publish(conns, OnChange) - return nil - } - current = dp - } - return nil -} - -func (d *Datapoint) UpdateValue(conns *serverModels.Connections, value any) error { - d.Value = d.Type.ConvertValue(value) - d.UpdateDateTime = time.Now().UnixMilli() - d.Publish(conns, OnChange) - return nil -} - -func (d *Datapoint) RemoveDatapoint(conns *serverModels.Connections, set json_dataModels.Set) (json_dataModels.Set, error) { - parts := strings.Split(set.Path, ":") - - if len(parts) < 1 { - return json_dataModels.Set{}, fmt.Errorf("invalid path: '%s'", set.Path) - } - - current := d - for i := range len(parts) - 1 { - next, ok := current.Datapoints[parts[i]] - if !ok { - return json_dataModels.Set{}, fmt.Errorf("path not found: '%s'", strings.Join(parts[:i+1], ":")) - } - current = next - } - - toDelete := parts[len(parts)-1] - if dp, ok := current.Datapoints[toDelete]; ok { - dp.Publish(conns, OnDelete) - delete(current.Datapoints, toDelete) - return json_dataModels.Set{ - Uuid: set.Uuid, - Path: set.Path, - }, nil - } - return json_dataModels.Set{}, fmt.Errorf("datapoint '%s' not found", set.Path) -} - -func (d *Datapoint) GetAllDatapoints(depth uint) (dps []*Datapoint) { - - var dfs func(dp *Datapoint, currentDepth uint) - dfs = func(dp *Datapoint, currentDepth uint) { - if depth == 1 { - return - } else if depth == 0 { - // Return all descendants - for _, child := range dp.Datapoints { - dps = append(dps, child) - dfs(child, currentDepth+1) - } - return - } - - if currentDepth == depth-1 { - return - } - - for _, child := range dp.Datapoints { - dps = append(dps, child) - dfs(child, currentDepth+1) - } - } - dps = append(dps, d) - dfs(d, 0) - return -} - -func (d *Datapoint) QueryDatapoints(depth uint, path string) (dps []*Datapoint) { - parts := strings.Split(path, ":") - - var dfs func(current *Datapoint, index int) - dfs = func(current *Datapoint, index int) { - - if index == len(parts) { - dps = append(dps, current.GetAllDatapoints(depth)...) - return - } - - pattern := "^" + parts[index] + "$" - re, err := regexp.Compile(pattern) - if err != nil { - return - } - - for name, dp := range current.Datapoints { - if re.MatchString(name) { - dfs(dp, index+1) - } - } - } - - dfs(d, 0) - return -} - -func (d *Datapoint) AddSubscribtion(id string, sub json_dataModels.Subscribe) { - if d.Subscriptions == nil { - return - } - - if s, ok := d.Subscriptions[id]; ok { - s.OnCreate = sub.OnCreate - s.OnChange = sub.OnChange - s.OnDelete = sub.OnDelete - } else { - d.Subscriptions[id] = &Subscription{ - OnCreate: sub.OnCreate, - OnChange: sub.OnChange, - OnDelete: sub.OnDelete, - } - } -} - -func (d *Datapoint) RemoveSubscribtion(id string) { - if _, ok := d.Subscriptions[id]; !ok { - return - } - delete(d.Subscriptions, id) -} - -func (d *Datapoint) Publish(conns *serverModels.Connections, eventType string) error { - if conns.Clients == nil { - return nil - } - conns.RLock() - defer conns.RUnlock() - - for id := range d.Subscriptions { - if client, ok := conns.Clients[id]; !ok { - delete(d.Subscriptions, id) - } else { - r := json_data.NewResponse() - r.AddUPublish(json_dataModels.Publish{ - Event: eventType, - Uuid: d.Uuid, - Path: d.Path, - Value: d.Value, - }) - err := wsjson.Write(client.Ctx, client.Conn, r) - if err != nil { - return err - } - } - } - return nil +func (d *Datapoints) SortSlice() { + // Sort by Path before processing + sort.Slice(*d, func(i, j int) bool { + return (*d)[i].Path < (*d)[j].Path + }) }