Stateful Stream-Processing Engine (Flink-lite, in Go)¶
Build a stateful stream processor that consumes Kafka and computes windowed aggregations correctly under out-of-order and late events. Learn where watermarks trade correctness for latency, and where keyed state stops fitting in RAM and has to spill to disk.
| Tier | Lab (event-engineering) |
| Primary domain | Stream processing / distributed systems |
| Skills exercised | Event-time windowing (tumbling/sliding/session), watermarks & allowed lateness, keyed local state + changelog, checkpoint/restart, idempotent sinks, Go (twmb/franz-go, cockroachdb/pebble or dgraph-io/badger, pprof) |
| Interview sections | 11 (messaging), 13 (distributed systems), 17 (performance) |
| Est. effort | 4–6 focused days |
1. Context¶
You own the real-time analytics path at a company doing ~2B events/day. Product wants per-account rollups computed on the stream instead of in a nightly batch: "sessions per user", "5-minute sliding revenue", "count per fixed minute". The catch is the data isn't tidy — mobile clients buffer offline and flush hours later, so events arrive out of order with a long tail of late arrivals, and the key space is huge (millions of accounts) so per-key window state won't sit in memory forever.
Reaching for Flink/Kafka-Streams hides exactly the parts an interviewer probes: what a watermark actually is, what "allowed lateness" costs you, and where your window state lives when there are 10M keys. So you build a Flink-lite engine yourself, in Go, and produce numbers: how much late data you drop at a given watermark lag, when state spills to disk, and how long recovery from a checkpoint takes after a crash.
2. Goals / Non-goals¶
Goals - Implement tumbling, sliding, and session windows over event time, keyed by account_id, with correct results under reordering. - Implement watermarks + allowed lateness and quantify the correctness-vs-latency trade-off (how many late events are dropped at each watermark lag). - Hold keyed local state for 10M+ keys with a changelog, spilling to an embedded KV (pebble/badger) when it exceeds a memory budget. - Checkpoint state + input offsets atomically and recover after a crash with results that match a no-crash run. - Emit results to an idempotent sink so replay/recovery doesn't double-count.
Non-goals - Reimplementing Flink's full operator graph, SQL layer, or cluster scheduler — single process (or a few keyed shards), no distributed task manager. - Exactly-once across operators via two-phase commit to multiple systems — one sink, made idempotent. State the guarantee you reach. - Schema evolution / Avro — fixed binary payload (that's the schema-registry lab). - Re-sharding window state across nodes live (that's the staff sharding lab).
3. Functional requirements¶
- A source (
cmd/source) consumes topiceventsfrom Kafka withfranz-go, extracts the event timestamp from the payload (not the Kafka append time), and feeds records into the windowing operator in arrival order. - A windowing operator (
internal/window) supports, selectable by flag: tumbling— fixed, non-overlapping (e.g. 1-minute) windows.sliding— size + slide (e.g. 5-minute size, 1-minute slide).session— gap-based (e.g. close after 30s of inactivity per key), with session merging when a late event bridges two open sessions.- A watermark generator advances event-time as
max_seen_ts − allowed_skew(bounded-out-of-orderness). Windows fire when the watermark passeswindow_end + allowed_lateness; events older than the watermark by more than the lateness budget are dropped and counted (not silently lost). - Keyed state stores per-window accumulators keyed by
(key, window). State lives in memory up to a budget, then spills topebble/badger; every state mutation is appended to a changelog for recovery. - A checkpoint coordinator (
cmd/checkpointor a goroutine) periodically snapshots state + the source offsets atomically; on restart the engine loads the latest checkpoint and resumes from the committed offsets. - A sink (
internal/sink) writes fired-window results idempotently, keyed by(key, window_start, window_end), so re-emission after recovery is a no-op. - A chaos hook (
cmd/chaos) cankill -9the engine mid-run to exercise recovery from the last checkpoint.
4. Load & data profile¶
- Volume: generate and process ≥ 1 billion events total; a single sustained run ≥ 30 minutes at target rate.
- Message size: 256 B events (
account_id,value,event_ts,seq, pad). - Key cardinality:
account_idover 10M+ accounts, Zipfian (s≈1.1) so a handful of hot keys dominate volume while millions of cold keys still hold open window state — this is what makes state large. - Out-of-order model: each event's
event_tsis delayed from "true" time by a Zipfian-tailed lag: ~95% within a few seconds, a long tail up to several minutes late, plus a small fraction of extreme stragglers (hours). This is the knob the watermark/lateness experiments turn. - Generator:
cmd/genis deterministic given a seed; it emits the true ordering and the delayed ordering so you can compute a ground-truth result to diff against. - Traffic model: open-model source (fixed input rate) so lag and state growth are observable rather than self-throttled by the consumer.
5. Non-functional requirements / SLOs¶
| Metric | Target |
|---|---|
| Sustained ingest throughput (256 B, single process) | Find & report the ceiling; name the bound (CPU on window bookkeeping? state I/O? GC/allocs?) |
| Result correctness vs ground truth (no late events) | 100% of windows match the deterministic batch result, exactly |
| Result correctness with late tail @ chosen watermark | Report drop rate (% events past lateness budget) and the resulting per-window error |
| Per-event processing p99 (ingest → accumulator updated) | < 5 ms at 80% of ingest ceiling |
| Window-fire → sink-emit p99 | < 1 s after watermark crosses window_end + lateness |
| Resident memory under 10M open keys | Bounded (e.g. ≤ 4 GB); engine spills to disk rather than OOMing |
Recovery time after kill -9 (load checkpoint → caught up) | Measured & reported; results post-recovery match a clean run |
| State-on-disk size @ 10M keys | Reported, with the compaction story for the changelog |
The point isn't a magic number — it's to find where each trade-off bites (watermark lag vs dropped events, key count vs spill, window size vs memory) and explain it with measurements.
6. Architecture constraints & guidance¶
- 1 Kafka broker (KRaft, no ZooKeeper) via
docker-composeis enough — the lab is about the processor, not the broker. Pin the version. - Go Kafka client:
twmb/franz-go. Disable client-side reordering and read the payloadevent_tsyourself — Kafka offset order ≠ event-time order. - Embedded state store:
cockroachdb/pebble(LSM, RocksDB-shaped, good for the changelog story) ordgraph-io/badger. Keep a hot in-memory map for active windows; treat the KV as the spill + checkpoint tier. - Make the watermark, window assigner, and state backend interfaces so you can swap window types and a memory-only vs disk-backed state without rewrites.
- Single-writer per key: shard by
hash(account_id) % Nacross N goroutines so each key's state is touched by exactly one worker — no per-key locks. - Instrument with Prometheus: ingest rate, watermark value & lag, open-window count, dropped-late counter, spill bytes, checkpoint duration, sink emit rate, p50/p99/p999 per stage. Profile allocations with
pprof— window bookkeeping is an allocation trap.
7. Data model¶
event: { account_id uint64, value int64, event_ts int64 (ms), seq uint64, pad []byte }
keyed state (in-memory hot tier + pebble/badger cold tier):
windowKey = (account_id, window_start, window_end) // tumbling/sliding
sessionKey = (account_id, session_id) // session, mergeable
accumulator = { count uint64, sum int64, min,max int64, last_update_ts int64 }
changelog (append-only, in pebble): seq → (windowKey, accumulator-delta)
checkpoint: { checkpoint_id, source_offsets[partition]→offset,
state_snapshot_ptr, watermark, ts }
sink (idempotent):
results(account_id, window_start, window_end, agg..., PRIMARY KEY(account_id, window_start, window_end))
results keyed by (account_id, window_start, window_end); re-emitting the same window after recovery is an idempotent no-op. 8. Interface contract¶
GET /windows/{account_id}?from=&to=→ fired windows for a key in a time range.GET /watermark→{ "watermark_ms": ..., "lag_ms": ..., "dropped_late": N }.GET /metrics→ Prometheus exposition.- Engine configured via flags/env:
-window=tumbling|sliding|session,-size,-slide,-gap,-allowed-skew,-allowed-lateness,-state=mem|disk,-mem-budget,-checkpoint-interval,-shards.
9. Key technical challenges¶
- Event time vs processing time. Offsets arrive in append order; your results must be in event-time order. The watermark is the only thing that tells you "event time has advanced enough to fire this window" — get its semantics wrong and you either fire too early (wrong) or never (stalled).
- The watermark trade-off is the lab. A tight watermark (small
allowed_skew) fires fast but drops more of the late tail; a loose one is more correct but holds windows open longer (more state, more latency). You must quantify the curve, not pick a number. - Session merging. A late event can fall in the gap between two open sessions and must merge them — including merging their accumulators and state entries. This is the subtle correctness bug factory.
- State that doesn't fit. 10M Zipfian keys with overlapping sliding windows means millions of live
(key, window)accumulators. Decide what stays hot, what spills topebble, and when expired windows get garbage-collected so state doesn't grow without bound. - Atomic checkpoints. State snapshot and source offsets must be consistent — if you snapshot state at offset X but commit offset Y, recovery double-counts or loses events. The changelog + offset must move together.
- Allocation pressure. Per-event window assignment, map churn, and accumulator boxing generate garbage at billions of events.
pprofthe heap; this is analloc/opproblem before it's a throughput problem.
10. Experiments to run (break it / tune it)¶
Record before/after numbers for each:
- Window-type cost. Same input, same key set: tumbling vs sliding (5m/1m) vs session. Measure ingest throughput, p99 per-event latency, open-window count, and resident memory. Explain why sliding costs more.
- Watermark lag vs correctness. Sweep
allowed_skew∈ {1s, 5s, 30s, 2m} at a fixed late-tail profile. Plot % late events dropped and per-window error vs ground truth against fire latency. Find the knee. - Allowed-lateness sweep. Fix the watermark; sweep
allowed_lateness∈ {0, 10s, 1m, 5m}. Measure recovered-correctness vs extra state held open and extra sink re-emits. - State size vs cardinality vs memory. Run with 100k → 1M → 10M keys. Plot resident memory and find where
memmode OOMs anddiskmode must spill. Report on-disk state size and spill throughput. - Throughput vs window size. Tumbling 1s → 1m → 10m. Show how larger windows change accumulator count, memory, and fire frequency.
- Out-of-order severity. Sweep the Zipfian delay tail from "tight" (95% < 1s) to "wild" (long minutes-late tail). Measure dropped-late rate and result error at a fixed watermark — show how reorder severity degrades correctness.
- Recovery from checkpoint. During a 30-min run,
kill -9the engine. Measure time to load the last checkpoint and catch back up to live; prove post-recovery results match a clean run, and that the idempotent sink produced no duplicates. - Checkpoint-interval cost. Sweep checkpoint interval {5s, 30s, 2m}. Trade recovery time (smaller = faster recovery) against steady-state overhead (snapshot pause + changelog/disk I/O).
11. Milestones¶
- Compose Kafka up;
cmd/gendeterministic Zipfian-delayed data with a ground-truth batch result;cmd/sourcefeeding events. - Tumbling windows + watermark + drop-late counter; first correctness diff vs ground truth; Prometheus board for rate/watermark-lag/open-windows.
- Sliding + session windows (incl. session merge); window-type cost run (exp 1).
- Disk-backed state + changelog; cardinality/memory sweep and spill (exp 4);
pprofthe allocation hot path. - Checkpoint/recover;
kill -9chaos proving post-recovery correctness and idempotent sink (exp 7); watermark/lateness trade-off curves (exp 2, 3, 6).
12. Acceptance criteria (definition of done)¶
- Sustained ≥ 30-min run at a stated rate with bounded memory and flat open-window count, dashboard screenshot attached.
- All three window types produce results that match the deterministic ground-truth when no events are dropped (show the diff).
- Watermark-lag curve plotted: dropped-late % and per-window error vs fire latency, with the chosen operating point justified.
- State spills to
pebble/badgerat 10M keys without OOM; resident memory stays under the stated budget; on-disk size reported. - After
kill -9mid-load, the engine recovers from the last checkpoint and post-recovery results match a clean run, with zero duplicate sink rows (show the SQL/diff). - Recovery time and checkpoint overhead measured and reported.
-
pprofheap profile attached; the per-event allocation hot path named and reduced, withalloc/opbefore/after. - Every number is reproducible from a committed command + config + seed.
13. Stretch goals¶
- Incremental / RocksDB-style checkpoints: snapshot only the changelog delta since the last checkpoint instead of full state; measure the overhead drop.
- Watermark per partition (per Kafka partition) with a global min — handle an idle partition that would otherwise stall the global watermark.
- Late-data side output: route dropped-late events to a side topic instead of counting-and-discarding; reprocess them in a slow path.
- Two-phase commit sink to a second system (e.g. Postgres + Kafka) for true cross-system exactly-once; quantify the coordinator cost vs idempotent upsert.
- State TTL & compaction tuning: tune
pebblecompaction for the changelog and measure write amplification of the checkpoint path.
14. Evaluation rubric¶
| Dimension | Senior bar | Staff bar |
|---|---|---|
| Event-time model | Implements watermarks, windows fire correctly | Explains watermark semantics; handles idle partitions and per-partition watermark min |
| Late-data handling | Drops & counts late events | Quantifies the lag/lateness trade-off; picks an operating point with evidence |
| Window correctness | Tumbling/sliding match ground truth | Session merge is correct under reordering; proves it against the deterministic result |
| State at scale | State works for the key set | Spills to disk at 10M keys with bounded memory; GCs expired windows; reports on-disk cost |
| Recovery | Recovers after a crash | Atomic state+offset checkpoint; proves post-recovery == clean run with an idempotent sink |
| Performance | Reports throughput + p99 | Names the bottleneck; reduces alloc/op with pprof evidence |
| Communication | Clear findings note | Could defend every curve (watermark, spill, recovery) to a staff panel |
15. References¶
- Designing Data-Intensive Applications — Ch. 11 (stream processing, event-time vs processing-time, windows).
- Streaming Systems (Akidau, Chernyak, Lax) — watermarks, windowing, the "what / where / when / how" model.
- Apache Flink docs: event time, watermarks, allowed lateness, keyed state, checkpointing (read for the model, not to copy the impl).
twmb/franz-goconsumer docs;cockroachdb/pebble/dgraph-io/badgerdocs.- Go
runtime/pprof+pprof -alloc_spacefor the allocation hot path. - See also:
Interview Question/11-messaging-and-event-streaming/.