major change of websocket dbmHandler structure
This commit is contained in:
@@ -5,6 +5,8 @@ import (
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/zuadi/tecamino-dbm/utils"
|
||||
)
|
||||
|
||||
type Datapoint struct {
|
||||
@@ -15,169 +17,261 @@ type Datapoint struct {
|
||||
UpdateDateTime int64 `json:"updateDateTime,omitempty"`
|
||||
Type Type `json:"type"`
|
||||
ReadWrite Rights `json:"readWrite"`
|
||||
Drivers map[string]*Driver `json:"-"`
|
||||
Subscribtions Subscribtions `json:"-"`
|
||||
}
|
||||
|
||||
var depth1 int
|
||||
func (d *Datapoint) CreateDatapoint(typ Type, value any, rights Rights, path string) error {
|
||||
parts := regexp.MustCompile(`[:]+`).Split(path, -1)
|
||||
|
||||
func (d *Datapoint) CreateDatapoint(typ Type, value any, rights Rights, paths ...string) error {
|
||||
l := len(paths) - 1
|
||||
if l == 0 {
|
||||
paths = regexp.MustCompile(`[:]+`).Split(paths[0], -1)
|
||||
l = len(paths) - 1
|
||||
}
|
||||
current := d
|
||||
for i, part := range parts {
|
||||
if current.Datapoints == nil {
|
||||
current.Datapoints = make(map[string]*Datapoint)
|
||||
}
|
||||
|
||||
if d.Datapoints == nil {
|
||||
d.Datapoints = make(map[string]*Datapoint)
|
||||
}
|
||||
if i == len(parts)-1 {
|
||||
// Leaf node: create or update datapoint
|
||||
if existing, ok := current.Datapoints[part]; ok {
|
||||
// Update existing
|
||||
existing.Type = typ
|
||||
existing.ReadWrite = rights.GetRights()
|
||||
existing.Value = typ.ConvertValue(value)
|
||||
existing.UpdateDateTime = time.Now().UnixMilli()
|
||||
} else {
|
||||
// Create new
|
||||
current.Datapoints[part] = &Datapoint{
|
||||
Path: strings.Join(parts, ":"),
|
||||
Type: typ,
|
||||
Value: typ.ConvertValue(value),
|
||||
ReadWrite: rights.GetRights(),
|
||||
CreateDateTime: time.Now().UnixMilli(),
|
||||
UpdateDateTime: time.Now().UnixMilli(),
|
||||
Subscribtions: InitSubscribtion(),
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if l == depth1 {
|
||||
if od, ok := d.Datapoints[paths[depth1]]; !ok {
|
||||
d.Datapoints[paths[depth1]] = &Datapoint{
|
||||
Path: strings.Join(paths, ":"),
|
||||
Type: typ,
|
||||
Value: typ.ConvertValue(value),
|
||||
// Traverse or create intermediate datapoints
|
||||
if next, ok := current.Datapoints[part]; ok {
|
||||
current = next
|
||||
} else {
|
||||
newDp := &Datapoint{
|
||||
Path: strings.Join(parts[:i+1], ":"),
|
||||
Type: NONE,
|
||||
ReadWrite: rights.GetRights(),
|
||||
CreateDateTime: time.Now().UnixMilli(),
|
||||
UpdateDateTime: time.Now().UnixMilli(),
|
||||
Subscribtions: InitSubscribtion(),
|
||||
}
|
||||
} else {
|
||||
od.Type = typ
|
||||
od.ReadWrite = rights.GetRights()
|
||||
od.Value = typ.ConvertValue(value)
|
||||
od.UpdateDateTime = time.Now().UnixMilli()
|
||||
current.Datapoints[part] = newDp
|
||||
current = newDp
|
||||
}
|
||||
depth1 = 0
|
||||
} else if datapoint, ok := d.Datapoints[paths[depth1]]; ok {
|
||||
depth1 += 1
|
||||
datapoint.CreateDatapoint(typ, value, rights, paths...)
|
||||
} else {
|
||||
da := Datapoint{
|
||||
Path: strings.Join(paths[:depth1+1], ":"),
|
||||
Type: NONE,
|
||||
ReadWrite: rights.GetRights(),
|
||||
CreateDateTime: time.Now().UnixMilli(),
|
||||
UpdateDateTime: time.Now().UnixMilli(),
|
||||
}
|
||||
|
||||
d.Datapoints[paths[depth1]] = &da
|
||||
depth1 += 1
|
||||
da.CreateDatapoint(typ, value, rights, paths...)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var depth2 int
|
||||
func (d *Datapoint) ImportDatapoint(dp *Datapoint, path string) error {
|
||||
parts := regexp.MustCompile(`[:]+`).Split(path, -1)
|
||||
|
||||
func (d *Datapoint) ImportDatapoint(dp *Datapoint, paths ...string) error {
|
||||
l := len(paths) - 1
|
||||
if l == 0 {
|
||||
paths = regexp.MustCompile(`[:]+`).Split(paths[0], -1)
|
||||
l = len(paths) - 1
|
||||
}
|
||||
|
||||
if d.Datapoints == nil {
|
||||
d.Datapoints = make(map[string]*Datapoint)
|
||||
}
|
||||
if l == depth2 {
|
||||
if od, ok := d.Datapoints[paths[depth2]]; !ok {
|
||||
d.Datapoints[paths[depth2]] = dp
|
||||
dp.ReadWrite = dp.ReadWrite.GetRights()
|
||||
dp.UpdateDateTime = time.Now().UnixMilli()
|
||||
} else {
|
||||
od.Type = dp.Type
|
||||
od.Value = d.Type.ConvertValue(dp.Value)
|
||||
od.ReadWrite = dp.ReadWrite.GetRights()
|
||||
od.UpdateDateTime = time.Now().UnixMilli()
|
||||
current := d
|
||||
for i, part := range parts {
|
||||
if current.Datapoints == nil {
|
||||
current.Datapoints = make(map[string]*Datapoint)
|
||||
}
|
||||
depth2 = 0
|
||||
} else if datapoint, ok := d.Datapoints[paths[depth2]]; ok {
|
||||
depth2 += 1
|
||||
datapoint.ImportDatapoint(dp, paths...)
|
||||
} else {
|
||||
da := Datapoint{
|
||||
Path: strings.Join(paths[:depth2+1], ":"),
|
||||
Type: NONE,
|
||||
UpdateDateTime: time.Now().UnixMilli(),
|
||||
}
|
||||
da.ReadWrite = da.ReadWrite.GetRights()
|
||||
d.Datapoints[paths[depth2]] = &da
|
||||
depth2 += 1
|
||||
da.ImportDatapoint(dp, paths...)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var depth3 int
|
||||
|
||||
func (d *Datapoint) UpdateDatapointValue(value any, paths ...string) error {
|
||||
l := len(paths) - 1
|
||||
if l == 0 {
|
||||
paths = regexp.MustCompile(`[:]+`).Split(paths[0], -1)
|
||||
l = len(paths) - 1
|
||||
}
|
||||
|
||||
if l == depth3 {
|
||||
if dp, ok := d.Datapoints[paths[depth3]]; ok {
|
||||
fmt.Print("update dp:", dp.Path, " old value:", dp.Value)
|
||||
dp.Value = dp.Type.ConvertValue(value)
|
||||
fmt.Println(" new value:", dp.Value)
|
||||
|
||||
dp.UpdateDateTime = time.Now().UnixMilli()
|
||||
depth3 = 0
|
||||
if i == len(parts)-1 {
|
||||
// Leaf node: import the datapoint
|
||||
if existing, ok := current.Datapoints[part]; ok {
|
||||
existing.Type = dp.Type
|
||||
existing.Value = current.Type.ConvertValue(dp.Value)
|
||||
existing.ReadWrite = dp.ReadWrite.GetRights()
|
||||
existing.UpdateDateTime = time.Now().UnixMilli()
|
||||
} else {
|
||||
dp.Path = strings.Join(parts, ":")
|
||||
dp.ReadWrite = dp.ReadWrite.GetRights()
|
||||
dp.UpdateDateTime = time.Now().UnixMilli()
|
||||
dp.Subscribtions = InitSubscribtion()
|
||||
current.Datapoints[part] = dp
|
||||
}
|
||||
return nil
|
||||
}
|
||||
depth3 = 0
|
||||
return fmt.Errorf("datapoint '%s' not found", strings.Join(paths, ":"))
|
||||
} else if datapoint, ok := d.Datapoints[paths[depth3]]; ok {
|
||||
depth3 += 1
|
||||
if err := datapoint.UpdateDatapointValue(value, paths...); err != nil {
|
||||
fmt.Println(100, err)
|
||||
return err
|
||||
|
||||
// Traverse or create intermediate nodes
|
||||
if next, ok := current.Datapoints[part]; ok {
|
||||
current = next
|
||||
} else {
|
||||
newDp := &Datapoint{
|
||||
Path: strings.Join(parts[:i+1], ":"),
|
||||
Type: NONE,
|
||||
ReadWrite: dp.ReadWrite.GetRights(),
|
||||
UpdateDateTime: time.Now().UnixMilli(),
|
||||
}
|
||||
newDp.ReadWrite = newDp.ReadWrite.GetRights()
|
||||
current.Datapoints[part] = newDp
|
||||
current = newDp
|
||||
}
|
||||
}
|
||||
depth3 = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
var depth4 int
|
||||
func (d *Datapoint) UpdateDatapointValue(value any, path string) error {
|
||||
|
||||
func (d *Datapoint) RemoveDatapoint(paths ...string) error {
|
||||
l := len(paths) - 1
|
||||
if l == 0 {
|
||||
paths = regexp.MustCompile(`[:]+`).Split(paths[0], -1)
|
||||
l = len(paths) - 1
|
||||
}
|
||||
paths := regexp.MustCompile(`[:]+`).Split(path, -1)
|
||||
|
||||
if l == depth4 {
|
||||
if _, ok := d.Datapoints[paths[depth4]]; ok {
|
||||
delete(d.Datapoints, paths[depth4])
|
||||
fmt.Println("removed dp:", strings.Join(paths, ":"))
|
||||
current := d
|
||||
for i, part := range paths {
|
||||
dp, ok := current.Datapoints[part]
|
||||
if !ok {
|
||||
return fmt.Errorf("datapoint path not found: %s (at %s)", path, part)
|
||||
}
|
||||
depth4 = 0
|
||||
} else if datapoint, ok := d.Datapoints[paths[depth4]]; ok {
|
||||
depth4 += 1
|
||||
datapoint.RemoveDatapoint(paths...)
|
||||
if i == len(paths)-1 {
|
||||
dp.Value = dp.Type.ConvertValue(value)
|
||||
dp.UpdateDateTime = time.Now().UnixMilli()
|
||||
return nil
|
||||
}
|
||||
current = dp
|
||||
}
|
||||
return fmt.Errorf("datapoint '%s' not found", strings.Join(paths, ":"))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Datapoint) GetAllDatapoints() (dps []*Datapoint) {
|
||||
for _, dp := range d.Datapoints {
|
||||
dps = append(dps, dp.GetAllDatapoints()...)
|
||||
dps = append(dps, dp)
|
||||
func (d *Datapoint) RemoveDatapoint(path string) error {
|
||||
parts := regexp.MustCompile(`[:]+`).Split(path, -1)
|
||||
|
||||
if len(parts) < 1 {
|
||||
return fmt.Errorf("invalid path: '%s'", path)
|
||||
}
|
||||
return
|
||||
|
||||
current := d
|
||||
for i := range len(parts) - 1 {
|
||||
next, ok := current.Datapoints[parts[i]]
|
||||
if !ok {
|
||||
return fmt.Errorf("path not found: '%s'", strings.Join(parts[:i+1], ":"))
|
||||
}
|
||||
current = next
|
||||
}
|
||||
|
||||
toDelete := parts[len(parts)-1]
|
||||
if _, ok := current.Datapoints[toDelete]; ok {
|
||||
delete(current.Datapoints, toDelete)
|
||||
fmt.Println("Removed datapoint:", path)
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("datapoint '%s' not found", path)
|
||||
}
|
||||
|
||||
func (d *Datapoint) QueryDatapoints(key string) (dps []*Datapoint) {
|
||||
reg := regexp.MustCompile(key)
|
||||
for _, dp := range d.Datapoints {
|
||||
if reg.MatchString(dp.Path) {
|
||||
func (d *Datapoint) GetAllDatapoints(depth int) (dps []*Datapoint) {
|
||||
|
||||
var dfs func(dp *Datapoint, currentDepth int)
|
||||
dfs = func(dp *Datapoint, currentDepth int) {
|
||||
if currentDepth == 0 {
|
||||
dps = append(dps, dp)
|
||||
}
|
||||
dps = append(dps, dp.QueryDatapoints(key)...)
|
||||
|
||||
if depth == 1 {
|
||||
return
|
||||
} else if depth == 0 {
|
||||
// Return all descendants
|
||||
for _, child := range dp.Datapoints {
|
||||
dps = append(dps, child)
|
||||
dfs(child, currentDepth+1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if currentDepth == depth-1 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, child := range dp.Datapoints {
|
||||
dfs(child, currentDepth+1)
|
||||
}
|
||||
}
|
||||
|
||||
dfs(d, 0)
|
||||
return
|
||||
}
|
||||
|
||||
func (d *Datapoint) QueryDatapoints(depth int, path string) (dps []*Datapoint) {
|
||||
parts := strings.Split(path, ":")
|
||||
|
||||
var dfs func(current *Datapoint, index int)
|
||||
dfs = func(current *Datapoint, index int) {
|
||||
|
||||
if index == len(parts) {
|
||||
dps = append(dps, current.GetAllDatapoints(depth)...)
|
||||
return
|
||||
}
|
||||
|
||||
pattern := "^" + parts[index] + "$"
|
||||
re, err := regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for name, dp := range current.Datapoints {
|
||||
if re.MatchString(name) {
|
||||
dfs(dp, index+1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dfs(d, 0)
|
||||
return
|
||||
}
|
||||
|
||||
func (d *Datapoint) AddSubscribtion(id string, sub *Subscribe) {
|
||||
if d.Subscribtions == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s, ok := d.Subscribtions[id]; ok {
|
||||
s.OnCreate = sub.OnCreate
|
||||
s.OnChange = sub.OnChange
|
||||
s.OnDelete = sub.OnDelete
|
||||
} else {
|
||||
d.Subscribtions[id] = &Subscribtion{
|
||||
OnCreate: sub.OnCreate,
|
||||
OnChange: sub.OnChange,
|
||||
OnDelete: sub.OnDelete,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Datapoint) RemoveSubscribtion(id string) {
|
||||
if _, ok := d.Subscribtions[id]; !ok {
|
||||
return
|
||||
}
|
||||
delete(d.Subscribtions, id)
|
||||
}
|
||||
|
||||
func (d *Datapoint) AddDriver(driver, bus string, adr int) {
|
||||
if d.Drivers == nil {
|
||||
d.Drivers = make(map[string]*Driver)
|
||||
}
|
||||
|
||||
d.Drivers[driver] = &Driver{
|
||||
Bus: bus,
|
||||
Address: adr,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Datapoint) AddDriverSubscribtion(id string, sub *Subscribe) {
|
||||
if s, ok := d.Subscribtions[id]; ok {
|
||||
s.OnCreate = sub.OnCreate
|
||||
s.OnChange = sub.OnChange
|
||||
s.OnDelete = sub.OnDelete
|
||||
} else {
|
||||
d.Subscribtions[id] = &Subscribtion{
|
||||
OnCreate: sub.OnCreate,
|
||||
OnChange: sub.OnChange,
|
||||
OnDelete: sub.OnDelete,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Datapoint) GetValueUint64() uint64 {
|
||||
return utils.Uint64From(d.Value)
|
||||
}
|
||||
|
Reference in New Issue
Block a user