6 Commits

Author SHA1 Message Date
Adrian Zuercher
5e1e4b9daf make new model for stringSlices allowOrigin helper 2025-07-13 20:01:13 +02:00
Adrian Zuercher
0f06128ce8 new send datapoint type and one fix typo 2025-07-11 17:47:12 +02:00
Adrian Zuercher
be07dc8749 add new argument for remote port 2025-07-11 17:46:32 +02:00
Adrian Zuercher
e75d7c8b03 add automatic localhost and local ip to allow origin 2025-06-29 21:18:15 +02:00
Adrian Zürcher
59c7705ca1 fix concurrent write new sendPong channel 2025-06-22 08:58:14 +02:00
Adrian Zürcher
91ea59ed6e improvement websocket according to gorilla example 2025-06-22 07:46:24 +02:00
16 changed files with 205 additions and 99 deletions

View File

@@ -2,7 +2,6 @@ package args
import ( import (
"flag" "flag"
"strings"
"github.com/tecamino/tecamino-dbm/cert" "github.com/tecamino/tecamino-dbm/cert"
"github.com/tecamino/tecamino-dbm/models" "github.com/tecamino/tecamino-dbm/models"
@@ -19,21 +18,10 @@ type Args struct {
Debug bool Debug bool
} }
type StringSlice []string
func (s *StringSlice) String() string {
return strings.Join(*s, ",")
}
func (s *StringSlice) Set(value string) error {
*s = append(*s, value)
return nil
}
// initialte cli arguments // initialte cli arguments
func Init() *Args { func Init() *Args {
var allowOrigins StringSlice var allowOrigins models.StringSlice
flag.Var(&allowOrigins, "allowOrigin", "Allowed origin (can repeat this flag)") flag.Var(&allowOrigins, "allowOrigin", "Allowed origin (can repeat this flag)")
@@ -43,6 +31,7 @@ func Init() *Args {
keyFile := flag.String("keyFile", "./cert/key.pem", "path of keyfile") keyFile := flag.String("keyFile", "./cert/key.pem", "path of keyfile")
portHttp := flag.Uint("http-port", 8100, "json server communication for http/ws") portHttp := flag.Uint("http-port", 8100, "json server communication for http/ws")
portHttps := flag.Uint("https-port", 8101, "json server communication for http/wss") portHttps := flag.Uint("https-port", 8101, "json server communication for http/wss")
remotePort := flag.Uint("remotePort", 9500, "remote Port of gui user interface")
rootDir := flag.String("workingDir", "./", "working directory") rootDir := flag.String("workingDir", "./", "working directory")
dbmFile := flag.String("dbm", "/test/test", "dbm file name") dbmFile := flag.String("dbm", "/test/test", "dbm file name")
debug := flag.Bool("debug", false, "debug flag") debug := flag.Bool("debug", false, "debug flag")
@@ -58,6 +47,7 @@ func Init() *Args {
Port: models.Port{ Port: models.Port{
Http: *portHttp, Http: *portHttp,
Https: *portHttps, Https: *portHttps,
Remote: *remotePort,
}, },
RootDir: *rootDir, RootDir: *rootDir,
DBMFile: *dbmFile, DBMFile: *dbmFile,
@@ -65,7 +55,7 @@ func Init() *Args {
} }
if len(allowOrigins) == 0 { if len(allowOrigins) == 0 {
allowOrigins = StringSlice{"http://localhost:9500"} allowOrigins = models.StringSlice{"http://localhost:9500"}
} }
a.AllowOrigins = allowOrigins a.AllowOrigins = allowOrigins

View File

@@ -35,6 +35,7 @@ func (d *DBMHandler) Json_Data(c *gin.Context) {
Path: res.Path, Path: res.Path,
Type: res.Type, Type: res.Type,
Value: res.Value, Value: res.Value,
Drivers: &res.Drivers,
HasChild: res.Datapoints != nil, HasChild: res.Datapoints != nil,
Rights: res.ReadWrite, Rights: res.ReadWrite,
}) })

View File

@@ -20,6 +20,7 @@ func (d *DBMHandler) Set(req *json_dataModels.Request, id string) {
for _, set := range req.Set { for _, set := range req.Set {
dps := d.DBM.QueryDatapoints(1, set.Uuid, set.Path) dps := d.DBM.QueryDatapoints(1, set.Uuid, set.Path)
if len(dps) == 0 { if len(dps) == 0 {
resp.SetError() resp.SetError()
if resp.Message == "" { if resp.Message == "" {
@@ -27,8 +28,10 @@ func (d *DBMHandler) Set(req *json_dataModels.Request, id string) {
} }
continue continue
} }
for _, dp := range dps { for _, dp := range dps {
dp.UpdateValue(d.Conns, set.Value) dp.UpdateValue(d.Conns, set.Value)
resp.AddSet(json_dataModels.Set{ resp.AddSet(json_dataModels.Set{
Uuid: dp.Uuid, Uuid: dp.Uuid,
Path: dp.Path, Path: dp.Path,
@@ -36,6 +39,7 @@ func (d *DBMHandler) Set(req *json_dataModels.Request, id string) {
}) })
} }
} }
if err := d.Conns.SendResponse(id, resp); err != nil { if err := d.Conns.SendResponse(id, resp); err != nil {
d.Log.Error("get.Set", err.Error()) d.Log.Error("get.Set", err.Error())
} }

View File

@@ -54,7 +54,6 @@ func (d *DBMHandler) Subscribe(req *json_dataModels.Request, id string) {
}) })
} }
} }
if err := d.Conns.SendResponse(id, resp); err != nil { if err := d.Conns.SendResponse(id, resp); err != nil {
d.Log.Error("subscribe.Subscribe", err.Error()) d.Log.Error("subscribe.Subscribe", err.Error())
} }

View File

@@ -33,12 +33,15 @@ func (d *DBMHandler) WebSocket(c *gin.Context) {
if err != nil { if err != nil {
d.Log.Error("dbmHandler.webSocket.Websocket", "read json: "+err.Error()) d.Log.Error("dbmHandler.webSocket.Websocket", "read json: "+err.Error())
} }
// Sets // Sets
d.Get(request, id) d.Get(request, id)
// Sets // Sets
d.Set(request, id) d.Set(request, id)
// Subscribe // Subscribe
d.Subscribe(request, id) d.Subscribe(request, id)
// Unsubscribe // Unsubscribe
d.Unsubscribe(request, id) d.Unsubscribe(request, id)
} }

2
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/gin-gonic/gin v1.10.0 github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3
github.com/tecamino/tecamino-json_data v0.0.16 github.com/tecamino/tecamino-json_data v0.0.17
github.com/tecamino/tecamino-logger v0.2.0 github.com/tecamino/tecamino-logger v0.2.0
) )

4
go.sum
View File

@@ -71,8 +71,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tecamino/tecamino-json_data v0.0.16 h1:aZFxnhm4g6WMDPoqy4HosUk7vl0DB0iIcVs8bbT4MzU= github.com/tecamino/tecamino-json_data v0.0.17 h1:M12UzKbfIgu/q3ERsEhGNDV3DYOvc0TioU288kQjDCA=
github.com/tecamino/tecamino-json_data v0.0.16/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY= github.com/tecamino/tecamino-json_data v0.0.17/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY=
github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU= github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU=
github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8= github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=

View File

@@ -23,7 +23,7 @@ func main() {
//initialize new server //initialize new server
dbmHandler.Log.Debug("main", "initialize new server instance") dbmHandler.Log.Debug("main", "initialize new server instance")
s := ws.NewServer(a.AllowOrigins) s := ws.NewServer(a.AllowOrigins, a.Port.Remote)
//set routes //set routes
dbmHandler.Log.Debug("main", "setting routes") dbmHandler.Log.Debug("main", "setting routes")

View File

@@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@@ -34,7 +33,6 @@ type Datapoint struct {
ReadWrite json_dataModels.Rights `json:"readWrite"` ReadWrite json_dataModels.Rights `json:"readWrite"`
Drivers json_dataModels.Drivers `json:"drivers,omitempty"` Drivers json_dataModels.Drivers `json:"drivers,omitempty"`
Subscriptions Subscriptions `json:"-"` Subscriptions Subscriptions `json:"-"`
sync.RWMutex `json:"-"`
} }
func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) { func (d *Datapoint) Set(path string, set json_dataModels.Set) (bool, error) {
@@ -381,25 +379,22 @@ func (d *Datapoint) RemoveSubscribtion(client *wsModels.Client) {
} }
func (d *Datapoint) Publish(eventType string) error { func (d *Datapoint) Publish(eventType string) error {
d.RLock()
defer d.RUnlock()
for client := range d.Subscriptions { for client := range d.Subscriptions {
r := json_data.NewResponse() r := json_data.NewResponse()
r.AddPublish(json_dataModels.Publish{ r.AddPublish(json_dataModels.Publish{
Event: eventType, Event: eventType,
Uuid: d.Uuid, Uuid: d.Uuid,
Path: d.Path, Path: d.Path,
Type: d.Type,
Value: d.Value, Value: d.Value,
}) })
b, err := json.Marshal(r) b, err := json.Marshal(r)
if err != nil { if err != nil {
return err return err
} }
if err := client.SendResponse(b, 5); err != nil {
return err
}
client.SendResponse(b)
} }
return nil return nil
} }

View File

@@ -3,4 +3,5 @@ package models
type Port struct { type Port struct {
Http uint Http uint
Https uint Https uint
Remote uint
} }

14
models/stringSlices.go Normal file
View File

@@ -0,0 +1,14 @@
package models
import "strings"
type StringSlice []string
func (s *StringSlice) String() string {
return strings.Join(*s, ",")
}
func (s *StringSlice) Set(value string) error {
*s = append(*s, value)
return nil
}

View File

@@ -142,7 +142,7 @@ func TestUpdateDps(t *testing.T) {
func TestServer(t *testing.T) { func TestServer(t *testing.T) {
fmt.Println("start") fmt.Println("start")
server := ws.NewServer([]string{".*"}) server := ws.NewServer([]string{".*"}, 9500)
t.Fatal(server.ServeHttp("http://localhost", 8100)) t.Fatal(server.ServeHttp("http://localhost", 8100))
} }

View File

@@ -1,5 +1,10 @@
package utils package utils
import (
"fmt"
"net"
)
func ListofA2ZZ() (list []string) { func ListofA2ZZ() (list []string) {
for i := 'A'; i <= 'Z'; i++ { for i := 'A'; i <= 'Z'; i++ {
list = append(list, string(i)) list = append(list, string(i))
@@ -9,12 +14,21 @@ func ListofA2ZZ() (list []string) {
list = append(list, string(i)+string(j)) list = append(list, string(i)+string(j))
} }
} }
// for i := 'A'; i <= 'Z'; i++ {
// for j := 'A'; j <= 'Z'; j++ {
// for k := 'A'; k <= 'Z'; k++ {
// list = append(list, string(i)+string(j)+string(k))
// }
// }
// }
return return
} }
func GetLocalIP() (string, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
return ipNet.IP.String(), nil
}
}
}
return "", fmt.Errorf("no local IP address found")
}

View File

@@ -60,14 +60,12 @@ func (c *ClientHandler) SendResponse(id string, r *json_dataModels.Response) err
return fmt.Errorf("client not found for id %s", id) return fmt.Errorf("client not found for id %s", id)
} }
b, err := json.Marshal(r)
b, err := json.Marshal(*r)
if err != nil { if err != nil {
return err return err
} }
client.SendResponse(b)
if err := client.SendResponse(b, 5); err != nil {
return err
}
return nil return nil
} }

View File

@@ -13,17 +13,30 @@ import (
var Origins []string = []string{"*"} var Origins []string = []string{"*"}
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 10 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
type Client struct { type Client struct {
Id string Id string
Connected bool `json:"connected"` Connected bool `json:"connected"`
Conn *websocket.Conn `json:"-"` conn *websocket.Conn `json:"-"`
OnOpen func() OnOpen func()
OnMessage func(data []byte) OnMessage func(data []byte)
OnClose func(code int, reason string) OnClose func(code int, reason string)
OnError func(err error) OnError func(err error)
OnPing func() OnPing func()
OnPong func() OnPong func()
timeout time.Duration sendPong chan string
send chan []byte
unregister chan []byte
} }
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
@@ -54,75 +67,132 @@ func ConnectNewClient(id string, c *gin.Context) (*Client, error) {
client := &Client{ client := &Client{
Id: id, Id: id,
Connected: true, Connected: true,
Conn: conn, conn: conn,
timeout: 5, sendPong: make(chan string),
send: make(chan []byte),
unregister: make(chan []byte),
} }
conn.SetPingHandler(func(appData string) error { conn.SetPingHandler(func(appData string) error {
if client.OnPing != nil { if client.OnPing != nil {
client.OnPing() client.OnPing()
} }
conn.SetWriteDeadline(time.Now().Add(client.timeout)) conn.SetWriteDeadline(time.Now().Add(writeWait))
conn.SetReadDeadline(time.Now().Add(client.timeout)) conn.SetReadDeadline(time.Now().Add(writeWait))
return conn.WriteMessage(websocket.PongMessage, []byte(appData)) client.sendPong <- appData
return nil
}) })
conn.SetPongHandler(func(appData string) error { conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
if client.OnPong != nil { if client.OnPong != nil {
client.OnPong() client.OnPong()
} }
conn.SetReadDeadline(time.Now().Add(client.timeout))
return nil return nil
}) })
// Start reading messages from client // Start reading messages from client
go client.Listen(7) go client.Read()
go client.Write()
return client, nil return client, nil
} }
func (c *Client) Listen(timeout uint) { func (c *Client) Read() {
if timeout > 0 {
c.timeout = time.Duration(timeout) * time.Second
}
if c.OnOpen != nil { if c.OnOpen != nil {
c.OnOpen() c.OnOpen()
} }
c.Conn.SetReadDeadline(time.Now().Add(c.timeout)) c.conn.SetReadDeadline(time.Now().Add(writeWait))
for c.Connected { for c.Connected {
msgType, msg, err := c.Conn.ReadMessage() msgType, msg, err := c.conn.ReadMessage()
if err != nil { if err != nil {
c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err)) c.handleError(fmt.Errorf("read error (id:%s): %w", c.Id, err))
return return
} }
switch msgType { switch msgType {
case websocket.CloseMessage: case websocket.CloseMessage:
c.handleClose(1000, "Client closed") c.Close(websocket.CloseNormalClosure, "Client closed")
return return
case websocket.TextMessage: case websocket.TextMessage:
if isPing := c.handleJsonPing(msg); isPing { if isPing := c.handleJsonPing(msg); !isPing {
continue
}
if c.OnMessage != nil { if c.OnMessage != nil {
c.OnMessage(msg) c.OnMessage(msg)
} else { } else {
log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg)) log.Printf("Received message but no handler set (id:%s): %s", c.Id, string(msg))
} }
}
default: default:
log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id) log.Printf("Unhandled message type %d (id:%s)", msgType, c.Id)
} }
} }
} }
func (c *Client) Write() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
c.handleError(err)
return
}
c.handleError(fmt.Errorf("server %s closed channel", c.Id))
return
} else {
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
c.handleError(err)
return
}
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.handleError(err)
return
}
if c.OnPing != nil {
c.OnPing()
}
case message, ok := <-c.sendPong:
if ok {
c.conn.WriteMessage(websocket.PongMessage, []byte(message))
}
case message := <-c.unregister:
c.conn.WriteMessage(websocket.CloseMessage, message)
c.Connected = false
close(c.sendPong)
close(c.send)
close(c.unregister)
return
}
}
}
func (c *Client) handleJsonPing(msg []byte) (isPing bool) { func (c *Client) handleJsonPing(msg []byte) (isPing bool) {
var wsMsg WSMessage var wsMsg WSMessage
err := json.Unmarshal(msg, &wsMsg) err := json.Unmarshal(msg, &wsMsg)
if err == nil && wsMsg.IsPing() { if err == nil && wsMsg.IsPing() {
c.Conn.SetReadDeadline(time.Now().Add(c.timeout)) c.conn.SetReadDeadline(time.Now().Add(writeWait))
// Respond with pong JSON // Respond with pong JSON
c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)) select {
err = c.Conn.WriteMessage(websocket.TextMessage, GetPongByteSlice()) case c.send <- GetPongByteSlice():
default:
// optional: log or handle if send buffer is full
c.handleError(fmt.Errorf("failed to queue pong message"))
return
}
if err != nil { if err != nil {
c.handleError(fmt.Errorf("write pong error: %w", err)) c.handleError(fmt.Errorf("write pong error: %w", err))
return return
@@ -132,33 +202,39 @@ func (c *Client) handleJsonPing(msg []byte) (isPing bool) {
} }
isPing = true isPing = true
} }
return return
} }
func (c *Client) SendResponse(data []byte, timeout uint) error { func (c *Client) SendResponse(data []byte) {
c.Conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
return c.Conn.WriteMessage(websocket.TextMessage, data)
}
func (c *Client) Close(code int, reason string) {
c.handleClose(code, reason)
}
func (c *Client) handleClose(code int, text string) {
if !c.Connected { if !c.Connected {
return return
} }
c.Connected = false c.send <- data
if c.OnClose != nil {
c.OnClose(code, text)
} }
c.Conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, text))
c.Conn.Close() func (c *Client) Close(code int, reason string) error {
closeMsg := websocket.FormatCloseMessage(code, reason)
select {
case c.unregister <- closeMsg: // Attempt to send
default: // If the channel is full, this runs
return fmt.Errorf("attempt close client socket failed")
}
if c.OnClose != nil {
c.OnClose(code, reason)
}
return nil
} }
func (c *Client) handleError(err error) { func (c *Client) handleError(err error) {
if c.OnError != nil { if c.OnError != nil {
c.OnError(err) c.OnError(err)
} }
c.Close(websocket.CloseInternalServerErr, err.Error()) if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil {
if c.OnError != nil {
c.OnError(err)
} else {
fmt.Println("error: ", err)
}
}
} }

View File

@@ -2,12 +2,14 @@ package websocket
import ( import (
"fmt" "fmt"
"log"
"sync" "sync"
"time" "time"
"github.com/gin-contrib/cors" "github.com/gin-contrib/cors"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/tecamino/tecamino-dbm/cert" "github.com/tecamino/tecamino-dbm/cert"
"github.com/tecamino/tecamino-dbm/utils"
"github.com/tecamino/tecamino-logger/logging" "github.com/tecamino/tecamino-logger/logging"
) )
@@ -19,8 +21,17 @@ type Server struct {
} }
// initalizes new dbm server // initalizes new dbm server
func NewServer(allowOrigins []string) *Server { func NewServer(allowOrigins []string, port uint) *Server {
r := gin.Default() r := gin.Default()
allowOrigins = append(allowOrigins, fmt.Sprintf("http://localhost:%d", port))
localIP, err := utils.GetLocalIP()
if err != nil {
log.Printf("get local ip : %s", err.Error())
} else {
allowOrigins = append(allowOrigins, fmt.Sprintf("http://%s:%d", localIP, port))
}
r.Use(cors.New(cors.Config{ r.Use(cors.New(cors.Config{
AllowOrigins: allowOrigins, AllowOrigins: allowOrigins,
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},