Files
pubSub/pubSub.go
2025-08-22 14:32:20 +02:00

134 lines
3.2 KiB
Go

package pubSub
import (
"log"
"sync"
"gitea.tecamino.com/paadi/pubSub/models"
)
// Pubsub implements a simple topic-based publish/subscribe system
// with worker goroutines delivering messages to subscriber callbacks.
type Pubsub struct {
mu sync.RWMutex // protects access to subs and closed
subs map[string]map[string]func(any) // topic -> subscriberID -> callback
closed bool // signals shutdown
jobQueue chan models.Data // message delivery queue
wg sync.WaitGroup // waits for workers to finish
queueSize int // maximum buffered messages
Blocking bool // if true, Publish blocks on full queue; if false, drops messages
}
// Ensure Pubsub implements PubSub interface
var _ PubSub = (*Pubsub)(nil)
// NewPubsub creates a new Pubsub with a fixed number of workers and queue size.
func NewPubsub(workerCount, queueSize int) *Pubsub {
ps := &Pubsub{
subs: make(map[string]map[string]func(any)),
jobQueue: make(chan models.Data, max(1, queueSize)),
queueSize: max(1, queueSize),
}
// start worker pool for message delivery
for i := 0; i < max(1, workerCount); i++ {
ps.wg.Add(1)
go ps.worker()
}
return ps
}
// worker pulls jobs from the queue and invokes subscriber callbacks safely.
func (ps *Pubsub) worker() {
defer ps.wg.Done()
for job := range ps.jobQueue {
ps.mu.RLock()
subs := make([]func(any), 0, len(ps.subs[job.Topic]))
for _, cb := range ps.subs[job.Topic] {
subs = append(subs, cb)
}
ps.mu.RUnlock()
for _, cb := range subs {
func(c func(any), d any) {
defer func() { recover() }()
c(d)
}(cb, job)
}
}
}
// Subscribe registers a callback for a given topic under a subscriber ID.
func (ps *Pubsub) Subscribe(id, topic string, cb func(any)) {
ps.mu.Lock()
defer ps.mu.Unlock()
if _, ok := ps.subs[topic]; !ok {
ps.subs[topic] = make(map[string]func(any))
}
ps.subs[topic][id] = cb
}
// Unsubscribe removes a single subscriber ID from a given topic.
func (ps *Pubsub) Unsubscribe(id, topic string) {
ps.mu.Lock()
defer ps.mu.Unlock()
if _, ok := ps.subs[topic]; ok {
delete(ps.subs[topic], id)
if len(ps.subs[topic]) == 0 {
delete(ps.subs, topic)
}
}
}
// UnsubscribeAll removes a subscriber ID from all topics it is registered to.
func (ps *Pubsub) UnsubscribeAll(id string) {
ps.mu.Lock()
defer ps.mu.Unlock()
for topic := range ps.subs {
delete(ps.subs[topic], id)
if len(ps.subs[topic]) == 0 {
delete(ps.subs, topic)
}
}
}
// Publish enqueues a message for a topic.
func (ps *Pubsub) Publish(topic string, data any) {
ps.mu.RLock()
if ps.closed {
ps.mu.RUnlock()
return
}
ps.mu.RUnlock()
if ps.Blocking {
ps.jobQueue <- models.Data{Topic: topic, Data: data}
return
}
select {
case ps.jobQueue <- models.Data{Topic: topic, Data: data}:
default:
log.Println("queue full with:", ps.queueSize)
}
}
// Close shuts down the Pubsub: stops accepting new messages, closes the queue,
// clears subscriptions, and waits for all workers to finish.
func (ps *Pubsub) Close() {
ps.mu.Lock()
if ps.closed {
ps.mu.Unlock()
return
}
ps.closed = true
close(ps.jobQueue)
ps.subs = nil
ps.mu.Unlock()
ps.wg.Wait()
}