From e99fd0710b851e00b9c55d2cf8e39c5b0ecdbb9b Mon Sep 17 00:00:00 2001 From: Adrian Zuercher Date: Sun, 24 Aug 2025 07:52:26 +0200 Subject: [PATCH] new subsrcribe topic with wildcard * --- pubSub.go | 11 ++++++++--- utils/utils.go | 14 ++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 utils/utils.go diff --git a/pubSub.go b/pubSub.go index 1211ddd..d9b8532 100644 --- a/pubSub.go +++ b/pubSub.go @@ -5,6 +5,7 @@ import ( "sync" "gitea.tecamino.com/paadi/pubSub/models" + "gitea.tecamino.com/paadi/pubSub/utils" ) // Pubsub implements a simple topic-based publish/subscribe system @@ -43,9 +44,13 @@ func (ps *Pubsub) worker() { defer ps.wg.Done() for job := range ps.jobQueue { ps.mu.RLock() - subs := make([]func(any), 0, len(ps.subs[job.Topic])) - for _, cb := range ps.subs[job.Topic] { - subs = append(subs, cb) + var subs []func(any) + for pattern, callbacks := range ps.subs { + if utils.Matches(pattern, job.Topic) { + for _, cb := range callbacks { + subs = append(subs, cb) + } + } } ps.mu.RUnlock() diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 0000000..25699e5 --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,14 @@ +package utils + +import "strings" + +func Matches(pattern, topic string) bool { + if pattern == "*" { + return true + } + if strings.HasSuffix(pattern, "/*") { + prefix := strings.TrimSuffix(pattern, "/*") + return strings.HasPrefix(topic, prefix+"/") + } + return pattern == topic +}