Fan-Out — Specification¶
Table of Contents¶
- Introduction
- Formal Definition
- Function Signature
- Preconditions
- Postconditions
- Distribution Semantics
- Closing Protocol
- Cancellation Semantics
- Memory Model Edges
- Resource Bounds
- Concurrency Invariants
- Edge Cases
- Reference Implementation
- Compliance Checks
- Summary
Introduction¶
This file is the formal specification of fan-out. It states the contract a fan-out function must satisfy, distinct from a worker pool's broader contract. Implementations may differ in performance and structure but must respect this contract.
Formal Definition¶
Let T be the input element type and R be the result type. Let ctx be a context.Context, in be a receive channel of T, n be a positive integer, and work be a function of signature func(context.Context, T) R.
Process(ctx, in, n, work) returns a receive channel out of R such that:
- For every value
vreceived fromin, exactly one worker callswork(ctx, v)and the result is sent onout, orctxis cancelled and the value is silently dropped. - After
inis closed and drained (and not cancelled),outis closed. - After
ctx.Done()becomes ready,outis closed in finite time. - No goroutine spawned by Process remains alive after
outis closed. - Each value from
inis processed by exactly one worker (no duplication).
Function Signature¶
func Process[T, R any](
ctx context.Context,
in <-chan T,
n int,
work func(context.Context, T) R,
) <-chan R
ctx: cancellation control.in: receive-only input channel.n: positive integer; number of worker goroutines to spawn.work: per-item processing function. May block; should respect ctx.- Return: receive-only output channel.
Preconditions¶
n > 0. Behaviour forn ≤ 0is undefined; implementations may panic.inmust be a valid (non-nil) channel.- The producer of
inmust closeinexactly once when there are no more values, orctxmust be cancelled. - The caller must drain
outto completion or cancelctx. workmust not retain ownership ofTbeyond its return.
Postconditions¶
outis a fresh channel created by Process.outis closed exactly once.- The number of values received from
outis at most the number of values sent oninbefore any closure. - If neither
ctxcancels norworkpanics, the number of values received fromoutequals the number of values sent onin. - After
outis closed, all internal goroutines have exited.
Distribution Semantics¶
- Each value from
inis delivered to exactly one worker. No value is delivered to two workers. - The runtime selects which worker receives each value. The selection is "fair" in expectation but not in any particular pattern.
- The order of values received from
outis generally NOT the order they were sent onin. A fasterworkinvocation finishes earlier; a slower one finishes later. - For order-preserving processing, attach a sequence number to each
Tand re-sort downstream.
Closing Protocol¶
Define: - T_close(in): time in is closed. - T_drain(W_i): time worker W_i has consumed all values it will and exited. - T_cancel: time ctx.Done() becomes ready (∞ if never). - T_close_out: time out is closed.
Then: - If T_cancel = ∞: T_close_out = max(T_drain(W_1), ..., T_drain(W_n)) + ε. - If T_cancel < ∞: T_close_out ≤ T_cancel + δ, where δ includes worker reaction time and work termination time.
If work blocks ignoring ctx, δ may be unbounded. The spec requires work to terminate in bounded time after ctx cancel; pass ctx to work or use deadlines.
Cancellation Semantics¶
When ctx.Done() becomes ready:
- Each worker observes
<-ctx.Done()at its next select (the two-select sandwich). - Workers in the middle of
workcomplete that call (or terminate early ifworkrespects ctx). - After all workers exit, the closer goroutine closes
out. - Subsequent receives on
outreturn the zero value withok=false.
Values held by a worker but not yet processed (received from in but ctx fires before work runs) are dropped. Values received from in and processed but not yet sent to out are dropped.
Memory Model Edges¶
For a value v sent on in at time s_in, processed by a worker into r, and received from out at time r_out:
- Send on
inhappens-before worker's receive ofv. - Worker's send
out <- rhappens-before consumer'sr_out. - Therefore, all writes preceding the producer's send are visible to the consumer after
r_out.
Workers MUST NOT share mutable state without synchronisation; doing so is a race per the Go memory model.
Resource Bounds¶
- Goroutines spawned: exactly
n + 1(n workers + 1 closer). - Channels created by Process: exactly 1 (
out). - Memory: O(1) constant overhead plus output channel buffer (caller's choice; default 0).
- Lifetime of internal goroutines: from invocation until
outis closed.
Concurrency Invariants¶
- Worker count is exactly n: spawned at start, no dynamic adjustment within Process.
- No value duplication: every input value processed at most once.
- No value loss in absence of cancel: if ctx never fires and
worknever panics, every input value produces exactly one output value. - Single closer: only the closer goroutine calls
close(out). - WaitGroup matched: each worker's
wg.Donematches itswg.Add.
Edge Cases¶
- n = 0: undefined; implementations may panic, return a closed channel, or block.
- n > number of input values: some workers idle. They consume the close signal and exit. No issue.
inalready closed: workers' selects observe close immediately and exit.workpanics: behaviour is implementation-defined. The spec allows the panic to propagate (terminating the worker; reducing pool capacity by 1) or to be recovered. For production code, recovery is recommended.- Result channel buffer: caller may receive a buffered output if Process variant supports it. Default unbuffered.
ctxalready cancelled at call: every worker exits at first select; closer closesoutimmediately.
Reference Implementation¶
func Process[T, R any](
ctx context.Context,
in <-chan T,
n int,
work func(context.Context, T) R,
) <-chan R {
out := make(chan R)
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
r := work(ctx, v)
select {
case <-ctx.Done():
return
case out <- r:
}
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
This satisfies every clause above. Variants may add panic recovery, batched dispatch, or worker identifiers.
Compliance Checks¶
A compliance test suite must verify:
- With
n=1, in=closed-channel:outis closed; no values emitted. - With
n=4, 100 distinct values onin: exactly 100 values emitted onout; the multiset matches. - Cancel
ctxafter 5 values:outcloses; ≤ 5+n+1 values emitted (small slop for in-flight); no goroutine leak. workalways returns a constant:outcarries that constant for every input.- Concurrent
workinvocations under-race: no race report. goleak.VerifyNoneafter every test.
Summary¶
Fan-out's contract: distribute every input value to exactly one of n workers, transform it, emit on a single output channel. Closing follows the same two-select-sandwich pattern as fan-in. Cancellation cascades through workers; bounded internal resources; deterministic compliance against the listed invariants. Implementations may add features but must satisfy the spec.