first commit
This commit is contained in:
133
pubSub.go
Normal file
133
pubSub.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package pubSub
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"gitea.tecamino.com/paadi/pubSub/models"
|
||||
)
|
||||
|
||||
// Pubsub implements a simple topic-based publish/subscribe system
|
||||
// with worker goroutines delivering messages to subscriber callbacks.
|
||||
type Pubsub struct {
|
||||
mu sync.RWMutex // protects access to subs and closed
|
||||
subs map[string]map[string]func(any) // topic -> subscriberID -> callback
|
||||
closed bool // signals shutdown
|
||||
jobQueue chan models.Data // message delivery queue
|
||||
wg sync.WaitGroup // waits for workers to finish
|
||||
queueSize int // maximum buffered messages
|
||||
Blocking bool // if true, Publish blocks on full queue; if false, drops messages
|
||||
}
|
||||
|
||||
// Ensure Pubsub implements PubSub interface
|
||||
var _ PubSub = (*Pubsub)(nil)
|
||||
|
||||
// NewPubsub creates a new Pubsub with a fixed number of workers and queue size.
|
||||
func NewPubsub(workerCount, queueSize int) *Pubsub {
|
||||
ps := &Pubsub{
|
||||
subs: make(map[string]map[string]func(any)),
|
||||
jobQueue: make(chan models.Data, max(1, queueSize)),
|
||||
queueSize: max(1, queueSize),
|
||||
}
|
||||
|
||||
// start worker pool for message delivery
|
||||
for i := 0; i < max(1, workerCount); i++ {
|
||||
ps.wg.Add(1)
|
||||
go ps.worker()
|
||||
}
|
||||
return ps
|
||||
}
|
||||
|
||||
// worker pulls jobs from the queue and invokes subscriber callbacks safely.
|
||||
func (ps *Pubsub) worker() {
|
||||
defer ps.wg.Done()
|
||||
for job := range ps.jobQueue {
|
||||
ps.mu.RLock()
|
||||
subs := make([]func(any), 0, len(ps.subs[job.Topic]))
|
||||
for _, cb := range ps.subs[job.Topic] {
|
||||
subs = append(subs, cb)
|
||||
}
|
||||
ps.mu.RUnlock()
|
||||
|
||||
for _, cb := range subs {
|
||||
func(c func(any), d any) {
|
||||
defer func() { recover() }()
|
||||
c(d)
|
||||
}(cb, job.Data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe registers a callback for a given topic under a subscriber ID.
|
||||
func (ps *Pubsub) Subscribe(id, topic string, cb func(any)) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
|
||||
if _, ok := ps.subs[topic]; !ok {
|
||||
ps.subs[topic] = make(map[string]func(any))
|
||||
}
|
||||
ps.subs[topic][id] = cb
|
||||
}
|
||||
|
||||
// Unsubscribe removes a single subscriber ID from a given topic.
|
||||
func (ps *Pubsub) Unsubscribe(id, topic string) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
|
||||
if _, ok := ps.subs[topic]; ok {
|
||||
delete(ps.subs[topic], id)
|
||||
if len(ps.subs[topic]) == 0 {
|
||||
delete(ps.subs, topic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// UnsubscribeAll removes a subscriber ID from all topics it is registered to.
|
||||
func (ps *Pubsub) UnsubscribeAll(id string) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
|
||||
for topic := range ps.subs {
|
||||
delete(ps.subs[topic], id)
|
||||
if len(ps.subs[topic]) == 0 {
|
||||
delete(ps.subs, topic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Publish enqueues a message for a topic.
|
||||
func (ps *Pubsub) Publish(topic string, data any) {
|
||||
ps.mu.RLock()
|
||||
if ps.closed {
|
||||
ps.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
ps.mu.RUnlock()
|
||||
|
||||
if ps.Blocking {
|
||||
ps.jobQueue <- models.Data{Topic: topic, Data: data}
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case ps.jobQueue <- models.Data{Topic: topic, Data: data}:
|
||||
default:
|
||||
log.Println("queue full with:", ps.queueSize)
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts down the Pubsub: stops accepting new messages, closes the queue,
|
||||
// clears subscriptions, and waits for all workers to finish.
|
||||
func (ps *Pubsub) Close() {
|
||||
ps.mu.Lock()
|
||||
if ps.closed {
|
||||
ps.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ps.closed = true
|
||||
close(ps.jobQueue)
|
||||
ps.subs = nil
|
||||
ps.mu.Unlock()
|
||||
|
||||
ps.wg.Wait()
|
||||
}
|
Reference in New Issue
Block a user