Fan-In / Fan-Out Inside a Pipeline — Specification¶
This file collects the formal contracts and invariants of fan-out / fan-in patterns. It is the reference engineers can quote in design reviews, code reviews, and architectural decisions.
Vocabulary¶
- Stage: A function that takes one or more receive-only channels and returns one or more receive-only channels, plus a
context.Contextfor cancellation. - Worker: A goroutine inside a stage. A stage may have one or many workers.
- Producer: The goroutine that originally creates and sends to a channel.
- Consumer: The goroutine that receives from a channel.
- Forwarder: A goroutine inside a merge stage; reads from one input channel and writes to the merged output.
- Closer: The goroutine responsible for closing a channel.
- Fan-out factor: The number of workers in a fan-out stage.
- Reorder buffer: A data structure (map or heap) that holds out-of-order results until they can be emitted in order.
Channel Ownership Invariants¶
For every channel c in a pipeline, exactly one goroutine is its owner. The owner:
- Creates
cviamake(chan T)ormake(chan T, n). - Is the sole writer to
c(or coordinates with others viaWaitGroupif there are multiple writers). - Is the sole closer of
c.
A non-owner goroutine MUST NOT: - Close c. - Assume c will be closed by any particular time. - Hold a reference to c after the owner has closed it.
Send / Receive Invariants¶
- A send on a closed channel panics.
- A receive on a closed channel returns the zero value and
ok=falseafter the buffer is drained. - A receive on a nil channel blocks forever.
- A send on a nil channel blocks forever.
close(nil_channel)panics.close(closed_channel)panics.closeis non-blocking; it sets the closed flag and wakes all waiters (receivers get zero+false, senders panic).
Merge Function Contract¶
A canonical merge function merge(ctx, c1, ..., cN) -> out:
- Returns a receive-only channel
outof the same element type. - Spawns one forwarder goroutine per input channel.
- Each forwarder reads from its input and forwards to
out, with<-ctx.Done()guard on every send. - A closer goroutine waits for all forwarders to exit, then closes
out. outis closed exactly once, after all input channels are closed AND all forwarders have exited.- The caller MUST NOT send to
out(enforced at compile time by<-chan Treturn type). - The caller MUST NOT close
out. - If
ctxis cancelled, all forwarders exit promptly;outthen closes.
Fan-Out Worker Pool Contract¶
A canonical worker pool pool(ctx, in, n, fn) -> out:
inis a receive-only channel of inputs.nis the worker count, fixed at construction.fnis the per-item transform.- Returns a receive-only channel
outof outputs. - Spawns
nworker goroutines, each reading fromin. - Each worker writes to
out, with<-ctx.Done()guard. - A closer goroutine waits for all workers; then closes
out. - Order of outputs is NOT guaranteed to match order of inputs (unless tagged-and-reordered).
- The pool exits when
inis closed AND all workers have finished OR whenctxis cancelled.
Cancellation Contract¶
For a stage that accepts context.Context:
- The stage SHOULD respect
ctx.Done()in every blocking operation. - On
ctx.Done()close, the stage SHOULD exit as quickly as possible. - The stage SHOULD release all resources on exit (close channels it owns, release pooled buffers, close files).
- The stage SHOULD NOT leak goroutines after cancellation.
- The stage MUST NOT panic on cancellation.
Backpressure Contract¶
In a pipeline using only unbuffered or modestly-buffered channels:
- The producer's rate is bounded by the slowest stage's rate.
- There is no unbounded queue between stages.
- If a consumer pauses, the producer eventually pauses too.
Violations: channels with very large buffers, or stages that use default clause in select to drop on full. These remove the backpressure property and require explicit drop accounting.
Ordering Contract¶
By default, fan-out destroys order. To restore order, use tag-and-reorder:
- Tag each input with a monotonically-increasing sequence number at the source.
- Workers preserve the sequence number on the output.
- A reorder stage emits results in sequence-number order.
The reorder stage: - Holds out-of-order results in a buffer (map or min-heap). - Emits the next-expected sequence as soon as it is available. - May stall if the reorder buffer exceeds a cap.
Round-robin merge preserves order ONLY if the dispatcher pre-assigns items in round-robin (not the runtime's automatic load balancing).
Error Propagation Contract¶
Three patterns:
Pattern A: Result[T] union¶
Each pipeline item is a Result[T] containing either a value or an error. The consumer inspects each result.
Pattern B: errgroup¶
Workers return error. The first non-nil error cancels the group's context. All workers exit. g.Wait() returns the first error.
Pattern C: DLQ¶
Errors are sent to a separate "dead-letter queue" channel. The main pipeline continues with successes only.
The pattern MUST be documented per pipeline. Mixing without coordination is a bug.
Resource Lifecycle Contract¶
For any resource held by a worker (file handle, network connection, database transaction):
- Acquire at start of work.
- Release before exit (
deferis canonical). - On panic recovery, release if possible.
- On ctx cancellation, release.
A leaked resource is a leaked goroutine indicator.
Goroutine Lifecycle Contract¶
For every goroutine spawned by the pipeline:
- It has a known entry condition (the
gostatement). - It has at least one known exit condition (input channel closed, ctx cancelled, work complete).
- The exit condition is reached in all execution paths.
- On exit, all resources are released and all owned channels are closed.
goleak.VerifyNone(t) enforces these at test time.
Buffer Size Specification¶
Default buffer sizes:
- Channel from a producer to a fan-out: 0 (unbuffered).
- Per-worker output channel: 0 or 1.
- Merged output channel: 0 to N (worker count).
- Cross-stage channel: 0.
- Tag-and-reorder buffer: bounded by expected straggler distance (typically O(N)).
Any non-zero buffer SHOULD be documented with a comment explaining the size.
Performance Contract¶
A fan-out / fan-in stage's overhead per item is approximately:
- 1 producer send + 1 worker receive: ~200 ns
- N workers parallel work: bounded by slowest worker
- 1 worker send + 1 forwarder receive: ~200 ns
- 1 forwarder send + 1 consumer receive: ~200 ns
Total channel overhead: ~600-800 ns per item, plus goroutine scheduling overhead (~300-1500 ns per item).
The stage adds linear overhead to per-item latency and is scalable in throughput up to the slowest stage's capacity.
Testing Contract¶
A pipeline test MUST:
- Run under
-race. - Verify the output count matches the input count.
- Verify no goroutines leak (use
goleak.VerifyNone(t)). - Verify cancellation shuts down the pipeline within a bounded time.
- Verify error injection produces expected error propagation.
A pipeline test SHOULD:
- Verify ordering if order is a requirement.
- Verify backpressure if the pipeline claims it.
- Verify throughput against an established baseline.
Documentation Contract¶
A pipeline implementation MUST document:
- The shape of the pipeline (stages, fan-out factors).
- Channel ownership (who writes, who reads, who closes each).
- Failure modes (what panics, what errors, what restarts).
- Resource caps (max goroutines, max memory).
- The cancellation behavior.
- The buffer sizes and their justifications.
Deployment Contract¶
A production pipeline:
- Exposes metrics for items/sec, errors/sec, latency, in-flight, goroutine count.
- Has alerting on critical metrics (lag, errors, goroutine count).
- Has a graceful shutdown procedure with a drain timeout.
- Has a runbook for common failures.
- Has been load-tested at 2x expected peak.
Composition¶
A pipeline composition A → B → C → ... → Z:
- Each stage's output is the next stage's input.
- Each stage gets the same
context.Context(or a derived one). - Cancellation of
ctxcancels all stages. - Closing the source's output propagates downstream as each stage's input closes.
- The end of pipeline (the final consumer) reads until the final stage's output closes.
Equivalence¶
Two pipelines are equivalent if and only if:
- They produce the same output for every input.
- They have the same termination behavior (ctx cancellation, source close).
- They have the same resource bounds.
Throughput and latency are not part of equivalence; they are performance properties.
Limits¶
The following are platform / Go-runtime limits:
- Max goroutines: practically ~1 million per GB of RAM.
- Max channel buffer: capped by available memory.
- Max select cases (static): 65536.
- Max reflect.Select cases: limited by slice; practically ~10 000 efficient.
- Max stack per goroutine: 1 GB (configurable via
debug.SetMaxStack).
Conformance¶
A fan-out / fan-in implementation is conformant to this specification if it:
- Implements the merge or pool function contract.
- Respects channel ownership.
- Respects cancellation.
- Provides backpressure (unless explicitly disabled with documentation).
- Has tests covering the contracts.
Code review against this specification is recommended for every pipeline before merge to main.
This specification is a living document. It records the agreements we make as a team about how fan-out / fan-in should behave. When evolving, update this file, update the tests, then update the implementation.
Detailed Channel Operation Semantics¶
The following formal semantics describe behavior of channel operations relevant to fan-out / fan-in pipelines. They are derived from the Go specification and runtime behavior.
Send Operation¶
Given ch <- v:
- If
chisnil, the operation blocks forever (the goroutine parks). - If
chis closed, the operation panics with "send on closed channel". - If
chis unbuffered: a. If a goroutine is waiting onchfor receive, the value is transferred directly; both goroutines continue. b. Otherwise, the sending goroutine parks until a receiver arrives. - If
chis buffered: a. If a goroutine is waiting onchfor receive (rare for buffered), direct transfer. b. Iflen(ch) < cap(ch), the value is copied into the buffer; the sender continues. c. Iflen(ch) == cap(ch), the sender parks until space is available.
Receive Operation¶
Given v, ok := <-ch or v := <-ch:
- If
chisnil, the operation blocks forever. - If
chis closed: a. Iflen(ch) > 0, the next value from the buffer is returned withok == true. b. Iflen(ch) == 0, the zero value is returned withok == false(in two-result form) or with no error (in single-result form). - If a goroutine is waiting on
chfor send (parked sender), the value is taken from it directly; both goroutines continue. - If
len(ch) > 0, the next value is returned from the buffer; the next parked sender (if any) deposits into the freed slot. - Otherwise, the receiver parks until a sender arrives or the channel is closed.
Close Operation¶
Given close(ch):
- If
chisnil, the operation panics. - If
chis already closed, the operation panics. - Otherwise: a. The channel's closed flag is set. b. All goroutines parked on the channel's recvq are woken; they receive the zero value with
ok == false. c. All goroutines parked on the channel's sendq are woken; they panic with "send on closed channel".
Select Operation¶
A select statement with N cases:
- If any case can proceed without blocking, one of the ready cases is chosen pseudo-randomly.
- If a
defaultcase exists and no other case is ready, thedefaultruns. - If no
defaultand no case is ready, the goroutine parks; the select fires when any case becomes ready.
A nil channel in a select case is treated as "never ready" — the case is effectively disabled.
Forwarder Goroutine Semantics¶
Inside a merge function, each forwarder goroutine satisfies:
- It reads from one input channel
cviafor v := range c { ... }or an equivalent select pattern. - It writes each value to the merged output
outvia a select that includes<-ctx.Done(). - It exits when: a.
cis closed (the for-range loop ends naturally), OR b.ctx.Done()is closed (the select picks the cancellation case). - It calls
wg.Done()before exit (viadefer wg.Done()at the top). - It does not close
out. It does not closec.
Worker Goroutine Semantics¶
Inside a fan-out worker:
- It reads from a shared input channel
in. - It performs work for each item.
- It writes the result to an output channel
out(per-worker or shared). - It honors
ctx.Done()on every blocking operation. - It closes
out(if per-worker) before exit viadefer close(out). - It calls
wg.Done()before exit if part of aWaitGroupgroup. - It recovers panics if the work can panic on untrusted input.
Closer Goroutine Semantics¶
A closer goroutine inside a merge or pool:
- Waits on
wg.Wait()until all forwarders or workers have exited. - Closes the merged output channel exactly once.
- Returns.
The closer is independent of the merge function's return. The merge function returns the output channel immediately; the closer runs in the background until all upstream goroutines finish.
Supervisor Goroutine Semantics¶
A supervisor that monitors a worker:
- Invokes the worker function.
- On worker exit: a. If error is
nilorcontext.Canceled, supervisor exits. b. If error indicates transient failure, supervisor waits a backoff period and re-invokes. c. If error indicates fatal failure, supervisor escalates (returns the error to its parent). - The supervisor respects
ctx.Done()during backoff.
The supervisor MUST NOT loop forever on context cancellation; it must check and return.
Backpressure Propagation Theorem¶
Given a pipeline with stages S1, S2, ..., Sn, each connected by unbuffered or modestly-buffered channels:
- The maximum sustained throughput is
min(throughput(Si))over all stages. - The producer's effective rate equals the slowest stage's rate.
- The pipeline applies backpressure: if any consumer pauses, the producer eventually pauses.
- Memory in flight is bounded: sum of all channel buffers plus per-stage in-flight items.
Buffers larger than O(1) introduce a "latency window" during which the producer can outrun the consumer. Beyond that window, backpressure resumes.
Ordering Theorem¶
Given a fan-out worker pool with N workers:
- By default, the output order is NOT a function of the input order.
- With tag-and-reorder (sequence-number tagging at input, heap-based reorder at output), the output order matches the input order, with a memory bound proportional to the maximum straggler distance.
- With round-robin dispatch (input item i to worker i mod N) and round-robin merge (read from worker output i mod N), the output order matches the input order if and only if work is uniform across workers.
Cancellation Latency Theorem¶
Given a pipeline with K stages and the longest blocking operation being a channel op (no syscalls or unbounded loops):
- The maximum time from
cancel()to full pipeline shutdown is bounded by K * select_latency (typically a few microseconds total). - If any stage has unbounded blocking (uncancellable syscall, infinite loop without select), cancellation is unbounded.
- Tests SHOULD verify cancellation latency.
Equivalence Classes¶
Two channel operations are equivalent if they have the same observable behavior. For example:
for v := range cis equivalent tofor { v, ok := <-c; if !ok { break }; ... }.ch <- vfollowed byclose(ch)(single-value channel) is equivalent to writing to a buffered channel of cap 1.- A merge of N=1 channels is equivalent to the input channel (modulo an extra forwarder goroutine).
Equivalence is useful for refactoring without changing behavior.
Constraints on Implementations¶
A conformant merge[T] implementation MUST:
- Accept a variable number of input channels via
...<-chan T. - Return a
<-chan T. - Accept a
context.Contextfirst argument. - Spawn at most O(N+1) goroutines where N is the number of inputs.
- Use a
sync.WaitGroupor equivalent to coordinate forwarder exits. - Close the output channel exactly once.
- Have a closer goroutine separate from the forwarders.
- Handle the zero-input case gracefully (return an immediately-closed channel).
- Handle
nilinput channels by ignoring them (or panic — but ignoring is preferred).
Conformance Tests¶
A conformant implementation passes:
func TestMergeAllValues(t *testing.T) {
// Three channels each emitting some values; merge produces all of them.
}
func TestMergeClosesWhenAllInputsClose(t *testing.T) {
// After all inputs close, merge's output also closes.
}
func TestMergeRespectsCancellation(t *testing.T) {
// After cancel(), merge's output closes within a bounded time.
}
func TestMergeNoLeaks(t *testing.T) {
// goleak.VerifyNone(t) after merge completes.
}
func TestMergeZeroInputs(t *testing.T) {
// merge() with no arguments returns an immediately-closed channel.
}
func TestMergeSingleInput(t *testing.T) {
// merge with one input behaves like a transparent forwarder.
}
A pool implementation passes analogous tests.
Cross-Stage Contracts¶
When composing stages A and B such that A's output is B's input:
- The same
context.Contextis passed to A and B. - A closes its output channel when its work is done.
- B reads from A's output until it closes.
- B closes its own output channel when it has finished forwarding A's output (or when ctx cancels).
- Cancellation of ctx cancels both A and B.
Reorder Stage Contract¶
A reorder stage reorder(ctx, in) -> out:
- Reads tagged values
(seq, val)fromin. - Maintains a buffer of pending tagged values keyed by
seq. - Emits values in order of sequence number to
out. - The reorder buffer has a maximum size; if exceeded, the stage stalls (applies backpressure) or panics (configurable).
- On
ctx.Done(), the stage exits; pending values are discarded. outis closed wheninis closed AND the next-expected sequence is unreachable, OR when ctx cancels.
Hedged Request Contract¶
A hedged request fan-out:
- Sends the request to one primary replica immediately.
- After a delay, sends to a secondary replica.
- Whichever responds first is the returned result.
- The slow replica is cancelled via context.
- The returned result is propagated to the caller; the cancelled replica's response is discarded.
- If both fail, the last error is returned.
Auto-scaler Contract¶
An auto-scaler for fan-out worker count:
- Has a minimum and maximum worker count.
- Periodically measures load (e.g., channel fill, latency).
- Adjusts worker count gradually (one at a time) to avoid oscillation.
- Uses hysteresis: threshold to add < threshold to remove.
- Respects min/max bounds.
- On cancellation, stops adjusting; workers exit as they finish.
Bulkhead Contract¶
A bulkheaded fan-out:
- Per-worker errors are caught locally (recovered or logged).
- Per-worker errors do NOT propagate as group errors (do not cancel siblings).
- Critical errors (out-of-memory, fatal) MAY escalate.
- Each worker has a private resource budget.
Specification Versioning¶
This specification is versioned. Changes:
- Major version: incompatible changes (e.g., changed signatures).
- Minor version: new optional features.
- Patch: documentation clarifications.
Current version: 1.0.
When a project depends on this specification, it should pin the version.
Glossary of Specification Terms¶
- MUST: required for conformance.
- MUST NOT: prohibited for conformance.
- SHOULD: strongly recommended; departures require justification.
- MAY: optional.
- MAY NOT: optional opposite of SHOULD.
These are the standard RFC 2119 keywords.
End of Specification¶
This document is intended to be read together with the junior, middle, senior, and professional files. The specification is the formal layer; the other files are the educational layer.
For questions or proposed amendments, raise an issue against this file with the rationale and example.
Appendix A: Canonical Implementations¶
Canonical merge¶
func merge[T any](ctx context.Context, cs ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
wg.Add(len(cs))
for _, c := range cs {
c := c
go func() {
defer wg.Done()
for v := range c {
select {
case out <- v:
case <-ctx.Done():
return
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Canonical worker pool¶
func pool[I, O any](ctx context.Context, in <-chan I, n int, fn func(I) O) <-chan O {
out := make(chan O)
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for v := range in {
r := fn(v)
select {
case out <- r:
case <-ctx.Done():
return
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Canonical reorder¶
type Tagged[T any] struct {
Seq int
Val T
}
func reorder[T any](ctx context.Context, in <-chan Tagged[T]) <-chan T {
out := make(chan T)
go func() {
defer close(out)
next := 0
pending := make(map[int]T)
for t := range in {
pending[t.Seq] = t.Val
for {
v, ok := pending[next]
if !ok {
break
}
select {
case out <- v:
case <-ctx.Done():
return
}
delete(pending, next)
next++
}
}
}()
return out
}
These implementations are the reference. Variations exist for performance, but the contract is the same.
Appendix B: Conformance Statement¶
Any pipeline component claiming conformance with this specification SHOULD include in its package documentation:
This package implements [version X.Y] of the Fan-In / Fan-Out specification. The conformance tests in conformance_test.go verify the contract.
This makes the contract auditable.
Appendix C: Specification History¶
- Version 1.0: Initial. Defines core fan-out / fan-in patterns, merge contract, pool contract, cancellation, backpressure, ordering.
- (Future versions to be added as the specification evolves.)
End of specification document.