5 Commits

Author SHA1 Message Date
Adrian Zuercher
c3a3060129 improve websocket ping and remodel for rename datapoint 2025-07-25 18:26:36 +02:00
Adrian Zuercher
a23f82e9fe Merge branch 'main' of https://github.com/tecamino/tecamino-dbm
This is a neccesary pull merge
2025-07-23 09:15:01 +02:00
Adrian Zuercher
c37dd87a37 new respond to send all create change delete different bug fixes 2025-07-23 09:10:56 +02:00
Adrian Zuercher
8be5c80a22 add broadcast to all client and modify lingpong handler 2025-07-23 09:09:14 +02:00
zuadi
47a065aaf9 Update build.yml add flags to make smaller executable 2025-07-13 20:10:05 +02:00
13 changed files with 334 additions and 201 deletions

View File

@@ -36,9 +36,9 @@ jobs:
run: | run: |
mkdir -p build mkdir -p build
if [ "${{ matrix.goos }}" == "windows" ]; then if [ "${{ matrix.goos }}" == "windows" ]; then
GOOS=${{ matrix.goos }} GOARCH=${{ matrix.goarch }} go build -o build/tecamino-dbm-${{ matrix.goos }}-${{ matrix.goarch }}.exe main.go GOOS=${{ matrix.goos }} GOARCH=${{ matrix.goarch }} go build -ldflags="-s -w" -trimpath -o build/tecamino-dbm-${{ matrix.goos }}-${{ matrix.goarch }}.exe main.go
else else
GOOS=${{ matrix.goos }} GOARCH=${{ matrix.goarch }} go build -o build/tecamino-dbm-${{ matrix.goos }}-${{ matrix.goarch }} main.go GOOS=${{ matrix.goos }} GOARCH=${{ matrix.goarch }} go build -ldflags="-s -w" -trimpath -o build/tecamino-dbm-${{ matrix.goos }}-${{ matrix.goarch }} main.go
fi fi
- name: Upload artifacts - name: Upload artifacts

View File

@@ -1,6 +1,7 @@
package dbm package dbm
import ( import (
"fmt"
"net/http" "net/http"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -29,6 +30,7 @@ func (d *DBMHandler) Json_Data(c *gin.Context) {
if get.Query != nil { if get.Query != nil {
depth = get.Query.Depth depth = get.Query.Depth
} }
for _, res := range d.DBM.QueryDatapoints(depth, get.Uuid, get.Path) { for _, res := range d.DBM.QueryDatapoints(depth, get.Uuid, get.Path) {
resp.AddGet(json_dataModels.Get{ resp.AddGet(json_dataModels.Get{
Uuid: res.Uuid, Uuid: res.Uuid,
@@ -44,6 +46,10 @@ func (d *DBMHandler) Json_Data(c *gin.Context) {
} }
if len(payload.Set) > 0 { if len(payload.Set) > 0 {
resp.Set, err = d.DBM.CreateDatapoints(payload.Set...) resp.Set, err = d.DBM.CreateDatapoints(payload.Set...)
for i, o := range resp.Set {
fmt.Println(1000, i, o)
fmt.Println(1001, o.HasChild)
}
if err != nil { if err != nil {
r := json_data.NewResponse() r := json_data.NewResponse()
r.SetError() r.SetError()
@@ -52,6 +58,7 @@ func (d *DBMHandler) Json_Data(c *gin.Context) {
return return
} }
} }
c.JSON(200, resp) c.JSON(200, resp)
} }
@@ -67,9 +74,7 @@ func (d *DBMHandler) Delete(c *gin.Context) {
} }
response := json_data.NewResponse() response := json_data.NewResponse()
if payload.Set != nil { if payload.Set != nil {
response.Set, err = d.DBM.RemoveDatapoint(payload.Set...) response.Set, err = d.DBM.RemoveDatapoint(payload.Set...)
if err != nil { if err != nil {
r := json_data.NewResponse() r := json_data.NewResponse()

View File

@@ -30,7 +30,7 @@ func (d *DBMHandler) Set(req *json_dataModels.Request, id string) {
} }
for _, dp := range dps { for _, dp := range dps {
dp.UpdateValue(d.Conns, set.Value) dp.UpdateValue(set.Value)
resp.AddSet(json_dataModels.Set{ resp.AddSet(json_dataModels.Set{
Uuid: dp.Uuid, Uuid: dp.Uuid,

View File

@@ -49,6 +49,7 @@ func (d *DBMHandler) Subscribe(req *json_dataModels.Request, id string) {
Path: dp.Path, Path: dp.Path,
Value: dp.Value, Value: dp.Value,
HasChild: dp.Datapoints != nil, HasChild: dp.Datapoints != nil,
Rights: dp.ReadWrite.GetRights(),
Driver: sub.Driver, Driver: sub.Driver,
Drivers: &dp.Drivers, Drivers: &dp.Drivers,
}) })

View File

@@ -46,6 +46,10 @@ func (d *DBMHandler) WebSocket(c *gin.Context) {
d.Unsubscribe(request, id) d.Unsubscribe(request, id)
} }
client.OnWarning = func(s string) {
d.Log.Warning("dbmHandler.webSocket.Websocket", "warning on websocket connection: "+s)
}
client.OnError = func(err error) { client.OnError = func(err error) {
d.Log.Error("dbmHandler.webSocket.Websocket", "error on websocket connection: "+err.Error()) d.Log.Error("dbmHandler.webSocket.Websocket", "error on websocket connection: "+err.Error())
} }

2
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/gin-gonic/gin v1.10.0 github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3
github.com/tecamino/tecamino-json_data v0.0.17 github.com/tecamino/tecamino-json_data v0.0.21
github.com/tecamino/tecamino-logger v0.2.0 github.com/tecamino/tecamino-logger v0.2.0
) )

4
go.sum
View File

@@ -71,8 +71,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tecamino/tecamino-json_data v0.0.17 h1:M12UzKbfIgu/q3ERsEhGNDV3DYOvc0TioU288kQjDCA= github.com/tecamino/tecamino-json_data v0.0.21 h1:ZRN9wyn+p6J1T4b0e9xf1HStVaAy4wb+3yZ9xn5LFc0=
github.com/tecamino/tecamino-json_data v0.0.17/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY= github.com/tecamino/tecamino-json_data v0.0.21/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY=
github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU= github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU=
github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8= github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=

View File

@@ -18,7 +18,7 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
//save database after exeutabe ends //save database after executabe ends
defer dbmHandler.SaveDb() defer dbmHandler.SaveDb()
//initialize new server //initialize new server

View File

@@ -23,16 +23,17 @@ const (
) )
type Datapoint struct { type Datapoint struct {
Datapoints map[string]*Datapoint `json:"-"`
Uuid uuid.UUID `json:"uuid"` Uuid uuid.UUID `json:"uuid"`
Path string `json:"path"`
Value any `json:"value,omitempty"` Value any `json:"value,omitempty"`
Path string `json:"path"`
CreateDateTime int64 `json:"createDateTime,omitempty"` CreateDateTime int64 `json:"createDateTime,omitempty"`
UpdateDateTime int64 `json:"updateDateTime,omitempty"` UpdateDateTime int64 `json:"updateDateTime,omitempty"`
Type json_dataModels.Type `json:"type"` Datapoints map[string]*Datapoint `json:"-"`
ReadWrite json_dataModels.Rights `json:"readWrite"`
Drivers json_dataModels.Drivers `json:"drivers,omitempty"` Drivers json_dataModels.Drivers `json:"drivers,omitempty"`
Subscriptions Subscriptions `json:"-"` Subscriptions Subscriptions `json:"-"`
Type json_dataModels.Type `json:"type"`
ReadWrite json_dataModels.Rights `json:"readWrite"`
HasChild bool `json:"-"`
} }
func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) { func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) {
@@ -55,8 +56,8 @@ func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) {
changed = true changed = true
d.Value = d.Type.ConvertValue(set.Value) d.Value = d.Type.ConvertValue(set.Value)
} }
} }
if set.Rights != "" { if set.Rights != "" {
changed = true changed = true
d.ReadWrite = set.Rights.GetRights() d.ReadWrite = set.Rights.GetRights()
@@ -89,14 +90,15 @@ func (d *Datapoint) GetValueUint64() uint64 {
return utils.Uint64From(d.Value) return utils.Uint64From(d.Value)
} }
func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []json_dataModels.Set, uuids Uuids, err error) { func (d *Datapoint) CreateDatapoints(uuids *Uuids, sets ...json_dataModels.Set) (created []json_dataModels.Set, err error) {
if len(sets) == 0 { if len(sets) == 0 {
return return
} }
uuids = make(Uuids, 1) publishes := []json_dataModels.Publish{}
for _, dp := range sets { for _, dp := range sets {
parts := strings.Split(dp.Path, ":") parts := strings.Split(dp.Path, ":")
current := d current := d
@@ -104,13 +106,13 @@ func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []jso
if current.Datapoints == nil { if current.Datapoints == nil {
current.Datapoints = make(map[string]*Datapoint) current.Datapoints = make(map[string]*Datapoint)
} }
if i == len(parts)-1 { if i == len(parts)-1 {
// Leaf node: create or update datapoint // Leaf node: create or update datapoint
if existing, ok := current.Datapoints[part]; ok { if existing, ok := current.Datapoints[part]; ok {
publish, err := existing.Set("", dp)
_, err := existing.Set("", dp)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
created = append(created, json_dataModels.Set{ created = append(created, json_dataModels.Set{
Uuid: existing.Uuid, Uuid: existing.Uuid,
@@ -120,23 +122,33 @@ func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []jso
Rights: existing.ReadWrite, Rights: existing.ReadWrite,
Drivers: &existing.Drivers, Drivers: &existing.Drivers,
Updated: true, Updated: true,
HasChild: existing.HasChild,
}) })
if publish {
existing.Publish(OnChange) existing.Publish(OnChange)
}
} else { } else {
ndp := Datapoint{ var uid uuid.UUID = uuid.New()
Uuid: uuid.New(), if dp.Uuid != uuid.Nil {
uid = dp.Uuid
}
ndp := &Datapoint{
Uuid: uid,
CreateDateTime: time.Now().UnixMilli(), CreateDateTime: time.Now().UnixMilli(),
Subscriptions: InitSubscribtion(), Subscriptions: InitSubscribtion(),
} }
// Create new // Create new
current.Datapoints[part] = &ndp current.Datapoints[part] = ndp
publish, err := ndp.Set(strings.Join(parts, ":"), dp)
_, err := ndp.Set(strings.Join(parts, ":"), dp)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
//add uuid to flat map for faster lookup
uuids.AddDatapoint(current, ndp)
created = append(created, json_dataModels.Set{ created = append(created, json_dataModels.Set{
Uuid: ndp.Uuid, Uuid: ndp.Uuid,
Path: ndp.Path, Path: ndp.Path,
@@ -144,12 +156,16 @@ func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []jso
Value: ndp.Value, Value: ndp.Value,
Rights: ndp.ReadWrite, Rights: ndp.ReadWrite,
Driver: dp.Driver, Driver: dp.Driver,
HasChild: ndp.HasChild,
})
publishes = append(publishes, json_dataModels.Publish{
Event: OnCreate,
Uuid: ndp.Uuid,
Path: ndp.Path,
Type: ndp.Type,
Value: ndp.Value,
}) })
if publish {
current.Publish(OnChange)
}
//add uuid to flat map for faster lookuo
uuids[ndp.Uuid] = &ndp
} }
} }
@@ -166,34 +182,51 @@ func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []jso
Subscriptions: InitSubscribtion(), Subscriptions: InitSubscribtion(),
} }
//add uuid to flat map for faster lookup
uuids.AddDatapoint(current, newDp)
created = append(created, json_dataModels.Set{ created = append(created, json_dataModels.Set{
Uuid: newDp.Uuid, Uuid: newDp.Uuid,
Path: newDp.Path, Path: newDp.Path,
Type: newDp.Type, Type: newDp.Type,
Value: newDp.Value, Value: newDp.Value,
Rights: newDp.ReadWrite, Rights: newDp.ReadWrite,
HasChild: newDp.HasChild,
}) })
if dp.Rights != "" { if dp.Rights != "" {
newDp.ReadWrite = dp.Rights.GetRights() newDp.ReadWrite = dp.Rights.GetRights()
} }
publishes = append(publishes, json_dataModels.Publish{
Event: OnCreate,
Uuid: newDp.Uuid,
Path: newDp.Path,
Type: newDp.Type,
Value: newDp.Value,
})
current.Datapoints[part] = newDp current.Datapoints[part] = newDp
current = newDp current = newDp
}
}
}
//add uuid to flat map for faster lookuo r := json_data.NewResponse()
uuids[newDp.Uuid] = newDp r.Publish = append(r.Publish, publishes...)
}
} b, err := json.Marshal(r)
if err != nil {
return created, err
} }
ws.SendBroadcast(b)
return return
} }
func (d *Datapoint) ImportDatapoint(dp *Datapoint, path string) (uuids Uuids, err error) { func (d *Datapoint) ImportDatapoint(uuids *Uuids, dp *Datapoint, path string) (err error) {
parts := strings.Split(dp.Path, ":") parts := strings.Split(dp.Path, ":")
uuids = make(Uuids, 1)
current := d current := d
for i, part := range parts { for i, part := range parts {
if current.Datapoints == nil { if current.Datapoints == nil {
@@ -214,12 +247,13 @@ func (d *Datapoint) ImportDatapoint(dp *Datapoint, path string) (uuids Uuids, er
dp.UpdateDateTime = time.Now().UnixMilli() dp.UpdateDateTime = time.Now().UnixMilli()
dp.Subscriptions = InitSubscribtion() dp.Subscriptions = InitSubscribtion()
current.Datapoints[part] = dp current.Datapoints[part] = dp
//add uuid to flat map for faster lookuo //add uuid to flat map for faster lookup
uuids[dp.Uuid] = dp uuids.AddDatapoint(current, dp)
dp.Publish(OnChange) dp.Publish(OnChange)
} }
return uuids, nil return nil
} }
// Traverse or create intermediate nodes // Traverse or create intermediate nodes
@@ -236,10 +270,10 @@ func (d *Datapoint) ImportDatapoint(dp *Datapoint, path string) (uuids Uuids, er
current.Datapoints[part] = newDp current.Datapoints[part] = newDp
current = newDp current = newDp
//add uuid to flat map for faster lookup //add uuid to flat map for faster lookup
uuids[newDp.Uuid] = newDp uuids.AddDatapoint(current, newDp)
} }
} }
return uuids, nil return nil
} }
func (d *Datapoint) UpdateDatapointValue(value any, path string) error { func (d *Datapoint) UpdateDatapointValue(value any, path string) error {
@@ -263,57 +297,91 @@ func (d *Datapoint) UpdateDatapointValue(value any, path string) error {
return nil return nil
} }
func (d *Datapoint) UpdateValue(conns *ws.ClientHandler, value any) error { func (d *Datapoint) UpdateValue(value any) error {
d.Value = d.Type.ConvertValue(value) d.Value = d.Type.ConvertValue(value)
d.UpdateDateTime = time.Now().UnixMilli() d.UpdateDateTime = time.Now().UnixMilli()
d.Publish(OnChange) d.Publish(OnChange)
return nil return nil
} }
func (d *Datapoint) RemoveDatapoint(conns *ws.ClientHandler, set json_dataModels.Set) (json_dataModels.Set, error) { func (d *Datapoint) RemoveDatapoint(set json_dataModels.Set) (sets []json_dataModels.Set, err error) {
parts := strings.Split(set.Path, ":") parts := strings.Split(set.Path, ":")
if len(parts) < 1 { if len(parts) < 1 {
return json_dataModels.Set{}, fmt.Errorf("invalid path: '%s'", set.Path) return sets, fmt.Errorf("invalid path: '%s'", set.Path)
} }
publishes := []json_dataModels.Publish{}
current := d current := d
for i := 0; i < len(parts)-1; i++ { for i := 0; i < len(parts)-1; i++ {
next, ok := current.Datapoints[parts[i]] next, ok := current.Datapoints[parts[i]]
if !ok { if !ok {
return json_dataModels.Set{}, fmt.Errorf("path not found: '%s'", strings.Join(parts[:i+1], ":")) return sets, fmt.Errorf("path not found: '%s'", strings.Join(parts[:i+1], ":"))
} }
current = next current = next
} }
toDelete := parts[len(parts)-1] toDelete := parts[len(parts)-1]
if dp, ok := current.Datapoints[toDelete]; ok { if dp, ok := current.Datapoints[toDelete]; ok {
dp.Publish(OnDelete) s, p := removeChildren(dp)
sets = append(sets, s...)
publishes = append(publishes, p...)
publishes = append(publishes, json_dataModels.Publish{
Event: OnDelete,
Uuid: dp.Uuid,
Path: dp.Path,
})
sets = append(sets, json_dataModels.Set{
Uuid: dp.Uuid,
Path: dp.Path,
})
delete(current.Datapoints, toDelete) delete(current.Datapoints, toDelete)
return json_dataModels.Set{
Uuid: set.Uuid, r := json_data.NewResponse()
Path: set.Path, r.Publish = append(r.Publish, publishes...)
}, nil
b, err := json.Marshal(r)
if err != nil {
return sets, err
} }
return json_dataModels.Set{}, fmt.Errorf("datapoint '%s' not found", set.Path) ws.SendBroadcast(b)
return sets, nil
}
return sets, fmt.Errorf("datapoint '%s' not found", set.Path)
}
// removes all children and grandchlidren of datapoint
func removeChildren(dp *Datapoint) (sets []json_dataModels.Set, pubs []json_dataModels.Publish) {
for name, d := range dp.Datapoints {
s, p := removeChildren(d)
sets = append(sets, s...)
pubs = append(pubs, p...)
sets = append(sets, json_dataModels.Set{
Uuid: d.Uuid,
Path: d.Path,
})
delete(d.Datapoints, name)
}
return sets, pubs
} }
func (d *Datapoint) GetAllDatapoints(depth uint) (dps Datapoints) { func (d *Datapoint) GetAllDatapoints(depth uint) (dps Datapoints) {
var dfs func(dp *Datapoint, currentDepth uint) dps = append(dps, d)
dfs = func(dp *Datapoint, currentDepth uint) {
if depth == 1 { if depth == 1 {
return return
} else if depth == 0 { } else if depth > 0 {
depth--
}
var dfs func(dp *Datapoint, currentDepth uint)
dfs = func(dp *Datapoint, currentDepth uint) {
switch depth {
case 0:
// Return all descendants // Return all descendants
for _, child := range dp.Datapoints { case currentDepth:
dps = append(dps, child)
dfs(child, currentDepth+1)
}
return
}
if currentDepth == depth-1 {
return return
} }
@@ -322,7 +390,7 @@ func (d *Datapoint) GetAllDatapoints(depth uint) (dps Datapoints) {
dfs(child, currentDepth+1) dfs(child, currentDepth+1)
} }
} }
dps = append(dps, d)
dfs(d, 0) dfs(d, 0)
dps.SortSlice() dps.SortSlice()
return return
@@ -374,12 +442,40 @@ func (d *Datapoint) AddSubscribtion(conn *wsModels.Client, sub json_dataModels.S
} }
} }
func (d *Datapoint) RenamePaths(oldPath string) {
visited := make(map[*Datapoint]bool)
if len(d.Datapoints) == 0 {
return
}
for _, dp := range d.Datapoints {
dp.Path = strings.Replace(dp.Path, oldPath, d.Path, 1)
dp.renameSubPaths(oldPath, d.Path, visited)
}
}
func (d *Datapoint) renameSubPaths(oldPath, newPath string, visited map[*Datapoint]bool) {
if visited[d] {
return
}
visited[d] = true
if len(d.Datapoints) == 0 {
return
}
for _, dp := range d.Datapoints {
dp.Path = strings.Replace(dp.Path, oldPath, newPath, 1)
dp.renameSubPaths(oldPath, newPath, visited)
}
}
func (d *Datapoint) RemoveSubscribtion(client *wsModels.Client) { func (d *Datapoint) RemoveSubscribtion(client *wsModels.Client) {
delete(d.Subscriptions, client) delete(d.Subscriptions, client)
} }
func (d *Datapoint) Publish(eventType string) error { func (d *Datapoint) Publish(eventType string) error {
for client := range d.Subscriptions {
r := json_data.NewResponse() r := json_data.NewResponse()
r.AddPublish(json_dataModels.Publish{ r.AddPublish(json_dataModels.Publish{
Event: eventType, Event: eventType,
@@ -393,8 +489,13 @@ func (d *Datapoint) Publish(eventType string) error {
if err != nil { if err != nil {
return err return err
} }
switch eventType {
case OnCreate, OnDelete:
ws.SendBroadcast(b)
default:
for client := range d.Subscriptions {
client.SendResponse(b) client.SendResponse(b)
} }
}
return nil return nil
} }

View File

@@ -5,8 +5,6 @@ import (
"runtime" "runtime"
"time" "time"
"maps"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/tecamino/tecamino-dbm/utils" "github.com/tecamino/tecamino-dbm/utils"
ws "github.com/tecamino/tecamino-dbm/websocket" ws "github.com/tecamino/tecamino-dbm/websocket"
@@ -16,7 +14,7 @@ import (
type DBM struct { type DBM struct {
Datapoints Datapoint Datapoints Datapoint
Uuids Uuids Uuids *Uuids
Conns *ws.ClientHandler Conns *ws.ClientHandler
Log *logging.Logger Log *logging.Logger
} }
@@ -25,7 +23,7 @@ var SystemDatapoints uuid.UUID
func NewDBM(conns *ws.ClientHandler, log *logging.Logger) *DBM { func NewDBM(conns *ws.ClientHandler, log *logging.Logger) *DBM {
return &DBM{ return &DBM{
Uuids: make(Uuids), Uuids: &Uuids{},
Conns: conns, Conns: conns,
Log: log, Log: log,
} }
@@ -36,11 +34,7 @@ func (d *DBM) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.S
return nil, nil return nil, nil
} }
dps, uuids, err := d.Datapoints.CreateDatapoints(sets...) dps, err := d.Datapoints.CreateDatapoints(d.Uuids, sets...)
//save uuid in seperate map for fast look up
maps.Copy(d.Uuids, uuids)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -58,11 +52,10 @@ func (d *DBM) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.S
func (d *DBM) ImportDatapoints(dps ...*Datapoint) error { func (d *DBM) ImportDatapoints(dps ...*Datapoint) error {
for _, dp := range dps { for _, dp := range dps {
uuids, err := d.Datapoints.ImportDatapoint(dp, dp.Path) err := d.Datapoints.ImportDatapoint(d.Uuids, dp, dp.Path)
if err != nil { if err != nil {
return err return err
} }
maps.Copy(d.Uuids, uuids)
d.ModifyCountedDatapoints(1, false) d.ModifyCountedDatapoints(1, false)
} }
@@ -71,10 +64,10 @@ func (d *DBM) ImportDatapoints(dps ...*Datapoint) error {
func (d *DBM) UpdateDatapointValue(value any, uid uuid.UUID, path ...string) error { func (d *DBM) UpdateDatapointValue(value any, uid uuid.UUID, path ...string) error {
if uid != uuid.Nil { if uid != uuid.Nil {
if _, ok := d.Uuids[uid]; !ok { dp := d.Uuids.GetDatapoint(uid)
if dp == nil {
return fmt.Errorf("uuid %s not found", uid.String()) return fmt.Errorf("uuid %s not found", uid.String())
} }
dp := d.Uuids[uid]
dp.Value = dp.Type.ConvertValue(value) dp.Value = dp.Type.ConvertValue(value)
dp.UpdateDateTime = time.Now().UnixMilli() dp.UpdateDateTime = time.Now().UnixMilli()
dp.Publish(OnChange) dp.Publish(OnChange)
@@ -87,27 +80,29 @@ func (d *DBM) UpdateDatapointValue(value any, uid uuid.UUID, path ...string) err
return d.Datapoints.UpdateDatapointValue(value, path[0]) return d.Datapoints.UpdateDatapointValue(value, path[0])
} }
func (d *DBM) RemoveDatapoint(sets ...json_dataModels.Set) ([]json_dataModels.Set, error) { func (d *DBM) RemoveDatapoint(sets ...json_dataModels.Set) (lsRemoved []json_dataModels.Set, err error) {
var lsRemoved []json_dataModels.Set
for _, set := range sets { for _, set := range sets {
removed, err := d.Datapoints.RemoveDatapoint(d.Conns, set) if set.Path == "" {
if err != nil { if dp := d.Uuids.GetDatapoint(set.Uuid); dp != nil {
return lsRemoved, err set.Path = dp.Path
} }
lsRemoved = append(lsRemoved, removed)
d.ModifyCountedDatapoints(1, true)
} }
return lsRemoved, nil lsRemoved, err = d.Datapoints.RemoveDatapoint(set)
if err != nil {
return
}
d.ModifyCountedDatapoints(uint64(len(lsRemoved)), true)
}
return
} }
func (d *DBM) QueryDatapoints(depth uint, uid uuid.UUID, key ...string) []*Datapoint { func (d *DBM) QueryDatapoints(depth uint, uid uuid.UUID, key ...string) []*Datapoint {
if uid != uuid.Nil { if uid != uuid.Nil {
if _, ok := d.Uuids[uid]; !ok { dp := d.Uuids.GetDatapoint(uid)
if dp == nil {
return nil return nil
} } else if depth == 1 {
dp := d.Uuids[uid]
if depth == 1 {
return []*Datapoint{dp} return []*Datapoint{dp}
} }
return append([]*Datapoint{}, dp.QueryDatapoints(depth, key[0])...) return append([]*Datapoint{}, dp.QueryDatapoints(depth, key[0])...)
@@ -159,8 +154,6 @@ func (d *DBM) GoSystemTime() error {
func (d *DBM) GoSystemMemory() error { func (d *DBM) GoSystemMemory() error {
path := "System:UsedMemory" path := "System:UsedMemory"
var m runtime.MemStats
var mOld uint64
typ := json_dataModels.STR typ := json_dataModels.STR
rights := json_dataModels.Read rights := json_dataModels.Read
@@ -172,14 +165,12 @@ func (d *DBM) GoSystemMemory() error {
go func() { go func() {
for { for {
var m runtime.MemStats
runtime.ReadMemStats(&m) runtime.ReadMemStats(&m)
if m.Sys != mOld { mem := fmt.Sprintf("%.2f MB", float64(m.Alloc)/1024/1024)
mem := fmt.Sprintf("%.2f MB", float64(m.Sys)/1024/1024)
if er := d.UpdateDatapointValue(mem, uuid.Nil, path); er != nil { if er := d.UpdateDatapointValue(mem, uuid.Nil, path); er != nil {
d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error()) d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error())
} }
mOld = m.Sys
}
time.Sleep(time.Second) time.Sleep(time.Second)
} }
}() }()

View File

@@ -1,5 +1,52 @@
package models package models
import "github.com/google/uuid" import (
"fmt"
"github.com/google/uuid"
json_dataModels "github.com/tecamino/tecamino-json_data/models"
)
type Uuids map[uuid.UUID]*Datapoint type Uuids map[uuid.UUID]*Datapoint
func NewUuids() *Uuids {
return &Uuids{}
}
func (u *Uuids) AddDatapoint(parentDp, newDp *Datapoint) {
if odp, ok := (*u)[newDp.Uuid]; ok {
if odp.Path == newDp.Path {
return
}
newDp.Datapoints = odp.Datapoints
newDp.HasChild = len(odp.Datapoints) > 0
odp.Datapoints = map[string]*Datapoint{}
newDp.RenamePaths(odp.Path)
rmDps, _ := parentDp.RemoveDatapoint(json_dataModels.Set{Path: odp.Path})
datapoints := u.GetDatapointByPath("System:Datapoints")
datapoints.UpdateValue(datapoints.Value.(uint64) - uint64(len(rmDps)))
fmt.Println(11, newDp.HasChild)
}
(*u)[newDp.Uuid] = newDp
}
func (u *Uuids) GetDatapoint(uuid uuid.UUID) *Datapoint {
if dp, ok := (*u)[uuid]; ok {
return dp
}
return nil
}
func (u *Uuids) GetDatapointByPath(path string) *Datapoint {
for _, dp := range *u {
if dp.Path == path {
return dp
}
}
return nil
}
func (u *Uuids) RemoveDatapoint(uuid uuid.UUID) {
delete(*u, uuid)
}

View File

@@ -17,6 +17,12 @@ type ClientHandler struct {
Clients models.Clients Clients models.Clients
} }
func SendBroadcast(msg []byte) {
for _, c := range models.Broadcast {
c.SendResponse(msg)
}
}
// initaiates new conections with client map // initaiates new conections with client map
func NewConnectionHandler() *ClientHandler { func NewConnectionHandler() *ClientHandler {
return &ClientHandler{ return &ClientHandler{
@@ -32,6 +38,7 @@ func (cH *ClientHandler) ConnectNewClient(id string, c *gin.Context) (client *mo
client, err = models.ConnectNewClient(id, c) client, err = models.ConnectNewClient(id, c)
client.OnClose = func(code int, reason string) { client.OnClose = func(code int, reason string) {
fmt.Println(23, "closing", id)
delete(cH.Clients, id) delete(cH.Clients, id)
} }
@@ -41,7 +48,6 @@ func (cH *ClientHandler) ConnectNewClient(id string, c *gin.Context) (client *mo
cH.Lock() cH.Lock()
cH.Clients[id] = client cH.Clients[id] = client
cH.Unlock() cH.Unlock()
return client, nil return client, nil
} }

View File

@@ -1,10 +1,10 @@
package models package models
import ( import (
"encoding/json"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"slices"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -13,12 +13,14 @@ import (
var Origins []string = []string{"*"} var Origins []string = []string{"*"}
var Broadcast Clients = make(Clients)
const ( const (
// Time allowed to write a message to the peer. // Time allowed to write a message to the peer.
writeWait = 10 * time.Second writeWait = 30 * time.Second
// Time allowed to read the next pong message from the peer. // Time allowed to read the next pong message from the peer.
pongWait = 10 * time.Second pongWait = 30 * time.Second
// Send pings to peer with this period. Must be less than pongWait. // Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10 pingPeriod = (pongWait * 9) / 10
@@ -32,9 +34,9 @@ type Client struct {
OnMessage func(data []byte) OnMessage func(data []byte)
OnClose func(code int, reason string) OnClose func(code int, reason string)
OnError func(err error) OnError func(err error)
OnWarning func(warn string)
OnPing func() OnPing func()
OnPong func() OnPong func()
sendPong chan string
send chan []byte send chan []byte
unregister chan []byte unregister chan []byte
} }
@@ -43,17 +45,10 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { CheckOrigin: func(r *http.Request) bool {
if len(Origins) == 0 { if len(Origins) == 0 {
return false return false
} } else if Origins[0] == "*" {
if Origins[0] == "*" {
return true return true
} }
origin := r.Header.Get("Origin") return slices.Contains(Origins, r.Header.Get("Origin"))
for _, o := range Origins {
if o == origin {
return true
}
}
return false
}, },
EnableCompression: false, EnableCompression: false,
} }
@@ -68,22 +63,25 @@ func ConnectNewClient(id string, c *gin.Context) (*Client, error) {
Id: id, Id: id,
Connected: true, Connected: true,
conn: conn, conn: conn,
sendPong: make(chan string), send: make(chan []byte, 512),
send: make(chan []byte), unregister: make(chan []byte, 256),
unregister: make(chan []byte),
} }
Broadcast[client.Id] = client
conn.SetPingHandler(func(appData string) error { conn.SetPingHandler(func(appData string) error {
if client.OnPing != nil { if client.OnPing != nil {
client.OnPing() client.OnPing()
} }
conn.SetWriteDeadline(time.Now().Add(writeWait)) conn.SetWriteDeadline(time.Now().Add(writeWait))
conn.SetReadDeadline(time.Now().Add(writeWait)) conn.SetReadDeadline(time.Now().Add(writeWait))
client.sendPong <- appData if err := client.conn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(pongWait)); err != nil {
client.OnError(err)
}
return nil return nil
}) })
conn.SetPongHandler(func(string) error { conn.SetPongHandler(func(appData string) error {
conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetReadDeadline(time.Now().Add(pongWait))
if client.OnPong != nil { if client.OnPong != nil {
client.OnPong() client.OnPong()
@@ -94,6 +92,7 @@ func ConnectNewClient(id string, c *gin.Context) (*Client, error) {
// Start reading messages from client // Start reading messages from client
go client.Read() go client.Read()
go client.Write() go client.Write()
go client.PingInterval(pingPeriod)
return client, nil return client, nil
} }
@@ -105,6 +104,7 @@ func (c *Client) Read() {
c.conn.SetReadDeadline(time.Now().Add(writeWait)) c.conn.SetReadDeadline(time.Now().Add(writeWait))
for c.Connected { for c.Connected {
msgType, msg, err := c.conn.ReadMessage() msgType, msg, err := c.conn.ReadMessage()
if err != nil { if err != nil {
c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err)) c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err))
return return
@@ -114,13 +114,11 @@ func (c *Client) Read() {
c.Close(websocket.CloseNormalClosure, "Client closed") c.Close(websocket.CloseNormalClosure, "Client closed")
return return
case websocket.TextMessage: case websocket.TextMessage:
if isPing := c.handleJsonPing(msg); !isPing {
if c.OnMessage != nil { if c.OnMessage != nil {
c.OnMessage(msg) c.OnMessage(msg)
} else { } else {
log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg)) log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg))
} }
}
default: default:
log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id) log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id)
} }
@@ -128,19 +126,15 @@ func (c *Client) Read() {
} }
func (c *Client) Write() { func (c *Client) Write() {
ticker := time.NewTicker(pingPeriod) defer c.conn.Close()
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
for {
select { select {
case message, ok := <-c.send: case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok { if !ok {
// The hub closed the channel. // The hub closed the channel.
if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil { if err := c.conn.WriteMessage(websocket.CloseMessage, []byte("ping")); err != nil {
c.handleError(err) c.handleError(err)
return return
} }
@@ -152,26 +146,11 @@ func (c *Client) Write() {
return return
} }
} }
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.handleError(err)
return
}
if c.OnPing != nil {
c.OnPing()
}
case message, ok := <-c.sendPong:
if ok {
c.conn.WriteMessage(websocket.PongMessage, []byte(message))
}
case message := <-c.unregister: case message := <-c.unregister:
c.conn.WriteMessage(websocket.CloseMessage, message) c.conn.WriteMessage(websocket.CloseMessage, message)
c.Connected = false c.Connected = false
close(c.sendPong)
close(c.send) close(c.send)
delete(Broadcast, c.Id)
close(c.unregister) close(c.unregister)
return return
} }
@@ -179,42 +158,40 @@ func (c *Client) Write() {
} }
} }
func (c *Client) handleJsonPing(msg []byte) (isPing bool) { func (c *Client) PingInterval(interval time.Duration) {
var wsMsg WSMessage ticker := time.NewTicker(interval)
err := json.Unmarshal(msg, &wsMsg) defer ticker.Stop()
if err == nil && wsMsg.IsPing() { for range ticker.C {
c.conn.SetReadDeadline(time.Now().Add(writeWait))
// Respond with pong JSON
select {
case c.send <- GetPongByteSlice():
default:
// optional: log or handle if send buffer is full
c.handleError(fmt.Errorf("failed to queue pong message"))
return
}
if err != nil {
c.handleError(fmt.Errorf("write pong error: %w", err))
return
}
if c.OnPing != nil { if c.OnPing != nil {
c.OnPing() c.OnPing()
} }
isPing = true
}
if err := c.conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(pongWait)); err != nil {
c.OnError(err)
return return
}
}
} }
func (c *Client) SendResponse(data []byte) { func (c *Client) SendResponse(data []byte) {
if !c.Connected { if !c.Connected {
return return
} }
c.send <- data select {
case c.send <- data:
// sent successfully
default:
// channel full, drop or log
if c.OnWarning != nil {
c.OnWarning("Dropping message: channel full")
}
}
} }
func (c *Client) Close(code int, reason string) error { func (c *Client) Close(code int, reason string) error {
closeMsg := websocket.FormatCloseMessage(code, reason) closeMsg := websocket.FormatCloseMessage(code, reason)
select { select {
case c.unregister <- closeMsg: // Attempt to send case c.unregister <- closeMsg: // Attempt to send
default: // If the channel is full, this runs default: // If the channel is full, this runs
@@ -230,6 +207,7 @@ func (c *Client) handleError(err error) {
if c.OnError != nil { if c.OnError != nil {
c.OnError(err) c.OnError(err)
} }
if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil { if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil {
if c.OnError != nil { if c.OnError != nil {
c.OnError(err) c.OnError(err)