diff --git a/dbm/json_data.go b/dbm/json_data.go index 9a18c08..12a64de 100644 --- a/dbm/json_data.go +++ b/dbm/json_data.go @@ -1,6 +1,7 @@ package dbm import ( + "fmt" "net/http" "github.com/gin-gonic/gin" @@ -29,6 +30,7 @@ func (d *DBMHandler) Json_Data(c *gin.Context) { if get.Query != nil { depth = get.Query.Depth } + for _, res := range d.DBM.QueryDatapoints(depth, get.Uuid, get.Path) { resp.AddGet(json_dataModels.Get{ Uuid: res.Uuid, @@ -44,6 +46,10 @@ func (d *DBMHandler) Json_Data(c *gin.Context) { } if len(payload.Set) > 0 { resp.Set, err = d.DBM.CreateDatapoints(payload.Set...) + for i, o := range resp.Set { + fmt.Println(1000, i, o) + fmt.Println(1001, o.HasChild) + } if err != nil { r := json_data.NewResponse() r.SetError() @@ -52,6 +58,7 @@ func (d *DBMHandler) Json_Data(c *gin.Context) { return } } + c.JSON(200, resp) } diff --git a/dbm/set.go b/dbm/set.go index 6d903f8..529d16c 100644 --- a/dbm/set.go +++ b/dbm/set.go @@ -30,7 +30,7 @@ func (d *DBMHandler) Set(req *json_dataModels.Request, id string) { } for _, dp := range dps { - dp.UpdateValue(d.Conns, set.Value) + dp.UpdateValue(set.Value) resp.AddSet(json_dataModels.Set{ Uuid: dp.Uuid, diff --git a/dbm/webSocket.go b/dbm/webSocket.go index 7eb34f3..57d0179 100644 --- a/dbm/webSocket.go +++ b/dbm/webSocket.go @@ -46,6 +46,10 @@ func (d *DBMHandler) WebSocket(c *gin.Context) { d.Unsubscribe(request, id) } + client.OnWarning = func(s string) { + d.Log.Warning("dbmHandler.webSocket.Websocket", "warning on websocket connection: "+s) + } + client.OnError = func(err error) { d.Log.Error("dbmHandler.webSocket.Websocket", "error on websocket connection: "+err.Error()) } diff --git a/go.mod b/go.mod index a9bd3bb..fb7af9c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 - github.com/tecamino/tecamino-json_data v0.0.19 + github.com/tecamino/tecamino-json_data v0.0.21 github.com/tecamino/tecamino-logger v0.2.0 ) diff --git a/go.sum b/go.sum index 781185a..dba797a 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tecamino/tecamino-json_data v0.0.19 h1:jaEiY38Rdur1ZKJ5/5H/7ZL5vn1NdUnM/0bQj2vByKc= -github.com/tecamino/tecamino-json_data v0.0.19/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY= +github.com/tecamino/tecamino-json_data v0.0.21 h1:ZRN9wyn+p6J1T4b0e9xf1HStVaAy4wb+3yZ9xn5LFc0= +github.com/tecamino/tecamino-json_data v0.0.21/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY= github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU= github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= diff --git a/main.go b/main.go index 5fc76e8..292b2d9 100644 --- a/main.go +++ b/main.go @@ -18,7 +18,7 @@ func main() { if err != nil { panic(err) } - //save database after exeutabe ends + //save database after executabe ends defer dbmHandler.SaveDb() //initialize new server diff --git a/models/datapoint.go b/models/datapoint.go index 1f65b40..be3b7be 100644 --- a/models/datapoint.go +++ b/models/datapoint.go @@ -23,16 +23,17 @@ const ( ) type Datapoint struct { - Datapoints map[string]*Datapoint `json:"-"` Uuid uuid.UUID `json:"uuid"` - Path string `json:"path"` Value any `json:"value,omitempty"` + Path string `json:"path"` CreateDateTime int64 `json:"createDateTime,omitempty"` UpdateDateTime int64 `json:"updateDateTime,omitempty"` - Type json_dataModels.Type `json:"type"` - ReadWrite json_dataModels.Rights `json:"readWrite"` + Datapoints map[string]*Datapoint `json:"-"` Drivers json_dataModels.Drivers `json:"drivers,omitempty"` Subscriptions Subscriptions `json:"-"` + Type json_dataModels.Type `json:"type"` + ReadWrite json_dataModels.Rights `json:"readWrite"` + HasChild bool `json:"-"` } func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) { @@ -89,14 +90,15 @@ func (d *Datapoint) GetValueUint64() uint64 { return utils.Uint64From(d.Value) } -func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []json_dataModels.Set, uuids Uuids, err error) { +func (d *Datapoint) CreateDatapoints(uuids *Uuids, sets ...json_dataModels.Set) (created []json_dataModels.Set, err error) { if len(sets) == 0 { return } - uuids = make(Uuids, 1) + publishes := []json_dataModels.Publish{} for _, dp := range sets { + parts := strings.Split(dp.Path, ":") current := d @@ -104,52 +106,66 @@ func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []jso if current.Datapoints == nil { current.Datapoints = make(map[string]*Datapoint) } - if i == len(parts)-1 { // Leaf node: create or update datapoint if existing, ok := current.Datapoints[part]; ok { _, err := existing.Set("", dp) if err != nil { - return nil, nil, err + return nil, err } created = append(created, json_dataModels.Set{ - Uuid: existing.Uuid, - Path: existing.Path, - Type: existing.Type, - Value: existing.Value, - Rights: existing.ReadWrite, - Drivers: &existing.Drivers, - Updated: true, + Uuid: existing.Uuid, + Path: existing.Path, + Type: existing.Type, + Value: existing.Value, + Rights: existing.ReadWrite, + Drivers: &existing.Drivers, + Updated: true, + HasChild: existing.HasChild, }) existing.Publish(OnChange) } else { - ndp := Datapoint{ - Uuid: uuid.New(), + var uid uuid.UUID = uuid.New() + if dp.Uuid != uuid.Nil { + uid = dp.Uuid + } + ndp := &Datapoint{ + Uuid: uid, CreateDateTime: time.Now().UnixMilli(), Subscriptions: InitSubscribtion(), } // Create new - current.Datapoints[part] = &ndp + current.Datapoints[part] = ndp + _, err := ndp.Set(strings.Join(parts, ":"), dp) if err != nil { - return nil, nil, err + return nil, err } - created = append(created, json_dataModels.Set{ - Uuid: ndp.Uuid, - Path: ndp.Path, - Type: ndp.Type, - Value: ndp.Value, - Rights: ndp.ReadWrite, - Driver: dp.Driver, - }) - ndp.Publish(OnCreate) //add uuid to flat map for faster lookup - uuids[ndp.Uuid] = &ndp + uuids.AddDatapoint(current, ndp) + + created = append(created, json_dataModels.Set{ + Uuid: ndp.Uuid, + Path: ndp.Path, + Type: ndp.Type, + Value: ndp.Value, + Rights: ndp.ReadWrite, + Driver: dp.Driver, + HasChild: ndp.HasChild, + }) + + publishes = append(publishes, json_dataModels.Publish{ + Event: OnCreate, + Uuid: ndp.Uuid, + Path: ndp.Path, + Type: ndp.Type, + Value: ndp.Value, + }) } } @@ -166,36 +182,51 @@ func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []jso Subscriptions: InitSubscribtion(), } + //add uuid to flat map for faster lookup + uuids.AddDatapoint(current, newDp) + created = append(created, json_dataModels.Set{ - Uuid: newDp.Uuid, - Path: newDp.Path, - Type: newDp.Type, - Value: newDp.Value, - Rights: newDp.ReadWrite, + Uuid: newDp.Uuid, + Path: newDp.Path, + Type: newDp.Type, + Value: newDp.Value, + Rights: newDp.ReadWrite, + HasChild: newDp.HasChild, }) if dp.Rights != "" { newDp.ReadWrite = dp.Rights.GetRights() } - newDp.Publish(OnCreate) + publishes = append(publishes, json_dataModels.Publish{ + Event: OnCreate, + Uuid: newDp.Uuid, + Path: newDp.Path, + Type: newDp.Type, + Value: newDp.Value, + }) current.Datapoints[part] = newDp current = newDp - - //add uuid to flat map for faster lookuo - uuids[newDp.Uuid] = newDp } } } + + r := json_data.NewResponse() + r.Publish = append(r.Publish, publishes...) + + b, err := json.Marshal(r) + if err != nil { + return created, err + } + ws.SendBroadcast(b) + return } -func (d *Datapoint) ImportDatapoint(dp *Datapoint, path string) (uuids Uuids, err error) { +func (d *Datapoint) ImportDatapoint(uuids *Uuids, dp *Datapoint, path string) (err error) { parts := strings.Split(dp.Path, ":") - uuids = make(Uuids, 1) - current := d for i, part := range parts { if current.Datapoints == nil { @@ -217,11 +248,12 @@ func (d *Datapoint) ImportDatapoint(dp *Datapoint, path string) (uuids Uuids, er dp.Subscriptions = InitSubscribtion() current.Datapoints[part] = dp //add uuid to flat map for faster lookup - uuids[dp.Uuid] = dp + uuids.AddDatapoint(current, dp) + dp.Publish(OnChange) } - return uuids, nil + return nil } // Traverse or create intermediate nodes @@ -238,10 +270,10 @@ func (d *Datapoint) ImportDatapoint(dp *Datapoint, path string) (uuids Uuids, er current.Datapoints[part] = newDp current = newDp //add uuid to flat map for faster lookup - uuids[newDp.Uuid] = newDp + uuids.AddDatapoint(current, newDp) } } - return uuids, nil + return nil } func (d *Datapoint) UpdateDatapointValue(value any, path string) error { @@ -265,20 +297,22 @@ func (d *Datapoint) UpdateDatapointValue(value any, path string) error { return nil } -func (d *Datapoint) UpdateValue(conns *ws.ClientHandler, value any) error { +func (d *Datapoint) UpdateValue(value any) error { d.Value = d.Type.ConvertValue(value) d.UpdateDateTime = time.Now().UnixMilli() d.Publish(OnChange) return nil } -func (d *Datapoint) RemoveDatapoint(conns *ws.ClientHandler, set json_dataModels.Set) (sets []json_dataModels.Set, err error) { +func (d *Datapoint) RemoveDatapoint(set json_dataModels.Set) (sets []json_dataModels.Set, err error) { parts := strings.Split(set.Path, ":") if len(parts) < 1 { return sets, fmt.Errorf("invalid path: '%s'", set.Path) } + publishes := []json_dataModels.Publish{} + current := d for i := 0; i < len(parts)-1; i++ { next, ok := current.Datapoints[parts[i]] @@ -290,49 +324,64 @@ func (d *Datapoint) RemoveDatapoint(conns *ws.ClientHandler, set json_dataModels toDelete := parts[len(parts)-1] if dp, ok := current.Datapoints[toDelete]; ok { - sets = append(sets, removeChildren(dp)...) - dp.Publish(OnDelete) + s, p := removeChildren(dp) + sets = append(sets, s...) + publishes = append(publishes, p...) + publishes = append(publishes, json_dataModels.Publish{ + Event: OnDelete, + Uuid: dp.Uuid, + Path: dp.Path, + }) sets = append(sets, json_dataModels.Set{ Uuid: dp.Uuid, Path: dp.Path, }) delete(current.Datapoints, toDelete) + + r := json_data.NewResponse() + r.Publish = append(r.Publish, publishes...) + + b, err := json.Marshal(r) + if err != nil { + return sets, err + } + ws.SendBroadcast(b) + return sets, nil } return sets, fmt.Errorf("datapoint '%s' not found", set.Path) } // removes all children and grandchlidren of datapoint -func removeChildren(dp *Datapoint) (sets []json_dataModels.Set) { +func removeChildren(dp *Datapoint) (sets []json_dataModels.Set, pubs []json_dataModels.Publish) { for name, d := range dp.Datapoints { - sets = append(sets, removeChildren(d)...) - d.Publish(OnDelete) + s, p := removeChildren(d) + sets = append(sets, s...) + pubs = append(pubs, p...) + sets = append(sets, json_dataModels.Set{ Uuid: d.Uuid, Path: d.Path, }) delete(d.Datapoints, name) } - return sets + return sets, pubs } func (d *Datapoint) GetAllDatapoints(depth uint) (dps Datapoints) { + dps = append(dps, d) + if depth == 1 { + return + } else if depth > 0 { + depth-- + } var dfs func(dp *Datapoint, currentDepth uint) dfs = func(dp *Datapoint, currentDepth uint) { switch depth { case 0: // Return all descendants - for _, child := range dp.Datapoints { - dps = append(dps, child) - dfs(child, currentDepth+1) - } - return - case 1: - return - } - - if currentDepth == depth-1 { + case currentDepth: return } @@ -341,7 +390,7 @@ func (d *Datapoint) GetAllDatapoints(depth uint) (dps Datapoints) { dfs(child, currentDepth+1) } } - dps = append(dps, d) + dfs(d, 0) dps.SortSlice() return @@ -393,6 +442,35 @@ func (d *Datapoint) AddSubscribtion(conn *wsModels.Client, sub json_dataModels.S } } +func (d *Datapoint) RenamePaths(oldPath string) { + visited := make(map[*Datapoint]bool) + + if len(d.Datapoints) == 0 { + return + } + + for _, dp := range d.Datapoints { + dp.Path = strings.Replace(dp.Path, oldPath, d.Path, 1) + dp.renameSubPaths(oldPath, d.Path, visited) + } +} + +func (d *Datapoint) renameSubPaths(oldPath, newPath string, visited map[*Datapoint]bool) { + if visited[d] { + return + } + visited[d] = true + + if len(d.Datapoints) == 0 { + return + } + + for _, dp := range d.Datapoints { + dp.Path = strings.Replace(dp.Path, oldPath, newPath, 1) + dp.renameSubPaths(oldPath, newPath, visited) + } +} + func (d *Datapoint) RemoveSubscribtion(client *wsModels.Client) { delete(d.Subscriptions, client) } diff --git a/models/dbm.go b/models/dbm.go index 2bc4a9d..ac9cdab 100644 --- a/models/dbm.go +++ b/models/dbm.go @@ -5,8 +5,6 @@ import ( "runtime" "time" - "maps" - "github.com/google/uuid" "github.com/tecamino/tecamino-dbm/utils" ws "github.com/tecamino/tecamino-dbm/websocket" @@ -16,7 +14,7 @@ import ( type DBM struct { Datapoints Datapoint - Uuids Uuids + Uuids *Uuids Conns *ws.ClientHandler Log *logging.Logger } @@ -25,7 +23,7 @@ var SystemDatapoints uuid.UUID func NewDBM(conns *ws.ClientHandler, log *logging.Logger) *DBM { return &DBM{ - Uuids: make(Uuids), + Uuids: &Uuids{}, Conns: conns, Log: log, } @@ -36,11 +34,7 @@ func (d *DBM) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.S return nil, nil } - dps, uuids, err := d.Datapoints.CreateDatapoints(sets...) - - //save uuid in seperate map for fast look up - maps.Copy(d.Uuids, uuids) - + dps, err := d.Datapoints.CreateDatapoints(d.Uuids, sets...) if err != nil { return nil, err } @@ -58,11 +52,10 @@ func (d *DBM) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.S func (d *DBM) ImportDatapoints(dps ...*Datapoint) error { for _, dp := range dps { - uuids, err := d.Datapoints.ImportDatapoint(dp, dp.Path) + err := d.Datapoints.ImportDatapoint(d.Uuids, dp, dp.Path) if err != nil { return err } - maps.Copy(d.Uuids, uuids) d.ModifyCountedDatapoints(1, false) } @@ -71,10 +64,10 @@ func (d *DBM) ImportDatapoints(dps ...*Datapoint) error { func (d *DBM) UpdateDatapointValue(value any, uid uuid.UUID, path ...string) error { if uid != uuid.Nil { - if _, ok := d.Uuids[uid]; !ok { + dp := d.Uuids.GetDatapoint(uid) + if dp == nil { return fmt.Errorf("uuid %s not found", uid.String()) } - dp := d.Uuids[uid] dp.Value = dp.Type.ConvertValue(value) dp.UpdateDateTime = time.Now().UnixMilli() dp.Publish(OnChange) @@ -90,11 +83,12 @@ func (d *DBM) UpdateDatapointValue(value any, uid uuid.UUID, path ...string) err func (d *DBM) RemoveDatapoint(sets ...json_dataModels.Set) (lsRemoved []json_dataModels.Set, err error) { for _, set := range sets { if set.Path == "" { - if dp, ok := d.Uuids[set.Uuid]; ok { + if dp := d.Uuids.GetDatapoint(set.Uuid); dp != nil { set.Path = dp.Path } } - lsRemoved, err = d.Datapoints.RemoveDatapoint(d.Conns, set) + + lsRemoved, err = d.Datapoints.RemoveDatapoint(set) if err != nil { return } @@ -105,11 +99,10 @@ func (d *DBM) RemoveDatapoint(sets ...json_dataModels.Set) (lsRemoved []json_dat func (d *DBM) QueryDatapoints(depth uint, uid uuid.UUID, key ...string) []*Datapoint { if uid != uuid.Nil { - if _, ok := d.Uuids[uid]; !ok { + dp := d.Uuids.GetDatapoint(uid) + if dp == nil { return nil - } - dp := d.Uuids[uid] - if depth == 1 { + } else if depth == 1 { return []*Datapoint{dp} } return append([]*Datapoint{}, dp.QueryDatapoints(depth, key[0])...) diff --git a/models/uuids.go b/models/uuids.go index 734de20..73070c0 100644 --- a/models/uuids.go +++ b/models/uuids.go @@ -1,5 +1,52 @@ package models -import "github.com/google/uuid" +import ( + "fmt" + + "github.com/google/uuid" + json_dataModels "github.com/tecamino/tecamino-json_data/models" +) type Uuids map[uuid.UUID]*Datapoint + +func NewUuids() *Uuids { + return &Uuids{} +} + +func (u *Uuids) AddDatapoint(parentDp, newDp *Datapoint) { + if odp, ok := (*u)[newDp.Uuid]; ok { + if odp.Path == newDp.Path { + return + } + newDp.Datapoints = odp.Datapoints + newDp.HasChild = len(odp.Datapoints) > 0 + odp.Datapoints = map[string]*Datapoint{} + newDp.RenamePaths(odp.Path) + rmDps, _ := parentDp.RemoveDatapoint(json_dataModels.Set{Path: odp.Path}) + datapoints := u.GetDatapointByPath("System:Datapoints") + datapoints.UpdateValue(datapoints.Value.(uint64) - uint64(len(rmDps))) + fmt.Println(11, newDp.HasChild) + } + + (*u)[newDp.Uuid] = newDp +} + +func (u *Uuids) GetDatapoint(uuid uuid.UUID) *Datapoint { + if dp, ok := (*u)[uuid]; ok { + return dp + } + return nil +} + +func (u *Uuids) GetDatapointByPath(path string) *Datapoint { + for _, dp := range *u { + if dp.Path == path { + return dp + } + } + return nil +} + +func (u *Uuids) RemoveDatapoint(uuid uuid.UUID) { + delete(*u, uuid) +} diff --git a/websocket/clientHandler.go b/websocket/clientHandler.go index 7379970..82eb4a8 100644 --- a/websocket/clientHandler.go +++ b/websocket/clientHandler.go @@ -38,6 +38,7 @@ func (cH *ClientHandler) ConnectNewClient(id string, c *gin.Context) (client *mo client, err = models.ConnectNewClient(id, c) client.OnClose = func(code int, reason string) { + fmt.Println(23, "closing", id) delete(cH.Clients, id) } diff --git a/websocket/models/client.go b/websocket/models/client.go index 031432f..7c0667f 100644 --- a/websocket/models/client.go +++ b/websocket/models/client.go @@ -17,10 +17,10 @@ var Broadcast Clients = make(Clients) const ( // Time allowed to write a message to the peer. - writeWait = 10 * time.Second + writeWait = 30 * time.Second // Time allowed to read the next pong message from the peer. - pongWait = 10 * time.Second + pongWait = 30 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 @@ -34,6 +34,7 @@ type Client struct { OnMessage func(data []byte) OnClose func(code int, reason string) OnError func(err error) + OnWarning func(warn string) OnPing func() OnPong func() send chan []byte @@ -177,7 +178,15 @@ func (c *Client) SendResponse(data []byte) { if !c.Connected { return } - c.send <- data + select { + case c.send <- data: + // sent successfully + default: + // channel full, drop or log + if c.OnWarning != nil { + c.OnWarning("Dropping message: channel full") + } + } } func (c *Client) Close(code int, reason string) error {