Pipeline — Optimization¶
Honest framing first: most pipeline performance is dictated by the slowest stage. Before any micro-optimization, profile (
go test -bench,pprof) and find the bottleneck. The optimizations below are real, measurable, and each has caveats that matter — apply them only when you can show the gain on your workload.Each entry states the problem, shows a "before" and "after," and reports the realistic gain.
Optimization 1 — Replace a slow stage with fan-out workers¶
Problem: One stage's per-item cost dominates throughput. The pipeline runs at the slow stage's rate, the rest of the pipeline is mostly idle.
Before:
func enrich(ctx context.Context, in <-chan Event) <-chan Event {
out := make(chan Event)
go func() {
defer close(out)
for ev := range in {
ev.Tag = lookup(ctx, ev.UserID) // 5 ms remote call
out <- ev
}
}()
return out
}
// Throughput: 1 / 5ms = 200 events/sec
After:
func parallelEnrich(ctx context.Context, in <-chan Event, n int) <-chan Event {
workers := make([]<-chan Event, n)
for i := 0; i < n; i++ {
workers[i] = enrich(ctx, in) // each worker reads from same in
}
return Merge(ctx, workers...)
}
// Use:
out := parallelEnrich(ctx, src, 16)
// Throughput: ~16 * 200 = 3200 events/sec (if RPC server keeps up)
Gain: Linear with worker count up to the saturation point of the underlying resource. A 16x or 32x speedup is typical for IO-bound stages.
Caveat: Fan-out destroys order. If downstream needs ordered output, tag each item with a sequence number and reorder at the sink. Also, the upstream and downstream must keep up — adding fan-out to the middle of an already-bottlenecked pipeline just moves the bottleneck.
Optimization 2 — Tune buffer sizes to absorb jitter¶
Problem: Strict unbuffered backpressure causes "stop-and-go" jitter when stages have variable per-item latency. Average throughput drops because the slowest item per stage stalls the chain.
Before:
parsed := parse(ctx, src) // unbuffered: P50=200µs, P99=4ms
enriched := enrich(ctx, parsed) // unbuffered
written := write(ctx, enriched) // unbuffered
After:
parsed := parse(ctx, src)
enriched := enrichBuf(ctx, parsed, 16) // buffer 16 to absorb 4ms tails
written := write(ctx, enriched)
func enrichBuf(ctx context.Context, in <-chan Event, buf int) <-chan Event {
out := make(chan Event, buf) // absorbs P99/median ≈ 20x ratio
// ... two-select sandwich body ...
}
Gain: 10-30% throughput improvement on jittery workloads. Variance in end-to-end latency drops sharply; tail latency (P99) improves by amortising the slow items across the buffer.
Caveat: Don't oversize. A buffer of 16 with 1 KB events holds 16 KB; a buffer of 100,000 holds 100 MB and hides backpressure problems. Size by 2 * (P99 / median) of the upstream stage and document it in code.
Optimization 3 — Fuse trivial stages¶
Problem: Long pipelines of tiny stages spend most of their time in channel operations, not real work. Channel send/recv is ~50-150 ns; for transforms costing <100 ns the channel overhead exceeds the work.
Before:
out := addOne(ctx, multiplyByTwo(ctx, square(ctx, src)))
// 3 stages, 3 channels, ~450 ns of channel overhead per item.
// Per-item work: ~5 ns. Channel overhead: 90x the work.
After:
func transform(v int) int {
return (v*v)*2 + 1
}
out := Map(transform)(ctx, src)
// 1 stage, 1 channel, ~150 ns overhead per item.
// 3x speedup on tiny transforms.
Gain: 2-5x for trivial transforms. The exact gain scales with how many stages you fuse.
Caveat: Don't fuse stages whose responsibilities differ. If square and addOne belong to different concerns (validation vs scoring), keep them separate for clarity. Fuse only when the per-item work is truly trivial and the boundary is artificial.
Optimization 4 — Reduce GC pressure with object reuse¶
Problem: Each pipeline pass allocates new structs; under high throughput, GC pauses become a measurable fraction of CPU time.
Before:
type Record struct {
Buf []byte
// ...other fields
}
func decode(ctx context.Context, in <-chan []byte) <-chan Record {
out := make(chan Record)
go func() {
defer close(out)
for raw := range in {
r := Record{Buf: make([]byte, len(raw))}
copy(r.Buf, raw)
out <- r // each item allocates a new []byte
}
}()
return out
}
// pprof: 30% time in GC under load
After:
var bufPool = sync.Pool{
New: func() any { return make([]byte, 0, 4096) },
}
type Record struct {
Buf []byte
}
func decode(ctx context.Context, in <-chan []byte) <-chan Record {
out := make(chan Record)
go func() {
defer close(out)
for raw := range in {
buf := bufPool.Get().([]byte)
buf = append(buf[:0], raw...)
out <- Record{Buf: buf}
}
}()
return out
}
// At the sink, return buffers:
for r := range out {
process(r)
bufPool.Put(r.Buf[:0])
}
// pprof: GC drops to 5%
Gain: 20-40% throughput improvement in allocation-heavy pipelines. Latency tail also improves because GC pauses shorten.
Caveat: Object reuse requires explicit ownership transfer. The sink must put back; the stages must not retain references. Mistakes cause use-after-return bugs. Use sync.Pool only when allocation profile shows it's worth the complexity.
Optimization 5 — Speed up pipeline shutdown¶
Problem: A pipeline with deep buffers takes a long time to shut down because each stage has to drain its buffer before noticing ctx cancellation.
Before:
After: drop-on-cancel
func stage(ctx context.Context, in <-chan In) <-chan Out {
out := make(chan Out, 32)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return // drop in-flight items immediately
case v, ok := <-in:
if !ok {
return
}
select {
case <-ctx.Done():
return
case out <- transform(v):
}
}
}
}()
return out
}
The two-select sandwich already does this — but only if the buffer is small. With buffer=1000 the stage may still be blocked sending into a full downstream when cancel fires; the select on ctx unblocks it immediately.
Gain: Shutdown drops from O(buffer) to O(in-flight item) latency. For a 1000-buffer pipeline doing 5 ms per item, that's 5 seconds → 5 milliseconds.
Caveat: Dropped items are lost. For at-least-once semantics you need either a bounded grace period (drain with a deadline) or explicit acknowledgement at the sink. Don't drop without thinking about the contract.
Optimization 6 — Replace generics with concrete types in the hot path¶
Problem: Generic stages compile to gcshape stencils and may not inline as well as concrete code. For trivial transforms in a hot inner loop, this can cost 20-40% per item.
Before:
func Map[In, Out any](f func(In) Out) Stage[In, Out] {
return func(ctx context.Context, in <-chan In) <-chan Out {
// ... two-select sandwich body that calls f ...
}
}
out := Map(square)(ctx, src) // generic
// Benchmark: 220 ns/item
After:
func squareStage(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
select {
case <-ctx.Done():
return
case out <- v * v: // inlined; no function indirection
}
}
}
}()
return out
}
out := squareStage(ctx, src)
// Benchmark: 130 ns/item — 40% faster
Gain: 20-40% on trivial transforms in tight loops. Negligible on transforms that do real work (>1 µs).
Caveat: You lose composability. Keep generics for cold-path or boilerplate stages; specialize only the measured hot stage. Run a benchmark before and after to confirm the gain — sometimes the inliner already does it for you.
Optimization 7 — Batch within stages¶
Problem: Per-item channel overhead is fixed; per-batch overhead is the same. So if items are tiny, batching N together cuts overhead by N×.
Before:
func encode(ctx context.Context, in <-chan Record) <-chan []byte {
out := make(chan []byte)
go func() {
defer close(out)
for r := range in {
out <- proto.Marshal(r) // 1 send per record
}
}()
return out
}
// 1M records/sec * 150ns chan overhead = 150ms/sec wasted
After:
func encodeBatched(ctx context.Context, in <-chan Record, batchSize int) <-chan [][]byte {
out := make(chan [][]byte)
go func() {
defer close(out)
batch := make([][]byte, 0, batchSize)
for {
select {
case <-ctx.Done():
return
case r, ok := <-in:
if !ok {
if len(batch) > 0 {
out <- batch
}
return
}
batch = append(batch, proto.Marshal(r))
if len(batch) >= batchSize {
select {
case <-ctx.Done():
return
case out <- batch:
}
batch = make([][]byte, 0, batchSize)
}
}
}
}()
return out
}
// 1M records/sec / 100 batch = 10k sends/sec * 150ns = 1.5ms/sec wasted
Gain: 10-100x reduction in channel overhead for small items. Latency increases by up to one batch interval — usually worth it.
Caveat: Batches add latency. If your latency budget is tight, add a time-based flush (select { case <-time.After(timeout): flush() }) so the batch doesn't sit half-full forever. Choose batch size by measuring the per-item vs per-batch cost ratio.
Optimization 8 — Drop redundant stages¶
Problem: Some stages were added "just in case" but are no-ops or near-no-ops on the actual workload. They cost channel overhead and goroutine memory for nothing.
Before:
src := gen(ctx, items)
v := validate(ctx, src) // 99.99% pass-through
n := normalize(ctx, v) // 99% no-op (already normalized upstream)
e := enrich(ctx, n) // real work
out := write(ctx, e)
After:
If validate is needed only for malformed input that almost never appears, move it to a sampling check or a one-time test rather than per-item.
Gain: Per-stage overhead × 2 stages × event rate. For 1M events/sec at 200 ns/stage, that's 400 ms/sec of CPU saved (40% of one core).
Caveat: Don't remove stages that are defensive against rare bad input — those are worth the overhead. Drop only stages that the data's actual distribution makes redundant.
Optimization 9 — Use sync.Pool for per-item allocations¶
Problem: Each item passing through a pipeline allocates fresh buffers, scratch maps, or temporary slices. Under high throughput, GC pauses dominate latency.
Before:
func tokenize(ctx context.Context, in <-chan string) <-chan []string {
out := make(chan []string)
go func() {
defer close(out)
for s := range in {
tokens := strings.Split(s, " ") // allocates slice + each substring
out <- tokens
}
}()
return out
}
// pprof: 600MB/s allocation rate, 25% time in GC
After:
var tokenPool = sync.Pool{
New: func() any { return make([]string, 0, 16) },
}
func tokenize(ctx context.Context, in <-chan string) <-chan []string {
out := make(chan []string)
go func() {
defer close(out)
for s := range in {
tokens := tokenPool.Get().([]string)[:0]
for _, t := range strings.Split(s, " ") {
tokens = append(tokens, t)
}
out <- tokens
}
}()
return out
}
// At sink:
for tokens := range out {
process(tokens)
tokenPool.Put(tokens[:0])
}
// pprof: 80MB/s allocation rate, 5% time in GC
Gain: 30-50% reduction in GC time. Latency tail flattens because GC pauses shorten.
Caveat: sync.Pool is a cache, not a guaranteed reuse. The runtime may evict pool entries; expect cache misses. Also, the sink must put items back or the pool is useless. And every consumer that receives the item is responsible — if the pipeline branches (fan-out at the sink), pool ownership becomes confusing.
Optimization 10 — Prefetch downstream work¶
Problem: A stage waits on a slow downstream (e.g. a DB write). The CPU sits idle while the network call completes.
Before:
func writer(ctx context.Context, in <-chan Record) {
for r := range in {
db.Insert(ctx, r) // blocks 5 ms each
}
}
// 200 records/sec
After:
func writer(ctx context.Context, in <-chan Record, parallelism int) {
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
for r := range in {
select {
case <-ctx.Done():
wg.Wait()
return
case sem <- struct{}{}:
}
wg.Add(1)
r := r
go func() {
defer wg.Done()
defer func() { <-sem }()
db.Insert(ctx, r)
}()
}
wg.Wait()
}
// With parallelism=8: ~1600 records/sec
Gain: Up to N× parallelism × baseline throughput, capped by the downstream service's capacity.
Caveat: Now ordering is lost (the DB sees inserts in non-deterministic order). If ordering matters (e.g. log replay), parallelism breaks correctness. Also, more inflight requests means more memory and more pressure on the downstream service — pick parallelism by measurement, not "just N=100".
Optimization 11 — Avoid struct copies through channels¶
Problem: Sending large structs by value through channels copies them on every hop. A 5-stage pipeline with 1 KB structs copies 5 KB per item.
Before:
type Record struct {
Header [256]byte
Body [1024]byte
}
func stage(ctx context.Context, in <-chan Record) <-chan Record {
// copies Record on receive AND on send
}
After:
type Record struct {
Header [256]byte
Body [1024]byte
}
func stage(ctx context.Context, in <-chan *Record) <-chan *Record {
// copies pointer (8 bytes) instead of full struct (1280 bytes)
}
Gain: For large structs (>128 bytes), 2-5x throughput improvement. Most of the gain comes from cache locality, not just the copy itself.
Caveat: Pointers introduce shared-mutable-state risk. If two stages both retain the pointer, they may race. Document ownership: "after sending, the sender must not touch the value." For complex shared types, prefer immutable copies; for hot-path performance, accept the risk and discipline.
Optimization 12 — Pin goroutines with GOMAXPROCS-aware fan-out¶
Problem: Fan-out with N=100 workers on a 4-core machine schedules them all on the same 4 OS threads, with massive context-switch overhead.
Before:
After:
n := runtime.GOMAXPROCS(0) * 2 // 8 workers on 4 cores
parallelEnrich(ctx, in, n)
// pprof: 5% time in sched/futex
Gain: 10-30% throughput improvement when the stage is CPU-bound. Less for IO-bound stages (where idle workers don't compete for CPU).
Caveat: For IO-bound stages, more workers can be useful — they spend most time blocked on the network. The right number depends on the stage's CPU/IO ratio. Measure per stage.
When NOT to optimize¶
A pipeline that does what it needs at the throughput required is done. Optimization has costs:
- Code complexity. Object pools and batching add invariants that future maintainers must respect.
- Correctness risk. Buffer tuning and parallelism can introduce ordering bugs and races.
- Maintenance. Tuned buffers tied to specific P99 measurements break when load patterns change.
Profile first. Establish a target. Optimize the measured bottleneck. Stop when you've hit the target. Most of the optimizations above are tools you should reach for in an emergency, not by default.
Cheat Sheet¶
| Optimization | Typical gain | When to use |
|---|---|---|
| Fan-out a slow stage | Linear in worker count | Bottleneck is one stage |
| Tune buffer sizes | 10-30% throughput | High-jitter stages |
| Fuse trivial stages | 2-5x | Per-item work < 100 ns |
| sync.Pool reuse | 20-50% GC | Allocation-heavy hot path |
| Faster shutdown | O(buf) → O(item) | Big buffers + slow drain |
| Concrete vs generic | 20-40% | Hot trivial transforms |
| Batching | 10-100x channel cost | Small per-item work |
| Drop redundant stages | Per-stage overhead | No-op stages |
| sync.Pool buffers | 30-50% GC | Per-item buffer alloc |
| Prefetch downstream | N× parallelism | IO-bound sink |
| Pointers vs values | 2-5x for big structs | Items > 128 bytes |
| GOMAXPROCS-aware fan-out | 10-30% | CPU-bound stage |
Each of these requires measurement to confirm the gain on your workload. The numbers above are typical, not guaranteed.