157 lines
3.6 KiB
Markdown
157 lines
3.6 KiB
Markdown
# PubSub
|
||
|
||
A lightweight, concurrent **Publish/Subscribe** system for Go.
|
||
It provides topic-based message distribution with worker goroutines, safe subscriber management, optional blocking or dropping of messages when queues are full, and supports programming against an interface for flexibility.
|
||
|
||
## ✨ Features
|
||
|
||
* Topic-based subscriptions (`topic -> subscriber ID -> callback`)
|
||
* Multiple workers for concurrent delivery
|
||
* Safe subscription and unsubscription (per topic or all topics)
|
||
* Panic-safe callback execution (one bad subscriber won’t crash workers)
|
||
* Configurable message queue size
|
||
* Blocking or non-blocking publish behavior
|
||
* Graceful shutdown with `Close()`
|
||
* Supports the `PubSub` interface for flexible usage and testing
|
||
|
||
## 📦 Installation
|
||
|
||
```bash
|
||
go get gitlab.com/your-repo/pubSub
|
||
```
|
||
|
||
Then import in your Go project:
|
||
|
||
```go
|
||
import "gitlab.com/your-repo/pubSub"
|
||
```
|
||
|
||
## 🚀 Usage
|
||
|
||
### Example
|
||
|
||
```go
|
||
package main
|
||
|
||
import (
|
||
"fmt"
|
||
"time"
|
||
"gitlab.com/your-repo/pubSub"
|
||
)
|
||
|
||
func main() {
|
||
ps := pubSub.NewPubsub(2, 100) // 2 workers, queue size 100
|
||
|
||
ps.Subscribe("sub1", "greetings", func(data any) {
|
||
fmt.Println("Subscriber 1 received:", data)
|
||
})
|
||
|
||
ps.Subscribe("sub2", "greetings", func(data any) {
|
||
fmt.Println("Subscriber 2 received:", data)
|
||
})
|
||
|
||
ps.Publish("greetings", "Hello, world!")
|
||
ps.Publish("greetings", "PubSub in Go is working 🚀")
|
||
|
||
time.Sleep(1 * time.Second)
|
||
|
||
ps.Unsubscribe("sub1", "greetings")
|
||
ps.Publish("greetings", "Goodbye from PubSub!")
|
||
|
||
time.Sleep(500 * time.Millisecond)
|
||
ps.Close()
|
||
}
|
||
```
|
||
|
||
## 📖 API Reference
|
||
|
||
### `NewPubsub(workerCount, queueSize int) *Pubsub`
|
||
|
||
Creates a new Pubsub instance with:
|
||
|
||
* `workerCount`: number of goroutines delivering messages
|
||
* `queueSize`: maximum buffered messages
|
||
|
||
### `Subscribe(id, topic string, cb func(any))`
|
||
|
||
Registers a callback under a subscriber ID for a topic.
|
||
|
||
* `id` must be unique per topic.
|
||
* Callbacks are executed asynchronously by workers.
|
||
|
||
### `Unsubscribe(id, topic string)`
|
||
|
||
Removes a subscriber ID from a specific topic.
|
||
|
||
### `UnsubscribeAll(id string)`
|
||
|
||
Removes a subscriber ID from **all topics** it is subscribed to.
|
||
|
||
### `Publish(topic string, data any)`
|
||
|
||
Publishes a message to a topic.
|
||
|
||
* If `Blocking = true`, `Publish` will block if the queue is full.
|
||
* If `Blocking = false`, the message will be dropped and logged.
|
||
|
||
### `Close()`
|
||
|
||
Gracefully shuts down the Pubsub:
|
||
|
||
* Stops accepting new messages
|
||
* Closes the queue
|
||
* Clears subscriptions
|
||
* Waits for workers to finish
|
||
|
||
## 🧩 PubSub Interface
|
||
|
||
The package also defines a `PubSub` interface for flexibility and testing:
|
||
|
||
```go
|
||
package pubSub
|
||
|
||
type PubSub interface {
|
||
Subscribe(id, topic string, cb func(any))
|
||
Unsubscribe(id, topic string)
|
||
UnsubscribeAll(id string)
|
||
Publish(topic string, data any)
|
||
Close()
|
||
}
|
||
```
|
||
|
||
Your `Pubsub` struct implements this interface automatically.
|
||
|
||
### Example Using the Interface
|
||
|
||
```go
|
||
package main
|
||
|
||
import (
|
||
"fmt"
|
||
"time"
|
||
"gitlab.com/your-repo/pubSub"
|
||
)
|
||
|
||
func runApp(ps pubSub.PubSub) {
|
||
ps.Subscribe("sub1", "topic1", func(data any) {
|
||
fmt.Println("Received:", data)
|
||
})
|
||
}
|
||
|
||
func main() {
|
||
ps := pubSub.NewPubsub(2, 100) // returns a Pubsub implementing PubSub interface
|
||
|
||
runApp(ps)
|
||
ps.Publish("topic1", "Hello from interface!")
|
||
|
||
time.Sleep(500 * time.Millisecond)
|
||
ps.Close()
|
||
}
|
||
```
|
||
|
||
## ⚠️ Notes
|
||
|
||
* Subscriber callbacks must be non-blocking. Long-running tasks should be offloaded to separate goroutines.
|
||
* IDs are required so you can **unsubscribe cleanly** (important for WebSocket connections, etc.).
|
||
* This is **not MQTT**, but a simpler, in-memory Pub/Sub suitable for local apps or servers.
|