Fan-In — Specification¶
Table of Contents¶
- Introduction
- Formal Definition
- Function Signature
- Preconditions
- Postconditions
- Closing Protocol
- Cancellation Semantics
- Memory Model Edges
- Ordering Semantics
- Resource Bounds
- Concurrency Invariants
- Edge Cases
- Reference Implementation
- Compliance Checks
- Summary
Introduction¶
This file is the formal specification of fan-in. It states the contract a Merge function must satisfy. Implementations may differ in performance and structure but must respect this contract.
Formal Definition¶
Let T be an arbitrary Go type. Let c_1, c_2, ..., c_N be receive channels of element type T. Let ctx be a context.Context.
Merge(ctx, c_1, ..., c_N) returns a receive channel out of element type T such that:
- Every value
vever sent on anyc_iand received by Merge before cancellation is eventually sent onoutexactly once, orctxis cancelled and the value is silently dropped. - After all
c_iare closed (and all values drained, if not cancelled),outis closed. - After
ctx.Done()becomes ready,outis closed in finite time regardless of the state of thec_i. - No goroutine spawned by Merge remains alive after
outis closed.
Function Signature¶
ctx: cancellation control. May becontext.Background()for unconditional merging.cs: zero or more receive-only channels of typeT. May benilslice; behaviour isoutclosed immediately after the call returns.- Return: a receive-only channel of type
T.
Preconditions¶
- Each
c_imust be a valid channel (initialised). Passing anilchannel is undefined behaviour: forwarders block forever on receive. - Each producer of
c_imust closec_iexactly once when it has no more values to send. - The caller must drain
outuntil it is closed, or cancelctxto release the merge. - The caller must not close
out. Doing so panics on the next forwarder send.
Postconditions¶
outis a fresh channel created by Merge.outis closed by Merge exactly once.- The number of values received from
outis at most the sum of values sent on allc_ibefore any closure. - If
ctxis never cancelled and every producer eventually closes its channel after sending all values, the number of values received fromoutis exactly that sum (no loss). - After
outis closed, all internal goroutines have exited.
Closing Protocol¶
The closing protocol is the critical correctness property. Stated precisely:
- Let
T_close(c_i)be the time whenc_iis closed. - Let
T_drain(c_i)be the time when Merge's forwarder forc_ihas received all values and exited. - Let
T_cancelbe the timectx.Done()becomes ready (∞ if never). - Let
T_close_outbe the timeoutis closed.
Then: - If T_cancel = ∞: T_close_out = max(T_drain(c_1), ..., T_drain(c_N)) + ε. - If T_cancel < ∞: T_close_out ≤ T_cancel + δ, where δ is bounded by the time for all forwarders to observe the cancel and the closer goroutine to schedule.
The bound δ is in practice a few milliseconds on a healthy system, but is not formally bounded by the spec.
Cancellation Semantics¶
When ctx.Done() becomes ready:
- Each forwarder, on its next iteration of the two-select sandwich, observes
<-ctx.Done()and returns. - The closer goroutine, after every forwarder has called
wg.Done, closesout. - Any value in flight (received from
c_ibut not yet sent onout) is silently dropped. - Subsequent receives on
outreturn the zero value withok=false.
Memory Model Edges¶
Let s be a send c_i <- v in producer P, and r be a receive v' = <-out in consumer C. Then:
- The send
shappens-before its matching receive in the forwarder F. - The send
out <- vin F happens-beforer. - By transitivity,
shappens-beforer.
Therefore: any memory writes P performs before the send s are visible to C after the receive r. Producers may safely "transfer ownership" of memory through fan-in.
Producers must NOT mutate the value they sent; the consumer may read it concurrently.
Ordering Semantics¶
- Within a single input channel
c_i, the order of values received fromoutmatches the order they were sent onc_i. - Across inputs (
c_iandc_j, i ≠ j), no order is guaranteed. The runtime scheduler is the sole arbiter. - Two values sent at "the same time" (no happens-before between them) may appear on
outin either order on different runs.
For stable cross-input ordering, use a k-way merge based on a comparison function, not Merge.
Resource Bounds¶
- Goroutines spawned by Merge: exactly
N + 1(one forwarder per input plus one closer). - Channels created by Merge: exactly 1 (
out). - Memory: bounded by the closure environment (small constant) plus the WaitGroup (small constant) plus the buffer of
out(caller's choice; default 0). - Lifetime of internal goroutines: from Merge's invocation until either (a) all inputs are closed and drained, or (b)
ctxis cancelled.
Concurrency Invariants¶
- Single closer: only the closer goroutine calls
close(out). Forwarders never closeout. - WaitGroup matched: every
wg.Addmatches exactly onewg.Done. - No double-Done: each forwarder calls
wg.Doneexactly once via defer. - No double-close:
close(out)is invoked exactly once. - No leaked goroutine: after
outis closed, every internal goroutine has exited.
A correctness proof for any implementation must establish all five.
Edge Cases¶
- N = 0: WaitGroup counter is 0, closer fires immediately,
outis closed in finite time after Merge returns. The caller'srange outexits at once. - N = 1: equivalent to forwarding
c_1. Allowed but wasteful (one extra forwarder, one closer). - All
c_ialready closed: forwarders' range loops exit at once; closer fires;outclosed. - Mix of closed and open
c_i: closed inputs drop out silently; open inputs continue feedingout. - Same channel passed twice: two forwarders compete for receive; behaviour is well-defined but rarely intended.
nilchannel: the forwarder'srangeover anilchannel blocks forever. WaitGroup never reaches zero.outnever closes (unlessctxcancels). Implementations may panic, skip, or accept; the spec allows any of these.
Reference Implementation¶
func Merge[T any](ctx context.Context, cs ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
wg.Add(len(cs))
for _, c := range cs {
go func(c <-chan T) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case v, ok := <-c:
if !ok {
return
}
select {
case <-ctx.Done():
return
case out <- v:
}
}
}
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
This implementation satisfies every clause above. It is the canonical form.
Compliance Checks¶
A test suite for any Merge implementation should verify:
Merge(ctx)(no inputs) returns a channel that is closed.Merge(ctx, c)(one input) yields exactly the values sent onc, in order, and closes whencdoes.Merge(ctx, c1, c2)yields the union of values sent (multiset), closes after both inputs close.- Cancelling
ctxmid-stream causesoutto close in finite time. - After
outis closed,goleak.VerifyNone(t)passes. - Race detector reports no races under stress (
-race -count=100).
Summary¶
Fan-in's contract is precise: every value flows once, the output closes when inputs close or ctx cancels, no goroutines leak, and only the closer closes the output. Cross-input order is not preserved. The reference implementation is small and idiomatic; alternative implementations must satisfy the same five concurrency invariants and the closing protocol's timing bounds.