diff --git a/.gitignore b/.gitignore index 2218b6c..d2e9a72 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ .DS_Store *.dbm *.log + +# local executables +dbm-arm64 diff --git a/dbm/dbmHandler.go b/dbm/dbmHandler.go index 8c6e0b7..6535635 100644 --- a/dbm/dbmHandler.go +++ b/dbm/dbmHandler.go @@ -11,13 +11,13 @@ import ( "github.com/tecamino/tecamino-dbm/args" "github.com/tecamino/tecamino-dbm/models" - serverModels "github.com/tecamino/tecamino-dbm/server/models" + ws "github.com/tecamino/tecamino-dbm/websocket" "github.com/tecamino/tecamino-logger/logging" ) type DBMHandler struct { DBM *models.DBM - Conns *serverModels.Connections + Conns *ws.ClientHandler sync.RWMutex Log *logging.Logger arg *args.Args @@ -43,7 +43,7 @@ func NewDbmHandler(a *args.Args) (*DBMHandler, error) { logger.Info("main", "start dma handler") //initialize connection map - conns := serverModels.NewConnections() + conns := ws.NewConnectionHandler() // Initialize dtabase manager handler dmaHandler := DBMHandler{ @@ -74,8 +74,8 @@ func NewDbmHandler(a *args.Args) (*DBMHandler, error) { var line int for scanner.Scan() { line++ - dp := models.Datapoint{} - if err = json.Unmarshal(scanner.Bytes(), &dp); err != nil { + dp := &models.Datapoint{} + if err = json.Unmarshal(scanner.Bytes(), dp); err != nil { dmaHandler.Log.Error("dmbHandler.NewDmbHandler", "error in line "+fmt.Sprint(line)+" "+scanner.Text()) dmaHandler.Log.Error("dmbHandler.NewDmbHandler", err.Error()) diff --git a/dbm/json_data.go b/dbm/json_data.go index 0d3c56f..e24194e 100644 --- a/dbm/json_data.go +++ b/dbm/json_data.go @@ -51,7 +51,6 @@ func (d *DBMHandler) Json_Data(c *gin.Context) { return } } - c.JSON(200, resp) } diff --git a/dbm/subscribe.go b/dbm/subscribe.go index b5f849c..07543e6 100644 --- a/dbm/subscribe.go +++ b/dbm/subscribe.go @@ -36,7 +36,14 @@ func (d *DBMHandler) Subscribe(req *json_dataModels.Request, id string) { continue } } - dp.AddSubscribtion(id, sub) + + client := d.DBM.Conns.GetClient(id) + if client == nil { + d.Log.Warning("subscribe", "id "+id+" not found") + continue + } + + dp.AddSubscribtion(client, sub) resp.AddSubscription(json_dataModels.Subscription{ Uuid: dp.Uuid, Path: dp.Path, @@ -68,10 +75,17 @@ func (d *DBMHandler) Unsubscribe(req *json_dataModels.Request, id string) { for _, sub := range req.Unsubscribe { for _, dp := range d.DBM.QueryDatapoints(sub.Depth, sub.Uuid, sub.Path) { - if _, ok := dp.Subscriptions[id]; !ok { + + client := d.DBM.Conns.GetClient(id) + if client == nil { + d.Log.Warning("subscribe", "id "+id+" not found") continue } - dp.RemoveSubscribtion(id) + + if _, ok := dp.Subscriptions[client]; !ok { + continue + } + dp.RemoveSubscribtion(client) resp.AddUnsubscription(json_dataModels.Subscription{ Uuid: dp.Uuid, Path: dp.Path, diff --git a/dbm/webSocket.go b/dbm/webSocket.go index 5917ccd..2af18f0 100644 --- a/dbm/webSocket.go +++ b/dbm/webSocket.go @@ -2,11 +2,7 @@ package dbm import ( "encoding/json" - "errors" - "fmt" - "io" - "github.com/coder/websocket" "github.com/gin-gonic/gin" "github.com/tecamino/tecamino-dbm/auth" json_dataModels "github.com/tecamino/tecamino-json_data/models" @@ -26,76 +22,33 @@ func (d *DBMHandler) WebSocket(c *gin.Context) { } d.Log.Debug("dbmHandler.webSocket.Websocket", "authorization id token: "+id) - err = d.Conns.ConnectRecievingWsConnection(id, c) - defer d.Conns.RemoveClient(id) + client, err := d.Conns.ConnectNewClient(id, c) if err != nil { - d.Log.Error("dbmHandler.webSocket.Websocket", "error connecting recieving websocket conection: "+err.Error()) + d.Log.Error("dbmHandler.webSocket.Websocket", err) return } - defer d.Conns.DisconnectWsConnection(id, websocket.StatusInternalError, "Internal error") - //Read loop - for { - - request, err := d.readJsonData(id) + client.OnMessage = func(data []byte) { + request, err := d.readJsonData(data) if err != nil { - d.Log.Error("websocket.WebSocket", err.Error()) - break + d.Log.Error("dbmHandler.webSocket.Websocket", "read json: "+err.Error()) } - // Sets - d.Get(request, id) // Sets d.Set(request, id) - // Subscribe d.Subscribe(request, id) - // Unsubscribe d.Unsubscribe(request, id) + } - request.Get = make([]json_dataModels.Get, 0) - request.Set = make([]json_dataModels.Set, 0) - request.Subscribe = make([]json_dataModels.Subscription, 0) - request.Unsubscribe = make([]json_dataModels.Subscription, 0) - request = nil + client.OnError = func(err error) { + d.Log.Error("dbmHandler.webSocket.Websocket", "error on websocket connection: "+err.Error()) } } -func (d *DBMHandler) readJsonData(id string) (request *json_dataModels.Request, err error) { - - client, ok := d.Conns.Clients[id] - if !ok { - return request, errors.New("client id not found") - } - - _, reader, err := client.Conn.Reader(client.Ctx) - if err != nil { - d.Log.Info("webSocket.readJsonData", fmt.Sprintf("WebSocket reader: %v\n", err)) - return request, nil - } - - b, err := io.ReadAll(reader) - - if err != nil { - code := websocket.CloseStatus(err) - - switch code { - case websocket.StatusNormalClosure, - websocket.StatusGoingAway, - websocket.StatusNoStatusRcvd: - d.Log.Info("webSocket.readJsonData", fmt.Sprintf("WebSocket closed: %v (code: %v)\n", err, code)) - return - default: - d.Log.Error("webSocket.readJsonData", fmt.Sprintf("WebSocket read error: %v (code: %v)\n", err, code)) - return - } - } - - if err := json.Unmarshal(b, &request); err != nil { - return request, err - } - +func (d *DBMHandler) readJsonData(data []byte) (request *json_dataModels.Request, err error) { + err = json.Unmarshal(data, &request) return } diff --git a/go.mod b/go.mod index 4cecee2..3c5a7de 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,11 @@ module github.com/tecamino/tecamino-dbm go 1.24.0 require ( - github.com/coder/websocket v1.8.13 github.com/gin-contrib/cors v1.7.5 github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.6.0 - github.com/tecamino/tecamino-json_data v0.0.15 + github.com/gorilla/websocket v1.5.3 + github.com/tecamino/tecamino-json_data v0.0.16 github.com/tecamino/tecamino-logger v0.2.0 ) diff --git a/go.sum b/go.sum index 32679e3..ccd6687 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFos github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= -github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE= -github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -34,6 +32,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -71,8 +71,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tecamino/tecamino-json_data v0.0.15 h1:r8S6Ls/MMKTdsqUWd3iVDW6Zm5I5H9UCqNAW9wid02E= -github.com/tecamino/tecamino-json_data v0.0.15/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY= +github.com/tecamino/tecamino-json_data v0.0.16 h1:aZFxnhm4g6WMDPoqy4HosUk7vl0DB0iIcVs8bbT4MzU= +github.com/tecamino/tecamino-json_data v0.0.16/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY= github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU= github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= diff --git a/main.go b/main.go index aebd177..14cea4d 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( "github.com/gin-gonic/gin" "github.com/tecamino/tecamino-dbm/args" "github.com/tecamino/tecamino-dbm/dbm" - "github.com/tecamino/tecamino-dbm/server" + ws "github.com/tecamino/tecamino-dbm/websocket" ) func main() { @@ -23,7 +23,7 @@ func main() { //initialize new server dbmHandler.Log.Debug("main", "initialize new server instance") - s := server.NewServer(a.AllowOrigins) + s := ws.NewServer(a.AllowOrigins) //set routes dbmHandler.Log.Debug("main", "setting routes") diff --git a/models/datapoint.go b/models/datapoint.go index 6a7fff4..25e15e2 100644 --- a/models/datapoint.go +++ b/models/datapoint.go @@ -1,15 +1,18 @@ package models import ( + "encoding/json" "errors" "fmt" "regexp" "strings" + "sync" "time" "github.com/google/uuid" - serverModels "github.com/tecamino/tecamino-dbm/server/models" "github.com/tecamino/tecamino-dbm/utils" + ws "github.com/tecamino/tecamino-dbm/websocket" + wsModels "github.com/tecamino/tecamino-dbm/websocket/models" json_data "github.com/tecamino/tecamino-json_data" json_dataModels "github.com/tecamino/tecamino-json_data/models" ) @@ -31,6 +34,7 @@ type Datapoint struct { ReadWrite json_dataModels.Rights `json:"readWrite"` Drivers json_dataModels.Drivers `json:"drivers,omitempty"` Subscriptions Subscriptions `json:"-"` + sync.RWMutex `json:"-"` } func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) { @@ -87,7 +91,7 @@ func (d *Datapoint) GetValueUint64() uint64 { return utils.Uint64From(d.Value) } -func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...json_dataModels.Set) (created []json_dataModels.Set, uuids Uuids, err error) { +func (d *Datapoint) CreateDatapoints(sets ...json_dataModels.Set) (created []json_dataModels.Set, uuids Uuids, err error) { if len(sets) == 0 { return } @@ -121,7 +125,7 @@ func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...js }) if publish { - existing.Publish(conns, OnChange) + existing.Publish(OnChange) } } else { ndp := Datapoint{ @@ -144,7 +148,7 @@ func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...js Driver: dp.Driver, }) if publish { - current.Publish(conns, OnChange) + current.Publish(OnChange) } //add uuid to flat map for faster lookuo uuids[ndp.Uuid] = &ndp @@ -187,7 +191,7 @@ func (d *Datapoint) CreateDatapoints(conns *serverModels.Connections, sets ...js return } -func (d *Datapoint) ImportDatapoint(conns *serverModels.Connections, dp Datapoint, path string) (uuids Uuids, err error) { +func (d *Datapoint) ImportDatapoint(dp *Datapoint, path string) (uuids Uuids, err error) { parts := strings.Split(dp.Path, ":") uuids = make(Uuids, 1) @@ -205,16 +209,16 @@ func (d *Datapoint) ImportDatapoint(conns *serverModels.Connections, dp Datapoin existing.Value = dp.Type.ConvertValue(dp.Value) existing.ReadWrite = dp.ReadWrite.GetRights() existing.UpdateDateTime = time.Now().UnixMilli() - dp.Publish(conns, OnChange) + dp.Publish(OnChange) } else { dp.Path = strings.Join(parts, ":") dp.ReadWrite = dp.ReadWrite.GetRights() dp.UpdateDateTime = time.Now().UnixMilli() dp.Subscriptions = InitSubscribtion() - current.Datapoints[part] = &dp + current.Datapoints[part] = dp //add uuid to flat map for faster lookuo - uuids[dp.Uuid] = &dp - dp.Publish(conns, OnChange) + uuids[dp.Uuid] = dp + dp.Publish(OnChange) } return uuids, nil @@ -233,14 +237,14 @@ func (d *Datapoint) ImportDatapoint(conns *serverModels.Connections, dp Datapoin newDp.ReadWrite = newDp.ReadWrite.GetRights() current.Datapoints[part] = newDp current = newDp - //add uuid to flat map for faster lookuo + //add uuid to flat map for faster lookup uuids[newDp.Uuid] = newDp } } return uuids, nil } -func (d *Datapoint) UpdateDatapointValue(conns *serverModels.Connections, value any, path string) error { +func (d *Datapoint) UpdateDatapointValue(value any, path string) error { paths := strings.Split(path, ":") @@ -253,7 +257,7 @@ func (d *Datapoint) UpdateDatapointValue(conns *serverModels.Connections, value if i == len(paths)-1 { dp.Value = dp.Type.ConvertValue(value) dp.UpdateDateTime = time.Now().UnixMilli() - dp.Publish(conns, OnChange) + dp.Publish(OnChange) return nil } current = dp @@ -261,14 +265,14 @@ func (d *Datapoint) UpdateDatapointValue(conns *serverModels.Connections, value return nil } -func (d *Datapoint) UpdateValue(conns *serverModels.Connections, value any) error { +func (d *Datapoint) UpdateValue(conns *ws.ClientHandler, value any) error { d.Value = d.Type.ConvertValue(value) d.UpdateDateTime = time.Now().UnixMilli() - d.Publish(conns, OnChange) + d.Publish(OnChange) return nil } -func (d *Datapoint) RemoveDatapoint(conns *serverModels.Connections, set json_dataModels.Set) (json_dataModels.Set, error) { +func (d *Datapoint) RemoveDatapoint(conns *ws.ClientHandler, set json_dataModels.Set) (json_dataModels.Set, error) { parts := strings.Split(set.Path, ":") if len(parts) < 1 { @@ -276,7 +280,7 @@ func (d *Datapoint) RemoveDatapoint(conns *serverModels.Connections, set json_da } current := d - for i := range len(parts) - 1 { + for i := 0; i < len(parts)-1; i++ { next, ok := current.Datapoints[parts[i]] if !ok { return json_dataModels.Set{}, fmt.Errorf("path not found: '%s'", strings.Join(parts[:i+1], ":")) @@ -286,7 +290,7 @@ func (d *Datapoint) RemoveDatapoint(conns *serverModels.Connections, set json_da toDelete := parts[len(parts)-1] if dp, ok := current.Datapoints[toDelete]; ok { - dp.Publish(conns, OnDelete) + dp.Publish(OnDelete) delete(current.Datapoints, toDelete) return json_dataModels.Set{ Uuid: set.Uuid, @@ -354,17 +358,17 @@ func (d *Datapoint) QueryDatapoints(depth uint, path string) (dps Datapoints) { return } -func (d *Datapoint) AddSubscribtion(id string, sub json_dataModels.Subscription) { +func (d *Datapoint) AddSubscribtion(conn *wsModels.Client, sub json_dataModels.Subscription) { if d.Subscriptions == nil { return } - if s, ok := d.Subscriptions[id]; ok { + if s, ok := d.Subscriptions[conn]; ok { s.OnCreate = sub.OnCreate s.OnChange = sub.OnChange s.OnDelete = sub.OnDelete } else { - d.Subscriptions[id] = &Subscription{ + d.Subscriptions[conn] = &Subscription{ OnCreate: sub.OnCreate, OnChange: sub.OnChange, OnDelete: sub.OnDelete, @@ -372,36 +376,30 @@ func (d *Datapoint) AddSubscribtion(id string, sub json_dataModels.Subscription) } } -func (d *Datapoint) RemoveSubscribtion(id string) { - if _, ok := d.Subscriptions[id]; !ok { - return - } - delete(d.Subscriptions, id) +func (d *Datapoint) RemoveSubscribtion(client *wsModels.Client) { + delete(d.Subscriptions, client) } -func (d *Datapoint) Publish(conns *serverModels.Connections, eventType string) error { - if conns.Clients == nil { - return nil - } - conns.RLock() - defer conns.RUnlock() +func (d *Datapoint) Publish(eventType string) error { + d.RLock() + defer d.RUnlock() - for id := range d.Subscriptions { - if _, ok := conns.Clients[id]; !ok { - delete(d.Subscriptions, id) - } else { - r := json_data.NewResponse() - r.AddUPublish(json_dataModels.Publish{ - Event: eventType, - Uuid: d.Uuid, - Path: d.Path, - Value: d.Value, - }) - - if err := conns.SendResponse(id, r); err != nil { - return err - } + for client := range d.Subscriptions { + r := json_data.NewResponse() + r.AddPublish(json_dataModels.Publish{ + Event: eventType, + Uuid: d.Uuid, + Path: d.Path, + Value: d.Value, + }) + b, err := json.Marshal(r) + if err != nil { + return err } + if err := client.SendResponse(b, 5); err != nil { + return err + } + } return nil } diff --git a/models/dbm.go b/models/dbm.go index 8363550..716f90f 100644 --- a/models/dbm.go +++ b/models/dbm.go @@ -8,8 +8,8 @@ import ( "maps" "github.com/google/uuid" - serverModels "github.com/tecamino/tecamino-dbm/server/models" "github.com/tecamino/tecamino-dbm/utils" + ws "github.com/tecamino/tecamino-dbm/websocket" json_dataModels "github.com/tecamino/tecamino-json_data/models" "github.com/tecamino/tecamino-logger/logging" ) @@ -17,13 +17,13 @@ import ( type DBM struct { Datapoints Datapoint Uuids Uuids - Conns *serverModels.Connections + Conns *ws.ClientHandler Log *logging.Logger } var SystemDatapoints uuid.UUID -func NewDBM(conns *serverModels.Connections, log *logging.Logger) *DBM { +func NewDBM(conns *ws.ClientHandler, log *logging.Logger) *DBM { return &DBM{ Uuids: make(Uuids), Conns: conns, @@ -36,7 +36,7 @@ func (d *DBM) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.S return nil, nil } - dps, uuids, err := d.Datapoints.CreateDatapoints(d.Conns, sets...) + dps, uuids, err := d.Datapoints.CreateDatapoints(sets...) //save uuid in seperate map for fast look up maps.Copy(d.Uuids, uuids) @@ -56,9 +56,9 @@ func (d *DBM) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.S return dps, nil } -func (d *DBM) ImportDatapoints(dps ...Datapoint) error { +func (d *DBM) ImportDatapoints(dps ...*Datapoint) error { for _, dp := range dps { - uuids, err := d.Datapoints.ImportDatapoint(d.Conns, dp, dp.Path) + uuids, err := d.Datapoints.ImportDatapoint(dp, dp.Path) if err != nil { return err } @@ -77,14 +77,14 @@ func (d *DBM) UpdateDatapointValue(value any, uid uuid.UUID, path ...string) err dp := d.Uuids[uid] dp.Value = dp.Type.ConvertValue(value) dp.UpdateDateTime = time.Now().UnixMilli() - dp.Publish(d.Conns, OnChange) + dp.Publish(OnChange) } if len(path) > 1 { return fmt.Errorf("only one path allowed") } - return d.Datapoints.UpdateDatapointValue(d.Conns, value, path[0]) + return d.Datapoints.UpdateDatapointValue(value, path[0]) } func (d *DBM) RemoveDatapoint(sets ...json_dataModels.Set) ([]json_dataModels.Set, error) { @@ -136,7 +136,6 @@ func (d *DBM) ModifyCountedDatapoints(count uint64, countDown bool) { func (d *DBM) GoSystemTime() error { path := "System:Time" - var tOld int64 typ := json_dataModels.STR rights := json_dataModels.Read @@ -147,15 +146,12 @@ func (d *DBM) GoSystemTime() error { } go func() { - for { - t := time.Now().UnixMilli() - if tOld != t { - if er := d.UpdateDatapointValue(time.UnixMilli(t).Format("2006-01-02 15:04:05"), uuid.Nil, path); er != nil { - d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error()) - } - tOld = t + ticker := time.NewTicker(time.Second) + for range ticker.C { + if er := d.UpdateDatapointValue(time.Now().Format("2006-01-02 15:04:05"), uuid.Nil, path); er != nil { + d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error()) } - time.Sleep(time.Second) + } }() return nil diff --git a/models/subscribtion.go b/models/subscribtion.go index f8542e5..7cba081 100644 --- a/models/subscribtion.go +++ b/models/subscribtion.go @@ -1,6 +1,8 @@ package models -type Subscriptions map[string]*Subscription +import wsModels "github.com/tecamino/tecamino-dbm/websocket/models" + +type Subscriptions map[*wsModels.Client]*Subscription type Subscription struct { OnCreate bool diff --git a/server/models/clients.go b/server/models/clients.go deleted file mode 100644 index 1737064..0000000 --- a/server/models/clients.go +++ /dev/null @@ -1,58 +0,0 @@ -package models - -import ( - "context" - "fmt" - - "github.com/coder/websocket" - "github.com/gin-gonic/gin" -) - -var Origins []string = []string{"*"} - -type Clients map[string]*Client - -type Client struct { - Ctx context.Context `json:"-"` - Cancel context.CancelFunc `json:"-"` - Connected bool `json:"connected"` - Conn *websocket.Conn `json:"-"` -} - -func NewClients() Clients { - return make(Clients) -} - -// Connect a recieving websocket connection -func (cl *Clients) ConnectRecievingWsConnection(id string, c *gin.Context) error { - if _, exists := (*cl)[id]; exists { - return nil - } - - conn, err := websocket.Accept(c.Writer, c.Request, &websocket.AcceptOptions{ - OriginPatterns: Origins, - }) - - if err != nil { - return fmt.Errorf("error accept websocket client: %s", err) - } - ctx, cancel := context.WithCancel(context.Background()) - - (*cl)[id] = &Client{ - Connected: true, - Ctx: ctx, - Cancel: cancel, - Conn: conn, - } - return nil -} - -func (c *Clients) RemoveClient(id string) { - delete(*c, id) -} - -func (c *Clients) DisconnectWsConnection(id string, code websocket.StatusCode, reason string) { - (*c)[id].Connected = false - (*c)[id].Conn.Close(code, reason) - (*c)[id].Cancel() -} diff --git a/server/models/connections.go b/server/models/connections.go deleted file mode 100644 index 5a9eb51..0000000 --- a/server/models/connections.go +++ /dev/null @@ -1,67 +0,0 @@ -package models - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "time" - - "github.com/coder/websocket" - "github.com/gin-gonic/gin" - json_dataModels "github.com/tecamino/tecamino-json_data/models" -) - -// serves as connection handler of websocket -type Connections struct { - sync.RWMutex - Clients Clients -} - -// initaiates new conections with client map -func NewConnections() *Connections { - return &Connections{ - Clients: NewClients(), - } -} - -// Connect a recieving websocket connection -func (c *Connections) ConnectRecievingWsConnection(id string, ctx *gin.Context) error { - return c.Clients.ConnectRecievingWsConnection(id, ctx) -} - -func (c *Connections) RemoveClient(id string) { - c.Clients.RemoveClient(id) -} - -func (c *Connections) DisconnectWsConnection(id string, code websocket.StatusCode, reason string) { - c.Clients.DisconnectWsConnection(id, code, reason) -} - -// sends json response to client -func (c *Connections) SendResponse(id string, r *json_dataModels.Response) error { - client, ok := c.Clients[id] - if !ok { - return fmt.Errorf("client not found for id %s", id) - - } - b, err := json.Marshal(r) - if err != nil { - return err - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - w, err := client.Conn.Writer(ctx, websocket.MessageText) - if err != nil { - return err - } - defer w.Close() - - _, err = w.Write(b) - if err != nil { - return err - } - return nil -} diff --git a/test/dbm_test.go b/test/dbm_test.go index 5949c13..1bbfcde 100644 --- a/test/dbm_test.go +++ b/test/dbm_test.go @@ -12,8 +12,8 @@ import ( "github.com/tecamino/tecamino-dbm/cert" "github.com/tecamino/tecamino-dbm/dbm" "github.com/tecamino/tecamino-dbm/models" - "github.com/tecamino/tecamino-dbm/server" "github.com/tecamino/tecamino-dbm/utils" + ws "github.com/tecamino/tecamino-dbm/websocket" ) func TestCreateDps(t *testing.T) { @@ -142,7 +142,7 @@ func TestUpdateDps(t *testing.T) { func TestServer(t *testing.T) { fmt.Println("start") - server := server.NewServer([]string{".*"}) + server := ws.NewServer([]string{".*"}) t.Fatal(server.ServeHttp("http://localhost", 8100)) } diff --git a/websocket/clientHandler.go b/websocket/clientHandler.go new file mode 100644 index 0000000..e9c4a70 --- /dev/null +++ b/websocket/clientHandler.go @@ -0,0 +1,73 @@ +package websocket + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/gin-gonic/gin" + "github.com/tecamino/tecamino-dbm/websocket/models" + wsModels "github.com/tecamino/tecamino-dbm/websocket/models" + json_dataModels "github.com/tecamino/tecamino-json_data/models" +) + +// serves as connection handler of websocket +type ClientHandler struct { + sync.RWMutex + Clients models.Clients +} + +// initaiates new conections with client map +func NewConnectionHandler() *ClientHandler { + return &ClientHandler{ + Clients: make(models.Clients), + } +} + +// Connect a recieving websocket connection +func (cH *ClientHandler) ConnectNewClient(id string, c *gin.Context) (client *models.Client, err error) { + if _, exists := cH.Clients[id]; exists { + return cH.Clients[id], nil + } + + client, err = models.ConnectNewClient(id, c) + client.OnClose = func(code int, reason string) { + delete(cH.Clients, id) + } + + if err != nil { + return nil, err + } + cH.Lock() + cH.Clients[id] = client + cH.Unlock() + + return client, nil +} + +// get client client +func (c *ClientHandler) GetClient(id string) *wsModels.Client { + if client, ok := c.Clients[id]; ok { + return client + } + return nil +} + +// sends json response to client +func (c *ClientHandler) SendResponse(id string, r *json_dataModels.Response) error { + client, ok := c.Clients[id] + if !ok { + return fmt.Errorf("client not found for id %s", id) + + } + b, err := json.Marshal(r) + if err != nil { + return err + } + + if err := client.SendResponse(b, 5); err != nil { + return err + } + + return nil +} diff --git a/websocket/models/client.go b/websocket/models/client.go new file mode 100644 index 0000000..a440f78 --- /dev/null +++ b/websocket/models/client.go @@ -0,0 +1,164 @@ +package models + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" +) + +var Origins []string = []string{"*"} + +type Client struct { + Id string + Connected bool `json:"connected"` + Conn *websocket.Conn `json:"-"` + OnOpen func() + OnMessage func(data []byte) + OnClose func(code int, reason string) + OnError func(err error) + OnPing func() + OnPong func() + timeout time.Duration +} + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + if len(Origins) == 0 { + return false + } + if Origins[0] == "*" { + return true + } + origin := r.Header.Get("Origin") + for _, o := range Origins { + if o == origin { + return true + } + } + return false + }, + EnableCompression: false, +} + +func ConnectNewClient(id string, c *gin.Context) (*Client, error) { + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + return nil, fmt.Errorf("websocket upgrade error: %w", err) + } + + client := &Client{ + Id: id, + Connected: true, + Conn: conn, + timeout: 5, + } + + conn.SetPingHandler(func(appData string) error { + if client.OnPing != nil { + client.OnPing() + } + conn.SetWriteDeadline(time.Now().Add(client.timeout)) + conn.SetReadDeadline(time.Now().Add(client.timeout)) + return conn.WriteMessage(websocket.PongMessage, []byte(appData)) + }) + + conn.SetPongHandler(func(appData string) error { + if client.OnPong != nil { + client.OnPong() + } + conn.SetReadDeadline(time.Now().Add(client.timeout)) + return nil + }) + + // Start reading messages from client + go client.Listen(7) + return client, nil +} + +func (c *Client) Listen(timeout uint) { + if timeout > 0 { + c.timeout = time.Duration(timeout) * time.Second + } + + if c.OnOpen != nil { + c.OnOpen() + } + + c.Conn.SetReadDeadline(time.Now().Add(c.timeout)) + for c.Connected { + msgType, msg, err := c.Conn.ReadMessage() + if err != nil { + c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err)) + return + } + switch msgType { + case websocket.CloseMessage: + c.handleClose(1000, "Client closed") + return + case websocket.TextMessage: + if isPing := c.handleJsonPing(msg); isPing { + continue + } + if c.OnMessage != nil { + c.OnMessage(msg) + } else { + log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg)) + } + default: + log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id) + } + } +} + +func (c *Client) handleJsonPing(msg []byte) (isPing bool) { + var wsMsg WSMessage + err := json.Unmarshal(msg, &wsMsg) + if err == nil && wsMsg.IsPing() { + c.Conn.SetReadDeadline(time.Now().Add(c.timeout)) + // Respond with pong JSON + c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)) + err = c.Conn.WriteMessage(websocket.TextMessage, GetPongByteSlice()) + if err != nil { + c.handleError(fmt.Errorf("write pong error: %w", err)) + return + } + if c.OnPing != nil { + c.OnPing() + } + isPing = true + } + return +} + +func (c *Client) SendResponse(data []byte, timeout uint) error { + c.Conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second)) + return c.Conn.WriteMessage(websocket.TextMessage, data) +} + +func (c *Client) Close(code int, reason string) { + c.handleClose(code, reason) +} + +func (c *Client) handleClose(code int, text string) { + if !c.Connected { + return + } + c.Connected = false + if c.OnClose != nil { + c.OnClose(code, text) + } + c.Conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, text)) + c.Conn.Close() +} + +func (c *Client) handleError(err error) { + if c.OnError != nil { + c.OnError(err) + } + c.Close(websocket.CloseInternalServerErr, err.Error()) +} diff --git a/websocket/models/clients.go b/websocket/models/clients.go new file mode 100644 index 0000000..4f051d5 --- /dev/null +++ b/websocket/models/clients.go @@ -0,0 +1,3 @@ +package models + +type Clients map[string]*Client diff --git a/websocket/models/wsMessage.go b/websocket/models/wsMessage.go new file mode 100644 index 0000000..538327f --- /dev/null +++ b/websocket/models/wsMessage.go @@ -0,0 +1,21 @@ +package models + +import "encoding/json" + +type WSMessage struct { + Type string `json:"type"` +} + +func GetPongByteSlice() []byte { + b, err := json.Marshal(WSMessage{ + Type: "pong", + }) + if err != nil { + return []byte{} + } + return b +} + +func (w WSMessage) IsPing() bool { + return w.Type == "ping" +} diff --git a/server/server.go b/websocket/server.go similarity index 98% rename from server/server.go rename to websocket/server.go index 9451b9a..dfb4478 100644 --- a/server/server.go +++ b/websocket/server.go @@ -1,4 +1,4 @@ -package server +package websocket import ( "fmt"