Files
tecamino-driver-artNet/models/bus.go

219 lines
4.4 KiB
Go

package models
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/tatsushid/go-fastping"
json_data "github.com/tecamino/tecamino-json_data"
json_dataModels "github.com/tecamino/tecamino-json_data/models"
"github.com/tecamino/tecamino-logger/logging"
)
// Art-Net constants
const (
artPort = 6454
)
// Art-Net Interface
type Bus struct {
Name string `yaml:"name" json:"name"`
Ip string `yaml:"ip" json:"ip"`
Port *int `yaml:"port" json:"port,omitempty"`
Data *DMX `yaml:"-" json:"-"`
Resubscribe *[]json_dataModels.Subscription `yaml:"-" json:"resubscribe"`
Watchdog context.CancelFunc `yaml:"-" json:"-"`
Reachable bool `yaml:"-" json:"-"`
Send chan *DMX `yaml:"-" json:"-"`
}
// adds new Art-Net interface to driver port 0 = 6454 (default art-net)
func NewBus(name, ip string, port int) *Bus {
if port == 0 {
port = artPort
}
i := Bus{
Name: name,
Ip: ip,
Port: &port,
Data: NewDMXUniverse(),
}
return &i
}
// get bus port from pointer
func (b *Bus) GetPort() int {
if b.Port == nil {
return artPort
}
return *b.Port
}
// start polling dmx data in milliseconds 0 = aprox. 44Hertz
func (b *Bus) Poll(interval time.Duration) error {
if interval == 0 {
interval = 23
}
// Send packet over UDP
conn, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.ParseIP(b.Ip),
Port: *b.Port,
})
if err != nil {
return err
}
defer conn.Close()
var errCount int
for {
go func() {
for {
if reached, _ := isUDPReachable(b.Ip); !reached {
if errCount > 20 {
break
} else {
errCount += 1
return
}
} else {
errCount = 0
break
}
}
}()
_, err = conn.Write(NewArtNetPackage(b.Data))
if err != nil {
return err
}
if errCount > 5 {
return fmt.Errorf("device not reachable")
}
time.Sleep(23 * time.Millisecond)
}
}
// start bus
func (b *Bus) Start(log *logging.Logger) error {
var ctx context.Context
ctx, b.Watchdog = context.WithCancel(context.Background())
b.Send = make(chan *DMX, 1024)
go func() {
var interval time.Duration = 10 * time.Second
log.Info("bus.Start", fmt.Sprintf("device:%s ip:%s watchdog starting", b.Name, b.Ip))
for {
select {
case <-ctx.Done():
log.Info("bus.Start", fmt.Sprintf("device:%s ip:%s watchdog stopped", b.Name, b.Ip))
b.Reachable = false
return
default:
b.Reachable = false
if reached, err := isUDPReachable(b.Ip); err != nil {
log.Error("bus.Start", err)
interval = 5 * time.Second
} else if !reached {
log.Error("bus.Start", fmt.Sprintf("device:%s ip:%s not reached", b.Name, b.Ip))
interval = 5 * time.Second
} else {
b.Reachable = true
// send data as a heartbeat for she ArtNet Protocol
b.Send <- b.Data
log.Info("bus.Start", fmt.Sprintf("device:%s ip:%s watchdog running", b.Name, b.Ip))
interval = 30 * time.Second
}
time.Sleep(interval)
}
}
}()
conn, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.ParseIP(b.Ip),
Port: *b.Port,
})
if err != nil {
return err
}
go func() {
defer conn.Close()
//close send channel
defer close(b.Send)
for send := range b.Send {
_, err = conn.Write(NewArtNetPackage(send))
if err != nil {
log.Error("bus.Start", err)
return
}
time.Sleep(23 * time.Millisecond)
}
}()
return nil
}
// stop bus
func (b *Bus) Stop() {
if b.Watchdog != nil {
//cancels context
b.Watchdog()
//close send channel
close(b.Send)
}
}
// status bus
func (b *Bus) Status() bool {
return b.Watchdog != nil
}
func (b *Bus) ParsePayload(c *gin.Context) error {
if err := c.BindJSON(b); err != nil {
r := json_data.NewResponse()
r.SetError()
r.SetMessage("json: " + err.Error())
c.JSON(http.StatusBadRequest, r)
return err
}
if b.Name == "" {
r := json_data.NewResponse()
r.SetError()
r.SetMessage("bus name missing")
c.JSON(http.StatusBadRequest, r)
return errors.New("bus name missing")
}
return nil
}
func isUDPReachable(ip string) (recieved bool, err error) {
p := fastping.NewPinger()
ra, err := net.ResolveIPAddr("ip4:icmp", ip)
if err != nil {
return
}
p.AddIPAddr(ra)
p.OnRecv = func(addr *net.IPAddr, rtt time.Duration) {
recieved = true
}
p.OnIdle = func() {}
err = p.Run()
return
}