PubSub
A lightweight, concurrent Publish/Subscribe system for Go. It provides topic-based message distribution with worker goroutines, safe subscriber management, optional blocking or dropping of messages when queues are full, and supports programming against an interface for flexibility.
✨ Features
- Topic-based subscriptions (
topic -> subscriber ID -> callback
) - Multiple workers for concurrent delivery
- Safe subscription and unsubscription (per topic or all topics)
- Panic-safe callback execution (one bad subscriber won’t crash workers)
- Configurable message queue size
- Blocking or non-blocking publish behavior
- Graceful shutdown with
Close()
- Supports the
PubSub
interface for flexible usage and testing
📦 Installation
go get gitlab.com/your-repo/pubSub
Then import in your Go project:
import "gitlab.com/your-repo/pubSub"
🚀 Usage
Example
package main
import (
"fmt"
"time"
"gitlab.com/your-repo/pubSub"
)
func main() {
ps := pubSub.NewPubsub(2, 100) // 2 workers, queue size 100
ps.Subscribe("sub1", "greetings", func(data any) {
fmt.Println("Subscriber 1 received:", data)
})
ps.Subscribe("sub2", "greetings", func(data any) {
fmt.Println("Subscriber 2 received:", data)
})
ps.Publish("greetings", "Hello, world!")
ps.Publish("greetings", "PubSub in Go is working 🚀")
time.Sleep(1 * time.Second)
ps.Unsubscribe("sub1", "greetings")
ps.Publish("greetings", "Goodbye from PubSub!")
time.Sleep(500 * time.Millisecond)
ps.Close()
}
📖 API Reference
NewPubsub(workerCount, queueSize int) *Pubsub
Creates a new Pubsub instance with:
workerCount
: number of goroutines delivering messagesqueueSize
: maximum buffered messages
Subscribe(id, topic string, cb func(any))
Registers a callback under a subscriber ID for a topic.
id
must be unique per topic.- Callbacks are executed asynchronously by workers.
Unsubscribe(id, topic string)
Removes a subscriber ID from a specific topic.
UnsubscribeAll(id string)
Removes a subscriber ID from all topics it is subscribed to.
Publish(topic string, data any)
Publishes a message to a topic.
- If
Blocking = true
,Publish
will block if the queue is full. - If
Blocking = false
, the message will be dropped and logged.
Close()
Gracefully shuts down the Pubsub:
- Stops accepting new messages
- Closes the queue
- Clears subscriptions
- Waits for workers to finish
🧩 PubSub Interface
The package also defines a PubSub
interface for flexibility and testing:
package pubSub
type PubSub interface {
Subscribe(id, topic string, cb func(any))
Unsubscribe(id, topic string)
UnsubscribeAll(id string)
Publish(topic string, data any)
Close()
}
Your Pubsub
struct implements this interface automatically.
Example Using the Interface
package main
import (
"fmt"
"time"
"gitlab.com/your-repo/pubSub"
)
func runApp(ps pubSub.PubSub) {
ps.Subscribe("sub1", "topic1", func(data any) {
fmt.Println("Received:", data)
})
}
func main() {
ps := pubSub.NewPubsub(2, 100) // returns a Pubsub implementing PubSub interface
runApp(ps)
ps.Publish("topic1", "Hello from interface!")
time.Sleep(500 * time.Millisecond)
ps.Close()
}
⚠️ Notes
- Subscriber callbacks must be non-blocking. Long-running tasks should be offloaded to separate goroutines.
- IDs are required so you can unsubscribe cleanly (important for WebSocket connections, etc.).
- This is not MQTT, but a simpler, in-memory Pub/Sub suitable for local apps or servers.
Description
Concurrent, in-memory Publish/Subscribe system for Go with topic-based subscriptions and worker-managed callbacks.
Languages
Go
100%