2025-08-24 07:52:26 +02:00
2025-08-21 08:07:00 +02:00
2025-08-21 08:07:00 +02:00
2025-08-24 07:52:26 +02:00
2025-08-21 08:07:00 +02:00

PubSub

A lightweight, concurrent Publish/Subscribe system for Go. It provides topic-based message distribution with worker goroutines, safe subscriber management, optional blocking or dropping of messages when queues are full, and supports programming against an interface for flexibility.

Features

  • Topic-based subscriptions (topic -> subscriber ID -> callback)
  • Multiple workers for concurrent delivery
  • Safe subscription and unsubscription (per topic or all topics)
  • Panic-safe callback execution (one bad subscriber wont crash workers)
  • Configurable message queue size
  • Blocking or non-blocking publish behavior
  • Graceful shutdown with Close()
  • Supports the PubSub interface for flexible usage and testing

📦 Installation

go get gitlab.com/your-repo/pubSub

Then import in your Go project:

import "gitlab.com/your-repo/pubSub"

🚀 Usage

Example

package main

import (
    "fmt"
    "time"
    "gitlab.com/your-repo/pubSub"
)

func main() {
    ps := pubSub.NewPubsub(2, 100) // 2 workers, queue size 100

    ps.Subscribe("sub1", "greetings", func(data any) {
        fmt.Println("Subscriber 1 received:", data)
    })

    ps.Subscribe("sub2", "greetings", func(data any) {
        fmt.Println("Subscriber 2 received:", data)
    })

    ps.Publish("greetings", "Hello, world!")
    ps.Publish("greetings", "PubSub in Go is working 🚀")

    time.Sleep(1 * time.Second)

    ps.Unsubscribe("sub1", "greetings")
    ps.Publish("greetings", "Goodbye from PubSub!")

    time.Sleep(500 * time.Millisecond)
    ps.Close()
}

📖 API Reference

NewPubsub(workerCount, queueSize int) *Pubsub

Creates a new Pubsub instance with:

  • workerCount: number of goroutines delivering messages
  • queueSize: maximum buffered messages

Subscribe(id, topic string, cb func(any))

Registers a callback under a subscriber ID for a topic.

  • id must be unique per topic.
  • Callbacks are executed asynchronously by workers.

Unsubscribe(id, topic string)

Removes a subscriber ID from a specific topic.

UnsubscribeAll(id string)

Removes a subscriber ID from all topics it is subscribed to.

Publish(topic string, data any)

Publishes a message to a topic.

  • If Blocking = true, Publish will block if the queue is full.
  • If Blocking = false, the message will be dropped and logged.

Close()

Gracefully shuts down the Pubsub:

  • Stops accepting new messages
  • Closes the queue
  • Clears subscriptions
  • Waits for workers to finish

🧩 PubSub Interface

The package also defines a PubSub interface for flexibility and testing:

package pubSub

type PubSub interface {
    Subscribe(id, topic string, cb func(any))
    Unsubscribe(id, topic string)
    UnsubscribeAll(id string)
    Publish(topic string, data any)
    Close()
}

Your Pubsub struct implements this interface automatically.

Example Using the Interface

package main

import (
    "fmt"
    "time"
    "gitlab.com/your-repo/pubSub"
)

func runApp(ps pubSub.PubSub) {
    ps.Subscribe("sub1", "topic1", func(data any) {
        fmt.Println("Received:", data)
    })
}

func main() {
    ps := pubSub.NewPubsub(2, 100) // returns a Pubsub implementing PubSub interface

    runApp(ps)
    ps.Publish("topic1", "Hello from interface!")

    time.Sleep(500 * time.Millisecond)
    ps.Close()
}

⚠️ Notes

  • Subscriber callbacks must be non-blocking. Long-running tasks should be offloaded to separate goroutines.
  • IDs are required so you can unsubscribe cleanly (important for WebSocket connections, etc.).
  • This is not MQTT, but a simpler, in-memory Pub/Sub suitable for local apps or servers.
Description
Concurrent, in-memory Publish/Subscribe system for Go with topic-based subscriptions and worker-managed callbacks.
Readme 38 KiB
Languages
Go 100%