Pub/Sub — Optimization¶
1. How to use this file¶
Twelve scenarios where Pub/Sub code is slower, allocates more, or delivers fewer messages than it should. Each entry has a Before (code + benchmark) and a collapsible After (optimized code + benchmark + why + trade-offs + when NOT).
Anchored at Go 1.23, amd64. Numbers are reproducible-shape — run go test -bench=. -benchmem on your hardware before quoting them. Brokered numbers depend on broker version, network, and disk; treat them as order-of-magnitude.
Pub/Sub spans two universes: in-process channels and over-the-wire brokers. Most in-process wins come from one rule — never do real work while holding the broker's lock. Most brokered wins come from another — size buffers to actual consumer throughput, not vendor defaults.
Reading order: Exercise 1 (snapshot under lock), 5 and 7 (map index, per-topic shard), 2 and 9 (buffer sizing in-process and prefetch in broker), then the rest in any order.
2. Exercise 1 — Lock held during delivery¶
The junior broker locks the subscriber table and sends on every channel under the write lock. One slow subscriber stalls every publisher.
Before:
func (b *Broker[T]) Publish(topic string, msg T) {
b.mu.Lock()
defer b.mu.Unlock()
for _, ch := range b.subs[topic] {
ch <- msg // blocks all publishers if subscriber is slow
}
}
BenchmarkPublishUnderLock-8 300000 4800 ns/op // 8 subs, 1 slow
BenchmarkPublishUnderLock_p99-8 42000 ns/op
After
Snapshot under the lock, release, then send. Slow subscribers stall themselves, not the broker. ~20× faster average, ~60× better p99. **Why faster:** Hot path is RLock (shared) instead of Lock (exclusive). The actual send is outside any lock — a slow subscriber cannot stall the broker. Publishers run concurrently with each other and with unsubscribes. **Trade-off:** `cps` allocates per Publish — 24 B header + 8 B × N. For very hot brokers, pool the snapshot slice. Snapshot inconsistency: a subscriber that unsubscribed between RUnlock and send still gets the message — fine, the channel is buffered and the cancel goroutine closes it cleanly. **When NOT:** Single-subscriber topics — no fan-out cost to amortize. `PublishSync` testing brokers that *want* backpressure to surface bugs.3. Exercise 2 — Single-buffered channels for every subscriber¶
The default Subscribe(topic, 1) looks safe — one slot per subscriber. Under spikes the buffer fills instantly and the publisher drops. Buffer size belongs to the consumer's rate, not to a global default.
Before:
ch := broker.Subscribe(ctx, "orders", 1)
go func() {
for m := range ch {
processOrder(m) // 5ms average
}
}()
After
Size buffer = burst length you must absorb without drops. For 50-message bursts at 200 msg/s, buffer ≥ 50. Observe `len(ch)` periodically. Sits at ~buffer-size → back to drops, need faster consumer or work-queue pattern. Sits at ~0 → fine. Same CPU cost; drop rate 18% → 0%. **Why faster (where it counts):** Buffer doesn't change steady-state throughput, but eliminates burst-induced drops. A dropped message often costs ~100× more downstream (extra DB fetch, retry RTT) than its channel footprint. **Trade-off:** 64 slots × 8 bytes × N subscribers — real memory at scale. A large buffer hides slow consumers; export buffer occupancy as a metric. **When NOT:** Latency-sensitive topics where stale beats none (presence updates, live cursors). There use buffer=1 with replace-on-publish (keep only newest).4. Exercise 3 — Untyped any messages with type switch¶
A broker holding any pays iface materialization on every publish and a type switch on every receive — ~5 ns each plus branch mispredictions.
Before:
type Broker struct {
mu sync.RWMutex
subs map[string][]chan any
}
for m := range ch {
switch v := m.(type) {
case OrderEvent: handleOrder(v)
case PaymentEvent: handlePayment(v)
}
}
The 16 B/op is the any box (heap alloc when value > 2 words).
After
Generic broker per concrete message type. ~3.5× faster, zero allocations. **Why faster:** The compiler monomorphizes `Broker[OrderEvent]`. No `iface` materialization, no escape to heap, no type switch. For `T = *Event`, the channel value is a single word. **Trade-off:** One broker per message type. 30 event types means 30 `Broker[T]` instances. The pre-1.18 ergonomic of one bus for everything is gone. For generic event audit logs, `Broker[*Event]` over a tagged-union root type still works. **When NOT:** Truly heterogeneous payloads where one loop handles many types. There the type switch is unavoidable.5. Exercise 4 — Per-message allocation for the envelope¶
A real-world broker wraps each payload in {ID, Topic, Timestamp, Payload}. Allocating that envelope on every Publish makes GC the bottleneck on a 100k msg/s topic.
Before:
type Envelope[T any] struct {
ID uint64
Topic string
Timestamp time.Time
Payload T
}
func (b *Broker[T]) Publish(topic string, payload T) {
env := &Envelope[T]{ID: nextID(), Topic: topic, Timestamp: time.Now(), Payload: payload}
b.deliver(topic, env)
}
After
Pool the envelope. Subscriber calls `Release()` after processing.var envPool = sync.Pool{New: func() any { return &Envelope[OrderEvent]{} }}
func (e *Envelope[T]) Release() {
var zero T
e.Payload = zero // help GC drop the payload's references
envPool.Put(e)
}
func (b *Broker[T]) Publish(topic string, payload T) {
env := envPool.Get().(*Envelope[T])
env.ID, env.Topic, env.Timestamp, env.Payload = nextID(), topic, time.Now(), payload
b.deliver(topic, env)
}
6. Exercise 5 — Linear subscriber scan on unsubscribe¶
The junior subs map[string][]chan T form makes unsubscribe O(N): scan the slice, then append to remove. With hundreds of subscribers churning (web clients connecting/disconnecting), unsubscribe dominates broker CPU.
Before:
func (b *Broker[T]) Unsubscribe(topic string, ch chan T) {
b.mu.Lock()
defer b.mu.Unlock()
list := b.subs[topic]
for i, c := range list {
if c == ch {
b.subs[topic] = append(list[:i], list[i+1:]...) // O(N)
return
}
}
}
After
Index subscribers by ID. Unsubscribe becomes `delete(m, id)` — O(1). Publish still iterates O(N), but the churn paths drop to O(1).type sub[T any] struct {
id uint64
ch chan T
}
type Broker[T any] struct {
mu sync.RWMutex
subs map[string]map[uint64]*sub[T]
nextID atomic.Uint64
}
func (b *Broker[T]) Unsubscribe(topic string, id uint64) {
b.mu.Lock()
if s, ok := b.subs[topic][id]; ok {
delete(b.subs[topic], id)
close(s.ch)
}
b.mu.Unlock()
}
7. Exercise 6 — JSON encoding for in-process messages¶
A team reuses the brokered envelope (JSON []byte) for the in-process bus "for consistency". Every publish marshals; every subscriber unmarshals. Pure waste.
Before:
func publish(b *Broker[[]byte], topic string, e Event) {
data, _ := json.Marshal(e)
b.Publish(topic, data)
}
for raw := range ch {
var e Event
json.Unmarshal(raw, &e)
handle(e)
}
After
Pass the value directly. Reserve JSON for the network hop — encode once at the boundary if you also feed a brokered topic. ~56× faster, zero allocations. **Why faster:** Marshal + unmarshal of a two-field struct is ~10 reflective field accesses, two scratch buffers, one heap allocation each direction. Passing the value on a channel is a small struct copy; the string's backing bytes are shared. **Trade-off:** "Consistency with the wire format" was real value for debugging. Mitigate with a logging hook (`bus.OnPublish(func(Event) { log.Print(e) })`). If subscribers might mutate the value, send pointers to immutable structs or document immutability. **When NOT:** When the in-process subscriber genuinely needs the bytes (writing to a file, forwarding without re-encoding). There JSON in the bus is the right representation.8. Exercise 7 — Single broker mutex across all topics¶
One sync.RWMutex guards the entire subs map. A chat-room topic with churn contends with a metrics topic doing high-volume Publish.
Before:
After
Shard by topic hash. A fixed array of buckets, each with its own lock; two topics in different shards are independent.const shards = 16
type shard[T any] struct {
mu sync.RWMutex
subs map[string]map[uint64]*sub[T]
}
type Broker[T any] struct{ s [shards]*shard[T] }
func (b *Broker[T]) shardFor(topic string) *shard[T] {
h := fnv.New32a()
h.Write([]byte(topic))
return b.s[h.Sum32()&(shards-1)]
}
func (b *Broker[T]) Publish(topic string, msg T) {
sh := b.shardFor(topic)
sh.mu.RLock()
/* snapshot + send as in Exercise 1 */
sh.mu.RUnlock()
}
9. Exercise 8 — Synchronous Publish in the request hot path¶
An HTTP handler publishes an audit event before returning. The Publish path waits for every subscriber, adding ~200 µs of subscriber processing to every request.
Before:
func handleOrder(w http.ResponseWriter, r *http.Request) {
save(order)
bus.PublishSync(ctx, "audit", order) // blocks until every subscriber acks
w.WriteHeader(200)
}
After
A small bounded buffer decouples the handler from subscriber latency.type AuditBus struct{ in chan AuditEvent }
func NewAuditBus(buf int) *AuditBus {
a := &AuditBus{in: make(chan AuditEvent, buf)}
go func() {
for ev := range a.in {
bus.Publish("audit", ev)
}
}()
return a
}
func (a *AuditBus) Submit(ev AuditEvent) {
select {
case a.in <- ev:
default:
metrics.AuditDropped.Inc() // visibility
}
}
10. Exercise 9 — Kafka prefetch tuned to "max"¶
A team sets MaxPartitionFetchBytes = 10 MB, MaxPollRecords = 1000 because "more is faster". Each consumer holds 1000 in-flight messages for ~1 minute of work; rebalances drop them all back.
Before:
opts := []kgo.Opt{
kgo.FetchMaxBytes(10 << 20),
kgo.FetchMaxPartitionBytes(10 << 20),
}
records := client.PollFetches(ctx).Records() // ~1000 records
for _, r := range records {
process(r) // ~60 ms each
}
client.MarkCommitRecords(records...)
Sustained: 16k msg/s per consumer
Rebalance: ~10s pause, 1000 records redelivered → 10% duplication
p99 latency: 60s (last record waits behind first 999)
After
Tune prefetch to ~2× the consumer's per-second processing rate. Enough to hide network latency, not enough to stall on rebalance. Same throughput, 30× better p99, 30× less duplication on rebalance. **Why better:** Kafka fetch is async-pipelined; a small prefetch hides one network RTT per batch without holding hundreds of messages hostage. Rebalances re-deliver only the uncommitted in-flight batch. **Trade-off:** More frequent commits load the broker's `__consumer_offsets` topic — tune batch size up to where commit overhead is ~5% of consumer CPU (usually 10–32). `FetchMaxWait` adds latency when traffic is sparse. **When NOT:** Bulk batch jobs where you genuinely want 10k records at once (analytics, replays). There use a dedicated reader with manual offsets, not consumer groups.11. Exercise 10 — One goroutine per topic per subscriber¶
A subscriber listens to 50 topics. The naive design spawns 50 goroutines, each blocked on its own channel, each calling the same handler.
Before:
for _, t := range topics { // 50 topics
ch := bus.Subscribe(ctx, t, 16)
go func(topic string, ch <-chan Event) {
for ev := range ch {
handle(topic, ev) // handler bounces between 50 stacks
}
}(t, ch)
}
After
Fan-in: every topic feeds one merged channel. Cheap forwarder goroutines park most of the time; one handler goroutine does the work with hot caches.type Tagged struct {
Topic string
Event Event
}
merged := make(chan Tagged, 256)
for _, t := range topics {
ch := bus.Subscribe(ctx, t, 16)
go func(topic string, ch <-chan Event) {
for ev := range ch {
select {
case merged <- Tagged{topic, ev}:
case <-ctx.Done(): return
}
}
}(t, ch)
}
go func() {
for m := range merged {
handle(m.Topic, m.Event)
}
}()
12. Exercise 11 — Acking every message individually¶
A Kafka/RabbitMQ consumer calls Commit() after each record. Each commit is a network round-trip. Throughput drops ~10× because the consumer spends most of its time waiting on RTT.
Before:
for _, r := range fetches.Records() {
process(r)
client.CommitRecords(ctx, r) // sync commit per record
}
After
Batch acks. Process N records, commit the latest offset. On crash, you re-process at most N — that's the at-least-once contract you already had. RabbitMQ: use `Ack(deliveryTag, multiple=true)` every 64 deliveries. ~25× faster throughput. **Why faster:** Each commit is ~0.5–2 ms intra-DC. Batching amortizes to one RTT per 64 messages. The broker also writes offsets more efficiently when batched. **Trade-off:** On crash, re-process up to `batchAck` records — side effects must be idempotent (usually by message ID dedup). A long-idle consumer might never flush its last partial batch; add a periodic flush. **When NOT:** When per-message ack is the contract (some financial systems). There focus on reducing commit RTT — move broker closer, use a streaming-ack protocol.13. Exercise 12 — Reconnect with no backoff¶
A consumer loses its broker connection. The loop calls Dial immediately on every error. During a 30s outage the consumer makes 200k connection attempts and gets rate-limited.
Before:
for {
conn, err := nats.Connect(brokerURL)
if err != nil {
log.Println("connect failed:", err)
continue // tight loop
}
serve(conn)
}
After
Exponential backoff with jitter. Start ~100 ms, double to a ~30 s cap, ±25% jitter to break herd synchronization.func connectLoop(ctx context.Context, url string) {
base, cap := 100*time.Millisecond, 30*time.Second
delay := base
for {
conn, err := nats.Connect(url)
if err == nil {
delay = base
serve(conn)
continue
}
jitter := time.Duration(rand.Int63n(int64(delay)/2)) - delay/4
select {
case <-time.After(delay + jitter):
case <-ctx.Done():
return
}
if delay *= 2; delay > cap {
delay = cap
}
}
}
14. When NOT to optimize¶
Most Pub/Sub systems are limited by subscriber work, not by the broker. A handler doing 5 ms of database work cannot benefit from a 50 ns broker optimization.
- Audit topic at 100 events/sec — no hot path, don't pool envelopes.
- Config-change topic with 8 fixed subscribers — no churn, don't index by ID.
- Integration test broker — simplest code wins;
map[string][]chan Tis fine.
Profile first. In-process: go test -bench=. -benchmem -cpuprofile=cpu.out. Brokered: broker metrics (records-lag-max, slow_consumers) are more informative than client profiles.
Common premature optimizations:
- Sharding the broker map (Ex. 7) with one hot topic.
- Pooling envelopes (Ex. 4) below 1k msg/s.
- Async publish (Ex. 8) when the publisher isn't on a request path.
- Fan-in (Ex. 10) when each topic has heavyweight per-topic state.
- Batching acks (Ex. 11) when the bottleneck is processing, not commit.
Correctness gaps disguised as optimizations:
- Dropping silently to "improve throughput" — always count drops.
- Removing the delivery lock without snapshotting (Ex. 1) — races on unsubscribe.
- Removing acks to go faster — at-most-once is a contract, not a choice.
The broker is rarely the bottleneck. Handler latency, downstream queries, and encoding are where the time goes.
15. Summary¶
Always-ship wins (apply by default in any new Pub/Sub code):
- Snapshot subscribers under the lock, deliver outside (Ex. 1).
- Typed generic
Broker[T]instead ofBroker[any](Ex. 3). - Index subscribers by ID, not by linear scan (Ex. 5).
- Pass values directly in-process; JSON only at the network boundary (Ex. 6).
- Reconnect with exponential backoff + jitter (Ex. 12) — every brokered client.
Wins behind a profile (when measurements justify them):
- Size subscriber buffers from observed bursts (Ex. 2) — when drops show up.
- Pool message envelopes (Ex. 4) — at ≥10k msg/s on one broker.
- Shard the broker mutex by topic (Ex. 7) — many topics with mixed access.
- Move publish off the request path (Ex. 8) — when publish latency shows up in handler traces.
- Tune broker prefetch to ~2× consumer throughput (Ex. 9) — when rebalance redelivery or p99 is high.
- Batch acks (Ex. 11) — when commit RTT shows up in consumer CPU.
Specialty (only when the design calls for it):
- Fan-in many topics to one handler (Ex. 10) — one service consuming dozens of topics with shared state.
- Per-subscriber slow-handling policies (drop / block / disconnect) — heterogeneous tolerance.
- Reference-counted envelope pooling — broadcast topics with heavy envelopes.
Pub/Sub performance comes down to three things: don't hold locks during delivery, size buffers to the workload (not defaults), and decide explicitly what happens when consumers fall behind. The Go primitives — channels, mutexes, contexts, generics — make the right shape ~150 lines. The brokered side adds discipline: prefetch, commits, backoff, observability. Measure, profile, then optimize. The broker is rarely where the time goes.