Error Propagation in Pipelines — Senior Level¶
Table of Contents¶
- Introduction
- Prerequisites
- Error Aggregation Strategies
- errors.Join Deep Dive
- Multi-Stage Failure Rollback
- Compensating Actions
- Saga Patterns in Go Pipelines
- Panic Recovery in Stages
- Memory Model Details
- Lock-Free Aggregation
- Structured Concurrency in Depth
- Error Routing Architectures
- First Error vs All Errors Trade-Off
- Dead-Letter Queues
- Bulkheads
- Circuit Breakers in Pipelines
- Observability and Tracing
- Senior-Level Code Examples
- Architecture Patterns
- Clean Code at Scale
- Product Considerations
- Best Practices
- Edge Cases
- Common Mistakes
- Tricky Points
- Test Strategies
- Tricky Questions
- Cheat Sheet
- Self-Assessment
- Summary
- Further Reading
Introduction¶
Focus: "I need to design error handling for a system, not a function. Multiple stages with side effects. Partial failure recovery. Aggregated errors. Tracing across stages."
At junior level you learned mechanics. At middle level, design. At senior level, architecture. A pipeline embedded in a production service has additional concerns: partial-failure recovery (some stages succeeded, the next must not orphan the work), aggregated reporting (every failed item, not just one), bulkheads (failure in one part shouldn't cascade), and observability (you must be able to debug it in production at 3 AM).
This file covers:
- Aggregating errors with
errors.Joinand beyond - Multi-stage rollback when downstream fails after upstream succeeded
- Compensating actions and the saga pattern
- Panic recovery as a deliberate part of pipeline design
- The Go memory model as it applies to errgroup
- Lock-free aggregation patterns
- Structured concurrency as a design philosophy
- Bulkheads, circuit breakers, dead-letter queues
- Distributed tracing across pipeline stages
This is the territory where you stop using libraries and start designing libraries.
Prerequisites¶
- Junior and middle files in this series.
- Comfort with
sync/atomic, the Go memory model, andsync.Mutex/sync.RWMutex. - Understanding of
context.Contextplumbing and cancellation semantics. - Familiarity with database transactions and ACID properties.
- Exposure to distributed system concepts: idempotency, sagas, dead-letter queues.
- Experience reading production goroutine dumps.
Error Aggregation Strategies¶
First-error-wins is the default for errgroup. But many scenarios demand aggregation.
When to aggregate¶
- Batch processing: report every failed item.
- Validation: tell the user every field that's invalid, not just one.
- Multi-source fetch: report every source that failed.
- Schema migrations: list every check that failed.
- Audit logs: every failure recorded for postmortem.
Three aggregation patterns¶
Pattern A: errors.Join (Go 1.20+)¶
func processAll(ctx context.Context, items []Item) error {
var (
errs []error
mu sync.Mutex
)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(16)
for _, it := range items {
it := it
g.Go(func() error {
if err := process(ctx, it); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("%s: %w", it.ID, err))
mu.Unlock()
}
return nil // do NOT propagate; we collect locally
})
}
_ = g.Wait()
return errors.Join(errs...)
}
The goroutine returns nil so errgroup doesn't cancel siblings. Errors are collected via a mutex. errors.Join produces a single error that wraps all of them. Callers can iterate via errors.Unwrap or check membership via errors.Is.
Pattern B: Result slice with per-item error¶
type Result struct {
Item Item
Err error
}
func processAll(ctx context.Context, items []Item) []Result {
results := make([]Result, len(items))
var g errgroup.Group
g.SetLimit(16)
for i, it := range items {
i, it := i, it
g.Go(func() error {
err := process(ctx, it)
results[i] = Result{Item: it, Err: err}
return nil
})
}
_ = g.Wait()
return results
}
No combined error. Caller iterates results, decides per-item action. Useful when callers want fine-grained control.
Pattern C: Multi-error struct¶
type MultiError struct {
PerItem map[string]error
}
func (m *MultiError) Error() string {
var parts []string
for k, v := range m.PerItem {
parts = append(parts, fmt.Sprintf("%s: %v", k, v))
}
return strings.Join(parts, "; ")
}
func (m *MultiError) Unwrap() []error {
out := make([]error, 0, len(m.PerItem))
for _, v := range m.PerItem {
out = append(out, v)
}
return out
}
Custom typed multi-error. Callers can errors.As to a *MultiError, then inspect PerItem map. Most expressive but most boilerplate.
Choosing between them¶
errors.Join: simplest, idiomatic in 1.20+. Default for new code.- Result slice: when item identity matters and callers process per-item.
- Custom multi-error: when callers need structured access and aggregation logic.
Don't mix in one pipeline. Pick one, document it.
errors.Join Deep Dive¶
errors.Join was added in Go 1.20. It creates an error that wraps multiple errors.
Signature¶
- Returns nil if all
errsare nil. - Otherwise returns an error whose
Error()is the concatenation of each error's message (newline-separated) and whoseUnwrap()returns[]error{errs...}(excluding nils).
Example¶
e1 := errors.New("one")
e2 := errors.New("two")
err := errors.Join(e1, e2)
fmt.Println(err)
// Output:
// one
// two
errors.Is(err, e1) // true
errors.Is(err, e2) // true
Implementation sketch¶
type joinError struct{ errs []error }
func (e *joinError) Error() string {
var b strings.Builder
for i, err := range e.errs {
if i > 0 { b.WriteByte('\n') }
b.WriteString(err.Error())
}
return b.String()
}
func (e *joinError) Unwrap() []error { return e.errs }
func Join(errs ...error) error {
n := 0
for _, e := range errs { if e != nil { n++ } }
if n == 0 { return nil }
je := &joinError{errs: make([]error, 0, n)}
for _, e := range errs { if e != nil { je.errs = append(je.errs, e) } }
return je
}
About 20 lines. Worth understanding so you know what you're getting.
Custom format vs Join's format¶
errors.Join's message is just newline-separated. Often you want richer formatting:
type Aggregate struct {
Stage string
Errs []error
}
func (a *Aggregate) Error() string {
return fmt.Sprintf("%s: %d errors: %s",
a.Stage, len(a.Errs), formatList(a.Errs))
}
func (a *Aggregate) Unwrap() []error { return a.Errs }
This gives a structured prefix and a count. errors.Is/errors.As still walk the children.
errors.Is with multi-unwrap¶
errors.Is (in Go 1.20+) handles multi-unwrap correctly. For an error returned by errors.Join(e1, e2):
errors.Is(joined, e1)→ walks toe1, returns true.errors.Is(joined, e2)→ walks toe2, returns true.
errors.As similarly checks each branch.
For nested multi-unwraps, the walk is a depth-first traversal. Cycles are not handled — if you construct a cycle, errors.Is loops forever. Don't construct cycles.
Performance¶
Joining N errors allocates one wrapper plus the slice. Error() walks all N. Is/As walks until match or exhaustion. All O(N).
If N is large (1000+), the formatted message is huge. Truncate at the boundary:
const maxErrs = 20
if len(errs) > maxErrs {
errs = append(errs[:maxErrs], fmt.Errorf("(%d more)", len(errs)-maxErrs))
}
return errors.Join(errs...)
Multi-Stage Failure Rollback¶
Pipelines often have side effects: writing to a DB, calling an API, sending a message. If a downstream stage fails after upstream stages succeeded, those side effects must be undone.
The problem¶
If C fails, A's row should be deleted and B's account deprovisioned. Otherwise you have a half-created user.
Strategy 1: Two-phase¶
Stage 1 stages all work without committing. Stage 2 commits. If stage 1 fails on any item, no commits happen.
type Tx interface {
Stage(ctx context.Context, item Item) error
Commit(ctx context.Context) error
Rollback() error
}
func process(ctx context.Context, items []Item, tx Tx) error {
defer tx.Rollback() // no-op if commit succeeded
g, ctx := errgroup.WithContext(ctx)
for _, it := range items {
it := it
g.Go(func() error {
return tx.Stage(ctx, it)
})
}
if err := g.Wait(); err != nil { return err }
return tx.Commit(ctx)
}
Works for DB transactions but not for external APIs that lack two-phase.
Strategy 2: Compensating actions¶
Each forward step has a corresponding rollback step. On failure, run rollbacks in reverse order:
type Step struct {
Forward func(ctx context.Context) error
Compensate func(ctx context.Context) error
}
func runSteps(ctx context.Context, steps []Step) error {
var completed []Step
for _, s := range steps {
if err := s.Forward(ctx); err != nil {
// rollback in reverse
rollback(ctx, completed)
return fmt.Errorf("step failed: %w", err)
}
completed = append(completed, s)
}
return nil
}
func rollback(ctx context.Context, steps []Step) {
for i := len(steps) - 1; i >= 0; i-- {
if err := steps[i].Compensate(ctx); err != nil {
log.Error("rollback step failed", "err", err)
// continue trying others
}
}
}
Strategy 3: Saga with persistent state¶
For long-running multi-step processes, persist state. If the process crashes mid-rollback, restart it from saved state.
We expand on sagas next.
Compensating Actions¶
A compensating action undoes a previously successful step. Designing them well is half the battle.
Properties of a good compensator¶
- Idempotent: running it twice has the same effect as once.
- Robust against partial state: if forward step partially succeeded (e.g., row inserted but trigger not run), compensator handles both cases.
- Independent of context: doesn't require state from the forward call that might be lost on crash.
Example: account provisioning¶
Forward: provisionAccount(userID) calls API, returns accountID. Compensator: deprovisionAccount(accountID).
If forward succeeded, accountID is known. Compensator just needs accountID — no other state.
If forward partially succeeded (API call timed out, account may or may not exist), compensator must handle both "account exists" and "account does not exist." Idempotent: if exists, delete; else, no-op.
Example: email send¶
Forward: sendEmail(to, body). Compensator: ... ?
You can't unsend an email. Some compensators are impossible. Options:
- Send a follow-up "ignore the previous message" email.
- Mark the email as "test" in your own DB, so the user knows.
- Accept that this step is non-compensatable and design accordingly (e.g., make it the last step so nothing can fail after it).
Compensation ordering¶
Reverse order: last forward step undone first.
Why reverse? Because dependencies are forward. D depends on C's output; D's compensator must run before C's, or C's resources are gone.
Compensation under concurrent failure¶
If two steps fail simultaneously, do you run both compensators? Or only the failing step's? Best practice: only run compensators for succeeded steps. The failing step didn't produce side effects (assuming it returned without success), so it has nothing to compensate.
If the failing step partially succeeded (network timeout after API call), you do need its compensator. Detect this via "best-effort cleanup":
func (s *Step) ForwardWithCleanup(ctx context.Context) (didStart bool, err error) {
didStart = false
// before doing anything that has side effects
err = s.precheck(ctx)
if err != nil { return false, err }
didStart = true // we're about to do work that may partially succeed
err = s.do(ctx)
return didStart, err
}
Caller:
didStart, err := step.ForwardWithCleanup(ctx)
if err != nil {
if didStart {
// run compensator
}
return err
}
Saga Patterns in Go Pipelines¶
A saga is a long-running transaction implemented as a sequence of forward steps with corresponding compensating actions.
Two flavors¶
Orchestration¶
A central coordinator drives each step:
type Orchestrator struct {
steps []Step
}
func (o *Orchestrator) Run(ctx context.Context) error {
var completed []Step
defer func() {
for i := len(completed) - 1; i >= 0; i-- {
completed[i].Compensate(ctx)
}
}()
for _, s := range o.steps {
if err := s.Forward(ctx); err != nil {
return err
}
completed = append(completed, s)
}
completed = nil // success; defer is a no-op
return nil
}
Simple. Easy to debug. Single point of control. Doesn't scale to long-running (days, weeks) sagas without persistence.
Choreography¶
Each step emits an event; the next step listens and runs. Compensations are also event-driven. More flexible but harder to reason about.
In a Go pipeline, orchestration is the norm. Choreography is for cross-service workflows (often using a message broker).
Persistent saga state¶
If your saga must survive a crash, persist state at each step:
type SagaState struct {
ID string
Step int
Completed []string
LastForward string
LastError string
}
func (s *SagaState) Save(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(ctx, "INSERT INTO sagas ... ON CONFLICT (id) DO UPDATE ...", s)
return err
}
On restart, load all incomplete sagas and resume:
The resume logic: if we're rolling back, continue rollback. If forward, continue forward. Each step is idempotent so re-running is safe.
This is the architecture for systems like AWS Step Functions, Cadence, Temporal. You can implement a lightweight version in your own service.
Panic Recovery in Stages¶
A panic in a g.Go function crashes the program. In production this is usually catastrophic. Recovery is a deliberate design choice.
When to recover¶
- Stages that process untrusted input (user data, external API responses).
- Stages that use third-party libraries with unknown panic behaviour.
- Long-running workers where uptime matters.
When NOT to recover¶
- Genuine programmer errors (nil dereference, index out of range). Recovery hides bugs.
- Critical invariant violations. Better to crash and restart than continue in a corrupt state.
Recovery pattern¶
func safeStage(ctx context.Context, fn func(context.Context) error) (err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1<<16)
n := runtime.Stack(buf, false)
err = fmt.Errorf("panic in stage: %v\n%s", r, buf[:n])
}
}()
return fn(ctx)
}
g.Go(func() error { return safeStage(ctx, parseStage) })
Notes:
- Named return so deferred recover can set it.
- Capture stack with
runtime.Stackfor debugging. - Wrap in error so the pipeline's error flow continues normally.
Distinguishing panics from errors¶
Sometimes useful at the caller:
type PanicError struct {
Value any
Stack []byte
}
func (e *PanicError) Error() string { return fmt.Sprintf("panic: %v", e.Value) }
func safeStage(ctx context.Context, fn func(context.Context) error) (err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1<<16)
n := runtime.Stack(buf, false)
err = &PanicError{Value: r, Stack: buf[:n]}
}
}()
return fn(ctx)
}
// caller
var pe *PanicError
if errors.As(err, &pe) {
log.Error("panic recovered", "stack", string(pe.Stack))
}
The *PanicError lets callers distinguish recovered panics from regular errors and act differently (alert, page on-call, etc.).
Recovery and partial state¶
If a panic happened mid-write, your data may be inconsistent. Recovery converts the panic to an error but doesn't undo the partial write. Use compensating actions (sagas) for that.
Don't recover at every level¶
Recover at the goroutine boundary, not inside every helper. Otherwise:
The deeply nested recover swallows the panic. The function above thinks everything is fine. The bug is hidden. Recover only at well-defined boundaries.
Memory Model Details¶
The Go memory model defines when one goroutine's reads see another's writes. Errgroup interacts with it precisely.
Happens-before relationships¶
The memory model says: in g.Go(f), the call to Go happens before f starts. So state set up before Go is visible inside f.
After g.Wait() returns, every operation inside every spawned f has completed and happened-before the return. So state written inside any f is visible to the caller.
This is why:
results := make([]Result, n)
for i := 0; i < n; i++ {
i := i
g.Go(func() error {
results[i] = compute(i)
return nil
})
}
g.Wait()
// results is fully populated; reads are safe.
Works without explicit synchronisation. The Wait provides the necessary happens-before.
What about reads inside g.Go?¶
shared := 0
g.Go(func() error { shared = 1; return nil })
g.Go(func() error { fmt.Println(shared); return nil })
g.Wait()
The two goroutines race. No happens-before between them. Race detector flags this. Use atomics or a mutex.
sync.Once internals¶
errgroup uses sync.Once to capture the first error. sync.Once.Do(f) is implemented via an atomic counter and a mutex. Its guarantee: writes inside f happen-before any subsequent return from Do for any caller.
So in errgroup:
After this returns to any caller, g.err and g.cancel are observably set. Wait reads g.err after wg.Wait(), which synchronises with all Done calls, which include the errOnce.Do execution by the failing goroutine.
The chain: failing goroutine writes err -> Once synchronises -> Done -> wg.Wait -> read err. All correct.
Why mutex around aggregation works¶
var mu sync.Mutex
var errs []error
g.Go(func() error {
if err := work(); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
return nil
})
g.Wait()
// Read errs here; safe due to wg.Wait synchronising with every Done,
// and mu.Unlock inside Done's window.
The mutex ensures writes don't race. g.Wait() ensures the final state is visible.
When you DON'T need a mutex¶
If each goroutine writes to a unique slot (slice index), no synchronisation between goroutines is needed. They write to disjoint memory. g.Wait provides the final visibility.
results := make([]int, n)
for i := 0; i < n; i++ {
i := i
g.Go(func() error { results[i] = compute(i); return nil })
}
g.Wait()
// safe to read all of results
This is the "result slot" pattern. Fast and lock-free.
Lock-Free Aggregation¶
When you want to aggregate without mutex contention.
Atomic counters¶
For numerical aggregation:
var total atomic.Int64
g.Go(func() error {
n, err := work()
if err != nil { return err }
total.Add(n)
return nil
})
atomic.Int64 is faster than mutex for simple updates. Use for counters, sums, etc.
Per-goroutine buffers¶
Each goroutine writes to its own buffer; aggregate after.
buffers := make([][]Result, n)
for i := 0; i < n; i++ {
i := i
g.Go(func() error {
buf := buffers[i]
for it := range items[i] {
buf = append(buf, compute(it))
}
buffers[i] = buf
return nil
})
}
g.Wait()
// Merge after
var all []Result
for _, buf := range buffers {
all = append(all, buf...)
}
No locking during the hot path. Merge is sequential but fast.
Channels for streaming aggregation¶
results := make(chan Result, runtime.NumCPU())
go func() {
g.Wait()
close(results)
}()
for _, item := range items {
item := item
g.Go(func() error {
r, err := compute(item)
if err != nil { return err }
results <- r
return nil
})
}
var all []Result
for r := range results {
all = append(all, r)
}
Channels serialize through the channel's internal lock; for very high throughput, prefer slot patterns.
sync.Map for keyed aggregation¶
For per-key aggregation:
var counts sync.Map
g.Go(func() error {
for v := range in {
key := compute(v)
val, _ := counts.LoadOrStore(key, &atomic.Int64{})
val.(*atomic.Int64).Add(1)
}
return nil
})
sync.Map is optimised for "write once, read many" workloads. For "write many, read once," a plain map with a mutex is often faster.
Structured Concurrency in Depth¶
The principle: every goroutine has a defined lifetime bounded by its parent's lifetime. Tony Hoare proposed it in CSP; modern languages (Kotlin, Swift) implement it natively. Go doesn't, but you can implement it with discipline.
Three rules¶
- Every goroutine is spawned via a tracked mechanism (errgroup, WaitGroup, channel-based).
- The parent does not return until all spawned children have returned.
- Cancellation propagates downward.
errgroup.WithContext implements all three.
Anti-pattern: orphan goroutines¶
updateMetrics runs forever. The function that started it has returned. Who cancels it? Often nobody. This is unstructured.
Refactor to structured¶
func service(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return updateMetrics(ctx) })
g.Go(func() error { return handleRequests(ctx) })
return g.Wait()
}
Both goroutines are now bounded by service's lifetime. When ctx is cancelled, both stop. service returns when both stop.
Nested groups¶
g.Go(func() error {
inner, ctx := errgroup.WithContext(ctx)
inner.Go(...)
inner.Go(...)
return inner.Wait()
})
Inner's children are bounded by the inner goroutine's lifetime, which is bounded by outer. Trees of bounded lifetimes.
Benefits¶
- No leaks.
- Clear ownership.
- Cancellation is well-defined.
- Easy to reason about: lifetime trees are straightforward.
Cost¶
- More boilerplate.
- "Just go func()" is sometimes more concise.
- Worth it for any non-trivial service.
Error Routing Architectures¶
In a complex service, errors flow through multiple layers. Designing this routing is its own art.
Layer 1: stage-internal handling¶
Sentinel errors that the stage knows how to handle:
for v := range in {
if err := tryProcess(v); err != nil {
switch {
case errors.Is(err, ErrSkip):
metrics.SkipCount.Inc()
continue
case errors.Is(err, ErrPoison):
deadLetter(v)
continue
default:
return fmt.Errorf("process: %w", err)
}
}
}
Local errors don't bubble. The stage decides.
Layer 2: stage-boundary wrapping¶
Errors that escape the stage are wrapped:
Or attributed via a typed error:
Layer 3: pipeline-level routing¶
The pipeline runner classifies and routes:
err := pipeline.Run(ctx)
switch {
case err == nil:
return Success()
case errors.Is(err, context.Canceled):
return Cancelled()
case errors.Is(err, context.DeadlineExceeded):
return Timeout()
case errors.Is(err, ErrTransient):
return Retry()
default:
return Failure(err)
}
Layer 4: service-level reporting¶
The service surfaces errors to callers:
- HTTP handler: map to status code.
- gRPC: map to status code with details.
- CLI: print user-friendly message; log full chain.
- Background worker: increment metrics, log, requeue.
Each layer has its job. Errors flow up; metadata accumulates; the final form is appropriate for the audience.
Don't skip layers¶
Tempting to handle errors "at the top" and skip stage-internal handling. Don't:
- Loses context.
- Forces top-level to know about every stage.
- Couples layers.
Each layer should handle what it knows; pass up what it doesn't.
First Error vs All Errors Trade-Off¶
When designing a pipeline's error policy, decide explicitly:
Use first-error-wins when¶
- The caller is a human user expecting one clear message.
- Subsequent failures are likely consequences of the first.
- Latency matters: fail fast saves work.
- Atomicity matters: any failure invalidates the whole.
Use aggregation when¶
- Each item is independent.
- Callers want a full report.
- Latency is less important than completeness.
- Partial results have value.
Hybrid¶
A pipeline might:
- Fail fast on structural errors (DB unavailable, config wrong).
- Aggregate on per-item errors (this row invalid, that one valid).
In code:
func run(ctx context.Context, items []Item) error {
if err := preflight(); err != nil {
return fmt.Errorf("preflight: %w", err)
}
var perItem []error
var mu sync.Mutex
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(16)
for _, it := range items {
it := it
g.Go(func() error {
err := process(ctx, it)
if err != nil {
if errors.Is(err, ErrCritical) {
return err // first-error path
}
mu.Lock()
perItem = append(perItem, err)
mu.Unlock()
}
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
if len(perItem) > 0 {
return errors.Join(perItem...)
}
return nil
}
Critical errors bubble up immediately; per-item errors aggregate. Best of both.
Dead-Letter Queues¶
A dead-letter queue (DLQ) holds items that repeatedly fail. They're not lost but not blocking the pipeline.
Pattern¶
type DLQ interface {
Put(ctx context.Context, item Item, reason error) error
}
func processWithDLQ(ctx context.Context, item Item, dlq DLQ, maxAttempts int) error {
var lastErr error
for attempt := 0; attempt < maxAttempts; attempt++ {
err := process(ctx, item)
if err == nil { return nil }
if errors.Is(err, ErrPoison) {
return dlq.Put(ctx, item, err)
}
lastErr = err
time.Sleep(backoff(attempt))
}
return dlq.Put(ctx, item, fmt.Errorf("max attempts: %w", lastErr))
}
After max attempts, item moves to DLQ. Pipeline continues. Operators inspect DLQ later.
Implementation¶
- Database table:
dead_letter_queue (id, payload, error, attempts, last_attempt). - Message queue (Kafka, RabbitMQ) with a separate DLQ topic.
- Cloud-native (AWS SQS, GCP Pub/Sub) — built-in DLQ support.
When to use¶
- Async pipelines where items can be retried later.
- Async workers processing user-submitted content.
- Background ETL where some items are bad.
Not appropriate for synchronous request-response.
Bulkheads¶
A bulkhead isolates failures to a portion of the system. Like watertight compartments on a ship.
Pattern: per-tenant isolation¶
type TenantPipeline struct {
tenantID string
limiter *rate.Limiter
workers *errgroup.Group
}
func (tp *TenantPipeline) Submit(ctx context.Context, item Item) error {
if !tp.limiter.Allow() {
return ErrTenantThrottled
}
tp.workers.Go(func() error { return tp.process(ctx, item) })
return nil
}
Each tenant gets its own limiter and worker pool. If tenant A's pipeline backs up, tenant B is unaffected.
Pattern: per-resource semaphore¶
Resources have bounded contention. If the API is slow, only 5 goroutines are blocked on it, not all.
Pattern: separate goroutine pools¶
fastPool, _ := errgroup.WithContext(ctx)
fastPool.SetLimit(100)
slowPool, _ := errgroup.WithContext(ctx)
slowPool.SetLimit(10)
if isFast(item) {
fastPool.Go(...)
} else {
slowPool.Go(...)
}
Slow items don't starve fast ones.
Circuit Breakers in Pipelines¶
A circuit breaker stops requests to a failing dependency, letting it recover.
States¶
- Closed: normal operation; requests flow.
- Open: failures exceeded threshold; requests fail fast.
- Half-open: after cooldown, try one request to see if recovered.
Implementation¶
Use github.com/sony/gobreaker or roll your own:
type Breaker struct {
mu sync.Mutex
failures int
threshold int
state int
openUntil time.Time
}
func (b *Breaker) Call(ctx context.Context, fn func() error) error {
b.mu.Lock()
if b.state == StateOpen && time.Now().Before(b.openUntil) {
b.mu.Unlock()
return ErrCircuitOpen
}
b.mu.Unlock()
err := fn()
b.mu.Lock()
defer b.mu.Unlock()
if err != nil {
b.failures++
if b.failures >= b.threshold {
b.state = StateOpen
b.openUntil = time.Now().Add(30 * time.Second)
}
} else {
b.failures = 0
b.state = StateClosed
}
return err
}
In a pipeline:
When the API is down, the breaker opens; subsequent goroutines fail fast without hammering the API. After 30 seconds, one is admitted to test recovery.
Observability and Tracing¶
Production pipelines need to be debuggable.
Structured logging¶
Every error log includes the stage, item ID, and full error. Searchable, filterable.
Metrics¶
pipeline.duration(histogram, labelled by status).pipeline.errors.count(counter, labelled by stage, error type).stage.duration(histogram, labelled by stage).pipeline.in_flight(gauge).pipeline.retries.count(counter).
Tracing¶
Distributed tracing (OpenTelemetry) propagates a trace ID through context. Each stage spans:
func parseStage(ctx context.Context, in <-chan []byte, out chan<- Record) error {
for b := range in {
ctx, span := tracer.Start(ctx, "parse")
r, err := parse(b)
if err != nil { span.RecordError(err) }
span.End()
if err != nil { return err }
out <- r
}
return nil
}
In a tracing UI you see the pipeline run as a tree: total time, per-stage time, per-item span. Indispensable for diagnosing slow stages or contention.
Don't log inside hot loops¶
log.Info per item, at 100k items/sec, generates 100k log lines. Sample:
Or log only on error:
Senior-Level Code Examples¶
Example: ETL with aggregation and rollback¶
type Importer struct {
db *sql.DB
workers int
dlq DLQ
}
func (im *Importer) Import(ctx context.Context, items []Item) error {
var (
succeeded []string
succMu sync.Mutex
errs []error
errMu sync.Mutex
)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(im.workers)
for _, it := range items {
it := it
g.Go(func() error {
err := im.importOne(ctx, it)
if err == nil {
succMu.Lock()
succeeded = append(succeeded, it.ID)
succMu.Unlock()
return nil
}
if errors.Is(err, ErrCritical) {
return err
}
if errors.Is(err, ErrPoison) {
_ = im.dlq.Put(ctx, it, err)
return nil
}
errMu.Lock()
errs = append(errs, fmt.Errorf("%s: %w", it.ID, err))
errMu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
// Critical failure: rollback succeeded items.
im.rollback(context.Background(), succeeded)
return err
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
func (im *Importer) rollback(ctx context.Context, ids []string) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(im.workers)
for _, id := range ids {
id := id
g.Go(func() error {
_, err := im.db.ExecContext(ctx, "DELETE FROM imports WHERE id = $1", id)
if err != nil {
log.Error("rollback failed", "id", id, "err", err)
}
return nil
})
}
_ = g.Wait()
}
Three error categories:
- Critical (DB down, etc.): fail-fast, rollback succeeded items.
- Poison (bad data): DLQ, continue.
- Per-item (transient, retryable later): aggregate, return at end.
This is the shape of a production importer.
Example: Saga with persistence¶
type Saga struct {
ID string
Steps []Step
State *SagaState
DB *sql.DB
}
func (s *Saga) Run(ctx context.Context) error {
for i := s.State.NextStep; i < len(s.Steps); i++ {
s.State.NextStep = i
s.persist(ctx)
if err := s.Steps[i].Forward(ctx); err != nil {
s.State.Error = err.Error()
s.persist(ctx)
return s.rollback(ctx, i)
}
}
s.State.NextStep = len(s.Steps)
s.persist(ctx)
return nil
}
func (s *Saga) rollback(ctx context.Context, failedStep int) error {
for i := failedStep - 1; i >= 0; i-- {
s.State.RollbackStep = i
s.persist(ctx)
if err := s.Steps[i].Compensate(ctx); err != nil {
log.Error("compensation failed", "step", i, "err", err)
// continue; can't help it
}
}
return errors.New("saga rolled back: " + s.State.Error)
}
func (s *Saga) persist(ctx context.Context) {
_, _ = s.DB.ExecContext(ctx,
"INSERT INTO sagas (id, state) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET state = $2",
s.ID, s.State)
}
State persisted between steps. On crash, reload state, resume.
Example: panic-safe pipeline¶
func runSafely(ctx context.Context, fn func(context.Context) error) (err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1<<16)
n := runtime.Stack(buf, false)
err = &PanicError{Value: r, Stack: buf[:n]}
log.Error("panic", "value", r, "stack", string(buf[:n]))
}
}()
return fn(ctx)
}
func runPipeline(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return runSafely(ctx, stage1) })
g.Go(func() error { return runSafely(ctx, stage2) })
g.Go(func() error { return runSafely(ctx, stage3) })
return g.Wait()
}
Every stage is wrapped. A panic becomes an error. The pipeline behaves predictably.
Architecture Patterns¶
Microservice pipeline¶
Each stage is a separate service connected by a message broker. Errors propagate via DLQ topics. Sagas via event sourcing.
Pros: scalability, isolation. Cons: complexity, latency.
Modular monolith pipeline¶
Stages are Go packages in one binary. Errgroup-based wiring. Errors via the standard pipeline patterns.
Pros: simplicity, low latency. Cons: scaling per-stage requires whole-binary scaling.
Stream processing¶
Apache Kafka + a Go consumer. Each consumer processes a stream of events. Errors via DLQ topic. Compensations via event sourcing.
Pros: durability, replay. Cons: operational overhead.
Choosing¶
For most internal pipelines, the modular monolith with errgroup is sufficient. Reach for distributed architectures when scale or durability requires it.
Clean Code at Scale¶
At scale, "clean" means:
- One package per pipeline. All stages, errors, types, tests in one package.
- Public surface is minimal. Most types unexported. Just
Run(ctx, ...), errors, and config. - Configuration through structs, not function arguments.
- Tests exhaustively cover error paths. Not just happy.
- Metrics and logs in every stage, via cross-cutting wrappers.
- No mutable global state. Even loggers are injected.
- Documentation per stage explains its contract.
A senior-quality pipeline package looks like a library: stable API, comprehensive tests, clear docs.
Product Considerations¶
- Cost of failure: a failed import affects N customers; design accordingly.
- Recovery time: if cancelled, how long to resume? Persistent sagas help.
- Observability budget: tracing every item is expensive; sample 1%.
- Tenancy: shared infrastructure needs bulkheads.
- Compliance: error logs may contain PII; redact before logging.
Best Practices¶
- Default to first-error; consider aggregation when partial results matter.
- Use
errors.Joinfor aggregation (1.20+). - Design compensating actions for every step with side effects.
- Persist saga state for long-running workflows.
- Recover panics at goroutine boundaries.
- Lock-free aggregation where possible (slot pattern).
- Bulkheads per tenant/resource.
- Circuit breakers for external dependencies.
- DLQ for poisoned items.
- Structured logs + metrics + tracing for every pipeline.
Edge Cases¶
Cancellation during rollback¶
If the parent context is cancelled mid-rollback, rollback may not complete. Workaround: use a separate, longer-lived context for rollback:
func (s *Saga) rollback(parentCtx context.Context, ...) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// use ctx for rollback steps
}
Rollback persists; parent's cancellation doesn't abort cleanup.
Compensator failure during rollback¶
What if a compensator fails? Best-effort: log and continue. Pipeline ends with rollback partially complete. Operators must finish manually.
For critical systems, persist rollback state. On restart, resume rollback.
Errors.Join with 1000 items¶
Memory: 1000 errors + a slice. Manageable. But Error() produces a 1000-line string. Truncate before serialising.
Multi-error errors.Is performance¶
errors.Is walks both single and multi unwraps. For deeply nested or wide multi-errors, it's O(N) total. In practice, fast.
Common Mistakes¶
- Aggregating with
errors.Joinand then returningg.Wait()'s error too — double-report. - Recovering panics without logging the stack trace.
- Compensating in forward order instead of reverse.
- Forgetting to persist saga state after each step.
- Using shared
errgroupacross requests — single-use violations. - Circuit breaker without timeout — open forever.
- DLQ without consumer — items accumulate forever.
Tricky Points¶
errors.Join(nil)returns nil. Useful: harmless to call with all nil.errors.Isagainst a joined error walks all branches.recoveronly catches panics in the same goroutine.runtime.Stackis expensive; avoid in tight loops.sync.Oncecannot be reset; for re-runs, use a new one.errgroupis single-use; for re-runs, create a new group.
Test Strategies¶
Property-based: cancellation always terminates¶
func TestPipelineAlwaysTerminates(t *testing.T) {
f := func(seed int64) bool {
ctx, cancel := context.WithCancel(context.Background())
rng := rand.New(rand.NewSource(seed))
go func() {
time.Sleep(time.Duration(rng.Intn(100)) * time.Millisecond)
cancel()
}()
done := make(chan struct{})
go func() {
_ = Run(ctx, randomItems(rng))
close(done)
}()
select {
case <-done:
return true
case <-time.After(time.Second):
return false
}
}
if err := quick.Check(f, nil); err != nil { t.Fatal(err) }
}
Fault injection¶
type FaultyDB struct {
real *sql.DB
failOn string
}
func (f *FaultyDB) Insert(ctx context.Context, r Record) error {
if r.ID == f.failOn {
return fmt.Errorf("simulated: %w", ErrTransient)
}
return f.real.Insert(ctx, r)
}
Inject failures at specific items to test retry and aggregation paths.
Race detection¶
Always. Any race in production pipeline is a bug.
Tricky Questions¶
Q. Two stages fail with different *StageErrors. Does errors.As work?
errgroup.Wait returns whichever error was captured first. The other is dropped. errors.As on the returned error works on it. If you want both, aggregate.
Q. What's the cost of recover in every goroutine?
About 100 ns per call. Negligible unless you have a tight inner loop spawning goroutines.
Q. Can I implement a saga without persistence for short pipelines?
Yes. In-memory rollback list works fine for sub-second pipelines. Persistence is needed only for pipelines that span process restarts.
Q. When does errgroup.WithContext cancel its derived context?
When (a) the parent is cancelled, or (b) any g.Go function returns a non-nil error (via errOnce). Both cases trigger cancel() on the derived context.
Q. How do I propagate trace context through a Go pipeline?
Pass context.Context through every stage. The tracer reads the trace ID from context. Each stage starts a span as a child.
Cheat Sheet¶
// Aggregation
errs := []error{}
mu := sync.Mutex{}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(N)
for _, x := range xs {
x := x
g.Go(func() error {
if err := do(x); err != nil {
mu.Lock(); errs = append(errs, err); mu.Unlock()
}
return nil
})
}
_ = g.Wait()
return errors.Join(errs...)
// Compensating action
defer func() {
if err != nil { compensate() }
}()
// Panic recovery
defer func() {
if r := recover(); r != nil { err = fmt.Errorf("panic: %v", r) }
}()
// Saga step
type Step struct {
Forward func(ctx) error
Compensate func(ctx) error
}
Self-Assessment¶
- I can implement error aggregation with
errors.Joinand reason about its memory model. - I can design compensating actions for a pipeline with side effects.
- I can implement a saga with persistent state.
- I can recover panics at goroutine boundaries.
- I can choose between first-error and aggregation based on requirements.
- I can integrate bulkheads, circuit breakers, and DLQs into a pipeline.
- I can read a goroutine dump from production and identify the leak.
Summary¶
At senior level, error propagation is a system-design topic. Aggregation, rollback, sagas, panic recovery, observability — all are deliberate choices that shape your service's resilience and debuggability. errgroup remains the foundation, but production pipelines layer on bulkheads, circuit breakers, DLQs, and persistent state.
The architectural test of a pipeline: when something fails at 3 AM, can the on-call engineer diagnose and respond in under an hour? If yes, your error design is good. If no, revisit.
Next: professional.md covers distributed pipelines across services, cost models of structured concurrency, and design tradeoffs at the staff level.
Further Reading¶
- "Saga Pattern" — microservices.io.
- "Patterns of Distributed Systems" — Unmesh Joshi.
- "Site Reliability Engineering" — Google.
- "Designing Data-Intensive Applications" — Martin Kleppmann (chapters on fault tolerance).
- Temporal/Cadence documentation on durable execution.
- OpenTelemetry Go SDK docs for tracing.
github.com/sony/gobreakerfor circuit breaker.github.com/cenkalti/backofffor exponential backoff with jitter.- Go Memory Model spec:
go.dev/ref/mem.
Bonus: Reading List¶
For week-by-week deep study:
- Week 1: Re-read Go memory model spec. Implement errgroup from scratch.
- Week 2: Implement a saga library. Test with simulated failures.
- Week 3: Implement bulkhead + circuit breaker. Profile under load.
- Week 4: Read "Designing Data-Intensive Applications" chapters 7-9.
- Week 5: Read Temporal's design docs. Implement a tiny temporal-like state machine.
- Week 6: Audit a production pipeline at your company. Note every error path.
By the end you'll be designing pipelines, not just writing them.
Extended Section: Building a Production Saga Coordinator¶
We've sketched sagas. Let's build a complete one — a coordinator that can run multi-step workflows with persistent state, idempotent steps, and proper compensation. This is the kind of code that powers order-processing systems, multi-step provisioning, and any workflow that crosses transactional boundaries.
Requirements¶
- Each step is a forward operation paired with a compensator.
- State is persisted after each step so we can resume after crashes.
- Steps must be idempotent — retrying a partially completed step is safe.
- On any step failure, compensators run in reverse for all completed steps.
- A failed compensator doesn't stop other compensators from running.
- The coordinator emits structured events for observability.
Data model¶
type SagaStatus string
const (
StatusPending SagaStatus = "pending"
StatusRunning SagaStatus = "running"
StatusCompleted SagaStatus = "completed"
StatusFailed SagaStatus = "failed"
StatusRolledBack SagaStatus = "rolled_back"
StatusRollingBack SagaStatus = "rolling_back"
)
type SagaRecord struct {
ID string
DefinitionID string
Status SagaStatus
CurrentStep int
LastError string
StartedAt time.Time
UpdatedAt time.Time
CompletedSteps []string
}
Step interface¶
type Step interface {
Name() string
Forward(ctx context.Context, payload []byte) error
Compensate(ctx context.Context, payload []byte) error
}
The payload is serialised state. Each step receives the saga's input data and can read/write it for inter-step communication (via the persistent record).
Coordinator¶
type Coordinator struct {
store SagaStore
steps []Step
metrics Metrics
logger Logger
}
func (c *Coordinator) Run(ctx context.Context, sagaID string, payload []byte) error {
rec, err := c.store.Load(ctx, sagaID)
if err != nil { return fmt.Errorf("load saga: %w", err) }
if rec == nil {
rec = &SagaRecord{
ID: sagaID,
DefinitionID: c.definitionID,
Status: StatusPending,
StartedAt: time.Now(),
}
if err := c.store.Save(ctx, rec); err != nil { return fmt.Errorf("init: %w", err) }
}
switch rec.Status {
case StatusCompleted:
return nil // already done; idempotent
case StatusFailed, StatusRolledBack:
return fmt.Errorf("saga %s already failed: %s", sagaID, rec.LastError)
case StatusRollingBack:
return c.continueRollback(ctx, rec, payload)
}
return c.forward(ctx, rec, payload)
}
func (c *Coordinator) forward(ctx context.Context, rec *SagaRecord, payload []byte) error {
rec.Status = StatusRunning
for i := rec.CurrentStep; i < len(c.steps); i++ {
step := c.steps[i]
rec.CurrentStep = i
if err := c.store.Save(ctx, rec); err != nil {
return fmt.Errorf("persist before step %d: %w", i, err)
}
c.logger.Info("saga forward step start", "saga", rec.ID, "step", step.Name())
start := time.Now()
err := step.Forward(ctx, payload)
c.metrics.RecordStep(step.Name(), time.Since(start), err)
if err == nil {
rec.CompletedSteps = append(rec.CompletedSteps, step.Name())
continue
}
c.logger.Error("saga step failed", "saga", rec.ID, "step", step.Name(), "err", err)
rec.LastError = err.Error()
rec.Status = StatusRollingBack
if err := c.store.Save(ctx, rec); err != nil {
return fmt.Errorf("persist before rollback: %w", err)
}
return c.rollback(ctx, rec, payload)
}
rec.Status = StatusCompleted
return c.store.Save(ctx, rec)
}
func (c *Coordinator) continueRollback(ctx context.Context, rec *SagaRecord, payload []byte) error {
return c.rollback(ctx, rec, payload)
}
func (c *Coordinator) rollback(ctx context.Context, rec *SagaRecord, payload []byte) error {
// Rollback uses a fresh context with longer deadline.
rollbackCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
// Compensate in reverse order.
var compensationErrs []error
for i := len(rec.CompletedSteps) - 1; i >= 0; i-- {
name := rec.CompletedSteps[i]
step := c.findStep(name)
if step == nil {
compensationErrs = append(compensationErrs, fmt.Errorf("unknown step: %s", name))
continue
}
c.logger.Info("saga compensate start", "saga", rec.ID, "step", name)
start := time.Now()
err := step.Compensate(rollbackCtx, payload)
c.metrics.RecordCompensation(name, time.Since(start), err)
if err != nil {
c.logger.Error("compensation failed", "saga", rec.ID, "step", name, "err", err)
compensationErrs = append(compensationErrs, fmt.Errorf("compensate %s: %w", name, err))
// continue; try to compensate the rest
}
rec.CompletedSteps = rec.CompletedSteps[:i]
rec.CurrentStep = i
c.store.Save(rollbackCtx, rec)
}
rec.Status = StatusRolledBack
rec.UpdatedAt = time.Now()
if err := c.store.Save(rollbackCtx, rec); err != nil {
compensationErrs = append(compensationErrs, fmt.Errorf("final save: %w", err))
}
if len(compensationErrs) > 0 {
return fmt.Errorf("saga failed and rollback had %d errors: %w",
len(compensationErrs), errors.Join(compensationErrs...))
}
return fmt.Errorf("saga rolled back: %s", rec.LastError)
}
func (c *Coordinator) findStep(name string) Step {
for _, s := range c.steps {
if s.Name() == name { return s }
}
return nil
}
What this coordinator gives you¶
- Crash recovery: state in DB lets us resume after process restart.
- Idempotency: each step is identified by name; running twice is safe if step implementation is idempotent.
- Compensation correctness: reverse order, all errors collected, all run.
- Observability: structured logs and metrics at every step.
- Separate rollback context: cancellation of forward doesn't kill compensation.
Caveats¶
- Concurrent execution of same saga: if two replicas try to run the same saga, we need a lock (DB advisory lock, Redis lock). Otherwise compensations race with forward steps.
- Long-running steps: if a step takes hours, save partial progress.
- Non-idempotent compensators: if compensator can't be retried, mark the saga "needs manual intervention" instead of looping forever.
When to use this vs Temporal¶
If your saga is simple (3-5 steps, single service), a coordinator like this is fine. For complex workflows across many services, with retries, signals, and child workflows, use Temporal or Cadence. They solve real problems but add operational complexity.
Extended Section: Error Aggregation Patterns at Scale¶
When processing millions of items and aggregating errors, naive patterns degrade.
Pattern: Capped error buffer¶
type ErrorBuffer struct {
mu sync.Mutex
errs []error
cap int
overflowCount atomic.Int64
}
func (b *ErrorBuffer) Add(err error) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.errs) < b.cap {
b.errs = append(b.errs, err)
} else {
b.overflowCount.Add(1)
}
}
func (b *ErrorBuffer) Result() error {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.errs) == 0 && b.overflowCount.Load() == 0 {
return nil
}
errs := append([]error{}, b.errs...)
if n := b.overflowCount.Load(); n > 0 {
errs = append(errs, fmt.Errorf("(%d more errors elided)", n))
}
return errors.Join(errs...)
}
Bounded memory regardless of error count. Useful when error volume can spike.
Pattern: Categorised aggregation¶
Group errors by category:
type Categorized struct {
mu sync.Mutex
byKind map[string][]error
}
func (c *Categorized) Add(kind string, err error) {
c.mu.Lock()
defer c.mu.Unlock()
c.byKind[kind] = append(c.byKind[kind], err)
}
func (c *Categorized) Summary() string {
c.mu.Lock()
defer c.mu.Unlock()
var parts []string
for kind, errs := range c.byKind {
parts = append(parts, fmt.Sprintf("%s: %d", kind, len(errs)))
}
return strings.Join(parts, ", ")
}
The caller learns "transient: 12, validation: 3, db: 1" at a glance.
Pattern: Sample errors¶
For very high volume, sample:
type Sample struct {
mu sync.Mutex
seen int64
sampled []error
cap int
}
func (s *Sample) Add(err error) {
n := atomic.AddInt64(&s.seen, 1)
s.mu.Lock()
defer s.mu.Unlock()
if len(s.sampled) < s.cap {
s.sampled = append(s.sampled, err)
} else {
// Reservoir sampling
j := rand.Int63n(n)
if j < int64(s.cap) {
s.sampled[j] = err
}
}
}
Reservoir sampling: every error has equal probability of being in the final sample. Memory bounded; representative.
Extended Section: Distributed Pipeline Failure Modes¶
Pipelines that cross service boundaries have additional failure modes.
Network partition¶
A stage calls a remote service; the network drops mid-call. The remote may or may not have processed the request. From the caller's view, it's a timeout.
Mitigation: idempotency keys + retry with backoff. The remote service deduplicates based on the key, so retries are safe.
Asymmetric failure¶
The remote thinks the request succeeded; the caller thinks it failed (due to a slow response that arrived after timeout). Compensator must handle this — usually by querying the remote for the operation's status using the idempotency key.
Cascading failure¶
Service A fails; service B retries A; B's retries overload A more; A fails harder. Circuit breakers prevent this.
Stale state¶
A long-running pipeline's first stage read data at time T0. By T10, the data has changed. The pipeline operates on stale data.
Mitigation: include a version/etag in pipeline state; verify at write time.
Cross-service compensation¶
Compensating a remote service requires it to support the compensation API. If it doesn't, you can't reliably roll back. Design APIs for compensability from day one.
Extended Section: Memory Model Edge Cases¶
The Go memory model is precise. Some pipeline-relevant nuances:
Channel send/receive¶
A send on a channel happens-before the corresponding receive completes. So:
The send synchronises the write to x with the read after the receive. This is why channel-based aggregation is correct.
Atomic operations¶
atomic.Store happens-before atomic.Load of the same address. So:
go func() { val = compute(); atomic.StoreInt32(&ready, 1) }()
for atomic.LoadInt32(&ready) == 0 { /* spin */ }
fmt.Println(val) // sees compute's result
Atomics give cross-goroutine happens-before. The spin-loop is a busy-wait; in real code, use a channel or sync.WaitGroup.
sync.WaitGroup¶
wg.Done happens-before wg.Wait returns. So any writes before Done are visible after Wait. This is the mechanism by which g.Wait lets you read state safely.
sync.Mutex¶
mu.Unlock() happens-before the next mu.Lock(). So writes inside a critical section are visible to the next holder of the lock.
Why this matters for errgroup¶
Every g.Wait ends with wg.Wait, which synchronises with every wg.Done. Each wg.Done happens after the goroutine's f() returns. So writes inside f happen-before Wait returns. No mutex needed for the final read — except for writes that race between goroutines.
Common bug¶
total := 0
for _, n := range nums {
n := n
g.Go(func() error { total += n; return nil }) // RACE
}
g.Wait()
total += n reads then writes total — two goroutines doing this race. The race detector catches it. Fix with atomic or mutex:
var total atomic.Int64
for _, n := range nums {
n := n
g.Go(func() error { total.Add(int64(n)); return nil })
}
g.Wait()
fmt.Println(total.Load())
Or per-goroutine slot:
sums := make([]int, len(nums))
for i, n := range nums {
i, n := i, n
g.Go(func() error { sums[i] = n; return nil })
}
g.Wait()
total := 0
for _, s := range sums { total += s }
The slot approach is often fastest.
Extended Section: Designing Error Hierarchies¶
Production systems benefit from a thoughtful error hierarchy.
Three-level hierarchy¶
// Top-level kinds
var (
ErrClient = errors.New("client error")
ErrServer = errors.New("server error")
ErrTransient = errors.New("transient")
)
// Middle: specific conditions
var (
ErrValidation = fmt.Errorf("%w: validation", ErrClient)
ErrAuth = fmt.Errorf("%w: auth", ErrClient)
ErrInternal = fmt.Errorf("%w: internal", ErrServer)
)
// Leaf: detailed errors
return fmt.Errorf("%w: missing required field: name", ErrValidation)
Callers can match at any level:
errors.Is(err, ErrValidation) // specific
errors.Is(err, ErrClient) // broad
errors.Is(err, ErrTransient) // for retry logic
When NOT to do this¶
If your service has a small, fixed error vocabulary, a flat list of sentinels is simpler. Hierarchy adds value only when you have 20+ errors that group naturally.
Mapping to status codes¶
func statusOf(err error) int {
switch {
case errors.Is(err, ErrAuth):
return 401
case errors.Is(err, ErrValidation):
return 400
case errors.Is(err, ErrNotFound):
return 404
case errors.Is(err, ErrTransient):
return 503
case errors.Is(err, ErrServer):
return 500
default:
return 500
}
}
The hierarchy makes this clean. Each errors.Is matches the highest-priority category.
Extended Section: Pipeline Health Monitoring¶
A production pipeline needs operational signals.
Liveness probe¶
Does the pipeline accept input?
If not, restart the service.
Readiness probe¶
Can the pipeline process new requests?
func (p *Pipeline) Readiness() bool {
if !p.Liveness() { return false }
if p.queueDepth() > p.maxQueue { return false }
if p.circuitBreaker.State() == StateOpen { return false }
return true
}
If not ready, load balancer routes elsewhere.
Metrics dashboard¶
- Throughput: items/second per stage.
- Error rate: errors/total per stage.
- Latency: p50, p95, p99 per stage.
- Queue depth between stages.
- Active goroutines: detect leaks.
- DLQ size: detect runaway poison.
- Saga in-flight: detect hung sagas.
- Compensation rate: detect failure clusters.
Alerts¶
- Error rate spike: >5x baseline for 5 minutes.
- Queue depth growth: increasing for 10 minutes.
- Pipeline stall: no completions for 5 minutes.
- DLQ growth: >100/min.
Calibrate to your service's normal behaviour. Over-alerting causes alert fatigue.
Closing the Loop: From Junior to Senior¶
To recap the progression:
- Junior: errgroup mechanics, channels, ctx, %w wrapping.
- Middle: error design, fan-out, retry, bounded parallelism, nested groups.
- Senior: aggregation, sagas, panics, memory model, distributed concerns.
Each level adds layers, but the fundamentals don't change. Junior-level discipline (defer close, select on ctx, %w wrap) is still required at senior. Senior-level patterns (sagas, aggregation) sit on top of junior-level scaffolding.
If you skip junior because you "already know goroutines," you'll write fragile senior-level code that compiles and tests pass but breaks in production. Master the basics. Then everything else is composition.
Final Note¶
Error propagation in pipelines is a deep topic because it interacts with concurrency, distributed systems, transactions, observability, and product design simultaneously. There is no "one true way" — only patterns that fit certain situations better than others. Your job as a senior engineer is to choose the right one for each situation and to explain why in code review.
The exam: a code reviewer points at a g.Go(func() error { ... }) block and asks "why this design?" Your answer should reference the requirements (atomicity, throughput, observability), the failure modes (transient vs permanent, cascading vs isolated), and the trade-offs (latency vs completeness, simplicity vs flexibility). If you can explain those, you're operating at senior level.
Case Studies: Real-World Pipeline Failures¶
Three case studies of pipeline failures in production. Names changed.
Case 1: The 3 AM page that never came¶
A nightly batch import. 10 million rows. Always finishes by 6 AM. One night, no completion alert by 9 AM. The on-call wakes up, finds the pipeline stuck.
Goroutine dump: every worker blocked on chan recv from a channel that nobody was writing to. The producer had returned 4 hours earlier, but didn't close the channel — a missing defer close. Workers waited forever.
The fix: add defer close(out). The bug had existed for months but only triggered when the producer returned early due to a corner case in the input. Most nights, the producer ran to completion and then return nil happened to coincide with workers having consumed everything — no detectable issue.
Lesson: defer close runs on every exit path, including the corner case you didn't think of. Always.
Case 2: The retries that took down the service¶
A service had a pipeline that retried API calls on failure. The remote API was rate-limited at 100 RPS. Normal traffic: 50 RPS. One day, the remote API briefly returned 503s for 30 seconds.
Every in-flight request retried. New requests piled up. After the brief outage, the service had 5000 queued requests, each retrying every 100 ms. The remote was back up but immediately overwhelmed by 50,000 RPS of retries. It went back down. The cycle repeated.
The fix: add a circuit breaker. When the remote returns 503s, open the breaker, fail fast for 30 seconds. Don't retry through the breaker.
Lesson: retries amplify failures. Combine them with bulkheads and breakers.
Case 3: The compensator that wasn't idempotent¶
A saga in a payment system: charge card, allocate inventory, create order, send email. The "create order" step persisted to two tables in different schemas (legacy). If the second insert failed, the saga rolled back. The compensator deleted from the first table.
But the compensator wasn't idempotent. If it ran twice (because the saga was retried after a crash mid-rollback), it tried to delete a row that was already gone — returned an error — and the saga was marked "rollback failed: row not found."
Operators had to manually mark sagas resolved. Hundreds accumulated.
The fix: the compensator now uses DELETE WHERE id = $1 which is idempotent (deletes 0 rows if already gone, no error). The saga succeeded on retry.
Lesson: every compensator must be idempotent. Test by running it twice.
Extended Section: Coordinating With Message Queues¶
When your pipeline is fed by a message queue (Kafka, RabbitMQ, SQS), error propagation interacts with queue semantics.
At-least-once delivery¶
Most queues guarantee at-least-once: a message may be delivered more than once. Your pipeline must be idempotent for repeated messages.
func consume(ctx context.Context, ch <-chan Message) error {
for msg := range ch {
if err := process(ctx, msg); err != nil {
// Don't ack; queue will redeliver.
msg.Nack()
continue
}
msg.Ack()
}
return nil
}
If process fails, nack tells the queue to redeliver. If process succeeded but the program crashed before ack, redelivery happens — and process runs again. Idempotency is essential.
Exactly-once via deduplication¶
To approximate exactly-once: maintain a "processed_messages" table with message IDs. Before processing, check; after processing, insert.
func process(ctx context.Context, msg Message) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil { return err }
defer tx.Rollback()
var exists bool
if err := tx.QueryRowContext(ctx,
"SELECT EXISTS(SELECT 1 FROM processed WHERE id = $1)", msg.ID).Scan(&exists); err != nil {
return err
}
if exists { return nil } // already processed
if err := doWork(ctx, tx, msg); err != nil { return err }
_, err = tx.ExecContext(ctx, "INSERT INTO processed (id) VALUES ($1)", msg.ID)
if err != nil { return err }
return tx.Commit()
}
The transaction ensures the dedup record and the work commit atomically. If anything fails, transaction rolls back; redelivery is safe.
DLQ from the queue¶
Most queues support a DLQ topic. After N failed deliveries, the queue moves the message to the DLQ:
Operators inspect the DLQ. Often, fixes involve patching the consumer code and replaying the DLQ messages.
Manual nack with delay¶
Some queues let you nack with a delay:
For transient failures, redelivering immediately just retries the same failure. A 30-second delay gives the dependency time to recover.
Backpressure¶
If your pipeline is slow, messages accumulate in the queue. The queue may have its own limits (memory, retention). Monitor queue depth as a leading indicator.
To apply backpressure: prefetch limit. Tell the queue "give me at most N unacked messages":
This caps in-flight work and lets the queue absorb the rest.
Extended Section: Error Propagation in Streaming Pipelines¶
Streaming pipelines have no "end." They run forever, processing events as they arrive. Error semantics differ.
Per-event error handling¶
In a stream, a single bad event shouldn't stop the pipeline:
for ev := range stream {
if err := process(ev); err != nil {
log.Error("event failed", "id", ev.ID, "err", err)
metrics.Counter("stream.errors").Inc()
// continue
}
}
Errors are observed, logged, and the next event is processed. The pipeline is durable.
Catastrophic failure¶
Some failures should stop the pipeline: DB completely down, config corrupt, panics. Distinguish:
for ev := range stream {
err := process(ev)
if err == nil { continue }
if errors.Is(err, ErrFatal) {
return fmt.Errorf("fatal: %w", err) // pipeline shuts down
}
log.Error("event failed", "err", err)
}
Define ErrFatal carefully. Most failures aren't fatal.
Position tracking¶
In a stream, you must remember "where am I?" so on restart you don't re-process from the beginning:
type Cursor struct {
Offset int64
UpdatedAt time.Time
}
func process(ev Event) error {
if err := doWork(ev); err != nil { return err }
return cursor.Save(ev.Offset)
}
If processing succeeds but cursor save fails, on restart we re-process. Idempotency needed.
Backpressure in streams¶
If the stream produces faster than you process, options:
- Buffer with bounded capacity; drop excess.
- Sample (process every Nth event).
- Scale out: more consumer instances.
- Apply backpressure to producer (rarely possible for external streams).
Pipeline design decides which.
Lambdas vs goroutines for stream processing¶
In serverless (AWS Lambda, Cloud Functions), each event is a separate invocation. No long-lived goroutines. Error semantics are simpler but state management is harder.
In Go with goroutines, you have a long-lived process. State is in memory. Failures lose state unless persisted.
Trade-off: simplicity vs efficiency.
Extended Section: Cross-Stage Tracing¶
Distributed tracing makes pipelines debuggable.
OpenTelemetry primer¶
import "go.opentelemetry.io/otel"
tracer := otel.Tracer("pipeline")
func stage(ctx context.Context, in <-chan Job) error {
for job := range in {
ctx, span := tracer.Start(ctx, "stage.process",
trace.WithAttributes(attribute.String("job.id", job.ID)))
err := process(ctx, job)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.End()
if err != nil { return err }
}
return nil
}
Each item gets its own span. The span tree shows the item's journey through stages.
Propagating across channels¶
context.Context flows through stages. But what if the channel only carries the item, not the context?
type Wrapped[T any] struct {
Ctx context.Context
Item T
}
func parseStage(in <-chan Wrapped[[]byte], out chan<- Wrapped[Record]) error {
for w := range in {
ctx, span := tracer.Start(w.Ctx, "parse")
r, err := parse(w.Item)
span.End()
if err != nil { return err }
out <- Wrapped[Record]{Ctx: ctx, Item: r}
}
return nil
}
The context (including trace ID) travels with the item. Each stage adds a child span. The trace shows the item's full path.
Caveat: contexts shouldn't be stored in structs (Go style says so). But for tracing in pipelines, it's a pragmatic exception. Document it.
Alternative: extract baggage¶
Instead of carrying full context, carry trace ID as baggage:
Each stage starts a span from the trace/span IDs. No context smuggling.
Sampling¶
Tracing every item is expensive. Sample:
1% of items get traced; the rest don't. Storage cost manageable; representative sample available for debugging.
Reading a pipeline trace¶
In Jaeger or Tempo, a pipeline trace looks like a Gantt chart:
You see exactly where time is spent. Where retries happened. Which stage is the bottleneck. Indispensable.
Extended Section: Failure Injection¶
Don't just hope error paths work — test them.
Manual injection¶
In tests:
type FaultyStorer struct {
real Storer
failOn map[string]error
}
func (f *FaultyStorer) Store(ctx context.Context, r Record) error {
if err, ok := f.failOn[r.ID]; ok {
return err
}
return f.real.Store(ctx, r)
}
Wire FaultyStorer into the pipeline. Test that failures propagate, compensations run, etc.
Chaos engineering¶
In production-like environments, inject failures randomly:
- Network: drop X% of packets between services.
- CPU: pin a worker at 100% CPU.
- Latency: add 500ms to one in 1000 requests.
- Errors: make a random 1% of API calls fail.
Tools: Chaos Mesh, Litmus, Gremlin. Or roll your own:
type ChaosWrapper struct {
real APIClient
rng *rand.Rand
errRate float64
}
func (c *ChaosWrapper) Call(ctx context.Context, req Request) (Response, error) {
if c.rng.Float64() < c.errRate {
return Response{}, errors.New("chaos: simulated failure")
}
return c.real.Call(ctx, req)
}
Enabled in staging. Find the bugs before users do.
Game days¶
Once a quarter, run a planned outage: take down a dependency for an hour. Watch how the pipeline reacts. Did the circuit breaker open? Did retries back off? Did DLQ fill up cleanly?
These exercises surface gaps in your design that you can't discover by reading code.
Extended Section: Performance Engineering¶
Some heuristics for tuning pipeline performance.
Profile first¶
Or in a running service:
Then go tool pprof http://localhost:6060/debug/pprof/profile. See where CPU is spent.
Common bottlenecks¶
- Mutex contention: pprof shows
runtime.lock/semrelease. Mitigate with sharded locks or atomic operations. - Channel contention: high time in
chansend/chanrecv. Increase buffer or use multiple channels. - Allocation: high
runtime.mallocgc. Pre-allocate slices, usesync.Poolfor temporary objects. - GC pressure: tune
GOGC, reduce allocations. - System calls: too many small
read/write. Batch.
Goroutine count¶
runtime.NumGoroutine() periodically. If growing unbounded, you have a leak. Heap profile shows where the goroutines were started:
Latency tuning¶
If p99 latency is bad but average is fine, you have outliers. Look for:
- Tail latency from one slow dependency.
- GC pauses (rare in Go but possible with large heaps).
- Cache misses (CPU profile shows high stalled cycles).
- Contention (pprof block profile).
Throughput tuning¶
If throughput is bottlenecked, look for:
- Underutilised CPUs (need more goroutines).
- Saturated CPUs (need fewer, or faster work).
- I/O-bound stages (more parallelism helps).
- Lock contention (sharding helps).
Avoid premature optimisation¶
Most pipelines are correct-but-slow before they're slow-and-wrong. Get correctness first.
Extended Section: API Design for Pipelines¶
When publishing a pipeline as a library or service interface:
Stable public API¶
package pipeline
type Config struct {
Workers int
RateLimit int
}
func New(cfg Config) *Pipeline
func (p *Pipeline) Run(ctx context.Context, input []Item) (*Result, error)
type Result struct {
Successful []Item
Failed []FailedItem
Duration time.Duration
}
type FailedItem struct {
Item Item
Err error
}
Minimal surface. Run is the only verb. Result is a value type. Errors are exported sentinels and types.
Versioning¶
Sentinels and types are part of the API. Renaming breaks callers. Adding new ones is non-breaking unless a caller's switch had a default.
Use semver. Bumps mean: patch for bug fixes, minor for new errors/options, major for renames.
Options pattern¶
type Option func(*Config)
func WithWorkers(n int) Option { ... }
func New(opts ...Option) *Pipeline {
cfg := defaultConfig()
for _, o := range opts { o(&cfg) }
return &Pipeline{cfg: cfg}
}
Lets callers customise without growing your signature.
Documentation¶
Every exported symbol has a doc comment. Errors documented in package doc. Examples in example_test.go.
// Run executes the pipeline.
//
// Returns a non-nil error if any item fails. Use errors.Is to identify:
// - ErrInvalid: an item failed validation
// - ErrTransient: an item failed transiently; retry may succeed
// - context.Canceled: caller cancelled
//
// The Result.Successful list contains items that processed successfully,
// even if other items failed.
func (p *Pipeline) Run(ctx context.Context, items []Item) (*Result, error)
Callers should be able to use the package without reading the source.
Extended Section: Cross-Language Concerns¶
If your pipeline interacts with non-Go services, errors interact with their conventions.
gRPC¶
gRPC has its own error model: status codes (OK, INVALID_ARGUMENT, NOT_FOUND, etc.) plus details. Map:
func toGRPC(err error) error {
switch {
case errors.Is(err, ErrNotFound):
return status.Error(codes.NotFound, err.Error())
case errors.Is(err, ErrValidation):
return status.Error(codes.InvalidArgument, err.Error())
case errors.Is(err, context.DeadlineExceeded):
return status.Error(codes.DeadlineExceeded, err.Error())
default:
return status.Error(codes.Internal, err.Error())
}
}
Don't leak internal error chains to gRPC — security risk. Sanitize.
HTTP¶
func toHTTP(err error, w http.ResponseWriter) {
switch {
case errors.Is(err, ErrNotFound):
http.Error(w, "Not Found", http.StatusNotFound)
case errors.Is(err, ErrValidation):
http.Error(w, "Bad Request", http.StatusBadRequest)
default:
log.Error("internal", "err", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
User sees a sanitised message; logs have the full chain.
Cross-service Go-to-Go¶
Even between two Go services, errors don't travel as Go errors over the wire. You serialise them as strings or structured data and reconstruct at the other end. Sentinels and types must be redefined per-service.
Standard approach: send error code + message:
Receiver maps code back to sentinel:
func fromAPI(ae *APIError) error {
switch ae.Code {
case "NOT_FOUND":
return fmt.Errorf("%s: %w", ae.Message, ErrNotFound)
default:
return errors.New(ae.Message)
}
}
Tedious but necessary. The error contract crosses the wire as data, not as Go types.
Extended Section: Performance Tuning Stories¶
A few illustrative numbers from real systems.
errgroup overhead¶
errgroup.WithContext allocates ~200 bytes. g.Go adds ~50 bytes. g.Wait is essentially free.
For a pipeline with three stages, total overhead is sub-microsecond. Don't worry about it.
Goroutine creation cost¶
A fresh goroutine costs ~1.5 µs of CPU. Spawning 100k goroutines is 150 ms. With SetLimit(N), you spawn N goroutines and reuse — overhead is amortised.
Channel send/receive¶
Unbuffered channel send/receive: ~50 ns each. Buffered with available capacity: ~20 ns. Buffered when blocked: ~100 ns due to scheduling.
For pipelines processing 1M items/sec, channels are about 20% of CPU. Use them; don't agonise.
Mutex vs atomic vs channel¶
- Atomic.Add: ~5 ns.
- Mutex Lock/Unlock (uncontended): ~30 ns.
- Channel send/recv: ~50 ns.
- Mutex Lock/Unlock (contended, microbenchmark): ~500 ns to 50 µs.
Use atomic for simple counters. Mutex for short critical sections. Channels for handoff or signalling.
Memory model fence cost¶
Atomic operations include implicit memory fences. On x86, this is essentially free; on ARM, more expensive. Not usually pipeline-relevant.
Extended Section: Maintaining a Pipeline Over Time¶
Pipelines evolve. Stages get added, changed, removed. How to do this safely.
Adding a stage¶
Append it after existing stages. Test in isolation. Roll out with feature flag to enable for a subset of traffic. Monitor.
Removing a stage¶
Mark it as a no-op first. Roll out. Wait. Once you're confident no traffic depends on it, remove the code.
Changing a stage¶
Hard. Best approach: add a new stage with new behaviour; have both run in parallel briefly; switch traffic; remove the old.
Changing the error API¶
Adding a new sentinel: non-breaking (unless callers have exhaustive switches). Removing a sentinel: breaking. Deprecate first, give callers time. Adding fields to a typed error: non-breaking (new field is zero-valued for old callers). Renaming: always breaking.
Document the error API like any public API.
Refactoring¶
When stages get too long:
- Extract helpers (private functions called by the stage).
- Split into two stages with a channel between.
- Move logic into a separate package.
Refactor incrementally. Every commit should leave the pipeline working.
Extended Section: Anti-Patterns at Senior Level¶
Senior-level anti-patterns are subtler than junior ones.
Anti-pattern: Premature sagas¶
Not every pipeline needs a saga. If side effects are within a single DB, use a transaction. Sagas add complexity; reserve for cross-service or non-transactional side effects.
Anti-pattern: Aggregation when first-error suffices¶
"Let's aggregate all errors" sounds thorough, but if callers only act on the first one, the aggregation is wasted code. Match design to use case.
Anti-pattern: Manual coordination instead of errgroup¶
errCh := make(chan error, 5)
for i := 0; i < 5; i++ {
go func() { errCh <- work() }()
}
var firstErr error
for i := 0; i < 5; i++ {
if err := <-errCh; firstErr == nil && err != nil {
firstErr = err
}
}
return firstErr
Reimplements errgroup, badly (no cancellation, no SetLimit). Use the library.
Anti-pattern: Recovery that hides bugs¶
The function continues as if nothing happened. The bug is silenced. Recovery should convert panic to error and return it.
Anti-pattern: Cross-cutting via globals¶
Now everyone shares one budget. Tests can't isolate. Two pipelines interfere. Inject dependencies; avoid globals.
Extended Section: Code Review Checklist¶
When reviewing pipeline code:
-
errgroup.WithContextused (not bareerrgroup.Group{}). - Every output channel is closed by exactly one goroutine.
- Every send is
select'd againstctx.Done(). -
db.Query/http.Do/etc. receivectx. - Errors wrapped with
%wat every boundary. -
errors.Is/errors.Asused, not==. - Sentinels and typed errors documented.
-
deferfor resource cleanup. - No bare
going.Gofunctions. -
SetLimitif fan-out can be large. - Retry with backoff and jitter.
- Compensating actions for stages with side effects.
- Panic recovery at goroutine boundaries.
- Tests cover happy path, cancellation, retry, partial failure.
- Metrics and logs for every error.
This is a senior-level review. Each item is a known bug class.
Extended Section: When to Build vs Buy¶
Many pipeline patterns are available as libraries or services:
- Workflow engines: Temporal, Cadence, AWS Step Functions. Use when sagas are complex or cross-service.
- Message queues: Kafka, RabbitMQ, SQS. Use for async durable processing.
- Stream processors: Flink, Kafka Streams. Use for high-volume event processing.
- Task queues: Asynq, Sidekiq, Celery. Use for background job processing.
For most internal pipelines, plain Go + errgroup is fine. Reach for these tools when:
- Pipelines exceed a few hours.
- State must survive crashes.
- Workflows cross services.
- Operational burden of custom code outweighs library complexity.
A common arc: start with errgroup. Outgrow it. Move to a workflow engine. The transition is the hard part — design for it.
Final Senior-Level Wisdom¶
A few principles after years of building pipelines:
- Make failure a first-class citizen of the design, not an afterthought.
- Idempotency is non-negotiable for anything with side effects.
- Observe everything: metrics, logs, traces. You can't debug what you can't see.
- Test failure paths exhaustively. Happy-path tests are necessary but insufficient.
- Bound everything: parallelism, retries, queue depth. Unbounded systems blow up.
- Simple beats clever. A 10-line errgroup pipeline you can debug at 3 AM beats a 100-line bespoke coordinator.
- Document the error API. Future you will thank present you.
The day you stop adding features and start adding tests is the day your pipeline becomes production-ready.
Worked Example: Full Pipeline With Every Senior Concept¶
A unified example tying everything together. The task: a payment processing pipeline that validates, charges, allocates inventory, ships, and notifies. Each step has a compensator. The whole flow is wrapped in a saga. Errors are aggregated. Panics are recovered. Everything is observable.
package payment
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
// Errors as a package-level vocabulary.
var (
ErrInvalidOrder = errors.New("invalid order")
ErrPaymentDeclined = errors.New("payment declined")
ErrInventoryShort = errors.New("inventory short")
ErrShippingDown = errors.New("shipping system down")
ErrTransient = errors.New("transient")
ErrPermanent = errors.New("permanent")
)
// Typed error for stage attribution.
type StageError struct {
Stage string
Err error
}
func (e *StageError) Error() string { return e.Stage + ": " + e.Err.Error() }
func (e *StageError) Unwrap() error { return e.Err }
// Order is the input.
type Order struct {
ID string
UserID string
Items []LineItem
Total int64
}
type LineItem struct {
SKU string
Quantity int
}
// Result is the output.
type Result struct {
OrderID string
Status string
PaymentID string
ShipmentID string
}
// Dependencies are injected.
type Deps struct {
Validator Validator
Payments PaymentClient
Inventory InventoryClient
Shipping ShippingClient
Notifier NotifierClient
Store SagaStore
Logger Logger
Metrics Metrics
}
// Pipeline processes an order through the pipeline with full saga semantics.
type Pipeline struct {
deps Deps
}
func New(deps Deps) *Pipeline {
return &Pipeline{deps: deps}
}
// Process runs the pipeline for one order.
func (p *Pipeline) Process(ctx context.Context, order Order) (*Result, error) {
// Wrap entire process in panic recovery.
var result *Result
var processErr error
func() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1<<16)
n := runtime.Stack(buf, false)
processErr = fmt.Errorf("panic: %v\n%s", r, buf[:n])
p.deps.Logger.Error("pipeline panic", "order", order.ID, "panic", r)
}
}()
result, processErr = p.processInternal(ctx, order)
}()
return result, processErr
}
func (p *Pipeline) processInternal(ctx context.Context, order Order) (*Result, error) {
p.deps.Metrics.OrderStarted(order.ID)
defer p.deps.Metrics.OrderFinished(order.ID)
saga := &Saga{
ID: order.ID,
Store: p.deps.Store,
Logger: p.deps.Logger,
Steps: []Step{
{
Name: "validate",
Forward: func(ctx context.Context) error {
return p.validate(ctx, order)
},
Compensate: func(ctx context.Context) error { return nil }, // no-op
},
{
Name: "charge",
Forward: func(ctx context.Context) error {
paymentID, err := p.chargeWithRetry(ctx, order, 3)
if err != nil { return err }
saga.Set("paymentID", paymentID)
return nil
},
Compensate: func(ctx context.Context) error {
paymentID, ok := saga.Get("paymentID").(string)
if !ok || paymentID == "" { return nil }
return p.deps.Payments.Refund(ctx, paymentID)
},
},
{
Name: "allocate",
Forward: func(ctx context.Context) error {
return p.allocateAllItems(ctx, order)
},
Compensate: func(ctx context.Context) error {
return p.releaseAllItems(ctx, order)
},
},
{
Name: "ship",
Forward: func(ctx context.Context) error {
shipmentID, err := p.shipWithRetry(ctx, order, 3)
if err != nil { return err }
saga.Set("shipmentID", shipmentID)
return nil
},
Compensate: func(ctx context.Context) error {
shipmentID, ok := saga.Get("shipmentID").(string)
if !ok || shipmentID == "" { return nil }
return p.deps.Shipping.Cancel(ctx, shipmentID)
},
},
{
Name: "notify",
Forward: func(ctx context.Context) error {
return p.deps.Notifier.Send(ctx, order.UserID, "Order shipped: "+order.ID)
},
Compensate: func(ctx context.Context) error { return nil }, // cannot unsend email
},
},
}
if err := saga.Run(ctx); err != nil {
return nil, fmt.Errorf("saga: %w", err)
}
return &Result{
OrderID: order.ID,
Status: "shipped",
PaymentID: saga.Get("paymentID").(string),
ShipmentID: saga.Get("shipmentID").(string),
}, nil
}
func (p *Pipeline) validate(ctx context.Context, order Order) error {
if err := p.deps.Validator.Validate(ctx, order); err != nil {
return fmt.Errorf("%w: %v", ErrInvalidOrder, err)
}
return nil
}
func (p *Pipeline) chargeWithRetry(ctx context.Context, order Order, maxAttempts int) (string, error) {
var lastErr error
for attempt := 0; attempt < maxAttempts; attempt++ {
pid, err := p.deps.Payments.Charge(ctx, order.UserID, order.Total, "order-"+order.ID)
if err == nil { return pid, nil }
if errors.Is(err, ErrPaymentDeclined) {
return "", err // permanent; don't retry
}
lastErr = err
wait := time.Duration(1<<attempt) * 200 * time.Millisecond
select {
case <-ctx.Done(): return "", ctx.Err()
case <-time.After(wait):
}
}
return "", fmt.Errorf("charge after %d attempts: %w", maxAttempts, lastErr)
}
func (p *Pipeline) allocateAllItems(ctx context.Context, order Order) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(runtime.NumCPU())
var failures []error
var mu sync.Mutex
for _, item := range order.Items {
item := item
g.Go(func() error {
err := p.deps.Inventory.Allocate(ctx, item.SKU, item.Quantity)
if err != nil {
mu.Lock()
failures = append(failures, fmt.Errorf("allocate %s: %w", item.SKU, err))
mu.Unlock()
return nil // collect, don't fail-fast
}
return nil
})
}
_ = g.Wait()
if len(failures) > 0 {
return fmt.Errorf("%w: %w", ErrInventoryShort, errors.Join(failures...))
}
return nil
}
func (p *Pipeline) releaseAllItems(ctx context.Context, order Order) error {
var errs []error
for _, item := range order.Items {
if err := p.deps.Inventory.Release(ctx, item.SKU, item.Quantity); err != nil {
errs = append(errs, fmt.Errorf("release %s: %w", item.SKU, err))
}
}
return errors.Join(errs...)
}
func (p *Pipeline) shipWithRetry(ctx context.Context, order Order, maxAttempts int) (string, error) {
var lastErr error
for attempt := 0; attempt < maxAttempts; attempt++ {
sid, err := p.deps.Shipping.Create(ctx, order)
if err == nil { return sid, nil }
if !errors.Is(err, ErrTransient) {
return "", fmt.Errorf("ship: %w", err)
}
lastErr = err
wait := time.Duration(1<<attempt) * 500 * time.Millisecond
select {
case <-ctx.Done(): return "", ctx.Err()
case <-time.After(wait):
}
}
return "", fmt.Errorf("ship after %d attempts: %w", maxAttempts, lastErr)
}
The saga implementation (sketched, complete saga code from earlier section):
type Step struct {
Name string
Forward func(ctx context.Context) error
Compensate func(ctx context.Context) error
}
type Saga struct {
ID string
Store SagaStore
Logger Logger
Steps []Step
state map[string]any
mu sync.Mutex
}
func (s *Saga) Set(k string, v any) {
s.mu.Lock()
defer s.mu.Unlock()
if s.state == nil { s.state = map[string]any{} }
s.state[k] = v
}
func (s *Saga) Get(k string) any {
s.mu.Lock()
defer s.mu.Unlock()
return s.state[k]
}
func (s *Saga) Run(ctx context.Context) error {
var completed []int
for i, step := range s.Steps {
s.Logger.Info("saga step", "id", s.ID, "step", step.Name)
if err := step.Forward(ctx); err != nil {
s.Logger.Error("saga step failed", "id", s.ID, "step", step.Name, "err", err)
// rollback
rollbackCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
var compErrs []error
for j := len(completed) - 1; j >= 0; j-- {
cstep := s.Steps[completed[j]]
if cerr := cstep.Compensate(rollbackCtx); cerr != nil {
compErrs = append(compErrs, fmt.Errorf("compensate %s: %w", cstep.Name, cerr))
}
}
if len(compErrs) > 0 {
return fmt.Errorf("saga failed (%w) and rollback errors: %w",
err, errors.Join(compErrs...))
}
return fmt.Errorf("saga rolled back: %w", err)
}
completed = append(completed, i)
}
return nil
}
This is a senior-quality pipeline. Every concept covered earlier is in here: errgroup, sentinels, typed errors, retry, aggregation, sagas, compensating actions, panic recovery, observability hooks. About 250 lines of production code; the rest is dependencies and types.
Read it three times. Understand each piece. Then try to write it from memory. That exercise will solidify everything.
Recap of Patterns by Phase¶
| Phase | Key patterns | Tools |
|---|---|---|
| Setup | Dependency injection, options | Struct + functional options |
| Validation | Sentinel errors | errors.New, errors.Is |
| Parallel work | Errgroup + SetLimit | golang.org/x/sync/errgroup |
| Retry | Backoff + jitter + budget | time.After, rand |
| Aggregation | errors.Join + mutex | sync.Mutex |
| Side effects | Sagas + compensators | Custom saga lib or Temporal |
| Safety | Panic recovery | defer recover |
| Observability | Logs + metrics + tracing | structured logging, OpenTelemetry |
A pipeline using all these is dense but readable, and recovers gracefully from a wide range of failures.
Looking Ahead¶
At professional level we'll cover:
- Pipelines in distributed systems (cross-service error propagation).
- Cost models of structured concurrency (when is the overhead worth it?).
- Idempotency budgets and exactly-once approximations.
- Designing error contracts for cross-team services.
- Backpressure and flow control beyond pipelines.
For now, internalise what's here. Senior-level pipeline design is the foundation for distributed-system design. Get it right at this scale before scaling further.
Deep Dive: Diagnosing a Stuck Pipeline in Production¶
A scenario: at 2:14 AM, alerts fire. The pipeline has stopped producing output. The queue depth is growing. You SSH in.
Step 1: Verify the process is alive¶
If the process is dead, restart and investigate logs.
Step 2: Check goroutine count¶
A normal pipeline has dozens of goroutines. If you see thousands, it's leaking. If you see 5 (when it should be 30), workers are exiting.
Step 3: Read the goroutine dump¶
Search for groups of goroutines stuck in the same state:
grep -A 5 "chan send" goroutines.txt | head -100
grep -A 5 "chan receive" goroutines.txt | head -100
Look for:
- Many goroutines blocked on
chan send: a consumer stopped reading. - Many on
chan receive: a producer stopped writing or didn't close. - A goroutine blocked on
sync.Cond.Wait: a notification never fired. - A goroutine blocked on
runtime.goparkwith no clear reason: deadlock.
Step 4: Check metrics¶
Look for:
pipeline_active_stagesnot zero butpipeline_completionsflat.- Error rate spiking.
- Specific stage taking far longer than usual.
Step 5: Inspect external dependencies¶
The pipeline may be waiting on a downed dependency:
If a dependency is unreachable, the pipeline is doing the right thing by waiting (assuming ctx has a deadline).
Step 6: Force a goroutine dump for postmortem¶
(Assuming the process has a SIGQUIT handler that dumps stacks. If not, use pprof.)
Save for later analysis.
Step 7: Decide: rollback or wait¶
- If a dependency is recovering, give it time.
- If the pipeline is leaking and unrecoverable, restart.
- If you can't tell, capture diagnostics, wait, escalate.
This procedure scales. Practice it in staging so you're not learning at 2 AM.
Deep Dive: Backfill Pipelines¶
A common production task: backfill data when a schema changes or a bug requires reprocessing.
Characteristics¶
- Single-run (not continuous).
- Large dataset (millions of items).
- May take hours.
- Must be resumable on crash.
- Must not impact live traffic.
Pattern: Checkpointed backfill¶
type Backfill struct {
ID string
Cursor int64
Total int64
Started time.Time
Last time.Time
Status string // running, paused, completed, failed
LastError string
}
func (b *Backfill) Run(ctx context.Context, batchSize int, processor func([]Row) error) error {
for {
if err := ctx.Err(); err != nil { return err }
rows, err := loadBatch(ctx, b.Cursor, batchSize)
if err != nil { return fmt.Errorf("load: %w", err) }
if len(rows) == 0 { break }
if err := processor(rows); err != nil {
b.LastError = err.Error()
saveBackfill(ctx, b)
return err
}
b.Cursor = rows[len(rows)-1].ID
b.Last = time.Now()
if err := saveBackfill(ctx, b); err != nil {
return fmt.Errorf("save cursor: %w", err)
}
// throttle to not impact live traffic
time.Sleep(10 * time.Millisecond)
}
b.Status = "completed"
return saveBackfill(ctx, b)
}
Each batch processed, cursor saved. On crash, reload b and resume.
Pattern: Parallel backfill¶
func runParallel(ctx context.Context, b *Backfill, workers int, processor func([]Row) error) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(workers)
cursorCh := make(chan int64, workers*2)
g.Go(func() error {
defer close(cursorCh)
for {
select {
case <-ctx.Done(): return ctx.Err()
case cursorCh <- nextCursor():
}
}
})
for i := 0; i < workers; i++ {
g.Go(func() error {
for cursor := range cursorCh {
rows, err := loadBatchAt(ctx, cursor)
if err != nil { return err }
if err := processor(rows); err != nil { return err }
}
return nil
})
}
return g.Wait()
}
Workers pull cursors independently. Each cursor is a batch range. Coordination via the cursor channel.
For checkpointing, you need to track which ranges have been processed. A bitmap or "completed_ranges" table works.
Pattern: Throttled backfill¶
limiter := rate.NewLimiter(rate.Limit(100), 100) // 100 RPS to backend
for _, row := range rows {
if err := limiter.Wait(ctx); err != nil { return err }
if err := processOne(row); err != nil { return err }
}
Throttling ensures the backfill doesn't impact live traffic. Tune based on capacity.
Deep Dive: Pipeline Templates¶
Once you've written 3-4 pipelines, you'll notice patterns. Codify them.
Template: ETL¶
The standard ETL. Use errgroup, fan-out in transform if CPU-bound, bounded parallelism in load.
Template: Fetch-and-store¶
Common for crawlers and integrations. Fetch is the heavy stage; bounded parallelism is essential.
Template: Stream processor¶
Stateful or stateless. For stateful, persist state per consumer group.
Template: Saga¶
For multi-step processes with side effects.
Template: Fan-out delivery¶
Webhook fan-out, notification systems. Per-subscriber retry, DLQ on permanent failure.
Each template has a known shape and known error patterns. Recognising the template is the first step in design.
Deep Dive: When Pipelines Don't Fit¶
Not every problem is a pipeline. Some signs you're forcing the wrong shape:
- The data has no natural flow direction. If items can be processed in any order with no dependencies, you don't need a pipeline — you need a worker pool.
- Stages are too uneven. If stage 2 takes 100x longer than stage 1, you have a bottleneck and the pipeline shape doesn't help — you need to scale stage 2 differently.
- Stages share state. Pipelines work best when each stage is stateless and data flows through. Shared mutable state breaks the model.
- Error handling needs cross-stage coordination. "On step 3 failure, rerun step 2" is not pipeline-shaped; it's a state machine.
Use the right tool. A pipeline isn't always it.
Deep Dive: Cross-Cutting Concerns¶
In senior-level pipeline design, cross-cutting concerns recur:
Authentication and authorization¶
A pipeline acting on behalf of a user must propagate that identity. Context value:
type userKey struct{}
func WithUser(ctx context.Context, user User) context.Context {
return context.WithValue(ctx, userKey{}, user)
}
func UserFrom(ctx context.Context) (User, bool) {
u, ok := ctx.Value(userKey{}).(User)
return u, ok
}
Stages that perform sensitive ops check authorization:
Tenancy¶
Multi-tenant pipelines must isolate. Pass tenant ID via context, use it for routing, rate limiting, bulkheads.
Quotas¶
Per-tenant or per-pipeline quotas:
Auditing¶
Every action logged with structured context:
Auditing is not the same as logging. Audit logs go to an append-only store. Used for compliance and incident response.
PII handling¶
Avoid PII in logs. If it must be there, redact:
Encryption at rest for stored errors. TLS in transit.
Deep Dive: The Pipeline as a Service¶
Some pipelines become services with public APIs. Considerations:
Rate limiting at ingress¶
Don't accept more work than you can handle. Reject excess at the door:
Communicates backpressure to clients.
Request validation¶
Validate inputs before queuing. Don't fill the queue with garbage:
Idempotency keys¶
Public APIs benefit from explicit idempotency:
Clients can safely retry. Server dedupes.
Webhooks for completion¶
For long pipelines, don't make clients poll. Webhook back:
func (p *Pipeline) ProcessAsync(ctx context.Context, req Request, webhook string) error {
go func() {
result := p.Process(ctx, req)
notify(webhook, result)
}()
return nil
}
Standard cloud service pattern. Avoid goroutine leaks: use a tracked worker pool or job queue, not bare go.
Versioning¶
Public APIs evolve. Version them:
Keep v1 working while v2 rolls out. Deprecate eventually.
Deep Dive: Lessons From Production Outages¶
Five outages worth remembering.
Outage 1: The cascading queue¶
A queue consumer pipeline had a bug: a poison message caused the consumer to crash. Kubernetes restarted it. It picked up the same poison message. Crashed. Restarted. Crashed.
Fix: catch the panic, DLQ the message, continue.
Lesson: panics in stream consumers are normally rare but catastrophic. Always recover.
Outage 2: The retry storm¶
A pipeline retried every transient failure 10 times with no jitter. A brief upstream blip caused all in-flight items to enter retry. The next attempt window saw 10,000 requests. Upstream went down harder.
Fix: add jitter. Add circuit breaker.
Lesson: synchronised retries amplify failures.
Outage 3: The cancellation that never came¶
A pipeline's stages didn't honor ctx.Done(). On shutdown, they kept running. New deploys had stale state from old runs.
Fix: audit every blocking call. Use linters.
Lesson: cancellation is everyone's job.
Outage 4: The unbounded queue¶
A pipeline accepted work into an unbounded queue. Faster ingress, slower processing. Queue grew. Memory grew. OOM killed the process.
Fix: bound the queue. Reject when full.
Lesson: every queue has bounds, even when implicit.
Outage 5: The lost error¶
A pipeline aggregated errors into a slice but returned only the first one to the metrics system. A new failure mode emerged but was masked by the dominant existing failure mode.
Fix: emit metrics per error category, not just per overall outcome.
Lesson: observability granularity matters.
Closing Note¶
This file is long because the topic is. Production pipelines are concurrent systems with side effects, error propagation, observability, and a finite operational budget. Every detail matters. Skipping any of them is a hidden bug.
If you've read this far, you have the foundation. The rest is practice. Build pipelines. Audit them. Watch them fail. Fix the failures. Iterate.
Senior engineers are made by the bugs they've debugged at 3 AM.
Appendix: A Long Quiz¶
Test yourself on these. No looking.
-
A pipeline uses
errgroup.WithContext. Stage A returns an error wrappingErrInvalid. Stage B is mid-selectonout <- vand<-ctx.Done(). What happens? -
You call
g.Wait()and getcontext.Canceled. The parent context was not cancelled. What probably happened inside the pipeline? -
A typed error
*StageErrorimplementsUnwrap. The error has been wrapped three times. Doeserrors.As(err, &se)find it? -
Two goroutines panic simultaneously. The second's panic message is logged by your
recoverwrapper. Is the first's panic also logged? -
g.SetLimit(2)is called afterg.Go. What happens? -
You return
nilfrom everyg.Godespite per-item failures. How do you signal failure to the caller? -
A compensator's first attempt fails. The saga retries. The compensator succeeds. Is the data in a consistent state?
-
A stage writes to
result[i]where eachiis unique. Afterg.Wait, you readresult. Is this safe? Why? -
A pipeline's "fast" stage outpaces the "slow" stage. The channel between them is unbuffered. What happens to the fast stage?
-
You use
errors.Jointo combine 1000 errors. The caller callserrors.Is(joined, ErrSpecific). How many comparisons doeserrors.Isperform? -
A panic in a
deferinside a goroutine. The outerrecovercatches it. What's the state of the original return value? -
The pipeline's context has a 10-second deadline. A stage's
db.Querytakes 15 seconds. What error does the pipeline return? -
You set up tracing via context. A stage starts a span. The stage returns; the span is
End()ed. The pipeline continues. Is the span recorded? -
A circuit breaker is open. A goroutine calls
breaker.Call. What's the goroutine's experience? -
Two pipelines share a
*errgroup.Group. One callsWait. What happens?
Answers:
- errgroup captures stage A's error, cancels derived ctx, stage B's
selectfires thectx.Done()branch, B returnsctx.Err(), ignored.g.Waitreturns A's wrappedErrInvalid. - A stage's
selecton<-ctx.Done()fired faster than the "real" error was captured byerrOnce. Race condition; uncommon. Or, a deeper helper caught and returnedctx.Err()instead of propagating the real failure. - Yes.
errors.Aswalks the chain through every wrap. - The first's panic crashed the program. The recover only runs in the panicking goroutine. The second's recover runs because the second goroutine survived (barely) — but in practice the program is terminating; depending on timing, you may see neither, one, or both.
- Panics.
SetLimitmust be called before anyGo. - Return an aggregated error from the orchestrator function. Or return a
Resultstruct with per-item errors and a separate top-level error indicating "some items failed." - Yes, if the compensator is idempotent. The second invocation should be safe regardless of the first's partial success.
- Yes. Each goroutine writes to a unique slot — no concurrent writes to the same memory.
g.Waitprovides happens-before for the read. - Blocks on each send. Slow stage's pace becomes overall throughput. Backpressure.
- Up to 1000.
errors.Iswalks every branch of the multi-unwrap. - If the function had
defer recoveras a named return setter, the panic in thedeferis hidden but the variable was already set. Depends on order. context.DeadlineExceededwrapped through the chain.- Yes. The span ends with its accumulated data. The tracing SDK exports it asynchronously.
- Receives
ErrCircuitOpenimmediately, no actual call made. - Allowed but odd. One pipeline's
Waitwaits for all goroutines from both. Then the group is consumed; the other pipeline can'tWaitagain. Don't share groups.
Appendix: Glossary at Senior Level¶
A few terms used in this file:
- Saga: a long-running transaction composed of forward steps and compensating actions.
- Compensator: an operation that undoes a previously completed step.
- Idempotency: the property that an operation has the same effect on repeat as on first call.
- DLQ: dead-letter queue, where unprocessable items go.
- Bulkhead: an isolation mechanism so failure in one part doesn't affect others.
- Circuit breaker: a mechanism to fail fast when a dependency is failing.
- Backpressure: a mechanism to slow producers to match consumers.
- At-least-once: delivery guarantee where messages may arrive >=1 time.
- Exactly-once: an idealised guarantee where messages arrive exactly once. Approximated by at-least-once + dedup.
- Saga orchestration vs choreography: central coordinator vs event-driven.
- Reservoir sampling: a sampling technique with bounded memory regardless of stream size.
Knowing these terms lets you communicate with other senior engineers and read the literature.
Appendix: Required Reading¶
For the senior-level Go engineer working on pipelines:
- "Patterns of Distributed Systems" — Unmesh Joshi (sagas, leader election, replication).
- "Designing Data-Intensive Applications" — Martin Kleppmann (consistency, fault tolerance).
- "Release It!" — Michael Nygard (stability patterns, circuit breakers, bulkheads).
- "Site Reliability Engineering" — Google (SLOs, error budgets, incident response).
- Go Memory Model spec:
go.dev/ref/mem. golang.org/x/sync/errgroupsource.- Temporal documentation (for workflow concepts, even if you don't use Temporal).
These are the books and docs that senior Go engineers reference. Get familiar with them.
Appendix: A Final Code Snippet¶
The simplest senior-quality pipeline that demonstrates all the patterns:
func Run(ctx context.Context, items []Item, deps Deps) (*Result, error) {
defer func() {
if r := recover(); r != nil {
deps.Logger.Error("panic", "value", r)
}
}()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(deps.Workers)
var (
results = make([]ItemResult, len(items))
errs []error
mu sync.Mutex
)
for i, it := range items {
i, it := i, it
g.Go(func() error {
r, err := processWithRetry(ctx, it, deps, 3)
results[i] = ItemResult{Item: it, Result: r, Err: err}
if err != nil && !errors.Is(err, ErrSkip) {
mu.Lock()
errs = append(errs, fmt.Errorf("%s: %w", it.ID, err))
mu.Unlock()
}
return nil
})
}
if err := g.Wait(); err != nil {
return &Result{Items: results}, err
}
if len(errs) > 0 {
return &Result{Items: results}, errors.Join(errs...)
}
return &Result{Items: results}, nil
}
20 lines. Recovers panics. Bounded parallelism. Per-item retry. Aggregation. Sentinel for skip. Caller gets both successful and failed items.
This is the shape to aim for. Everything else is decoration.
That's senior level. Read professional.md next for distributed-system perspectives.