commit 2446bb11a074cd9679d4c911517fed6b5f60a661 Author: Adrian Zuercher Date: Thu Aug 21 08:07:00 2025 +0200 first commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..f960dc8 --- /dev/null +++ b/README.md @@ -0,0 +1,156 @@ +# PubSub + +A lightweight, concurrent **Publish/Subscribe** system for Go. +It provides topic-based message distribution with worker goroutines, safe subscriber management, optional blocking or dropping of messages when queues are full, and supports programming against an interface for flexibility. + +## ✨ Features + +* Topic-based subscriptions (`topic -> subscriber ID -> callback`) +* Multiple workers for concurrent delivery +* Safe subscription and unsubscription (per topic or all topics) +* Panic-safe callback execution (one bad subscriber won’t crash workers) +* Configurable message queue size +* Blocking or non-blocking publish behavior +* Graceful shutdown with `Close()` +* Supports the `PubSub` interface for flexible usage and testing + +## πŸ“¦ Installation + +```bash +go get gitlab.com/your-repo/pubSub +``` + +Then import in your Go project: + +```go +import "gitlab.com/your-repo/pubSub" +``` + +## πŸš€ Usage + +### Example + +```go +package main + +import ( + "fmt" + "time" + "gitlab.com/your-repo/pubSub" +) + +func main() { + ps := pubSub.NewPubsub(2, 100) // 2 workers, queue size 100 + + ps.Subscribe("sub1", "greetings", func(data any) { + fmt.Println("Subscriber 1 received:", data) + }) + + ps.Subscribe("sub2", "greetings", func(data any) { + fmt.Println("Subscriber 2 received:", data) + }) + + ps.Publish("greetings", "Hello, world!") + ps.Publish("greetings", "PubSub in Go is working πŸš€") + + time.Sleep(1 * time.Second) + + ps.Unsubscribe("sub1", "greetings") + ps.Publish("greetings", "Goodbye from PubSub!") + + time.Sleep(500 * time.Millisecond) + ps.Close() +} +``` + +## πŸ“– API Reference + +### `NewPubsub(workerCount, queueSize int) *Pubsub` + +Creates a new Pubsub instance with: + +* `workerCount`: number of goroutines delivering messages +* `queueSize`: maximum buffered messages + +### `Subscribe(id, topic string, cb func(any))` + +Registers a callback under a subscriber ID for a topic. + +* `id` must be unique per topic. +* Callbacks are executed asynchronously by workers. + +### `Unsubscribe(id, topic string)` + +Removes a subscriber ID from a specific topic. + +### `UnsubscribeAll(id string)` + +Removes a subscriber ID from **all topics** it is subscribed to. + +### `Publish(topic string, data any)` + +Publishes a message to a topic. + +* If `Blocking = true`, `Publish` will block if the queue is full. +* If `Blocking = false`, the message will be dropped and logged. + +### `Close()` + +Gracefully shuts down the Pubsub: + +* Stops accepting new messages +* Closes the queue +* Clears subscriptions +* Waits for workers to finish + +## 🧩 PubSub Interface + +The package also defines a `PubSub` interface for flexibility and testing: + +```go +package pubSub + +type PubSub interface { + Subscribe(id, topic string, cb func(any)) + Unsubscribe(id, topic string) + UnsubscribeAll(id string) + Publish(topic string, data any) + Close() +} +``` + +Your `Pubsub` struct implements this interface automatically. + +### Example Using the Interface + +```go +package main + +import ( + "fmt" + "time" + "gitlab.com/your-repo/pubSub" +) + +func runApp(ps pubSub.PubSub) { + ps.Subscribe("sub1", "topic1", func(data any) { + fmt.Println("Received:", data) + }) +} + +func main() { + ps := pubSub.NewPubsub(2, 100) // returns a Pubsub implementing PubSub interface + + runApp(ps) + ps.Publish("topic1", "Hello from interface!") + + time.Sleep(500 * time.Millisecond) + ps.Close() +} +``` + +## ⚠️ Notes + +* Subscriber callbacks must be non-blocking. Long-running tasks should be offloaded to separate goroutines. +* IDs are required so you can **unsubscribe cleanly** (important for WebSocket connections, etc.). +* This is **not MQTT**, but a simpler, in-memory Pub/Sub suitable for local apps or servers. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cc2b269 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module gitea.tecamino.com/paadi/pubSub + +go 1.24.0 diff --git a/interface.go b/interface.go new file mode 100644 index 0000000..18339b0 --- /dev/null +++ b/interface.go @@ -0,0 +1,10 @@ +package pubSub + +// PubSub defines the public API for a publish/subscribe system. +type PubSub interface { + Subscribe(id, topic string, cb func(any)) + Unsubscribe(id, topic string) + UnsubscribeAll(id string) + Publish(topic string, data any) + Close() +} diff --git a/models/data.go b/models/data.go new file mode 100644 index 0000000..05de477 --- /dev/null +++ b/models/data.go @@ -0,0 +1,7 @@ +package models + +type Data struct { + Action string `json:"action"` + Topic string `json:"topic"` + Data any `json:"data"` +} diff --git a/pubSub.go b/pubSub.go new file mode 100644 index 0000000..1011532 --- /dev/null +++ b/pubSub.go @@ -0,0 +1,133 @@ +package pubSub + +import ( + "log" + "sync" + + "gitea.tecamino.com/paadi/pubSub/models" +) + +// Pubsub implements a simple topic-based publish/subscribe system +// with worker goroutines delivering messages to subscriber callbacks. +type Pubsub struct { + mu sync.RWMutex // protects access to subs and closed + subs map[string]map[string]func(any) // topic -> subscriberID -> callback + closed bool // signals shutdown + jobQueue chan models.Data // message delivery queue + wg sync.WaitGroup // waits for workers to finish + queueSize int // maximum buffered messages + Blocking bool // if true, Publish blocks on full queue; if false, drops messages +} + +// Ensure Pubsub implements PubSub interface +var _ PubSub = (*Pubsub)(nil) + +// NewPubsub creates a new Pubsub with a fixed number of workers and queue size. +func NewPubsub(workerCount, queueSize int) *Pubsub { + ps := &Pubsub{ + subs: make(map[string]map[string]func(any)), + jobQueue: make(chan models.Data, max(1, queueSize)), + queueSize: max(1, queueSize), + } + + // start worker pool for message delivery + for i := 0; i < max(1, workerCount); i++ { + ps.wg.Add(1) + go ps.worker() + } + return ps +} + +// worker pulls jobs from the queue and invokes subscriber callbacks safely. +func (ps *Pubsub) worker() { + defer ps.wg.Done() + for job := range ps.jobQueue { + ps.mu.RLock() + subs := make([]func(any), 0, len(ps.subs[job.Topic])) + for _, cb := range ps.subs[job.Topic] { + subs = append(subs, cb) + } + ps.mu.RUnlock() + + for _, cb := range subs { + func(c func(any), d any) { + defer func() { recover() }() + c(d) + }(cb, job.Data) + } + } +} + +// Subscribe registers a callback for a given topic under a subscriber ID. +func (ps *Pubsub) Subscribe(id, topic string, cb func(any)) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if _, ok := ps.subs[topic]; !ok { + ps.subs[topic] = make(map[string]func(any)) + } + ps.subs[topic][id] = cb +} + +// Unsubscribe removes a single subscriber ID from a given topic. +func (ps *Pubsub) Unsubscribe(id, topic string) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if _, ok := ps.subs[topic]; ok { + delete(ps.subs[topic], id) + if len(ps.subs[topic]) == 0 { + delete(ps.subs, topic) + } + } +} + +// UnsubscribeAll removes a subscriber ID from all topics it is registered to. +func (ps *Pubsub) UnsubscribeAll(id string) { + ps.mu.Lock() + defer ps.mu.Unlock() + + for topic := range ps.subs { + delete(ps.subs[topic], id) + if len(ps.subs[topic]) == 0 { + delete(ps.subs, topic) + } + } +} + +// Publish enqueues a message for a topic. +func (ps *Pubsub) Publish(topic string, data any) { + ps.mu.RLock() + if ps.closed { + ps.mu.RUnlock() + return + } + ps.mu.RUnlock() + + if ps.Blocking { + ps.jobQueue <- models.Data{Topic: topic, Data: data} + return + } + + select { + case ps.jobQueue <- models.Data{Topic: topic, Data: data}: + default: + log.Println("queue full with:", ps.queueSize) + } +} + +// Close shuts down the Pubsub: stops accepting new messages, closes the queue, +// clears subscriptions, and waits for all workers to finish. +func (ps *Pubsub) Close() { + ps.mu.Lock() + if ps.closed { + ps.mu.Unlock() + return + } + ps.closed = true + close(ps.jobQueue) + ps.subs = nil + ps.mu.Unlock() + + ps.wg.Wait() +}