Observer — Find the Bug¶
Each section presents an Observer that looks fine but is broken. Find the bug yourself, then check.
Table of Contents¶
- Bug 1: ConcurrentModificationException during dispatch
- Bug 2: One bad observer breaks the chain
- Bug 3: Memory leak from never-unsubscribed observer
- Bug 4: Cyclic notifications
- Bug 5: Race on the listener list
- Bug 6: Push vs pull data race
- Bug 7: Async dispatch with shared mutable event
- Bug 8:
equalsconfusion in unsubscribe - Bug 9: Order dependency between observers
- Bug 10: Subscribe inside a constructor
- Bug 11: Missing per-handler exception isolation in async
- Bug 12: SSE / WebSocket leak on disconnect
- Practice Tips
Bug 1: ConcurrentModificationException during dispatch¶
public final class Bus {
private final List<Listener> listeners = new ArrayList<>();
public void subscribe(Listener l) { listeners.add(l); }
public void unsubscribe(Listener l) { listeners.remove(l); }
public void publish(Event e) {
for (Listener l : listeners) l.on(e);
}
}
// One listener unsubscribes itself in on(...).
In production, intermittent ConcurrentModificationException.
Reveal
**Bug:** A listener modifies the `listeners` list during iteration. `ArrayList`'s iterator detects the modification and throws. **Fix:** snapshot before iterating, or use `CopyOnWriteArrayList`. Or: **Lesson:** The dispatch loop runs while observers might modify the list. Either snapshot or use a copy-on-write collection.Bug 2: One bad observer breaks the chain¶
public void publish(Event e) {
for (Listener l : listeners) {
l.on(e); // any listener throwing kills the loop
}
}
In production, sometimes only some listeners run; the rest are skipped. Logs show a NullPointerException.
Reveal
**Bug:** No per-listener try/catch. The first listener that throws aborts the loop — subsequent listeners never run. **Fix:** wrap each invocation. **Lesson:** In a broadcast chain, one observer's failure must not affect the rest. Always isolate.Bug 3: Memory leak from never-unsubscribed observer¶
public final class GlobalBus {
public static final GlobalBus INSTANCE = new GlobalBus();
private final List<Listener> listeners = new ArrayList<>();
public void subscribe(Listener l) { listeners.add(l); }
}
// In a request-scoped object:
public final class RequestHandler {
public RequestHandler() {
GlobalBus.INSTANCE.subscribe(e -> handle(e));
}
public void handle(Event e) { /* ... */ }
}
After hours, memory grows. Heap dump shows millions of RequestHandler instances.
Reveal
**Bug:** Each `RequestHandler` subscribes; the `GlobalBus` holds a strong reference to the lambda; the lambda captures `this`. The `RequestHandler` can never be GC'd. Classic Observer leak. **Fix:** unsubscribe in lifecycle / use weak references. Or: **Lesson:** Long-lived subjects + short-lived observers = leaks unless cleaned up. Manage lifecycle explicitly or use weak refs.Bug 4: Cyclic notifications¶
class Bus {
void publish(Event e) { for (Listener l : listeners) l.on(e); }
}
class A implements Listener {
public void on(Event e) {
bus.publish(new Event()); // re-publishes
}
}
Stack overflow.
Reveal
**Bug:** Listener publishes the event that caused it to be invoked. Infinite loop until the stack runs out. **Fix:** detect cycles or restructure. Or model as a state machine: listener emits a *different* event type that closes the loop. **Lesson:** Observer chains are easy to make cyclic. Detect or design out cycles.Bug 5: Race on the listener list¶
class Bus {
private final List<Listener> listeners = new ArrayList<>(); // not thread-safe
public void subscribe(Listener l) { listeners.add(l); }
public void publish(Event e) {
for (Listener l : listeners) l.on(e);
}
}
Threads A subscribe; thread B publishes. Sometimes B sees an inconsistent list (NPE during iteration, or recent subscriptions missing).
Reveal
**Bug:** `ArrayList` is not thread-safe. Concurrent reads + writes have undefined behavior in the JMM. **Fix:** use a thread-safe collection. Or synchronize externally. **Lesson:** Subscriber lists in multi-threaded code must use thread-safe collections. `CopyOnWriteArrayList` is purpose-built for this case.Bug 6: Push vs pull data race¶
class WeatherStation {
private double temp;
private List<Listener> listeners;
public void setTemp(double t) {
notifyAll(); // observers pull
this.temp = t; // BUG: order
}
private void notifyAll() {
for (Listener l : listeners) l.onChanged(this); // pulls .getTemp() — old value!
}
public double getTemp() { return temp; }
}
Observers read the old temp.
Reveal
**Bug:** State updated AFTER notifications. Observers pull the stale value. **Fix:** update first, notify after. In sync chains this is enough. In async or with multiple state updates, batching may be needed. **Lesson:** Order matters in pull model: state must be settled before notification.Bug 7: Async dispatch with shared mutable event¶
class Bus {
private final ExecutorService exec = Executors.newFixedThreadPool(4);
public void publish(MutableEvent e) {
for (Listener l : listeners) {
exec.submit(() -> l.on(e)); // all listeners share one event
}
}
}
// One listener:
public void on(MutableEvent e) { e.setProcessed(true); }
// Another listener:
public void on(MutableEvent e) { if (e.isProcessed()) skip(); }
Inconsistent results: the second observer's behavior depends on the order in which the executor schedules — which is non-deterministic.
Reveal
**Bug:** Shared mutable state across listeners. One mutates; another reads. Race condition. **Fix:** make events immutable. Or give each listener its own copy. **Lesson:** Events should be immutable values. Mutating events leak coupling between observers.Bug 8: equals confusion in unsubscribe¶
class Bus {
private final List<Listener> listeners = new ArrayList<>();
public void subscribe(Listener l) { listeners.add(l); }
public void unsubscribe(Listener l) { listeners.remove(l); }
}
class App {
void setup() {
bus.subscribe(this::handle);
// ...later...
bus.unsubscribe(this::handle); // doesn't unsubscribe!
}
void handle(Event e) { /* ... */ }
}
Memory grows; logs show observer never removed.
Reveal
**Bug:** Each `this::handle` is a *new* lambda. Two different objects, even pointing to the same method. `remove()` searches for an `equals`-equal entry; doesn't find it. **Fix:** keep a reference. Or have `subscribe` return a `Disposable`: **Lesson:** Method references and lambdas don't have `equals` based on the underlying method. Always retain the exact object you subscribed.Bug 9: Order dependency between observers¶
bus.subscribe(o -> calculateTax(o));
bus.subscribe(o -> applyDiscount(o));
bus.subscribe(o -> commitOrder(o));
Tests pass. In production, sometimes commitOrder runs before applyDiscount, missing the discount.
Reveal
**Bug:** Observer doesn't promise order. The implementation may have happened to be insertion-ordered, but switching to `CopyOnWriteArraySet` or async dispatch reorders. Discount and commit shouldn't be observers of the same event — they're a *pipeline*. **Fix:** model as a pipeline (Chain of Responsibility) or sequence the steps explicitly. Or: split events. `OrderPlaced` triggers `calculateTax`; that emits `OrderTaxed`; that triggers `applyDiscount`; etc. **Lesson:** Observer is for independent reactions. If you have ordered steps, use a different pattern (or an explicit handler).Bug 10: Subscribe inside a constructor¶
public final class Listener {
public Listener(Bus bus) {
bus.subscribe(this); // `this` published before construction completes
}
public void on(Event e) {
useField(); // BUG: field might not be initialized yet
}
}
NPEs in useField() for events fired during construction.
Reveal
**Bug:** Publishing `this` before the constructor finishes. If the bus dispatches synchronously while the constructor is running, the partially-constructed object handles events with uninitialized fields. **Fix:** subscribe AFTER construction, in a factory or `start()` method. **Lesson:** Don't leak `this` during construction. Subscriptions should happen in a separate post-construction step.Bug 11: Missing per-handler exception isolation in async¶
public void publish(Event e) {
CompletableFuture<Void> all = CompletableFuture.allOf(
listeners.stream()
.map(l -> CompletableFuture.runAsync(() -> l.on(e)))
.toArray(CompletableFuture[]::new)
);
// No try/catch
}
In production, a poison-pill event makes one handler throw; nothing in logs; the future fails silently.
Reveal
**Bug:** `CompletableFuture` exceptions are not propagated unless you `.exceptionally()`, `.handle()`, or `.join()`. Failures vanish. **Fix:** **Lesson:** Async handlers are easier to lose than sync ones. Always attach an error handler.Bug 12: SSE / WebSocket leak on disconnect¶
class LiveStream:
def __init__(self):
self._subscribers: list[asyncio.Queue] = []
def publish(self, msg):
for q in self._subscribers:
q.put_nowait(msg)
async def subscribe(self):
q = asyncio.Queue()
self._subscribers.append(q)
while True:
yield await q.get()
Clients disconnect; queues stay in self._subscribers. Memory grows.
Reveal
**Bug:** Disconnection is not handled. The async iterator never reaches its `finally` because cancellation isn't propagated to the generator. **Fix:** clean up on cancellation. (With proper async-generator cancellation, the `finally` runs when the consumer stops awaiting.) **Lesson:** Live streams (SSE, WebSocket) need explicit unsubscribe on client disconnect. `try/finally` around the consume loop is the standard idiom.Practice Tips¶
- Concurrent modification is the most common bug. Always snapshot or use COW.
- Per-handler error isolation is non-negotiable. Wrap each invocation.
- Audit observer lifecycle. Long-lived subject + short-lived observer = leak unless cleaned up.
- Beware lambda equality.
this::methodis a new lambda each time. - Mutable shared events are a recipe for races. Make events immutable.
- State first, notify after. In pull model, observers see what's settled.
- Observers shouldn't depend on each other's order. If they do, use a pipeline.
- Don't leak
thisfrom constructors. Subscribe after construction. - Async dispatch swallows exceptions silently. Attach error handlers.
- WebSocket / SSE need explicit cleanup on disconnect.