Pipeline — Specification¶
Table of Contents¶
- Introduction
- Formal Definition
- Stage Signature
- Preconditions
- Postconditions
- Closing Protocol
- Cancellation Semantics
- Memory Model Edges
- Ordering Semantics
- Resource Bounds
- Concurrency Invariants
- Composition Rules
- Edge Cases
- Reference Stage Implementation
- Compliance Checks
- Summary
Introduction¶
This file is the formal specification of a pipeline stage. A pipeline is a composition of stages connected by channels. The contract for the pipeline as a whole follows from the contract of each stage.
Formal Definition¶
A stage is a function:
For a stage S, the contract is:
S(ctx, in)returns a freshly-created receive channeloutof typeOut.Sspawns one or more goroutines that read frominand write toout.- The goroutines exit when (a)
inis drained and closed, or (b)ctx.Done()is ready. outis closed exactly once, after every spawned goroutine has exited.Sreturns immediately; all work is asynchronous.
A pipeline is a composition of stages: Sn(ctx, ... S2(ctx, S1(ctx, source))...). Closing of the source's channel cascades through every stage; each stage closes its output when its input is drained, and so on.
Stage Signature¶
The canonical signature is:
Variants:
- Producer / source: no
inparameter.func Source(ctx context.Context) <-chan Out. - Consumer / sink: no return.
func Sink(ctx context.Context, in <-chan In)orfunc Sink(ctx context.Context, in <-chan In) error. - Configurable: take additional parameters.
func Filter(ctx context.Context, in <-chan T, pred func(T) bool) <-chan T.
Producers and sinks are called terminal stages.
Preconditions¶
ctxis non-nil.inis a non-nil channel (except for producers).- Producers eventually close their output if their
for/rangebody terminates normally. - Stage authors must use the two-select sandwich (ctx-aware receive AND ctx-aware send).
- Stage's spawned goroutines must call
close(out)on every exit path (usedefer close(out)).
Postconditions¶
- The returned channel is freshly created and not yet closed.
- The output channel is closed when the stage's input is drained AND
ctxhas not been cancelled, ORctxis cancelled. - After the output channel is closed, all goroutines spawned by the stage have exited.
- For each input value
vnot dropped (due to filter logic) or skipped (due to cancellation), exactly one output value is produced — or N output values for fan-out stages.
Closing Protocol¶
The pipeline's closing protocol is a chain:
- Source closes its output when its data source is exhausted.
- Each subsequent stage's
rangeover its input exits when the input is closed. - The stage's deferred
close(out)fires, signalling the next stage. - The chain reaches the sink, which sees its input closed and exits.
When ctx is cancelled mid-stream:
- Each stage's two-select sandwich observes
<-ctx.Done()and returns. - Each stage's deferred
close(out)fires. - The chain unwinds in roughly the order the goroutines are scheduled.
The spec does not bound the stage exit time; it depends on whether work inside each stage respects ctx. The convention: pass ctx into any blocking call.
Cancellation Semantics¶
Cancellation is cooperative:
- Stages that respect ctx exit promptly.
- Stages that ignore ctx (e.g., a long sync.Mutex.Lock or a blocking syscall without ctx) delay propagation.
- The pipeline's overall shutdown time is the max stage exit time.
For deterministic shutdown, all stages must use the two-select sandwich and pass ctx into any potentially blocking sub-call.
Memory Model Edges¶
For a value v produced by stage S_i and consumed by stage S_{i+1}:
- The send
S_i.out <- vhappens-before the matching receive inS_{i+1}. - All writes
S_imade before the send are visible toS_{i+1}after the receive.
By transitivity, writes in any earlier stage are visible to any later stage that receives a value derived from those writes. This is what makes pipelines safe without explicit synchronisation between stages — the channel sends are the synchronisation.
Stages MUST NOT mutate values they have already sent. The downstream stage may read concurrently; mutation is a race.
Ordering Semantics¶
- Within a single-goroutine stage (one input, one output, one worker), order is preserved: output order matches input order.
- For a stage with internal fan-out (multiple workers reading the same input), order is generally NOT preserved: faster work calls finish first.
- For a stage with fan-out + downstream merge, order is NOT preserved.
- For order-preserving processing across fan-out, attach a sequence number and re-sort downstream.
Resource Bounds¶
- A stage spawns
kgoroutines, wherekdepends on the stage (typically 1 for sequential stages,nfor fan-out stages withnworkers). - A stage creates exactly 1 channel (its output).
- Memory is bounded by: closure captures + output channel buffer + per-worker scratch.
- Lifetime: until input is drained AND ctx not cancelled, or ctx is cancelled.
For a pipeline of K stages with average g goroutines per stage, total goroutines = K * g.
Concurrency Invariants¶
- Each stage closes its own output: not the input, never another stage's channel.
- Single close per channel: each output channel is closed exactly once.
- No value duplication: a single-worker stage produces output values one-to-one with input values (modulo filter logic).
- Cancellation cascades: ctx cancel causes every stage to exit in finite time (assuming stages respect ctx).
- No leaked goroutines: after all stages exit, no goroutine remains.
Composition Rules¶
For stages S1: A → B and S2: B → C, the composition Then(S1, S2): A → C is defined as:
func Then[A, B, C any](s1 Stage[A, B], s2 Stage[B, C]) Stage[A, C] {
return func(ctx context.Context, in <-chan A) <-chan C {
return s2(ctx, s1(ctx, in))
}
}
Composition is associative: Then(Then(S1, S2), S3) ≡ Then(S1, Then(S2, S3)).
A pipeline is Sn ∘ ... ∘ S2 ∘ S1 applied to a source channel.
For parallel stages, the compositional operator is Parallel(S, n):
func Parallel[In, Out any](s Stage[In, Out], n int) Stage[In, Out] {
return func(ctx context.Context, in <-chan In) <-chan Out {
outs := make([]<-chan Out, n)
for i := 0; i < n; i++ {
outs[i] = s(ctx, in)
}
return Merge(ctx, outs...)
}
}
Parallel(S, n) violates order preservation but increases throughput.
Edge Cases¶
- Empty input: source closes its output immediately; each stage drains immediately; pipeline completes.
- Single value: pipeline processes one value, then drains.
- Source error: producer logs and closes early; downstream stages see closure and exit cleanly.
- Stage panic: implementation-defined. The spec allows the panic to propagate (terminating the pipeline) or to be recovered (continuing but logging).
- Ctx cancelled at call site: every stage exits at first select;
outcloses immediately.
Reference Stage Implementation¶
func MapStage[In, Out any](
ctx context.Context,
in <-chan In,
f func(In) Out,
) <-chan Out {
out := make(chan Out)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
r := f(v)
select {
case <-ctx.Done():
return
case out <- r:
}
}
}
}()
return out
}
This satisfies every clause above. The two-select sandwich is mandatory.
Compliance Checks¶
A test suite for any stage must verify:
- Empty input → closed output.
- N input values → at most N output values; in non-fanout stages, exactly N (modulo filter).
- Cancel ctx mid-stream → output closes in finite time; no goroutine leak.
- After test,
goleak.VerifyNone(t)passes. go test -racereports no races.
For pipelines (multiple stages composed):
- End-to-end: known input → known output; assert multiset equality.
- Cancel: in-flight items dropped; pipeline shuts down; no leak.
- Source error: pipeline drains gracefully.
Summary¶
A pipeline stage is a function with the signature func(ctx, in) out that spawns goroutines, reads from in, writes to out, uses the two-select sandwich, and closes out on exit. Pipelines compose stages associatively. Closing cascades through the chain; ctx cancellation propagates similarly. Order is preserved in single-worker stages; lost in fan-out stages. The reference implementation is small; the compliance suite is the contract.