7 Commits

Author SHA1 Message Date
Adrian Zuercher
da78a00446 add heartbeat send data every 30 Seconds 2025-07-16 21:53:47 +02:00
Adrian Zuercher
ba3c55dc34 add mac DS_Store 2025-07-16 21:52:31 +02:00
Adrian Zuercher
76a036707f fix faulty data send to artNet with new chan queue 2025-07-13 19:56:00 +02:00
Adrian Zuercher
258323f5b7 add new executable name 2025-07-13 19:54:27 +02:00
Adrian Zuercher
473eb22b97 fix missing subscribe and publish on artNet Protocol 2025-06-29 15:36:14 +02:00
Adrian Zürcher
64ad4e8b3e improve websocket connection according to gorrila example 2025-06-22 09:03:18 +02:00
Adrian Zürcher
b29e7a97b8 lift version of json_data 2025-06-19 19:19:16 +02:00
11 changed files with 246 additions and 212 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
.DS_Store
*.cfg *.cfg
*.log *.log
artNetDriver-arm64 artNetDriver-arm64
tecamino-driver-artNet-linux-arm64

View File

@@ -4,9 +4,9 @@ import (
"artNet/cfg" "artNet/cfg"
"artNet/models" "artNet/models"
ws "artNet/websocket" ws "artNet/websocket"
"encoding/json"
"fmt" "fmt"
"path" "path"
"time"
json_data "github.com/tecamino/tecamino-json_data" json_data "github.com/tecamino/tecamino-json_data"
"github.com/tecamino/tecamino-logger/logging" "github.com/tecamino/tecamino-logger/logging"
@@ -82,7 +82,7 @@ func (d *ArtNetDriver) SetValue(bus string, address uint, value uint8) error {
return fmt.Errorf("no dmx data on bus '%s' found", bus) return fmt.Errorf("no dmx data on bus '%s' found", bus)
} }
d.Buses[bus].Data.SetValue(address, value) d.Buses[bus].Data.SetValue(address, value)
return d.Buses[bus].SendData() return nil
} }
// connect to websocket server and listen to subscriptions // connect to websocket server and listen to subscriptions
@@ -90,67 +90,54 @@ func (d *ArtNetDriver) SetValue(bus string, address uint, value uint8) error {
// id: id of driver // id: id of driver
// port: port of server // port: port of server
func (d *ArtNetDriver) Connect(ip, id string, port uint) error { func (d *ArtNetDriver) Connect(ip, id string, port uint) error {
var err error errChan := make(chan error)
client, err := ws.NewClient(ip, id, port) client, err := ws.NewClient(ip, id, port)
if err != nil { if err != nil {
return err return err
} }
client.OnError = func(err error) { client.OnError = func(err error) {
d.Log.Error("websocket connection", err) d.Log.Error("websocket connection", err)
} errChan <- err
client.OnMessage = func(data []byte) {
//fmt.Println(100, string(data))
fmt.Println(100, string(data))
} }
client.Connect(5) client.OnMessage = func(data []byte) {
request := json_data.NewResponse()
err = json.Unmarshal(data, &request)
if err != nil {
return
}
if request.Subscribe != nil {
d.Subscribe(request.Subscribe...)
}
if request.Publish != nil {
d.Publish(request.Publish...)
}
}
if err != nil {
d.Log.Error("artNet.Connect", err)
return err
}
req := json_data.NewRequest() req := json_data.NewRequest()
req.AddDriverSubscription(".*", id, 0, true, false, false) req.AddDriverSubscription(".*", id, 0, true, false, false)
if err := client.SendData(req); err != nil { if err := client.SendRequest(req); err != nil {
errChan <- err
d.Log.Error("websocket send data", err) d.Log.Error("websocket send data", err)
} }
for { for err := range errChan {
time.Sleep(1) return err
} }
return nil return nil
}
// d.Conn = websocket.NewClient()
// if err := d.Conn.Connect(ip, id, port); err != nil { // send data to all buses that the send flage is true
// return err func (d *ArtNetDriver) SendData() {
// } for _, bus := range d.Buses {
// defer d.Conn.Disconnect() bus.Send <- bus.Data
}
// if err := d.Conn.Subscribe(id); err != nil {
// return err
// }
// Subscribe to websocket server
// func (c *Client) Subscribe(id string) error {
// req := json_data.NewRequest()
// req.AddDriverSubscription(".*", id, 0, true, false, false)
// if err := wsjson.Write(c.ctx, c.conn, req); err != nil {
// return err
// }
// return nil
// }
// for {
// respond, err := d.Conn.ReadJsonData()
// if err != nil {
// return err
// }
// d.Subscribe(respond.Subscribe...)
// for _, pub := range respond.Publish {
// if sub, ok := d.Subscriptions[pub.Uuid]; ok {
// if err := d.SetValue(sub.Bus, sub.Address, uint8(pub.Value.(float64))); err != nil {
// d.Log.Info("artNet.Connect", err.Error())
// }
// }
// }
// }
} }

View File

@@ -105,7 +105,7 @@ func (d *ArtNetDriver) Start(c *gin.Context) {
} }
busPayload := models.Bus{} busPayload := models.Bus{}
if busPayload.ParsePayload(c); err != nil { if err := busPayload.ParsePayload(c); err != nil {
return return
} }

19
driver/publish.go Normal file
View File

@@ -0,0 +1,19 @@
package driver
import (
json_dataModels "github.com/tecamino/tecamino-json_data/models"
)
func (d *ArtNetDriver) Publish(pubs ...json_dataModels.Publish) error {
if d.Subscriptions == nil {
return nil
}
for _, pub := range pubs {
if drv, ok := (d.Subscriptions)[pub.Uuid]; ok {
d.SetValue(drv.Bus, drv.Address, uint8(pub.Value.(float64)))
}
}
d.SendData()
return nil
}

View File

@@ -17,4 +17,5 @@ func (d *ArtNetDriver) Subscribe(subs ...json_dataModels.Subscription) {
d.SetValue(drv.Bus, drv.Address, uint8(sub.Value.(float64))) d.SetValue(drv.Bus, drv.Address, uint8(sub.Value.(float64)))
} }
} }
d.SendData()
} }

2
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3
github.com/tatsushid/go-fastping v0.0.0-20160109021039-d7bb493dee3e github.com/tatsushid/go-fastping v0.0.0-20160109021039-d7bb493dee3e
github.com/tecamino/tecamino-json_data v0.0.13 github.com/tecamino/tecamino-json_data v0.0.16
github.com/tecamino/tecamino-logger v0.2.0 github.com/tecamino/tecamino-logger v0.2.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )

4
go.sum
View File

@@ -65,8 +65,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tatsushid/go-fastping v0.0.0-20160109021039-d7bb493dee3e h1:nt2877sKfojlHCTOBXbpWjBkuWKritFaGIfgQwbQUls= github.com/tatsushid/go-fastping v0.0.0-20160109021039-d7bb493dee3e h1:nt2877sKfojlHCTOBXbpWjBkuWKritFaGIfgQwbQUls=
github.com/tatsushid/go-fastping v0.0.0-20160109021039-d7bb493dee3e/go.mod h1:B4+Kq1u5FlULTjFSM707Q6e/cOHFv0z/6QRoxubDIQ8= github.com/tatsushid/go-fastping v0.0.0-20160109021039-d7bb493dee3e/go.mod h1:B4+Kq1u5FlULTjFSM707Q6e/cOHFv0z/6QRoxubDIQ8=
github.com/tecamino/tecamino-json_data v0.0.13 h1:hugbmCgXXh0F7YQAEbbJYHkSdq1caejD7SajDiMs42I= github.com/tecamino/tecamino-json_data v0.0.16 h1:aZFxnhm4g6WMDPoqy4HosUk7vl0DB0iIcVs8bbT4MzU=
github.com/tecamino/tecamino-json_data v0.0.13/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY= github.com/tecamino/tecamino-json_data v0.0.16/go.mod h1:LLlyD7Wwqplb2BP4PeO86EokEcTRidlW5MwgPd1T2JY=
github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU= github.com/tecamino/tecamino-logger v0.2.0 h1:NPH/Gg9qRhmVoW8b39i1eXu/LEftHc74nyISpcRG+XU=
github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8= github.com/tecamino/tecamino-logger v0.2.0/go.mod h1:0M1E9Uei/qw3e3WA1x3lBo1eP3H5oeYE7GjYrMahnj8=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=

View File

@@ -75,7 +75,7 @@ func main() {
if err := artNetDriver.Connect(*serverIp, DriverName, *serverPort); err != nil { if err := artNetDriver.Connect(*serverIp, DriverName, *serverPort); err != nil {
artNetDriver.Log.Error("main", err) artNetDriver.Log.Error("main", err)
} }
fmt.Println(555) artNetDriver.Log.Info("main", "next reconnecting attempt in 10 seconds")
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
} }

View File

@@ -29,6 +29,7 @@ type Bus struct {
Resubscribe *[]json_dataModels.Subscription `yaml:"-" json:"resubscribe"` Resubscribe *[]json_dataModels.Subscription `yaml:"-" json:"resubscribe"`
Watchdog context.CancelFunc `yaml:"-" json:"-"` Watchdog context.CancelFunc `yaml:"-" json:"-"`
Reachable bool `yaml:"-" json:"-"` Reachable bool `yaml:"-" json:"-"`
Send chan *DMX `yaml:"-" json:"-"`
} }
// adds new Art-Net interface to driver port 0 = 6454 (default art-net) // adds new Art-Net interface to driver port 0 = 6454 (default art-net)
@@ -102,7 +103,7 @@ func (b *Bus) Poll(interval time.Duration) error {
} }
// start bus // start bus
func (b *Bus) Start(log *logging.Logger) { func (b *Bus) Start(log *logging.Logger) error {
var ctx context.Context var ctx context.Context
ctx, b.Watchdog = context.WithCancel(context.Background()) ctx, b.Watchdog = context.WithCancel(context.Background())
@@ -125,6 +126,8 @@ func (b *Bus) Start(log *logging.Logger) {
interval = 5 * time.Second interval = 5 * time.Second
} else { } else {
b.Reachable = true b.Reachable = true
// send data as a heartbeat for the ArtNet Protocol
b.Send <- b.Data
log.Info("bus.Start", fmt.Sprintf("device:%s ip:%s watchdog running", b.Name, b.Ip)) log.Info("bus.Start", fmt.Sprintf("device:%s ip:%s watchdog running", b.Name, b.Ip))
interval = 30 * time.Second interval = 30 * time.Second
} }
@@ -132,26 +135,7 @@ func (b *Bus) Start(log *logging.Logger) {
} }
} }
}() }()
}
// stop bus
func (b *Bus) Stop() {
if b.Watchdog != nil {
b.Watchdog()
}
}
// status bus
func (b *Bus) Status() bool {
return b.Watchdog != nil
}
// send dmx data
func (b *Bus) SendData() error {
if !b.Reachable {
return nil
}
// Send packet over UDP
conn, err := net.DialUDP("udp", nil, &net.UDPAddr{ conn, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.ParseIP(b.Ip), IP: net.ParseIP(b.Ip),
Port: *b.Port, Port: *b.Port,
@@ -160,11 +144,38 @@ func (b *Bus) SendData() error {
if err != nil { if err != nil {
return err return err
} }
b.Send = make(chan *DMX, 1024)
go func() {
defer conn.Close() defer conn.Close()
//close send channel
close(b.Send)
for send := range b.Send {
_, err = conn.Write(NewArtNetPackage(send))
if err != nil {
log.Error("bus.Start", err)
return
}
time.Sleep(23 * time.Millisecond)
}
}()
return nil
}
_, err = conn.Write(NewArtNetPackage(b.Data)) // stop bus
func (b *Bus) Stop() {
if b.Watchdog != nil {
//cancels context
b.Watchdog()
//close send channel
close(b.Send)
}
}
return err // status bus
func (b *Bus) Status() bool {
return b.Watchdog != nil
} }
func (b *Bus) ParsePayload(c *gin.Context) error { func (b *Bus) ParsePayload(c *gin.Context) error {

View File

@@ -1,44 +0,0 @@
package models
// type Clients map[string]Client
// type Client struct {
// Connected *bool `json:"connected"`
// SndConn *websocket.Conn `json:"-"` //sending connection
// RvcConn *websocket.Conn `json:"-"` // revieving connection
// }
// func NewClients() Clients {
// return make(Clients)
// }
// // Connect a recieving websocket connection
// func (c *Clients) ConnectRecievingWsConnection(id string, ctx *gin.Context) (*websocket.Conn, error) {
// conn, err := websocket.Accept(ctx.Writer, ctx.Request, &websocket.AcceptOptions{
// OriginPatterns: []string{"*"},
// })
// if err != nil {
// return nil, fmt.Errorf("error accept websocket client: %s", err)
// }
// b := true
// (*c)[id] = Client{
// Connected: &b,
// RvcConn: conn,
// }
// return conn, nil
// }
// func (c *Clients) RemoveClient(id string) {
// delete(*c, id)
// }
// func (c *Clients) GetClientPointer(id string) *bool {
// return (*c)[id].Connected
// }
// func (c *Clients) DisconnectRecievingWsConnection(id string, code websocket.StatusCode, reason string) {
// *(*c)[id].Connected = false
// (*c)[id].RvcConn.Close(code, reason)
// }

View File

@@ -4,30 +4,51 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"sync"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
json_dataModels "github.com/tecamino/tecamino-json_data/models" json_dataModels "github.com/tecamino/tecamino-json_data/models"
) )
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 10 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
type Client struct { type Client struct {
ip string
port uint
Connected bool
conn *websocket.Conn conn *websocket.Conn
writeMu sync.Mutex
OnMessage func(data []byte)
OnOpen func() OnOpen func()
OnMessage func(data []byte)
OnClose func(code int, reason string) OnClose func(code int, reason string)
OnError func(err error) OnError func(err error)
OnPing func() OnPing func()
OnPong func() OnPong func()
timeout time.Duration sendPong chan string
send chan []byte
unregister chan []byte
} }
// Connect to websocket server // Connect to websocket server
// ip: ip address of server // ip: ip address of server
func NewClient(ip, id string, port uint) (*Client, error) { func NewClient(ip, id string, port uint) (*Client, error) {
url := fmt.Sprintf("ws://%s:%d/ws?id=%s", ip, port, id) url := fmt.Sprintf("ws://%s:%d/ws?id=%s", ip, port, id)
c := &Client{} c := &Client{
ip: ip,
port: port,
Connected: true,
sendPong: make(chan string),
send: make(chan []byte),
unregister: make(chan []byte),
}
dialer := websocket.DefaultDialer dialer := websocket.DefaultDialer
conn, resp, err := dialer.Dial(url, nil) conn, resp, err := dialer.Dial(url, nil)
@@ -40,20 +61,25 @@ func NewClient(ip, id string, port uint) (*Client, error) {
c.conn = conn c.conn = conn
// Setup control handlers // Setup control handlers
c.conn.SetPingHandler(func(appData string) error { conn.SetPingHandler(func(appData string) error {
if c.OnPing != nil { if c.OnPing != nil {
c.OnPing() c.OnPing()
} }
return c.conn.WriteMessage(websocket.PongMessage, nil) conn.SetWriteDeadline(time.Now().Add(writeWait))
conn.SetReadDeadline(time.Now().Add(writeWait))
c.sendPong <- appData
return nil
}) })
c.conn.SetPongHandler(func(appData string) error {
c.conn.SetReadDeadline(time.Now().Add(c.timeout)) conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
if c.OnPong != nil { if c.OnPong != nil {
c.OnPong() c.OnPong()
} }
return nil return nil
}) })
c.conn.SetCloseHandler(func(code int, text string) error {
conn.SetCloseHandler(func(code int, text string) error {
if c.OnClose != nil { if c.OnClose != nil {
c.OnClose(code, text) c.OnClose(code, text)
} }
@@ -63,98 +89,103 @@ func NewClient(ip, id string, port uint) (*Client, error) {
if c.OnOpen != nil { if c.OnOpen != nil {
c.OnOpen() c.OnOpen()
} }
// Start reading messages from client
go c.Read()
go c.Write()
return c, nil return c, nil
} }
func (c *Client) Connect(timeout uint) { func (c *Client) Read() {
if timeout > 0 { if c.OnOpen != nil {
fmt.Println(1234, timeout) c.OnOpen()
c.timeout = time.Duration(timeout) * time.Second
} }
go c.pingLoop() c.conn.SetReadDeadline(time.Now().Add(writeWait))
for c.Connected {
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
go func() {
for {
msgType, msg, err := c.conn.ReadMessage() msgType, msg, err := c.conn.ReadMessage()
if err != nil { if err != nil {
if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { c.handleError(fmt.Errorf("read error (id:%s:%d): %w", c.ip, c.port, err))
log.Println("WebSocket closed:", err)
}
if c.OnError != nil {
c.OnError(fmt.Errorf("read error: %w", err))
}
return return
} }
switch msgType { switch msgType {
case websocket.TextMessage, websocket.BinaryMessage: case websocket.CloseMessage:
c.Close(websocket.CloseNormalClosure, "Client closed")
return
case websocket.TextMessage:
if c.OnMessage != nil { if c.OnMessage != nil {
c.OnMessage(msg) c.OnMessage(msg)
} else {
log.Printf("Received message but no handler set (id:%s:%d): %s", c.ip, c.port, string(msg))
} }
default: default:
log.Printf("Unhandled message type: %d", msgType) log.Printf("Unhandled message type %d (id:%s:%d)", msgType, c.ip, c.port)
} }
} }
}
func (c *Client) Write() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}() }()
} for {
func (c *Client) pingLoop() { select {
interval := c.timeout / 2 case message, ok := <-c.send:
if interval <= 0 { c.conn.SetWriteDeadline(time.Now().Add(writeWait))
interval = 5 * time.Second if !ok {
} // The hub closed the channel.
ticker := time.NewTicker(interval) if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
defer ticker.Stop() c.handleError(err)
for range ticker.C {
if err := c.Write(websocket.PingMessage, nil); err != nil {
if c.OnError != nil {
c.OnError(fmt.Errorf("ping error: %w", err))
}
return return
} }
c.handleError(fmt.Errorf("server %s:%d closed channel", c.ip, c.port))
return
} else {
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
c.handleError(err)
return
}
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.handleError(err)
return
}
if c.OnPing != nil { if c.OnPing != nil {
c.OnPing() c.OnPing()
} }
case message, ok := <-c.sendPong:
if ok {
c.conn.WriteMessage(websocket.PongMessage, []byte(message))
}
case message := <-c.unregister:
c.conn.WriteMessage(websocket.CloseMessage, message)
c.Connected = false
close(c.send)
close(c.sendPong)
close(c.unregister)
return
}
} }
} }
func (c *Client) Write(msgType int, data []byte) error { func (c *Client) SendRequest(req *json_dataModels.Request) error {
c.writeMu.Lock() if !c.Connected {
defer c.writeMu.Unlock()
c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
if err := c.conn.WriteMessage(msgType, data); err != nil {
if c.OnError != nil {
c.OnError(err)
}
return err
}
return nil return nil
} }
// Close connection to websocket server data, err := json.Marshal(*req)
func (c *Client) Close(code int, reason string) { if err != nil {
if c.conn != nil {
if c.OnClose != nil {
c.OnClose(code, reason)
}
c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(code, reason))
c.conn.Close()
}
}
func (c *Client) SendData(data any) error {
c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
if err := c.conn.WriteJSON(data); err != nil {
if c.OnError != nil {
c.OnError(err)
}
return err return err
} }
c.send <- data
return nil return nil
} }
@@ -163,3 +194,30 @@ func (c *Client) ReadJsonData(data []byte) (json_dataModels.Response, error) {
err := json.Unmarshal(data, &resp) err := json.Unmarshal(data, &resp)
return resp, err return resp, err
} }
// Close connection to websocket server
func (c *Client) Close(code int, reason string) error {
closeMsg := websocket.FormatCloseMessage(code, reason)
select {
case c.unregister <- closeMsg: // Attempt to send
default: // If the channel is full, this runs
return fmt.Errorf("attempt close client socket failed")
}
if c.OnClose != nil {
c.OnClose(code, reason)
}
return nil
}
func (c *Client) handleError(err error) {
if c.OnError != nil {
c.OnError(err)
}
if err := c.Close(websocket.CloseInternalServerErr, err.Error()); err != nil {
if c.OnError != nil {
c.OnError(err)
} else {
fmt.Println("error: ", err)
}
}
}