modify websocketserver to broker with callback functions

This commit is contained in:
Adrian Zürcher
2025-06-19 19:22:23 +02:00
parent 9605b50198
commit 659cbe4072
19 changed files with 367 additions and 266 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
.DS_Store .DS_Store
*.dbm *.dbm
*.log *.log
# local executables
dbm-arm64

View File

@@ -11,13 +11,13 @@ import (
"github.com/tecamino/tecamino-dbm/args" "github.com/tecamino/tecamino-dbm/args"
"github.com/tecamino/tecamino-dbm/models" "github.com/tecamino/tecamino-dbm/models"
serverModels "github.com/tecamino/tecamino-dbm/server/models" ws "github.com/tecamino/tecamino-dbm/websocket"
"github.com/tecamino/tecamino-logger/logging" "github.com/tecamino/tecamino-logger/logging"
) )
type DBMHandler struct { type DBMHandler struct {
DBM *models.DBM DBM *models.DBM
Conns *serverModels.Connections Conns *ws.ClientHandler
sync.RWMutex sync.RWMutex
Log *logging.Logger Log *logging.Logger
arg *args.Args arg *args.Args
@@ -43,7 +43,7 @@ func NewDbmHandler(a *args.Args) (*DBMHandler, error) {
logger.Info("main", "start dma handler") logger.Info("main", "start dma handler")
//initialize connection map //initialize connection map
conns := serverModels.NewConnections() conns := ws.NewConnectionHandler()
// Initialize dtabase manager handler // Initialize dtabase manager handler
dmaHandler := DBMHandler{ dmaHandler := DBMHandler{
@@ -74,8 +74,8 @@ func NewDbmHandler(a *args.Args) (*DBMHandler, error) {
var line int var line int
for scanner.Scan() { for scanner.Scan() {
line++ line++
dp := models.Datapoint{} dp := &models.Datapoint{}
if err = json.Unmarshal(scanner.Bytes(), &dp); err != nil { if err = json.Unmarshal(scanner.Bytes(), dp); err != nil {
dmaHandler.Log.Error("dmbHandler.NewDmbHandler", "error in line "+fmt.Sprint(line)+" "+scanner.Text()) dmaHandler.Log.Error("dmbHandler.NewDmbHandler", "error in line "+fmt.Sprint(line)+" "+scanner.Text())
dmaHandler.Log.Error("dmbHandler.NewDmbHandler", err.Error()) dmaHandler.Log.Error("dmbHandler.NewDmbHandler", err.Error())

View File

@@ -51,7 +51,6 @@ func (d *DBMHandler) Json_Data(c *gin.Context) {
return return
} }
} }
c.JSON(200, resp) c.JSON(200, resp)
} }

View File

@@ -36,7 +36,14 @@ func (d *DBMHandler) Subscribe(req *json_dataModels.Request, id string) {
continue continue
} }
} }
dp.AddSubscribtion(id, sub)
client := d.DBM.Conns.GetClient(id)
if client == nil {
d.Log.Warning("subscribe", "id "+id+" not found")
continue
}
dp.AddSubscribtion(client, sub)
resp.AddSubscription(json_dataModels.Subscription{ resp.AddSubscription(json_dataModels.Subscription{
Uuid: dp.Uuid, Uuid: dp.Uuid,
Path: dp.Path, Path: dp.Path,
@@ -68,10 +75,17 @@ func (d *DBMHandler) Unsubscribe(req *json_dataModels.Request, id string) {
for _, sub := range req.Unsubscribe { for _, sub := range req.Unsubscribe {
for _, dp := range d.DBM.QueryDatapoints(sub.Depth, sub.Uuid, sub.Path) { for _, dp := range d.DBM.QueryDatapoints(sub.Depth, sub.Uuid, sub.Path) {
if _, ok := dp.Subscriptions[id]; !ok {
client := d.DBM.Conns.GetClient(id)
if client == nil {
d.Log.Warning("subscribe", "id "+id+" not found")
continue continue
} }
dp.RemoveSubscribtion(id)
if _, ok := dp.Subscriptions[client]; !ok {
continue
}
dp.RemoveSubscribtion(client)
resp.AddUnsubscription(json_dataModels.Subscription{ resp.AddUnsubscription(json_dataModels.Subscription{
Uuid: dp.Uuid, Uuid: dp.Uuid,
Path: dp.Path, Path: dp.Path,

View File

@@ -2,11 +2,7 @@ package dbm
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt"
"io"
"github.com/coder/websocket"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/tecamino/tecamino-dbm/auth" "github.com/tecamino/tecamino-dbm/auth"
json_dataModels "github.com/tecamino/tecamino-json_data/models" json_dataModels "github.com/tecamino/tecamino-json_data/models"
@@ -26,76 +22,33 @@ func (d *DBMHandler) WebSocket(c *gin.Context) {
} }
d.Log.Debug("dbmHandler.webSocket.Websocket", "authorization id token: "+id) d.Log.Debug("dbmHandler.webSocket.Websocket", "authorization id token: "+id)
err = d.Conns.ConnectRecievingWsConnection(id, c) client, err := d.Conns.ConnectNewClient(id, c)
defer d.Conns.RemoveClient(id)
if err != nil { if err != nil {
d.Log.Error("dbmHandler.webSocket.Websocket", "error connecting recieving websocket conection: "+err.Error()) d.Log.Error("dbmHandler.webSocket.Websocket", err)
return return
} }
defer d.Conns.DisconnectWsConnection(id, websocket.StatusInternalError, "Internal error")
//Read loop client.OnMessage = func(data []byte) {
for { request, err := d.readJsonData(data)
request, err := d.readJsonData(id)
if err != nil { if err != nil {
d.Log.Error("websocket.WebSocket", err.Error()) d.Log.Error("dbmHandler.webSocket.Websocket", "read json: "+err.Error())
break
} }
// Sets // Sets
d.Get(request, id) d.Get(request, id)
// Sets // Sets
d.Set(request, id) d.Set(request, id)
// Subscribe // Subscribe
d.Subscribe(request, id) d.Subscribe(request, id)
// Unsubscribe // Unsubscribe
d.Unsubscribe(request, id) d.Unsubscribe(request, id)
}
request.Get = make([]json_dataModels.Get, 0) client.OnError = func(err error) {
request.Set = make([]json_dataModels.Set, 0) d.Log.Error("dbmHandler.webSocket.Websocket", "error on websocket connection: "+err.Error())
request.Subscribe = make([]json_dataModels.Subscription, 0)
request.Unsubscribe = make([]json_dataModels.Subscription, 0)
request = nil
} }
} }
func (d *DBMHandler) readJsonData(id string) (request *json_dataModels.Request, err error) { func (d *DBMHandler) readJsonData(data []byte) (request *json_dataModels.Request, err error) {
err = json.Unmarshal(data, &request)
client, ok := d.Conns.Clients[id]
if !ok {
return request, errors.New("client id not found")
}
_, reader, err := client.Conn.Reader(client.Ctx)
if err != nil {
d.Log.Info("webSocket.readJsonData", fmt.Sprintf("WebSocket reader: %v\n", err))
return request, nil
}
b, err := io.ReadAll(reader)
if err != nil {
code := websocket.CloseStatus(err)
switch code {
case websocket.StatusNormalClosure,
websocket.StatusGoingAway,
websocket.StatusNoStatusRcvd:
d.Log.Info("webSocket.readJsonData", fmt.Sprintf("WebSocket closed: %v (code: %v)\n", err, code))
return
default:
d.Log.Error("webSocket.readJsonData", fmt.Sprintf("WebSocket read error: %v (code: %v)\n", err, code))
return
}
}
if err := json.Unmarshal(b, &request); err != nil {
return request, err
}
return return
} }

4
go.mod
View File

@@ -3,11 +3,11 @@ module github.com/tecamino/tecamino-dbm
go 1.24.0 go 1.24.0
require ( require (
github.com/coder/websocket v1.8.13
github.com/gin-contrib/cors v1.7.5 github.com/gin-contrib/cors v1.7.5
github.com/gin-gonic/gin v1.10.0 github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/tecamino/tecamino-json_data v0.0.15 github.com/gorilla/websocket v1.5.3
github.com/tecamino/tecamino-json_data v0.0.16
github.com/tecamino/tecamino-logger v0.2.0 github.com/tecamino/tecamino-logger v0.2.0
) )

8
go.sum
View File

@@ -6,8 +6,6 @@ github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFos
github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4=
github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -34,6 +32,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -71,8 +71,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tecamino/tecamino-json_data v0.0.15 h1:r8S6Ls/MMKTdsqUWd3iVDW6Zm5I5H9UCqNAW9wid02E= github.com/tecamino/tecamino-json_data v0.0.16 h1:aZFxnhm4g6WMDPoqy4HosUk7vl0DB0iIcVs8bbT4MzU=
github.com/tecamino/tecamino-json_data v0.0.15/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY= github.com/tecamino/tecamino-json_data v0.0.16/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY=
github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU= github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU=
github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8= github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=

View File

@@ -6,7 +6,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/tecamino/tecamino-dbm/args" "github.com/tecamino/tecamino-dbm/args"
"github.com/tecamino/tecamino-dbm/dbm" "github.com/tecamino/tecamino-dbm/dbm"
"github.com/tecamino/tecamino-dbm/server" ws "github.com/tecamino/tecamino-dbm/websocket"
) )
func main() { func main() {
@@ -23,7 +23,7 @@ func main() {
//initialize new server //initialize new server
dbmHandler.Log.Debug("main", "initialize new server instance") dbmHandler.Log.Debug("main", "initialize new server instance")
s := server.NewServer(a.AllowOrigins) s := ws.NewServer(a.AllowOrigins)
//set routes //set routes
dbmHandler.Log.Debug("main", "setting routes") dbmHandler.Log.Debug("main", "setting routes")

View File

@@ -1,15 +1,18 @@
package models package models
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
serverModels "github.com/tecamino/tecamino-dbm/server/models"
"github.com/tecamino/tecamino-dbm/utils" "github.com/tecamino/tecamino-dbm/utils"
ws "github.com/tecamino/tecamino-dbm/websocket"
wsModels "github.com/tecamino/tecamino-dbm/websocket/models"
json_data "github.com/tecamino/tecamino-json_data" json_data "github.com/tecamino/tecamino-json_data"
json_dataModels "github.com/tecamino/tecamino-json_data/models" json_dataModels "github.com/tecamino/tecamino-json_data/models"
) )
@@ -31,6 +34,7 @@ type Datapoint struct {
ReadWrite json_dataModels.Rights `json:"readWrite"` ReadWrite json_dataModels.Rights `json:"readWrite"`
Drivers json_dataModels.Drivers `json:"drivers,omitempty"` Drivers json_dataModels.Drivers `json:"drivers,omitempty"`
Subscriptions Subscriptions `json:"-"` Subscriptions Subscriptions `json:"-"`
sync.RWMutex `json:"-"`
} }
func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) { func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) {
@@ -87,7 +91,7 @@ func (d *Datapoint) GetValueUint64() uint64 {
return utils.Uint64From(d.Value) return utils.Uint64From(d.Value)
} }
func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...json_dataModels.Set) (created []json_dataModels.Set, uuids Uuids, err error) { func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []json_dataModels.Set, uuids Uuids, err error) {
if len(sets) == 0 { if len(sets) == 0 {
return return
} }
@@ -121,7 +125,7 @@ func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...js
}) })
if publish { if publish {
existing.Publish(conns, OnChange) existing.Publish(OnChange)
} }
} else { } else {
ndp := Datapoint{ ndp := Datapoint{
@@ -144,7 +148,7 @@ func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...js
Driver: dp.Driver, Driver: dp.Driver,
}) })
if publish { if publish {
current.Publish(conns, OnChange) current.Publish(OnChange)
} }
//add uuid to flat map for faster lookuo //add uuid to flat map for faster lookuo
uuids[ndp.Uuid] = &ndp uuids[ndp.Uuid] = &ndp
@@ -187,7 +191,7 @@ func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...js
return return
} }
func (d *Datapoint) ImportDatapoint(conns *serverModels.Connections, dp Datapoint, path string) (uuids Uuids, err error) { func (d *Datapoint) ImportDatapoint(dp *Datapoint, path string) (uuids Uuids, err error) {
parts := strings.Split(dp.Path, ":") parts := strings.Split(dp.Path, ":")
uuids = make(Uuids, 1) uuids = make(Uuids, 1)
@@ -205,16 +209,16 @@ func (d *Datapoint) ImportDatapoint(conns *serverModels.Connections, dp Datapoin
existing.Value = dp.Type.ConvertValue(dp.Value) existing.Value = dp.Type.ConvertValue(dp.Value)
existing.ReadWrite = dp.ReadWrite.GetRights() existing.ReadWrite = dp.ReadWrite.GetRights()
existing.UpdateDateTime = time.Now().UnixMilli() existing.UpdateDateTime = time.Now().UnixMilli()
dp.Publish(conns, OnChange) dp.Publish(OnChange)
} else { } else {
dp.Path = strings.Join(parts, ":") dp.Path = strings.Join(parts, ":")
dp.ReadWrite = dp.ReadWrite.GetRights() dp.ReadWrite = dp.ReadWrite.GetRights()
dp.UpdateDateTime = time.Now().UnixMilli() dp.UpdateDateTime = time.Now().UnixMilli()
dp.Subscriptions = InitSubscribtion() dp.Subscriptions = InitSubscribtion()
current.Datapoints[part] = &dp current.Datapoints[part] = dp
//add uuid to flat map for faster lookuo //add uuid to flat map for faster lookuo
uuids[dp.Uuid] = &dp uuids[dp.Uuid] = dp
dp.Publish(conns, OnChange) dp.Publish(OnChange)
} }
return uuids, nil return uuids, nil
@@ -233,14 +237,14 @@ func (d *Datapoint) ImportDatapoint(conns *serverModels.Connections, dp Datapoin
newDp.ReadWrite = newDp.ReadWrite.GetRights() newDp.ReadWrite = newDp.ReadWrite.GetRights()
current.Datapoints[part] = newDp current.Datapoints[part] = newDp
current = newDp current = newDp
//add uuid to flat map for faster lookuo //add uuid to flat map for faster lookup
uuids[newDp.Uuid] = newDp uuids[newDp.Uuid] = newDp
} }
} }
return uuids, nil return uuids, nil
} }
func (d *Datapoint) UpdateDatapointValue(conns *serverModels.Connections, value any, path string) error { func (d *Datapoint) UpdateDatapointValue(value any, path string) error {
paths := strings.Split(path, ":") paths := strings.Split(path, ":")
@@ -253,7 +257,7 @@ func (d *Datapoint) UpdateDatapointValue(conns *serverModels.Connections, value
if i == len(paths)-1 { if i == len(paths)-1 {
dp.Value = dp.Type.ConvertValue(value) dp.Value = dp.Type.ConvertValue(value)
dp.UpdateDateTime = time.Now().UnixMilli() dp.UpdateDateTime = time.Now().UnixMilli()
dp.Publish(conns, OnChange) dp.Publish(OnChange)
return nil return nil
} }
current = dp current = dp
@@ -261,14 +265,14 @@ func (d *Datapoint) UpdateDatapointValue(conns *serverModels.Connections, value
return nil return nil
} }
func (d *Datapoint) UpdateValue(conns *serverModels.Connections, value any) error { func (d *Datapoint) UpdateValue(conns *ws.ClientHandler, value any) error {
d.Value = d.Type.ConvertValue(value) d.Value = d.Type.ConvertValue(value)
d.UpdateDateTime = time.Now().UnixMilli() d.UpdateDateTime = time.Now().UnixMilli()
d.Publish(conns, OnChange) d.Publish(OnChange)
return nil return nil
} }
func (d *Datapoint) RemoveDatapoint(conns *serverModels.Connections, set json_dataModels.Set) (json_dataModels.Set, error) { func (d *Datapoint) RemoveDatapoint(conns *ws.ClientHandler, set json_dataModels.Set) (json_dataModels.Set, error) {
parts := strings.Split(set.Path, ":") parts := strings.Split(set.Path, ":")
if len(parts) < 1 { if len(parts) < 1 {
@@ -276,7 +280,7 @@ func (d *Datapoint) RemoveDatapoint(conns *serverModels.Connections, set json_da
} }
current := d current := d
for i := range len(parts) - 1 { for i := 0; i < len(parts)-1; i++ {
next, ok := current.Datapoints[parts[i]] next, ok := current.Datapoints[parts[i]]
if !ok { if !ok {
return json_dataModels.Set{}, fmt.Errorf("path not found: '%s'", strings.Join(parts[:i+1], ":")) return json_dataModels.Set{}, fmt.Errorf("path not found: '%s'", strings.Join(parts[:i+1], ":"))
@@ -286,7 +290,7 @@ func (d *Datapoint) RemoveDatapoint(conns *serverModels.Connections, set json_da
toDelete := parts[len(parts)-1] toDelete := parts[len(parts)-1]
if dp, ok := current.Datapoints[toDelete]; ok { if dp, ok := current.Datapoints[toDelete]; ok {
dp.Publish(conns, OnDelete) dp.Publish(OnDelete)
delete(current.Datapoints, toDelete) delete(current.Datapoints, toDelete)
return json_dataModels.Set{ return json_dataModels.Set{
Uuid: set.Uuid, Uuid: set.Uuid,
@@ -354,17 +358,17 @@ func (d *Datapoint) QueryDatapoints(depth uint, path string) (dps Datapoints) {
return return
} }
func (d *Datapoint) AddSubscribtion(id string, sub json_dataModels.Subscription) { func (d *Datapoint) AddSubscribtion(conn *wsModels.Client, sub json_dataModels.Subscription) {
if d.Subscriptions == nil { if d.Subscriptions == nil {
return return
} }
if s, ok := d.Subscriptions[id]; ok { if s, ok := d.Subscriptions[conn]; ok {
s.OnCreate = sub.OnCreate s.OnCreate = sub.OnCreate
s.OnChange = sub.OnChange s.OnChange = sub.OnChange
s.OnDelete = sub.OnDelete s.OnDelete = sub.OnDelete
} else { } else {
d.Subscriptions[id] = &Subscription{ d.Subscriptions[conn] = &Subscription{
OnCreate: sub.OnCreate, OnCreate: sub.OnCreate,
OnChange: sub.OnChange, OnChange: sub.OnChange,
OnDelete: sub.OnDelete, OnDelete: sub.OnDelete,
@@ -372,36 +376,30 @@ func (d *Datapoint) AddSubscribtion(id string, sub json_dataModels.Subscription)
} }
} }
func (d *Datapoint) RemoveSubscribtion(id string) { func (d *Datapoint) RemoveSubscribtion(client *wsModels.Client) {
if _, ok := d.Subscriptions[id]; !ok { delete(d.Subscriptions, client)
return
}
delete(d.Subscriptions, id)
} }
func (d *Datapoint) Publish(conns *serverModels.Connections, eventType string) error { func (d *Datapoint) Publish(eventType string) error {
if conns.Clients == nil { d.RLock()
return nil defer d.RUnlock()
}
conns.RLock()
defer conns.RUnlock()
for id := range d.Subscriptions { for client := range d.Subscriptions {
if _, ok := conns.Clients[id]; !ok {
delete(d.Subscriptions, id)
} else {
r := json_data.NewResponse() r := json_data.NewResponse()
r.AddUPublish(json_dataModels.Publish{ r.AddPublish(json_dataModels.Publish{
Event: eventType, Event: eventType,
Uuid: d.Uuid, Uuid: d.Uuid,
Path: d.Path, Path: d.Path,
Value: d.Value, Value: d.Value,
}) })
b, err := json.Marshal(r)
if err := conns.SendResponse(id, r); err != nil { if err != nil {
return err return err
} }
if err := client.SendResponse(b, 5); err != nil {
return err
} }
} }
return nil return nil
} }

View File

@@ -8,8 +8,8 @@ import (
"maps" "maps"
"github.com/google/uuid" "github.com/google/uuid"
serverModels "github.com/tecamino/tecamino-dbm/server/models"
"github.com/tecamino/tecamino-dbm/utils" "github.com/tecamino/tecamino-dbm/utils"
ws "github.com/tecamino/tecamino-dbm/websocket"
json_dataModels "github.com/tecamino/tecamino-json_data/models" json_dataModels "github.com/tecamino/tecamino-json_data/models"
"github.com/tecamino/tecamino-logger/logging" "github.com/tecamino/tecamino-logger/logging"
) )
@@ -17,13 +17,13 @@ import (
type DBM struct { type DBM struct {
Datapoints Datapoint Datapoints Datapoint
Uuids Uuids Uuids Uuids
Conns *serverModels.Connections Conns *ws.ClientHandler
Log *logging.Logger Log *logging.Logger
} }
var SystemDatapoints uuid.UUID var SystemDatapoints uuid.UUID
func NewDBM(conns *serverModels.Connections, log *logging.Logger) *DBM { func NewDBM(conns *ws.ClientHandler, log *logging.Logger) *DBM {
return &DBM{ return &DBM{
Uuids: make(Uuids), Uuids: make(Uuids),
Conns: conns, Conns: conns,
@@ -36,7 +36,7 @@ func (d *DBM) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.S
return nil, nil return nil, nil
} }
dps, uuids, err := d.Datapoints.CreateDatapoints(d.Conns, sets...) dps, uuids, err := d.Datapoints.CreateDatapoints(sets...)
//save uuid in seperate map for fast look up //save uuid in seperate map for fast look up
maps.Copy(d.Uuids, uuids) maps.Copy(d.Uuids, uuids)
@@ -56,9 +56,9 @@ func (d *DBM) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.S
return dps, nil return dps, nil
} }
func (d *DBM) ImportDatapoints(dps ...Datapoint) error { func (d *DBM) ImportDatapoints(dps ...*Datapoint) error {
for _, dp := range dps { for _, dp := range dps {
uuids, err := d.Datapoints.ImportDatapoint(d.Conns, dp, dp.Path) uuids, err := d.Datapoints.ImportDatapoint(dp, dp.Path)
if err != nil { if err != nil {
return err return err
} }
@@ -77,14 +77,14 @@ func (d *DBM) UpdateDatapointValue(value any, uid uuid.UUID, path ...string) err
dp := d.Uuids[uid] dp := d.Uuids[uid]
dp.Value = dp.Type.ConvertValue(value) dp.Value = dp.Type.ConvertValue(value)
dp.UpdateDateTime = time.Now().UnixMilli() dp.UpdateDateTime = time.Now().UnixMilli()
dp.Publish(d.Conns, OnChange) dp.Publish(OnChange)
} }
if len(path) > 1 { if len(path) > 1 {
return fmt.Errorf("only one path allowed") return fmt.Errorf("only one path allowed")
} }
return d.Datapoints.UpdateDatapointValue(d.Conns, value, path[0]) return d.Datapoints.UpdateDatapointValue(value, path[0])
} }
func (d *DBM) RemoveDatapoint(sets ...json_dataModels.Set) ([]json_dataModels.Set, error) { func (d *DBM) RemoveDatapoint(sets ...json_dataModels.Set) ([]json_dataModels.Set, error) {
@@ -136,7 +136,6 @@ func (d *DBM) ModifyCountedDatapoints(count uint64, countDown bool) {
func (d *DBM) GoSystemTime() error { func (d *DBM) GoSystemTime() error {
path := "System:Time" path := "System:Time"
var tOld int64
typ := json_dataModels.STR typ := json_dataModels.STR
rights := json_dataModels.Read rights := json_dataModels.Read
@@ -147,15 +146,12 @@ func (d *DBM) GoSystemTime() error {
} }
go func() { go func() {
for { ticker := time.NewTicker(time.Second)
t := time.Now().UnixMilli() for range ticker.C {
if tOld != t { if er := d.UpdateDatapointValue(time.Now().Format("2006-01-02 15:04:05"), uuid.Nil, path); er != nil {
if er := d.UpdateDatapointValue(time.UnixMilli(t).Format("2006-01-02 15:04:05"), uuid.Nil, path); er != nil {
d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error()) d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error())
} }
tOld = t
}
time.Sleep(time.Second)
} }
}() }()
return nil return nil

View File

@@ -1,6 +1,8 @@
package models package models
type Subscriptions map[string]*Subscription import wsModels "github.com/tecamino/tecamino-dbm/websocket/models"
type Subscriptions map[*wsModels.Client]*Subscription
type Subscription struct { type Subscription struct {
OnCreate bool OnCreate bool

View File

@@ -1,58 +0,0 @@
package models
import (
"context"
"fmt"
"github.com/coder/websocket"
"github.com/gin-gonic/gin"
)
var Origins []string = []string{"*"}
type Clients map[string]*Client
type Client struct {
Ctx context.Context `json:"-"`
Cancel context.CancelFunc `json:"-"`
Connected bool `json:"connected"`
Conn *websocket.Conn `json:"-"`
}
func NewClients() Clients {
return make(Clients)
}
// Connect a recieving websocket connection
func (cl *Clients) ConnectRecievingWsConnection(id string, c *gin.Context) error {
if _, exists := (*cl)[id]; exists {
return nil
}
conn, err := websocket.Accept(c.Writer, c.Request, &websocket.AcceptOptions{
OriginPatterns: Origins,
})
if err != nil {
return fmt.Errorf("error accept websocket client: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
(*cl)[id] = &Client{
Connected: true,
Ctx: ctx,
Cancel: cancel,
Conn: conn,
}
return nil
}
func (c *Clients) RemoveClient(id string) {
delete(*c, id)
}
func (c *Clients) DisconnectWsConnection(id string, code websocket.StatusCode, reason string) {
(*c)[id].Connected = false
(*c)[id].Conn.Close(code, reason)
(*c)[id].Cancel()
}

View File

@@ -1,67 +0,0 @@
package models
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/coder/websocket"
"github.com/gin-gonic/gin"
json_dataModels "github.com/tecamino/tecamino-json_data/models"
)
// serves as connection handler of websocket
type Connections struct {
sync.RWMutex
Clients Clients
}
// initaiates new conections with client map
func NewConnections() *Connections {
return &Connections{
Clients: NewClients(),
}
}
// Connect a recieving websocket connection
func (c *Connections) ConnectRecievingWsConnection(id string, ctx *gin.Context) error {
return c.Clients.ConnectRecievingWsConnection(id, ctx)
}
func (c *Connections) RemoveClient(id string) {
c.Clients.RemoveClient(id)
}
func (c *Connections) DisconnectWsConnection(id string, code websocket.StatusCode, reason string) {
c.Clients.DisconnectWsConnection(id, code, reason)
}
// sends json response to client
func (c *Connections) SendResponse(id string, r *json_dataModels.Response) error {
client, ok := c.Clients[id]
if !ok {
return fmt.Errorf("client not found for id %s", id)
}
b, err := json.Marshal(r)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
w, err := client.Conn.Writer(ctx, websocket.MessageText)
if err != nil {
return err
}
defer w.Close()
_, err = w.Write(b)
if err != nil {
return err
}
return nil
}

View File

@@ -12,8 +12,8 @@ import (
"github.com/tecamino/tecamino-dbm/cert" "github.com/tecamino/tecamino-dbm/cert"
"github.com/tecamino/tecamino-dbm/dbm" "github.com/tecamino/tecamino-dbm/dbm"
"github.com/tecamino/tecamino-dbm/models" "github.com/tecamino/tecamino-dbm/models"
"github.com/tecamino/tecamino-dbm/server"
"github.com/tecamino/tecamino-dbm/utils" "github.com/tecamino/tecamino-dbm/utils"
ws "github.com/tecamino/tecamino-dbm/websocket"
) )
func TestCreateDps(t *testing.T) { func TestCreateDps(t *testing.T) {
@@ -142,7 +142,7 @@ func TestUpdateDps(t *testing.T) {
func TestServer(t *testing.T) { func TestServer(t *testing.T) {
fmt.Println("start") fmt.Println("start")
server := server.NewServer([]string{".*"}) server := ws.NewServer([]string{".*"})
t.Fatal(server.ServeHttp("http://localhost", 8100)) t.Fatal(server.ServeHttp("http://localhost", 8100))
} }

View File

@@ -0,0 +1,73 @@
package websocket
import (
"encoding/json"
"fmt"
"sync"
"github.com/gin-gonic/gin"
"github.com/tecamino/tecamino-dbm/websocket/models"
wsModels "github.com/tecamino/tecamino-dbm/websocket/models"
json_dataModels "github.com/tecamino/tecamino-json_data/models"
)
// serves as connection handler of websocket
type ClientHandler struct {
sync.RWMutex
Clients models.Clients
}
// initaiates new conections with client map
func NewConnectionHandler() *ClientHandler {
return &ClientHandler{
Clients: make(models.Clients),
}
}
// Connect a recieving websocket connection
func (cH *ClientHandler) ConnectNewClient(id string, c *gin.Context) (client *models.Client, err error) {
if _, exists := cH.Clients[id]; exists {
return cH.Clients[id], nil
}
client, err = models.ConnectNewClient(id, c)
client.OnClose = func(code int, reason string) {
delete(cH.Clients, id)
}
if err != nil {
return nil, err
}
cH.Lock()
cH.Clients[id] = client
cH.Unlock()
return client, nil
}
// get client client
func (c *ClientHandler) GetClient(id string) *wsModels.Client {
if client, ok := c.Clients[id]; ok {
return client
}
return nil
}
// sends json response to client
func (c *ClientHandler) SendResponse(id string, r *json_dataModels.Response) error {
client, ok := c.Clients[id]
if !ok {
return fmt.Errorf("client not found for id %s", id)
}
b, err := json.Marshal(r)
if err != nil {
return err
}
if err := client.SendResponse(b, 5); err != nil {
return err
}
return nil
}

164
websocket/models/client.go Normal file
View File

@@ -0,0 +1,164 @@
package models
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var Origins []string = []string{"*"}
type Client struct {
Id string
Connected bool `json:"connected"`
Conn *websocket.Conn `json:"-"`
OnOpen func()
OnMessage func(data []byte)
OnClose func(code int, reason string)
OnError func(err error)
OnPing func()
OnPong func()
timeout time.Duration
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
if len(Origins) == 0 {
return false
}
if Origins[0] == "*" {
return true
}
origin := r.Header.Get("Origin")
for _, o := range Origins {
if o == origin {
return true
}
}
return false
},
EnableCompression: false,
}
func ConnectNewClient(id string, c *gin.Context) (*Client, error) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return nil, fmt.Errorf("websocket upgrade error: %w", err)
}
client := &Client{
Id: id,
Connected: true,
Conn: conn,
timeout: 5,
}
conn.SetPingHandler(func(appData string) error {
if client.OnPing != nil {
client.OnPing()
}
conn.SetWriteDeadline(time.Now().Add(client.timeout))
conn.SetReadDeadline(time.Now().Add(client.timeout))
return conn.WriteMessage(websocket.PongMessage, []byte(appData))
})
conn.SetPongHandler(func(appData string) error {
if client.OnPong != nil {
client.OnPong()
}
conn.SetReadDeadline(time.Now().Add(client.timeout))
return nil
})
// Start reading messages from client
go client.Listen(7)
return client, nil
}
func (c *Client) Listen(timeout uint) {
if timeout > 0 {
c.timeout = time.Duration(timeout) * time.Second
}
if c.OnOpen != nil {
c.OnOpen()
}
c.Conn.SetReadDeadline(time.Now().Add(c.timeout))
for c.Connected {
msgType, msg, err := c.Conn.ReadMessage()
if err != nil {
c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err))
return
}
switch msgType {
case websocket.CloseMessage:
c.handleClose(1000, "Client closed")
return
case websocket.TextMessage:
if isPing := c.handleJsonPing(msg); isPing {
continue
}
if c.OnMessage != nil {
c.OnMessage(msg)
} else {
log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg))
}
default:
log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id)
}
}
}
func (c *Client) handleJsonPing(msg []byte) (isPing bool) {
var wsMsg WSMessage
err := json.Unmarshal(msg, &wsMsg)
if err == nil && wsMsg.IsPing() {
c.Conn.SetReadDeadline(time.Now().Add(c.timeout))
// Respond with pong JSON
c.Conn.SetWriteDeadline(time.Now().Add(c.timeout))
err = c.Conn.WriteMessage(websocket.TextMessage, GetPongByteSlice())
if err != nil {
c.handleError(fmt.Errorf("write pong error: %w", err))
return
}
if c.OnPing != nil {
c.OnPing()
}
isPing = true
}
return
}
func (c *Client) SendResponse(data []byte, timeout uint) error {
c.Conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
return c.Conn.WriteMessage(websocket.TextMessage, data)
}
func (c *Client) Close(code int, reason string) {
c.handleClose(code, reason)
}
func (c *Client) handleClose(code int, text string) {
if !c.Connected {
return
}
c.Connected = false
if c.OnClose != nil {
c.OnClose(code, text)
}
c.Conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, text))
c.Conn.Close()
}
func (c *Client) handleError(err error) {
if c.OnError != nil {
c.OnError(err)
}
c.Close(websocket.CloseInternalServerErr, err.Error())
}

View File

@@ -0,0 +1,3 @@
package models
type Clients map[string]*Client

View File

@@ -0,0 +1,21 @@
package models
import "encoding/json"
type WSMessage struct {
Type string `json:"type"`
}
func GetPongByteSlice() []byte {
b, err := json.Marshal(WSMessage{
Type: "pong",
})
if err != nil {
return []byte{}
}
return b
}
func (w WSMessage) IsPing() bool {
return w.Type == "ping"
}

View File

@@ -1,4 +1,4 @@
package server package websocket
import ( import (
"fmt" "fmt"