Pub/Sub — Find the Bug¶
1. How to use this file¶
Fifteen buggy Pub/Sub snippets. Read each, spot the defect in 30-60 seconds, then expand <details> for the answer. Every bug here has shown up in real Go production code — Pub/Sub looks like a map[string][]chan T from a blog post, which is exactly why it regresses so quietly.
Pub/Sub bugs rarely crash on the happy path. They drop one event in ten thousand, deadlock when a single subscriber gets slow, leak a goroutine per HTTP request, or replay a million events on restart. The skill is asking: what does the slow subscriber do to the fast one, what does the next message see, and what happens at shutdown?
Bug 1 — Locking during delivery¶
type Broker struct {
mu sync.Mutex
subs map[string][]chan Event
}
func (b *Broker) Publish(topic string, e Event) {
b.mu.Lock()
defer b.mu.Unlock()
for _, ch := range b.subs[topic] {
ch <- e // blocks while holding the lock
}
}
Answer
**Bug:** `Publish` holds `b.mu` for the entire delivery loop. One subscriber with a full buffer blocks the send; the mutex stays locked; every other publisher, subscriber and unsubscribe piles up. One slow subscriber freezes the entire broker. **Why subtle:** Tests pass — the harness drains every channel promptly. In production, a single handler that does a slow HTTP call (or gets GC-paused) is enough to stall every topic, even unrelated ones. **Spot:** Any `Publish` doing I/O — including channel sends — while holding the broker mutex. **Fix:** Snapshot under lock, deliver outside the lock: **Why common:** "Lock, do the work, unlock" is the safe default for shared maps. For a broker the work *is* the slow part, so the safe default becomes the bottleneck.Bug 2 — Blocking publish with no buffer¶
func (b *Broker) Subscribe(topic string) <-chan Event {
ch := make(chan Event) // UNBUFFERED
b.mu.Lock()
b.subs[topic] = append(b.subs[topic], ch)
b.mu.Unlock()
return ch
}
func (b *Broker) Publish(topic string, e Event) {
// snapshot subscribers as cps...
for _, ch := range cps {
ch <- e // blocks until receiver ready
}
}
Answer
**Bug:** Unbuffered channel + blocking send. The publisher waits for *every* subscriber to be parked on a receive before it can move on. One subscriber doing any synchronous work between receives — a DB call, a `json.Marshal` — stalls the publisher and every later subscriber on the same call. **Why subtle:** Single-subscriber tests work. Two prompt readers work. The pathology shows when one handler is briefly busy. **Spot:** `make(chan Event)` with no buffer in a fan-out path, paired with a plain `ch <- e` send. **Fix:** If "every subscriber gets every message" is mandatory, use a brokered system that handles back-pressure for you. **Why common:** Unbuffered channels are the Go idiom for handoff synchronization. In a 1-to-1 pipeline they're fine; in a 1-to-N fan-out they're a serialization point.Bug 3 — Forgotten unsubscribe¶
func handler(w http.ResponseWriter, r *http.Request) {
ch := broker.Subscribe("notifications")
for ev := range ch {
fmt.Fprintf(w, "data: %s\n\n", ev.JSON)
if f, ok := w.(http.Flusher); ok { f.Flush() }
}
}
Answer
**Bug:** No matching `Unsubscribe`. When the client disconnects and the handler returns, the channel is still in `b.subs[topic]`. The broker keeps sending on it forever. The buffer fills; further sends block or drop silently. Every reconnect adds another phantom subscriber. After an hour of an SSE endpoint with occasional drops, the broker has 50,000 ghost subscribers each holding a buffer. **Why subtle:** Memory grows slowly. Channels are tiny. Goroutine count stays flat — it's *channels* leaking. The symptom is a publisher that gets slower over weeks until somebody restarts the service. **Spot:** Any `Subscribe` without a paired `Unsubscribe` or context-bound cleanup. **Fix:** Tie subscription lifetime to a `context.Context`:func (b *Broker) Subscribe(ctx context.Context, topic string) <-chan Event {
ch := make(chan Event, 32)
b.mu.Lock(); b.subs[topic] = append(b.subs[topic], ch); b.mu.Unlock()
go func() {
<-ctx.Done()
b.mu.Lock()
b.subs[topic] = removeChan(b.subs[topic], ch)
close(ch)
b.mu.Unlock()
}()
return ch
}
// caller: ch := broker.Subscribe(r.Context(), "notifications")
Bug 4 — Closing broker without closing subscriber channels¶
func (b *Broker) Close() {
b.mu.Lock()
defer b.mu.Unlock()
b.closed = true
b.subs = nil // drop refs; channels never closed
}
// subscriber elsewhere: for ev := range ch { process(ev) }
Answer
**Bug:** `Close` clears the map but doesn't `close()` the channels. Every subscriber goroutine is blocked on `for ev := range ch` waiting for a value that will never arrive. Shutdown deadlocks any `wg.Wait()` on those subscribers; if nobody waits, the goroutines leak until the process exits. **Why subtle:** `b.subs = nil` *looks* like cleanup. The map is gone, the broker can be GC'd. But Go puts the close responsibility on the *sender* — the broker stops sending, but the channel is never marked done. **Spot:** Any broker `Close` / shutdown that doesn't iterate every subscriber channel and call `close(ch)`. **Fix:** Pair this with the rule that **only the broker sends** — otherwise close is a panic source. **Why common:** Channel close semantics aren't symmetric with "close a file". Developers reach for `Close` as teardown without remembering it's also a synchronization signal for receivers.Bug 5 — Publishing pointer to mutable struct¶
func placeOrder(o *Order) {
ev := &OrderEvent{OrderID: o.ID, Status: "placed", Items: o.Items}
broker.Publish("orders", ev)
if o.NeedsFraudCheck() {
ev.Status = "pending_review" // mutate after publish
}
}
// audit subscriber: log.Printf("order %s status=%s", ev.OrderID, ev.Status)
// webhook subscriber: sendWebhook(ev)
Answer
**Bug:** `Publish` hands the broker `*OrderEvent` — a pointer to a struct the publisher still owns. After `Publish` returns, the publisher mutates `ev.Status`. The audit and webhook subscribers are reading the same struct concurrently with the writer. The audit log shows `placed` or `pending_review` depending on timing; the webhook payload includes whichever status was current when the subscriber serialized. `Items` is a slice — later appends are visible to both subscribers too. `go test -race` catches it. Production catches it as "the audit log doesn't match the webhook". **Why subtle:** The publisher's intent was *two* events. The fact that the second is a mutation rather than another `Publish` is the bug. The compiler can't tell. **Spot:** `Publish(topic, ptr)` followed by any mutation of `*ptr` or its slice/map fields. **Fix:** Send by value, treat messages as immutable, or copy at the broker boundary: Brokered systems do this for free — serializing copies. In-process brokers don't. **Why common:** Pointers are cheap and the publisher *just built* the struct, so reusing it feels efficient. The boundary between "my struct" and "the broker's struct" is invisible to the compiler.Bug 6 — Race on subscriber list¶
type Broker struct {
subs map[string][]chan Event // no mutex
}
func (b *Broker) Subscribe(topic string) <-chan Event {
ch := make(chan Event, 32)
b.subs[topic] = append(b.subs[topic], ch)
return ch
}
func (b *Broker) Publish(topic string, e Event) {
for _, ch := range b.subs[topic] {
select { case ch <- e: default: }
}
}
Answer
**Bug:** `b.subs` is a plain Go map. `Subscribe` writes; `Publish` reads; both run from arbitrary goroutines with no mutex. That's a concurrent map access — undefined behavior. The runtime usually catches it with `fatal error: concurrent map writes` (a crash that bypasses `recover`), but it may also silently corrupt the slice and dispatch to a freed channel. **Why subtle:** Maps and slices feel "lightweight" and the broker reads sequentially. Without explicit concurrent calls in tests, you never see the crash. The first sign is a 2 AM panic with a `runtime.mapaccess1_faststr` frame. **Spot:** Any shared map or slice used by methods documented as "safe from multiple goroutines" with no mutex anywhere. **Fix:** Wrap with a `sync.RWMutex`; snapshot under read-lock, deliver outside (see Bug 1). `sync.Map` is *not* the right answer — its API doesn't fit "append to a slice value". **Why common:** A broker that "just works" in single-threaded tests passes review. Concurrency only enters the picture at deployment, by which time the missing mutex is no longer load-bearing in the reviewer's memory.Bug 7 — Subscriber range never returning¶
func (b *Broker) Unsubscribe(topic string, target <-chan Event) {
b.mu.Lock()
defer b.mu.Unlock()
list := b.subs[topic]
for i, ch := range list {
if ch == target {
b.subs[topic] = append(list[:i], list[i+1:]...)
return // never closes the channel
}
}
}
// worker: for ev := range ch { process(ev) } // never exits
Answer
**Bug:** `Unsubscribe` removes the channel from the broker but never `close()`s it. The worker is still blocked on `for ev := range ch` waiting for a value that will never come. The goroutine leaks; the channel and its buffer leak with it. This is the dual of Bug 4 — Bug 4 closed the broker but not the channels; here, one channel is unhooked but left open. **Why subtle:** `Unsubscribe` *looks* complete — the broker stops sending, so isn't the subscriber done? But the worker has no other signal: the channel is its only input, `range` is its only loop control. **Spot:** Any `Unsubscribe` / `Remove` / `Deregister` that mutates the data structure but never calls `close(ch)`. Also any subscriber loop with no select-on-context branch. **Fix:** Or make the subscriber resilient with a context branch: **Why common:** "Remove from map" and "close channel" are two unrelated operations. The author of `Unsubscribe` was thinking about the data structure, not the receiver loop.Bug 8 — Wildcard match returning ALL topics¶
// "orders.*" matches "orders.placed"; "*" matches everything
func match(pattern, topic string) bool {
if pattern == "*" { return true }
if strings.HasSuffix(pattern, ".*") {
prefix := pattern[:len(pattern)-2] // strips ".*"
return strings.HasPrefix(topic, prefix)
}
return pattern == topic
}
Answer
**Bug:** `match("orders.*", "orders_admin")` returns `true`. The author meant the prefix to be `"orders."` but `pattern[:len(pattern)-2]` strips both the `.` and the `*`, leaving `"orders"`. Anything starting with `orders` — `orders`, `orders_admin`, `orders42` — matches. A subscriber to `orders.*` receives events from unrelated topics. Worse, audit/security consumers that subscribed narrowly silently see data they shouldn't. **Why subtle:** The author thought of `.*` as a unit; they actually need to keep the `.` as part of the prefix. **Spot:** Any matching/glob logic written by hand, with no table-driven test that includes adversarial cases (`"orders_admin"`, `"orders"`, `"order.s"`). **Fix:** Better: use a real matcher (NATS `>` and `*`, or MQTT `#`/`+`) with an off-the-shelf library and property-based tests. **Why common:** Wildcard matchers are written in five minutes. The off-by-one in the slice is exactly the kind of bug a unit test "passes" on the cases the author thought of.Bug 9 — Acking message before processing¶
func consume(sub *nats.Subscription) {
for {
msg, err := sub.NextMsg(time.Second)
if err != nil { continue }
if err := msg.Ack(); err != nil {
log.Print(err); continue
}
if err := process(msg.Data); err != nil {
log.Printf("process: %v", err)
}
}
}
Answer
**Bug:** `msg.Ack()` is called *before* `process`. If `process` fails — panic, network error, DB deadlock, OOM kill — the message is already acknowledged. The broker considers it delivered. No retry, no replay, no dead-letter. The event is lost. The same shape appears in Kafka (`MarkMessage` / commit) and RabbitMQ (`Ack` with `autoAck=false`). Every at-least-once broker has this footgun: ack means "I'm done", not "I received it". **Why subtle:** Acking immediately makes the consumer faster — no holding the message slot open during processing — and feels safer ("at least the broker knows we got it"). In stress tests with no errors, it's strictly better. The failure mode only appears when processing fails *and* you'd hoped for redelivery. **Spot:** Any `Ack` / commit / `MarkMessage` that runs before the result of `process(msg)` is committed to durable state. **Fix:** If `process` is non-idempotent, you also need an idempotency key in your own store — at-least-once + non-idempotent processing is a duplicate-event bug factory. **Why common:** "Ack early to free the slot" is folklore from fragile-consumer days. Modern brokers expect you to hold the message until you're truly done; their flow control assumes it.Bug 10 — Loop variable in goroutine (pre-Go-1.22)¶
// go.mod: go 1.21
for _, s := range cps {
go func() {
select {
case s.ch <- e: // s captured by reference
case <-time.After(s.deadline):
}
}()
}
Answer
**Bug:** Pre-Go-1.22, the loop variable `s` is **one** variable reused across iterations. Each goroutine captures the same address. By the time the goroutines run, `s` points at the last subscriber. Every spawned goroutine delivers to that last subscriber; every other subscriber gets nothing — and multiple goroutines race on `s.ch` while the publisher is still looping. Go 1.22+ changed the spec so loop variables are per-iteration, fixing this. Legacy modules still target older versions, and cross-version refactors have to deal with both worlds. **Why subtle:** The loop "works" if subscribers are identical (the bug becomes "we delivered N copies to subscriber N-1, which is the same shape as N copies to N subscribers"). With heterogeneous deadlines or channels, it fails silently. **Spot:** Pre-1.22 code with `for _, x := range xs { go func() { use(x) } }`. `go vet -loopclosure` catches it. **Fix:** **Why common:** It was *the* Go gotcha for a decade. Even 1.22+ codebases inherit it through vendored code, older `go` directives, and copy-paste from old Stack Overflow answers.Bug 11 — Reconnect handler holding mutex during dial¶
func (c *NATSClient) onDisconnect() {
c.mu.Lock()
defer c.mu.Unlock()
for {
conn, err := nats.Connect(c.url, nats.Timeout(30*time.Second))
if err == nil { c.conn = conn; return }
time.Sleep(time.Second)
}
}
func (c *NATSClient) Publish(topic string, data []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.conn.Publish(topic, data)
}
Answer
**Bug:** `onDisconnect` takes `c.mu` and then *dials in a loop, possibly for minutes*. Every `Publish` goes through the same mutex, so every publisher is blocked for the entire reconnect. The application thinks it's "still publishing"; the queue of publishes grows on goroutine stacks; memory climbs; HTTP handlers time out. This pattern surfaces in any pub/sub client wrapper that needs to swap a connection: NATS, Kafka, gRPC, AMQP. **Why subtle:** It works *during* normal operation — the mutex is held briefly per publish. The pathology only appears on the reconnect path, which you don't load-test. **Spot:** Any mutex-protected method that performs network I/O while holding the lock. Split "decide what to do" (under lock) from "do it" (lock released). **Fix:**func (c *NATSClient) onDisconnect() {
var conn *nats.Conn
for {
var err error
conn, err = nats.Connect(c.url, nats.Timeout(30*time.Second))
if err == nil { break }
time.Sleep(time.Second)
}
c.mu.Lock(); old := c.conn; c.conn = conn; c.mu.Unlock()
if old != nil { old.Close() }
}
func (c *NATSClient) Publish(topic string, data []byte) error {
c.mu.Lock(); conn := c.conn; c.mu.Unlock()
if conn == nil { return errors.New("not connected") }
return conn.Publish(topic, data)
}
Bug 12 — Prefetch too high, slow consumer holds messages¶
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: group,
Topic: topic,
QueueCapacity: 1000, // prefetch a lot
MinBytes: 10 << 10,
MaxBytes: 10 << 20,
})
// handler averages 200ms; occasionally 5s under load
Answer
**Bug:** `QueueCapacity: 1000` tells the client to prefetch up to 1,000 messages into an in-memory buffer. When one consumer is slow (5-second downstream call), it sits on 1,000 messages that *belong to its partition assignment*. Other consumers in the group can't help — those messages are assigned. Partition lag grows: 1,000 messages × 5 seconds = 83 minutes of backlog in one consumer's buffer. Worse, on session timeout the broker rebalances; those 1,000 messages get reassigned and *redelivered*. Throughput drops; duplicates rise. "Tune prefetch up for speed" makes everything worse. **Why subtle:** Higher prefetch *does* speed up the steady-state case where every consumer is healthy. The failure is asymmetric — one slow consumer dominates. **Spot:** Any consumer config with prefetch / queue capacity / in-flight limit set "high" without a per-consumer SLO that supports it. Rule of thumb: prefetch ≈ 2× one consumer's per-second throughput. **Fix:** `QueueCapacity: 10` (or whatever 2× steady-state throughput is). For RabbitMQ / NATS the equivalent is `prefetch_count` / `MaxAckPending`. Tune from a metric (`messages_in_flight`), not from intuition. **Why common:** Prefetch is a "throughput knob" in every tutorial. The trade-off ("buffer is per-consumer; slow consumer hoards") is in the docs but not in the headline.Bug 13 — Manual offset commit forgotten¶
func (c *Consumer) Run(ctx context.Context) error {
for {
msg, err := c.reader.FetchMessage(ctx) // manual commit mode
if err != nil { return err }
if err := c.processInTx(msg); err != nil {
log.Printf("process: %v", err); continue
}
// BUG: c.reader.CommitMessages(ctx, msg) never called
}
}
Answer
**Bug:** The author switched to manual commits (`FetchMessage` vs `ReadMessage`) but never called `CommitMessages`. The broker still has the *last* committed offset from before the change — possibly from days ago. On every restart, the consumer re-reads everything since that offset. Each restart triggers a replay storm. `processInTx` inserts duplicates (or, with a unique constraint, the log fills with violations). **Why subtle:** In steady state the consumer works. The bug shows only at restart, which the team carefully avoids in production. Then there's a deploy — and the new pod replays a week of events. **Spot:** Switching `ReadMessage` → `FetchMessage` (Kafka), or `autoAck=true` → `autoAck=false` (RabbitMQ), or `Subscribe` → `PullSubscribe` (NATS) requires an explicit ack/commit. Grep for the matched call. **Fix:** For at-least-once + idempotent processing, this is enough. For exactly-once you need transactional outbox or a single Kafka transaction wrapping read-process-write. **Why common:** "Manual commit" sounds like "now I control commits", not "now I'm responsible for every commit". Tutorials show the fetch loop; the commit line is on the next page.Bug 14 — Subscriber panic kills the broker goroutine¶
func (b *Broker) Publish(topic string, e Event) {
// snapshot subscribers as cps...
for _, fn := range cps {
fn(e) // synchronous, in the publisher's goroutine
}
}
func startBus() {
go func() {
for e := range incoming { broker.Publish(e.Topic, e) }
}()
}
Answer
**Bug:** Subscribers run synchronously inside the publisher's goroutine. If one subscriber panics (nil deref, index out of range, division by zero), the panic propagates through `Publish` and `for e := range incoming`. The bus goroutine dies. No more events get delivered to *any* topic, by *any* subscriber. The app looks healthy — HTTP serves, metrics scrape — but its event flow is silently dead. **Why subtle:** One bad subscriber takes everyone down. The crashed goroutine vanishes; whoever started it isn't watching for respawn. Logs may not even show a clean stack trace if `panic` happens deep. **Spot:** Any broker that invokes subscriber callbacks synchronously without a per-call `defer recover()`. **Fix:** Better: run each handler in its own goroutine with the same wrapper, so a slow handler can't stall the others either. Alert on the panic counter — a healthy bus has zero subscriber panics. **Why common:** Synchronous fan-out is the simplest implementation; the recover wrapper is "we'll add that when we need it". The first production panic is when you need it.Bug 15 — CloudEvents header injection¶
type CloudEvent struct {
ID, Source, Type string
Data map[string]any
}
func publishUserAction(action, userID string, payload map[string]any) error {
ev := CloudEvent{
ID: uuid.NewString(),
Source: "/users",
Type: "com.acme.user." + action, // action from HTTP query
Data: payload,
}
return nats.Publish("audit", ev)
}
func handleAction(w http.ResponseWriter, r *http.Request) {
action := r.URL.Query().Get("action") // attacker-controlled
publishUserAction(action, userID, body)
}
Answer
**Bug:** The CloudEvents `type` field is built from `action`, straight from the URL query. An attacker hits `/api/act?action=admin.deleted` and the broker fan-out delivers an event whose type *looks identical* to a legitimate `com.acme.user.admin.deleted` event. Downstream subscribers that filter on `type == "com.acme.user.admin.deleted"` to trigger compensating actions (revoke sessions, alert security) now fire on attacker-chosen input. An audit pipeline that uses `type` as a primary key sees a forged event indistinguishable from the real one. The same shape appears with Kafka headers, NATS subject suffixes, AMQP routing keys — anywhere the *envelope* is built from untrusted input. **Why subtle:** The `payload` field is treated as untrusted (the handler validates it). The `type` field looks like a constant prefix plus a string — the reviewer doesn't see "this string is attacker-controlled". Schema validators check that `type` is a string, not that the value is in a known set. **Spot:** Any envelope field (subject, routing key, partition key, type, source) built by `prefix + userInput` or `fmt.Sprintf(template, userInput)`. Envelope fields must come from a closed set; payload is where untrusted data goes. **Fix:** Subscribers should *also* validate they only handle types they own — not because the publisher is trusted, but because envelopes cross trust boundaries. **Why common:** Threat modeling for pub/sub focuses on payload validation and message integrity. The envelope feels like infrastructure, immune to attacker influence — until somebody builds it from a URL parameter.Summary¶
These bugs cluster into five families.
Broker concurrency hygiene (1, 2, 6, 11, 14): locking during delivery, unbuffered fan-out, unprotected subscriber map, mutex held across network I/O, no panic isolation. The broker is a concurrency point; every shared-state and slow-operation path deserves the same scrutiny as a critical section.
Lifecycle and cleanup (3, 4, 7): forgotten unsubscribe, broker close without channel close, unsubscribe without channel close. Subscriptions are resources. Tie them to a context.Context and close every channel on the path that hands ownership back.
Message ownership and routing (5, 8, 15): mutating a published struct, broken wildcard matcher, attacker-controlled envelope fields. Once an event leaves the publisher, the publisher does not own it any more — and any field a subscriber filters on must be drawn from a closed set.
Brokered ack/commit discipline (9, 13): acking before processing, manual commit forgotten. Brokered pub/sub trades latency for durability; the contract is "ack means I'm done", and any shortcut breaks at-least-once delivery.
Tuning that backfires (10, 12): loop-variable capture, prefetch too high. Defaults are usually right; deliberate tuning is usually wrong without a measurement to back it.
Review checklist for any Pub/Sub PR:
- Is the broker mutex never held during channel sends, network I/O, or subscriber callbacks?
- Are subscriber channels buffered, and is the slow-subscriber policy (drop / block / disconnect) explicit and per-subscriber?
- Does every
Subscribehave a matching cleanup — ideally tied to acontext.Contextrather than a manualUnsubscribe? - On broker shutdown (and on
Unsubscribe), is every subscriber channel explicitlyclose()d so receivers exit theirrange? - Are published messages either value types, deep-copied, or treated as immutable by convention enforced in code review?
- Is every subscriber callback wrapped in
defer recover()with a metric and a log, so one bad handler can't kill the broker? - For wildcard topics, is the matcher backed by a property-based test that includes adversarial near-matches?
- For at-least-once brokers, does
ack/commitrun after the side effects are durably committed — never before? - Is consumer prefetch / queue capacity tuned to ~2× steady-state throughput, not "as high as possible"?
- Are envelope fields (topic, subject, routing key, type) drawn from a closed allowlist, never built by concatenating untrusted input?