Error Propagation in Pipelines — Hands-on Tasks¶
Exercises from easy to hard. Each task specifies what to build, success criteria, and a hint. Solution sketches at the end.
Easy¶
Task 1 — Basic errgroup pipeline¶
Write a function Sum(ctx context.Context, nums []int) (int, error) that:
- Spawns a producer goroutine that sends each number into a channel.
- Spawns a consumer goroutine that reads numbers and accumulates a total.
- Uses
errgroupfor coordination. - Returns the total or an error.
Verify with nums = []int{1, 2, 3, 4, 5} returns 15.
Hint. Use defer close(out) in the producer. Use for range in the consumer.
Task 2 — Return early on negative¶
Modify Task 1 so that if any number is negative, the producer returns an error wrapped with %w and a sentinel ErrNegative. The consumer should stop. The function should return 0 and the wrapped error.
Verify with nums = []int{1, 2, -1, 4} returns an error matching errors.Is(err, ErrNegative).
Hint. Return the error from the producer's g.Go function. errgroup cancels the consumer via the context.
Task 3 — Parallel HTTP fetch¶
Write FetchAll(ctx context.Context, urls []string) ([]string, error) that fetches each URL in parallel and returns the bodies in order.
- Use
errgroupwithSetLimit(8). - On first failure, all in-flight fetches are cancelled.
- Wrap errors with the URL so the caller knows which failed.
Hint. Use http.NewRequestWithContext. Results in a pre-allocated slice with goroutine writing to its index.
Task 4 — Drain on cancellation¶
Write a pipeline where stage 1 produces 1000 items but stage 2 fails on item 500. Verify:
- The pipeline exits cleanly (no leaks).
- The error is propagated to the caller.
- Stage 1's goroutine actually exits (use
runtime.NumGoroutinebefore and after).
Hint. select on ctx.Done() in the producer's send.
Medium¶
Task 5 — Per-item retry¶
Write a stage that processes items, each of which may transiently fail. Retry up to 3 times with exponential backoff. If all retries fail, return the error.
Use a fake process function that fails the first 2 times for each item, then succeeds.
Hint. Retry inside the goroutine; don't return from g.Go until retries are exhausted.
Task 6 — Sentinel for skip¶
Write a pipeline where some items should be silently skipped (e.g., empty strings). Use a sentinel error ErrSkip. The middle stage returns ErrSkip for empty items; the next stage uses errors.Is(err, ErrSkip) to skip them, not fail the pipeline.
Hint. Don't return ErrSkip from g.Go; handle it inside the consuming stage.
Task 7 — Fan-out with internal errgroup¶
Write a pipeline stage that consumes items from an input channel and produces results to an output channel. Internally, fan out to 4 workers. If any worker fails, the stage returns the error.
Use a nested errgroup inside the stage's goroutine.
Hint. defer close(out) runs after inner.Wait() returns.
Task 8 — Aggregate errors¶
Write ProcessAll(ctx context.Context, items []Item) error that processes each item in parallel. Continue processing even if some items fail. Return all errors via errors.Join.
Verify with a mix of succeeding and failing items.
Hint. Return nil from each g.Go. Collect errors via a mutex.
Task 9 — Stage attribution with typed error¶
Define a *StageError{Stage string, Err error}. Use it to wrap errors in each stage. The caller can errors.As(err, &se) to find which stage failed.
Write a pipeline with three stages: parse, transform, store. Each wraps errors in *StageError with its name.
Hint. Implement Error() and Unwrap() on the type.
Task 10 — Context deadline¶
Add a 5 * time.Second deadline to a pipeline that processes 100 items, each taking ~100ms. With workers=10, this should complete. With workers=1, it should hit the deadline.
Verify: the second case returns errors.Is(err, context.DeadlineExceeded).
Hint. context.WithTimeout. defer cancel().
Hard¶
Task 11 — Saga with compensating actions¶
Implement a saga with three steps and three compensators. If step 3 fails, run compensators 2 and 1 in reverse. Each compensator may also fail; collect those errors.
Use a slice of Step{Forward, Compensate func(ctx) error}.
Hint. Track which steps completed. On failure, iterate the completed list in reverse calling compensators.
Task 12 — Saga with persistent state¶
Extend Task 11. Persist saga state to a database (or use an in-memory map for simplicity) after each step. If the process restarts mid-saga, resume from the saved state.
Verify: kill the process between steps, restart, observe completion.
Hint. Save state with each step. On startup, load incomplete sagas. Resume.
Task 13 — Panic recovery¶
Write a pipeline where one stage may panic. Use defer recover() to convert the panic to an error returned by g.Go.
Verify: the panic doesn't crash the program; g.Wait() returns the panic-as-error; other stages exit cleanly.
Hint. Named return value lets recover set the error.
Task 14 — Bulkhead per tenant¶
Write a pipeline processing items from N tenants. Each tenant has its own errgroup with SetLimit(8). A failure in tenant A's pipeline does not affect tenant B's.
The top-level coordinator runs all tenants concurrently with another errgroup. Each tenant's failure is recorded but doesn't fail others.
Hint. Outer errgroup returns nil from g.Go to avoid cancellation.
Task 15 — Circuit breaker¶
Implement a simple circuit breaker:
- Closed: requests pass.
- After 5 failures, open. Requests immediately return
ErrCircuitOpen. - After 30 seconds, half-open. One request passes; if succeeds, close; if fails, re-open.
Wire into a pipeline stage. Verify the breaker opens, fails fast, then recovers.
Hint. Atomic counters and a mutex.
Task 16 — Dead-letter queue¶
Implement a DLQ for poison messages. After 3 retries, move the item to DLQ (an in-memory slice). The pipeline continues processing other items.
Verify: items in DLQ are recoverable; pipeline doesn't fail on poison messages.
Hint. DLQ is just a slice with a mutex. Caller can drain it after g.Wait.
Task 17 — Streaming with cursor¶
Implement a stream consumer with cursor tracking. Process events; after each batch, persist the cursor. On restart, resume from the saved cursor.
Use channels for the stream; a fake source that produces events.
Hint. Persist cursor after every N events to limit data loss on crash.
Task 18 — Hedged requests¶
Implement HedgedFetch(ctx context.Context, urls []string) (string, error) that fetches from multiple URLs concurrently. The first to succeed wins; others are cancelled.
If all fail, return errors.Join of all errors.
Hint. context.WithCancel per attempt; cancel siblings on first success.
Task 19 — Streaming aggregation¶
Implement an Aggregator that:
- Reads events from an input channel.
- Buffers up to 100 events or 1 second.
- Flushes a batch to a downstream output channel.
- Errors during flush retry; if persistent, the batch goes to DLQ.
Verify with simulated input including bursts and pauses.
Hint. time.NewTicker for the 1-second flush.
Task 20 — Full integration¶
Combine Tasks 7, 8, 11, 13, and 14 into one pipeline:
- Fan-out parsing.
- Aggregated per-item errors.
- Saga for multi-step business logic.
- Panic recovery in each stage.
- Per-tenant bulkheads.
A small but realistic production pipeline.
Hint. Build incrementally. Test each layer.
Solution Sketches¶
Task 1 sketch¶
func Sum(ctx context.Context, nums []int) (int, error) {
g, ctx := errgroup.WithContext(ctx)
out := make(chan int)
g.Go(func() error {
defer close(out)
for _, n := range nums {
select {
case <-ctx.Done(): return ctx.Err()
case out <- n:
}
}
return nil
})
var total int
g.Go(func() error {
for v := range out {
total += v
}
return nil
})
if err := g.Wait(); err != nil { return 0, err }
return total, nil
}
Task 5 sketch¶
func processWithRetry(ctx context.Context, it Item, maxAttempts int) error {
var lastErr error
for attempt := 0; attempt < maxAttempts; attempt++ {
err := process(ctx, it)
if err == nil { return nil }
if !isTransient(err) { return err }
lastErr = err
wait := time.Duration(1<<attempt) * 100 * time.Millisecond
select {
case <-ctx.Done(): return ctx.Err()
case <-time.After(wait):
}
}
return fmt.Errorf("after %d attempts: %w", maxAttempts, lastErr)
}
Task 11 sketch¶
type Step struct {
Forward func(ctx context.Context) error
Compensate func(ctx context.Context) error
}
func RunSaga(ctx context.Context, steps []Step) error {
var completed []int
for i, s := range steps {
if err := s.Forward(ctx); err != nil {
// rollback
var compErrs []error
for j := len(completed) - 1; j >= 0; j-- {
if cerr := steps[completed[j]].Compensate(ctx); cerr != nil {
compErrs = append(compErrs, cerr)
}
}
if len(compErrs) > 0 {
return fmt.Errorf("saga failed: %w; rollback errors: %w",
err, errors.Join(compErrs...))
}
return fmt.Errorf("saga rolled back: %w", err)
}
completed = append(completed, i)
}
return nil
}
Task 13 sketch¶
g.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()
return riskyStage(ctx)
})
Task 15 sketch¶
type Breaker struct {
mu sync.Mutex
failures int
state int // 0=closed, 1=open, 2=half-open
openAt time.Time
}
const (
StateClosed = iota
StateOpen
StateHalfOpen
)
var ErrCircuitOpen = errors.New("circuit open")
func (b *Breaker) Call(ctx context.Context, fn func() error) error {
b.mu.Lock()
if b.state == StateOpen {
if time.Since(b.openAt) > 30*time.Second {
b.state = StateHalfOpen
} else {
b.mu.Unlock()
return ErrCircuitOpen
}
}
b.mu.Unlock()
err := fn()
b.mu.Lock()
defer b.mu.Unlock()
if err != nil {
b.failures++
if b.failures >= 5 {
b.state = StateOpen
b.openAt = time.Now()
}
} else {
b.failures = 0
b.state = StateClosed
}
return err
}
Verification Tips¶
- Run with
-race:go test -race ./.... Pipelines with hidden races fail. - Run with deadline: cancel after 1 second; verify exit. Pipelines without cancellation honour fail.
- Run with
runtime.NumGoroutine()checks: leak detection. - Stress test: run 1000 times in a tight loop. Flaky behavior surfaces.
Final Note¶
Pipeline correctness is hard. Each of these tasks isolates one concept. Master each before combining. The integration task (Task 20) tests whether you can compose without losing correctness.
These exercises mirror real production needs. The skills you build here transfer directly.
Good luck.
Bonus Tasks¶
Bonus 1 — Convert string-matched to typed errors¶
Given the following code:
Refactor to use errors.Is(err, ErrNotFound) with a proper sentinel. Identify where the wrap chain needs to be preserved.
Bonus 2 — Find the goroutine leak¶
Given a pipeline that "works" but runtime.NumGoroutine() grows on each run, find the leak. Common causes:
- Missing
defer close(out). - Producer ignores
ctx.Done(). g.Waitnever called.- Goroutine spawned but not tracked.
Bonus 3 — Stress test with random failures¶
Take any pipeline you've written. Add a FaultInjector that randomly fails 1% of operations. Run 1000 iterations. Verify:
- The pipeline never hangs.
- Errors are reported, never swallowed.
- No race detector warnings.
Bonus 4 — Build a tiny errgroup from scratch¶
Implement MiniGroup with Go, Wait, and WithContext semantics. Compare to the real errgroup. Note differences.
Bonus 5 — Test cancellation timing¶
Build a pipeline and verify that Wait returns within 100 ms of cancel() being called. If it takes longer, you have a non-cooperative stage.
Discussion Topics¶
After completing the tasks, discuss with a peer:
- Why do all production pipelines use
errgroup.WithContextinstead of bareerrgroup.Group{}? - When would you NOT use first-error-wins?
- How do you decide between
sync.Mutexandatomicfor aggregation? - What's the difference between a goroutine leak and a deadlock?
- When does panic recovery improve robustness vs hide bugs?
- How do you communicate that a compensator is idempotent in code?
- What does it mean for an error to "cross a stage boundary"?
- Why does
g.SetLimitblockGocalls instead of returning an error? - When should you nest errgroups vs flatten?
- How do you test that a pipeline drains cleanly on cancellation?
These are open-ended. Answers vary by context.
Common Mistakes to Avoid¶
When working through the tasks, watch for:
- Forgetting
defer close(out)in producers. - Forgetting
selectonctx.Done()in sends. - Capturing loop variables by reference (pre-Go 1.22).
- Returning before
g.Wait(). - Sharing state across goroutines without sync.
- Using
==to compare wrapped errors. - Recovery without logging.
SetLimitafterGo.
Each of these is a common bug. Practice catches them.
Self-Evaluation Rubric¶
For each task you complete, ask:
- Does it pass
go test -race? - Does it exit promptly on cancellation (< 100 ms)?
- Are errors wrapped with
%wat boundaries? - Are sentinels defined at package level and documented?
- Does each channel have exactly one closer?
- Are all blocking ops cancellable?
- Are tests checking both happy path and failure?
If you can answer yes to all, the task is complete to senior standard.
When Stuck¶
If a task isn't working:
- Re-read the relevant level file. The patterns are explained there.
- Print state. Log when goroutines start, finish, and what error they return.
- Use
runtime.NumGoroutine(). Before and after; see if leaks. - Run with
-race. Hidden races may be the cause. - Simplify. Reduce to the smallest case that exhibits the bug.
- Ask. A peer's fresh eyes often spot what you can't.
Concurrency bugs are notoriously hard. Don't be discouraged. Every senior engineer has spent hours on a bug that turned out to be a missing <-ctx.Done() case.
Closing¶
These exercises mirror real production needs. After completing them, you should be able to:
- Write an error-propagating pipeline from blank file in 10 minutes.
- Spot common bugs in code review.
- Reason about edge cases (cancellation timing, partial failure, retries).
- Architect saga-based flows.
- Test failure paths comprehensively.
That is the working knowledge of pipeline error propagation in Go.
Keep practising. The next pipeline you write at work will be better than this one.