From c85590e5a5e098314a3eae83b743e79c0d629da2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20Z=C3=BCrcher?= Date: Thu, 19 Jun 2025 18:32:47 +0200 Subject: [PATCH] updated websocket as client broker --- .github/workflows/build.yml | 48 +++++++++++ .gitignore | 3 +- client/client.go | 71 ---------------- driver/artNet.go | 80 ++++++++++++----- go.mod | 2 +- go.sum | 4 +- main.go | 7 +- websocket/client.go | 165 ++++++++++++++++++++++++++++++++++++ 8 files changed, 282 insertions(+), 98 deletions(-) create mode 100644 .github/workflows/build.yml delete mode 100644 client/client.go create mode 100644 websocket/client.go diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..9022d68 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,48 @@ +name: Build Go Binaries + +on: + push: + branches: [ main ] + pull_request: + +jobs: + build: + runs-on: ubuntu-latest + + strategy: + matrix: + goos: [linux, windows] + goarch: [amd64, arm, arm64] + exclude: + - goos: windows + goarch: arm + - goos: windows + goarch: arm64 + + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.24.0' + + - name: Set up Git credentials for private modules + run: | + git config --global url."https://${{ secrets.GH_PAT }}@github.com/".insteadOf "https://github.com/" + echo "GOPRIVATE=github.com/tecamino/*" >> $GITHUB_ENV + + - name: Build binary + run: | + mkdir -p build + if [ "${{ matrix.goos }}" == "windows" ]; then + GOOS=${{ matrix.goos }} GOARCH=${{ matrix.goarch }} go build -o build/tecamino-driver-artNet-${{ matrix.goos }}-${{ matrix.goarch }}.exe main.go + else + GOOS=${{ matrix.goos }} GOARCH=${{ matrix.goarch }} go build -o build/tecamino-driver-artNet-${{ matrix.goos }}-${{ matrix.goarch }} main.go + fi + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: binaries-${{ matrix.goos }}-${{ matrix.goarch }} + path: build/ diff --git a/.gitignore b/.gitignore index 77b913f..e1a9838 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.cfg -*.log \ No newline at end of file +*.log +artNetDriver-arm64 \ No newline at end of file diff --git a/client/client.go b/client/client.go deleted file mode 100644 index 7baeaae..0000000 --- a/client/client.go +++ /dev/null @@ -1,71 +0,0 @@ -package client - -import ( - "context" - "fmt" - - "github.com/coder/websocket" - "github.com/coder/websocket/wsjson" - json_data "github.com/tecamino/tecamino-json_data" - json_dataModels "github.com/tecamino/tecamino-json_data/models" - "github.com/tecamino/tecamino-logger/logging" -) - -type Client struct { - conn *websocket.Conn - ctx context.Context - cancel context.CancelFunc - log *logging.Logger -} - -// Initialize new websocket server -func NewClient(log *logging.Logger) *Client { - c := Client{ - log: log, - } - c.ctx, c.cancel = context.WithCancel(context.Background()) - return &c -} - -// Connect to websocket server -// ip: ip address of server -func (c *Client) Connect(ip, id string, port uint) (err error) { - c.conn, _, err = websocket.Dial(c.ctx, fmt.Sprintf("ws://%s:%d/ws?id=%s", ip, port, id), nil) - return -} - -// Close connection to websocket server -func (c *Client) Disconnect() { - c.cancel() -} - -// Subscribe to websocket server -func (c *Client) Subscribe(id string) error { - - req := json_data.NewRequest() - req.AddDriverSubscription(".*", id, 0, true, false, false) - if err := wsjson.Write(c.ctx, c.conn, req); err != nil { - return err - } - return nil -} - -func (c *Client) ReadJsonData() (response json_dataModels.Response, err error) { - - err = wsjson.Read(c.ctx, c.conn, &response) - if err != nil { - code := websocket.CloseStatus(err) - - switch code { - case websocket.StatusNormalClosure, - websocket.StatusGoingAway, - websocket.StatusNoStatusRcvd: - c.log.Info("webSocket.readJsonData", fmt.Sprintf("WebSocket closed: %v (code: %v)\n", err, code)) - return - default: - c.log.Error("webSocket.readJsonData", fmt.Sprintf("WebSocket read error: %v (code: %v)\n", err, code)) - return - } - } - return -} diff --git a/driver/artNet.go b/driver/artNet.go index 16b6eb7..4ef3407 100644 --- a/driver/artNet.go +++ b/driver/artNet.go @@ -2,11 +2,13 @@ package driver import ( "artNet/cfg" - "artNet/client" "artNet/models" + ws "artNet/websocket" "fmt" "path" + "time" + json_data "github.com/tecamino/tecamino-json_data" "github.com/tecamino/tecamino-logger/logging" ) @@ -14,7 +16,6 @@ type ArtNetDriver struct { Name string `yaml:"driver" json:"driver"` Buses map[string]*models.Bus `yaml:"buses,omitempty" json:"buses,omitempty"` cfgHandler *cfg.Cfg `yaml:"-" json:"-"` - Conn *client.Client `yaml:"-" json:"-"` Subscriptions models.Subscriptions `yaml:"-" json:"-"` Log *logging.Logger `yaml:"-" json:"-"` } @@ -89,30 +90,67 @@ func (d *ArtNetDriver) SetValue(bus string, address uint, value uint8) error { // id: id of driver // port: port of server func (d *ArtNetDriver) Connect(ip, id string, port uint) error { - d.Conn = client.NewClient(d.Log) - if err := d.Conn.Connect(ip, id, port); err != nil { + var err error + client, err := ws.NewClient(ip, id, port) + if err != nil { return err } - defer d.Conn.Disconnect() + client.OnError = func(err error) { + d.Log.Error("websocket connection", err) + } + client.OnMessage = func(data []byte) { + //fmt.Println(100, string(data)) + fmt.Println(100, string(data)) + } - if err := d.Conn.Subscribe(id); err != nil { - return err + client.Connect(5) + + req := json_data.NewRequest() + req.AddDriverSubscription(".*", id, 0, true, false, false) + + if err := client.SendData(req); err != nil { + d.Log.Error("websocket send data", err) } for { - respond, err := d.Conn.ReadJsonData() - if err != nil { - return err - } - - d.Subscribe(respond.Subscribe...) - - for _, pub := range respond.Publish { - if sub, ok := d.Subscriptions[pub.Uuid]; ok { - if err := d.SetValue(sub.Bus, sub.Address, uint8(pub.Value.(float64))); err != nil { - d.Log.Info("artNet.Connect", err.Error()) - } - } - } + time.Sleep(1) } + return nil + + // d.Conn = websocket.NewClient() + // if err := d.Conn.Connect(ip, id, port); err != nil { + // return err + // } + // defer d.Conn.Disconnect() + + // if err := d.Conn.Subscribe(id); err != nil { + // return err + // } + + // Subscribe to websocket server + // func (c *Client) Subscribe(id string) error { + // req := json_data.NewRequest() + // req.AddDriverSubscription(".*", id, 0, true, false, false) + // if err := wsjson.Write(c.ctx, c.conn, req); err != nil { + // return err + // } + // return nil + // } + + // for { + // respond, err := d.Conn.ReadJsonData() + // if err != nil { + // return err + // } + + // d.Subscribe(respond.Subscribe...) + + // for _, pub := range respond.Publish { + // if sub, ok := d.Subscriptions[pub.Uuid]; ok { + // if err := d.SetValue(sub.Bus, sub.Address, uint8(pub.Value.(float64))); err != nil { + // d.Log.Info("artNet.Connect", err.Error()) + // } + // } + // } + // } } diff --git a/go.mod b/go.mod index 057f637..b5af2c6 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,9 @@ go 1.23.0 toolchain go1.23.8 require ( - github.com/coder/websocket v1.8.13 github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/tatsushid/go-fastping v0.0.0-20160109021039-d7bb493dee3e github.com/tecamino/tecamino-json_data v0.0.13 github.com/tecamino/tecamino-logger v0.2.0 diff --git a/go.sum b/go.sum index 0b31ec8..0d941d0 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/ github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= -github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE= -github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -32,6 +30,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= diff --git a/main.go b/main.go index 2ba7f60..2464e28 100644 --- a/main.go +++ b/main.go @@ -56,7 +56,7 @@ func main() { go func() { if err := s.ServeHttp(*wsPort); err != nil { - artNetDriver.Log.Error("main", err.Error()) + artNetDriver.Log.Error("main", err) } }() @@ -72,7 +72,10 @@ func main() { // connect to server for { - artNetDriver.Log.Error("main", artNetDriver.Connect(*serverIp, DriverName, *serverPort)) + if err := artNetDriver.Connect(*serverIp, DriverName, *serverPort); err != nil { + artNetDriver.Log.Error("main", err) + } + fmt.Println(555) time.Sleep(10 * time.Second) } diff --git a/websocket/client.go b/websocket/client.go new file mode 100644 index 0000000..30eb0c5 --- /dev/null +++ b/websocket/client.go @@ -0,0 +1,165 @@ +package websocket + +import ( + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/gorilla/websocket" + json_dataModels "github.com/tecamino/tecamino-json_data/models" +) + +type Client struct { + conn *websocket.Conn + writeMu sync.Mutex + OnMessage func(data []byte) + OnOpen func() + OnClose func(code int, reason string) + OnError func(err error) + OnPing func() + OnPong func() + timeout time.Duration +} + +// Connect to websocket server +// ip: ip address of server +func NewClient(ip, id string, port uint) (*Client, error) { + url := fmt.Sprintf("ws://%s:%d/ws?id=%s", ip, port, id) + c := &Client{} + + dialer := websocket.DefaultDialer + conn, resp, err := dialer.Dial(url, nil) + if err != nil { + if c.OnError != nil { + c.OnError(err) + } + return nil, fmt.Errorf("dial error %v (status %v)", err, resp) + } + c.conn = conn + + // Setup control handlers + c.conn.SetPingHandler(func(appData string) error { + if c.OnPing != nil { + c.OnPing() + } + return c.conn.WriteMessage(websocket.PongMessage, nil) + }) + c.conn.SetPongHandler(func(appData string) error { + c.conn.SetReadDeadline(time.Now().Add(c.timeout)) + if c.OnPong != nil { + c.OnPong() + } + return nil + }) + c.conn.SetCloseHandler(func(code int, text string) error { + if c.OnClose != nil { + c.OnClose(code, text) + } + return nil + }) + + if c.OnOpen != nil { + c.OnOpen() + } + return c, nil +} + +func (c *Client) Connect(timeout uint) { + if timeout > 0 { + fmt.Println(1234, timeout) + c.timeout = time.Duration(timeout) * time.Second + } + + go c.pingLoop() + + c.conn.SetReadDeadline(time.Now().Add(c.timeout)) + go func() { + + for { + msgType, msg, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Println("WebSocket closed:", err) + } + if c.OnError != nil { + c.OnError(fmt.Errorf("read error: %w", err)) + } + return + } + switch msgType { + case websocket.TextMessage, websocket.BinaryMessage: + if c.OnMessage != nil { + c.OnMessage(msg) + } + default: + log.Printf("Unhandled message type: %d", msgType) + } + } + }() +} + +func (c *Client) pingLoop() { + interval := c.timeout / 2 + if interval <= 0 { + interval = 5 * time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for range ticker.C { + if err := c.Write(websocket.PingMessage, nil); err != nil { + if c.OnError != nil { + c.OnError(fmt.Errorf("ping error: %w", err)) + } + return + } + if c.OnPing != nil { + c.OnPing() + } + } +} + +func (c *Client) Write(msgType int, data []byte) error { + c.writeMu.Lock() + defer c.writeMu.Unlock() + + c.conn.SetWriteDeadline(time.Now().Add(c.timeout)) + + if err := c.conn.WriteMessage(msgType, data); err != nil { + if c.OnError != nil { + c.OnError(err) + } + return err + } + return nil +} + +// Close connection to websocket server +func (c *Client) Close(code int, reason string) { + if c.conn != nil { + if c.OnClose != nil { + c.OnClose(code, reason) + } + c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, reason)) + c.conn.Close() + } +} + +func (c *Client) SendData(data any) error { + c.conn.SetWriteDeadline(time.Now().Add(c.timeout)) + if err := c.conn.WriteJSON(data); err != nil { + if c.OnError != nil { + c.OnError(err) + } + return err + } + return nil +} + +func (c *Client) ReadJsonData(data []byte) (json_dataModels.Response, error) { + var resp json_dataModels.Response + err := json.Unmarshal(data, &resp) + return resp, err +}