5 Commits

Author SHA1 Message Date
Adrian Zuercher
e99fd0710b new subsrcribe topic with wildcard * 2025-08-24 07:52:26 +02:00
Adrian Zuercher
fee2381a45 Merge branch 'main' of https://gitea.tecamino.com/paadi/pubSub 2025-08-22 14:35:04 +02:00
Adrian Zuercher
a92a4d9fde . 2025-08-22 14:35:01 +02:00
3a243546e9 pubSub.go aktualisiert 2025-08-22 14:32:20 +02:00
d6d5466597 models/data.go aktualisiert 2025-08-22 14:31:53 +02:00
3 changed files with 31 additions and 5 deletions

View File

@@ -1,7 +1,14 @@
package models package models
import "encoding/json"
type Data struct { type Data struct {
Action string `json:"action"` Action string `json:"action,omitempty"`
Topic string `json:"topic"` Topic string `json:"topic"`
Data any `json:"data"` Data any `json:"data"`
} }
func (*Data) ReadData(data []byte) (request Data, err error) {
err = json.Unmarshal(data, &request)
return
}

View File

@@ -5,6 +5,7 @@ import (
"sync" "sync"
"gitea.tecamino.com/paadi/pubSub/models" "gitea.tecamino.com/paadi/pubSub/models"
"gitea.tecamino.com/paadi/pubSub/utils"
) )
// Pubsub implements a simple topic-based publish/subscribe system // Pubsub implements a simple topic-based publish/subscribe system
@@ -43,9 +44,13 @@ func (ps *Pubsub) worker() {
defer ps.wg.Done() defer ps.wg.Done()
for job := range ps.jobQueue { for job := range ps.jobQueue {
ps.mu.RLock() ps.mu.RLock()
subs := make([]func(any), 0, len(ps.subs[job.Topic])) var subs []func(any)
for _, cb := range ps.subs[job.Topic] { for pattern, callbacks := range ps.subs {
subs = append(subs, cb) if utils.Matches(pattern, job.Topic) {
for _, cb := range callbacks {
subs = append(subs, cb)
}
}
} }
ps.mu.RUnlock() ps.mu.RUnlock()
@@ -53,7 +58,7 @@ func (ps *Pubsub) worker() {
func(c func(any), d any) { func(c func(any), d any) {
defer func() { recover() }() defer func() { recover() }()
c(d) c(d)
}(cb, job.Data) }(cb, job)
} }
} }
} }

14
utils/utils.go Normal file
View File

@@ -0,0 +1,14 @@
package utils
import "strings"
func Matches(pattern, topic string) bool {
if pattern == "*" {
return true
}
if strings.HasSuffix(pattern, "/*") {
prefix := strings.TrimSuffix(pattern, "/*")
return strings.HasPrefix(topic, prefix+"/")
}
return pattern == topic
}