Command Pattern — Hands-on Tasks¶
1. How to use this file¶
Fifteen progressive tasks, ordered from "your first Command" to "design a job queue you would actually deploy". Each task has:
- Goal — one sentence stating what you are building and why.
- Starter — an incomplete sketch. Type it out by hand, do not paste.
- Hints — bullets you can read when stuck. Avoid them on the first pass.
- Reference solution — a collapsible, complete, compileable Go program. Senior decisions are called out in comments inside the code.
The code targets Go 1.22+ (for loop variable scoping, sync.OnceValue, generics, errors.Join). Every solution is self-contained package main unless noted — go run file.go should work.
The arc: start with a bare func() error, grow it into a struct with Execute(ctx), hang queues and retries off it, dispatch it by type with reflect and then with generics, weave it into a saga, dedupe by idempotency key, and finally land on a small but production-shaped job queue.
2. Difficulty legend¶
| Tag | Level | What it means |
|---|---|---|
| B | Beginner | Reach for closures, basic structs, and for range. Roughly junior.md territory. |
| I | Intermediate | Context, channels, decorators, small concurrency. Maps to middle.md. |
| A | Advanced | Generics, reflection, sagas, the outbox pattern, observability. |
| S | Stretch | One bigger project that combines everything. |
3. Tasks¶
Task 1 (B) — A first Command¶
Goal: Define type Command func() error, write three commands as function values, and run them in a loop. This is the smallest possible Command pattern in Go.
Starter:
package main
import "fmt"
type Command func() error
func main() {
cmds := []Command{
// TODO: three function values
}
for _, c := range cmds {
// TODO: call c, print error if any
}
_ = fmt.Sprintf // remove once you use fmt
}
Hints:
- A
Commandis just a function value —func() error { ... }literal works. - The invoker (
main) does not know what each command does. That is the whole point. - Make one of the commands return a real error so you see the failure branch.
Reference solution
package main
import (
"errors"
"fmt"
)
// Command is the smallest Command pattern in Go: just a function value.
// No interface, no struct, no Execute method. This shape is what 90% of
// production Go code looks like when it uses Command.
type Command func() error
func main() {
cmds := []Command{
func() error { fmt.Println("save file"); return nil },
func() error { fmt.Println("send email"); return nil },
func() error { return errors.New("payment declined") },
}
// The invoker has zero knowledge of what each command does.
// It only knows the signature.
for i, c := range cmds {
if err := c(); err != nil {
fmt.Printf("cmd %d failed: %v\n", i, err)
}
}
}
Task 2 (B) — Command with parameters¶
Goal: Use a closure to capture parameters. Write func makeWrite(path, data string) Command that returns a command which, when called, writes data to path.
Starter:
package main
import "os"
type Command func() error
func makeWrite(path, data string) Command {
// TODO: return a closure that writes data to path
return nil
}
func main() {
cmd := makeWrite("/tmp/cmd-task2.txt", "hello")
_ = cmd()
_ = os.Remove
}
Hints:
- The closure captures
pathanddataby reference, but since they are passed by value tomakeWrite, the captured value is stable. os.WriteFile(path, []byte(data), 0o644)is the one-liner.- Building commands this way is how every retry/queue helper in Go ends up looking.
Reference solution
package main
import (
"fmt"
"os"
)
type Command func() error
// makeWrite is a Command factory. The returned closure carries its
// arguments in a private environment — callers cannot mutate them.
// This is the idiomatic Go form of "parameterised request".
func makeWrite(path, data string) Command {
return func() error {
return os.WriteFile(path, []byte(data), 0o644)
}
}
func main() {
cmds := []Command{
makeWrite("/tmp/cmd-task2-a.txt", "alpha"),
makeWrite("/tmp/cmd-task2-b.txt", "beta"),
}
for i, c := range cmds {
if err := c(); err != nil {
fmt.Printf("cmd %d: %v\n", i, err)
}
}
// cleanup so reruns are idempotent
for _, p := range []string{"/tmp/cmd-task2-a.txt", "/tmp/cmd-task2-b.txt"} {
_ = os.Remove(p)
}
}
Task 3 (B) — Struct-based Command¶
Goal: Same behaviour as task 2 but as a struct with Execute() error. You now have two equivalent shapes: a function value and a struct method.
Starter:
package main
type Command interface {
Execute() error
}
type WriteFile struct {
Path, Data string
}
// TODO: implement Execute on *WriteFile or WriteFile (which one, and why?)
func main() {
var c Command = &WriteFile{Path: "/tmp/cmd-task3.txt", Data: "hi"}
_ = c.Execute()
}
Hints:
- Pointer receiver vs value receiver: prefer value here —
WriteFilehas no mutable state and is small. Pointer would also work; the trade-off is allocation vs zero-cost interface satisfaction. - Compile-time guarantee:
var _ Command = WriteFile{}(or(*WriteFile)(nil)for a pointer receiver) tells the compiler to enforce the interface.
Reference solution
package main
import (
"fmt"
"os"
)
type Command interface {
Execute() error
}
// WriteFile carries the request as data. No closures, no captured
// scope — just a struct you can JSON-marshal, log, or store on a queue.
type WriteFile struct {
Path string
Data string
}
// Value receiver: WriteFile is small and immutable; no reason to allocate.
func (w WriteFile) Execute() error {
return os.WriteFile(w.Path, []byte(w.Data), 0o644)
}
// Compile-time check that the interface is satisfied. Saves a debugging trip
// when someone later changes the method signature.
var _ Command = WriteFile{}
func main() {
cmds := []Command{
WriteFile{Path: "/tmp/cmd-task3-a.txt", Data: "alpha"},
WriteFile{Path: "/tmp/cmd-task3-b.txt", Data: "beta"},
}
for _, c := range cmds {
if err := c.Execute(); err != nil {
fmt.Println("execute:", err)
}
}
for _, p := range []string{"/tmp/cmd-task3-a.txt", "/tmp/cmd-task3-b.txt"} {
_ = os.Remove(p)
}
}
Task 4 (B) — Compare both forms¶
Goal: Write the same SendEmail command in two shapes — once as a closure-producing factory, once as a struct with Execute(). In comments, note which is shorter and what is lost in each.
Starter:
package main
// closure form
type CommandFunc func() error
// TODO: makeSendEmail(to, subject, body string) CommandFunc
// struct form
type Command interface{ Execute() error }
// TODO: type SendEmail struct{ ... }; func (s SendEmail) Execute() error
Hints:
- Closure form: shortest, but the closure is opaque — you cannot print it, serialize it to JSON, or ask "what command is this?".
- Struct form: more types, but you get
fmt.Printf("%+v", cmd)for free, you can store it on a Redis queue as JSON, and you can switch on its type. - Neither is "better" — choose by whether the command crosses a boundary.
Reference solution
package main
import (
"encoding/json"
"fmt"
)
// ---------------- closure form ----------------
type CommandFunc func() error
// Shortest possible. Cannot inspect, name, or serialize.
func makeSendEmail(to, subject, body string) CommandFunc {
return func() error {
fmt.Printf("[closure] sending to=%s subject=%q\n", to, subject)
_ = body
return nil
}
}
// ---------------- struct form ----------------
type Command interface {
Execute() error
}
// More boilerplate; in exchange you get reflection, JSON, and pretty-print.
type SendEmail struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
func (s SendEmail) Execute() error {
fmt.Printf("[struct] sending to=%s subject=%q\n", s.To, s.Subject)
return nil
}
func main() {
cf := makeSendEmail("a@b.c", "hi", "body")
_ = cf()
// The closure is opaque — best we can do is the function pointer.
fmt.Printf("closure value: %T\n", cf)
se := SendEmail{To: "a@b.c", Subject: "hi", Body: "body"}
_ = se.Execute()
fmt.Printf("struct value: %+v\n", se)
// And here is the killer feature of the struct form:
payload, _ := json.Marshal(se)
fmt.Printf("serialized: %s\n", payload)
// A closure cannot do this. encoding/json gives up on func types.
// Rule of thumb:
// - in-process, ephemeral, defined-where-used -> closure
// - crosses a queue / disk / network, needs name+log -> struct
}
Task 5 (I) — Command with context¶
Goal: Convert task 3 to take Execute(ctx context.Context) error. Make a SleepyJob that respects cancellation: if the context is cancelled while sleeping, return ctx.Err().
Starter:
package main
import (
"context"
"time"
)
type Command interface {
Execute(ctx context.Context) error
}
type SleepyJob struct {
Dur time.Duration
}
// TODO: implement Execute that returns on either time.After(Dur) OR ctx.Done()
Hints:
select { case <-time.After(d): return nil; case <-ctx.Done(): return ctx.Err() }.- A command that does not take a
ctxcannot be cancelled. For anything that does I/O or sleeps, that is a liability. - Demo with
context.WithTimeout(ctx, 100ms)against aSleepyJob{Dur: 1s}and see the timeout fire.
Reference solution
package main
import (
"context"
"fmt"
"time"
)
type Command interface {
Execute(ctx context.Context) error
}
// SleepyJob simulates a long-running command that respects cancellation.
// Real commands wrap http.Get, db.Query, etc — all of which take ctx.
type SleepyJob struct {
Name string
Dur time.Duration
}
func (j SleepyJob) Execute(ctx context.Context) error {
select {
case <-time.After(j.Dur):
fmt.Printf("%s: done\n", j.Name)
return nil
case <-ctx.Done():
// Senior decision: return ctx.Err() directly so callers can
// errors.Is(err, context.Canceled) / context.DeadlineExceeded.
return fmt.Errorf("%s: %w", j.Name, ctx.Err())
}
}
func main() {
short, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
cmds := []Command{
SleepyJob{Name: "fast", Dur: 10 * time.Millisecond},
SleepyJob{Name: "slow", Dur: 1 * time.Second}, // will be cancelled
}
for _, c := range cmds {
if err := c.Execute(short); err != nil {
fmt.Println("error:", err)
}
}
}
Task 6 (I) — A small worker pool¶
Goal: Spawn N workers that read commands from chan Command and execute them. Graceful shutdown via context. Closing the channel must drain remaining work.
Starter:
package main
import (
"context"
"sync"
)
type Command func(ctx context.Context) error
type Pool struct {
jobs chan Command
wg sync.WaitGroup
}
// TODO: NewPool(n int) *Pool; spawn n workers
// TODO: (p *Pool) Submit(c Command)
// TODO: (p *Pool) Stop() — close jobs, wait for workers
Hints:
- Each worker is a
goloopfor c := range p.jobs { c(ctx) }. Stop()closesp.jobsandwg.Wait()s. Closing the channel signals the range loop to exit when drained.- For cancellation mid-job, the worker's loop should also
selectonctx.Done()— your call whether to drop in-flight work or wait it out.
Reference solution
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Command func(ctx context.Context) error
// Pool is a fixed-size worker pool reading Commands from a channel.
// Real production pools add: per-job timeout, metrics, panic recovery,
// priority queue, dead-letter — see task 15 for the full shape.
type Pool struct {
jobs chan Command
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewPool(parent context.Context, workers, queueSize int) *Pool {
ctx, cancel := context.WithCancel(parent)
p := &Pool{
jobs: make(chan Command, queueSize), // bounded — see senior checklist
ctx: ctx,
cancel: cancel,
}
for i := 0; i < workers; i++ {
p.wg.Add(1)
go p.worker(i)
}
return p
}
func (p *Pool) worker(id int) {
defer p.wg.Done()
for c := range p.jobs {
if err := c(p.ctx); err != nil {
fmt.Printf("worker %d: %v\n", id, err)
}
}
}
func (p *Pool) Submit(c Command) {
// No select on ctx here — caller might want backpressure.
p.jobs <- c
}
// Stop closes the queue and waits for in-flight work. To abort in-flight
// work too, call p.cancel() first.
func (p *Pool) Stop() {
close(p.jobs)
p.wg.Wait()
p.cancel()
}
func main() {
p := NewPool(context.Background(), 3, 8)
for i := 0; i < 6; i++ {
i := i
p.Submit(func(ctx context.Context) error {
time.Sleep(20 * time.Millisecond)
fmt.Printf("job %d done\n", i)
return nil
})
}
p.Stop()
fmt.Println("pool stopped")
}
Task 7 (I) — Reversible commands¶
Goal: Define type Reversible struct { Do, Undo func() error }. Build a History with Push(r) (runs Do then records), UndoLast() error, and Len() int. Think of it as a tiny editor history.
Starter:
package main
type Reversible struct {
Do, Undo func() error
}
type History struct {
done []Reversible
}
// TODO: Push, UndoLast, Len
Hints:
PushrunsDo; only on success does it append todone.UndoLastpops the tail and runs itsUndo.- This is the foundation for editors, schema migrations, and saga compensations.
Reference solution
package main
import (
"errors"
"fmt"
)
// Reversible is a command that can be undone. Production examples:
// migration up/down, editor insert/delete, configuration commit/rollback.
type Reversible struct {
Name string
Do func() error
Undo func() error
}
type History struct {
done []Reversible
}
func (h *History) Push(r Reversible) error {
if err := r.Do(); err != nil {
// Senior decision: do not record half-applied commands.
return fmt.Errorf("Do %s: %w", r.Name, err)
}
h.done = append(h.done, r)
return nil
}
func (h *History) UndoLast() error {
if len(h.done) == 0 {
return errors.New("history empty")
}
last := h.done[len(h.done)-1]
h.done = h.done[:len(h.done)-1]
if err := last.Undo(); err != nil {
// Best practice: do NOT re-push a failed undo back on the stack.
// The state is unknown; surface to the caller.
return fmt.Errorf("Undo %s: %w", last.Name, err)
}
return nil
}
func (h *History) Len() int { return len(h.done) }
func main() {
var x int
h := &History{}
_ = h.Push(Reversible{
Name: "+5",
Do: func() error { x += 5; return nil },
Undo: func() error { x -= 5; return nil },
})
_ = h.Push(Reversible{
Name: "*2",
Do: func() error { x *= 2; return nil },
Undo: func() error { x /= 2; return nil },
})
fmt.Println("after pushes:", x) // 10
_ = h.UndoLast()
fmt.Println("after one undo:", x) // 5
_ = h.UndoLast()
fmt.Println("after two undos:", x) // 0
fmt.Println("history len:", h.Len())
}
Task 8 (I) — Command queue with retries¶
Goal: Build a queue where every failed command is re-enqueued up to N times. After N failures, it is dropped with a structured log line.
Starter:
package main
import "context"
type Command func(ctx context.Context) error
type job struct {
cmd Command
attempts int
}
type RetryQueue struct {
jobs chan job
max int
}
// TODO: NewRetryQueue, Submit(cmd), run loop that re-enqueues on failure
Hints:
- Use a buffered channel so re-enqueue does not deadlock with a single worker.
- Increment
attemptson every failure; drop whenattempts >= max. - Add a backoff:
time.Sleep(time.Duration(j.attempts) * 50ms)before re-submitting, otherwise a permanently-failing command burns CPU.
Reference solution
package main
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"sync"
"time"
)
type Command func(ctx context.Context) error
type job struct {
name string
cmd Command
attempts int
}
type RetryQueue struct {
jobs chan job
max int
log *slog.Logger
wg sync.WaitGroup
}
func NewRetryQueue(buf, maxAttempts int) *RetryQueue {
return &RetryQueue{
jobs: make(chan job, buf),
max: maxAttempts,
log: slog.New(slog.NewTextHandler(os.Stdout, nil)),
}
}
func (q *RetryQueue) Submit(name string, c Command) {
q.jobs <- job{name: name, cmd: c}
}
func (q *RetryQueue) Run(ctx context.Context, workers int) {
for i := 0; i < workers; i++ {
q.wg.Add(1)
go q.worker(ctx, i)
}
}
func (q *RetryQueue) worker(ctx context.Context, id int) {
defer q.wg.Done()
for {
select {
case <-ctx.Done():
return
case j, ok := <-q.jobs:
if !ok {
return
}
err := j.cmd(ctx)
if err == nil {
q.log.Info("ok", "name", j.name, "attempts", j.attempts+1, "worker", id)
continue
}
j.attempts++
if j.attempts >= q.max {
// Senior decision: structured log so this is queryable in
// production. The "dropped" event is the signal ops needs.
q.log.Error("dropped", "name", j.name, "attempts", j.attempts, "err", err.Error())
continue
}
// Linear backoff. In production, exponential + jitter.
time.Sleep(time.Duration(j.attempts) * 50 * time.Millisecond)
// Non-blocking re-enqueue — if the channel is full, we drop.
select {
case q.jobs <- j:
default:
q.log.Error("requeue-full", "name", j.name, "attempts", j.attempts)
}
}
}
}
func (q *RetryQueue) Stop() {
close(q.jobs)
q.wg.Wait()
}
func main() {
q := NewRetryQueue(16, 3)
q.Run(context.Background(), 2)
var tries int
q.Submit("flaky", func(ctx context.Context) error {
tries++
if tries < 3 {
return errors.New("transient")
}
return nil
})
q.Submit("always-fails", func(ctx context.Context) error {
return errors.New("permanent")
})
time.Sleep(500 * time.Millisecond) // wait for retries to settle
q.Stop()
fmt.Println("done")
}
Task 9 (I) — Command logging decorator¶
Goal: Write a decorator that wraps any command and logs its name, duration, and error. This is where Command meets Decorator.
Starter:
package main
import (
"context"
"time"
)
type Command func(ctx context.Context) error
// TODO: func Logged(name string, c Command) Command
// must: log name, run c, measure duration, log err (if any), return err
Hints:
- The decorator returns a new
Commandthat wraps the inner one. start := time.Now(); err := c(ctx); dur := time.Since(start).- Use
log/slogso the output is structured — that becomes important when 10k jobs per minute are running. - Returning the original error untouched is mandatory; the decorator must not swallow.
Reference solution
package main
import (
"context"
"errors"
"log/slog"
"os"
"time"
)
type Command func(ctx context.Context) error
var log = slog.New(slog.NewTextHandler(os.Stdout, nil))
// Logged wraps a Command, recording name + duration + error. Pure
// decorator: it must never alter the error returned by the inner command.
func Logged(name string, c Command) Command {
return func(ctx context.Context) error {
start := time.Now()
err := c(ctx)
attrs := []any{
"cmd", name,
"dur_ms", time.Since(start).Milliseconds(),
}
if err != nil {
attrs = append(attrs, "err", err.Error())
log.Error("cmd done", attrs...)
} else {
log.Info("cmd done", attrs...)
}
return err // do NOT swallow
}
}
// Timed enforces a per-command deadline. Compose decorators by nesting.
func Timed(d time.Duration, c Command) Command {
return func(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, d)
defer cancel()
return c(ctx)
}
}
func main() {
okCmd := Logged("fast", Timed(50*time.Millisecond, func(ctx context.Context) error {
time.Sleep(5 * time.Millisecond)
return nil
}))
failCmd := Logged("flaky", func(ctx context.Context) error {
return errors.New("kaboom")
})
_ = okCmd(context.Background())
_ = failCmd(context.Background())
}
Task 10 (A) — Command Bus with dispatch by type¶
Goal: Build a Bus that routes commands to handlers based on their concrete type, using reflect.TypeOf. Register handlers for Ping and Echo; the Bus picks the right one.
Starter:
package main
import "reflect"
type Bus struct {
handlers map[reflect.Type]func(any) error
}
type Ping struct{}
type Echo struct{ Msg string }
// TODO: NewBus, Register(cmd any, h func(any) error), Send(cmd any) error
Hints:
b.handlers[reflect.TypeOf(cmd)] = hto register.b.handlers[reflect.TypeOf(cmd)](cmd)to send.- Unknown command type must produce a clean error, not a nil-handler panic.
- This is the foundation of CQRS in many Go codebases — and the reason for task 11, which removes the
any.
Reference solution
package main
import (
"fmt"
"reflect"
)
// Bus dispatches commands to handlers keyed by concrete type. This is
// the classic CQRS dispatcher. The `any` in the handler signature is
// the cost of working at runtime; task 11 fixes that with generics.
type Bus struct {
handlers map[reflect.Type]func(any) error
}
func NewBus() *Bus {
return &Bus{handlers: map[reflect.Type]func(any) error{}}
}
func (b *Bus) Register(sample any, h func(any) error) {
b.handlers[reflect.TypeOf(sample)] = h
}
func (b *Bus) Send(cmd any) error {
t := reflect.TypeOf(cmd)
h, ok := b.handlers[t]
if !ok {
// Typed error so callers can distinguish "wrong command" from
// "handler failed".
return fmt.Errorf("no handler for %s", t)
}
return h(cmd)
}
type Ping struct{}
type Echo struct{ Msg string }
func main() {
bus := NewBus()
bus.Register(Ping{}, func(_ any) error {
fmt.Println("pong")
return nil
})
bus.Register(Echo{}, func(c any) error {
e := c.(Echo) // safe: dispatcher only calls us for Echo
fmt.Println("echo:", e.Msg)
return nil
})
_ = bus.Send(Ping{})
_ = bus.Send(Echo{Msg: "hi"})
if err := bus.Send(42); err != nil {
fmt.Println("expected error:", err)
}
}
Task 11 (A) — Generics Command Bus (Go 1.18+)¶
Goal: Same Bus as task 10, but type-safe. Register[T any](bus *Bus, h func(ctx, T) error) and Send[T any](bus *Bus, ctx, cmd T) error. No any at the call site, no type assertions.
Starter:
package main
import (
"context"
"reflect"
)
type Bus struct {
handlers map[reflect.Type]any // still untyped inside the bus
}
// TODO: NewBus
// TODO: func Register[T any](b *Bus, h func(context.Context, T) error)
// TODO: func Send[T any](b *Bus, ctx context.Context, cmd T) error
Hints:
- Methods cannot have their own type parameters in Go (yet), so
RegisterandSendare top-level functions that take the bus as an argument. This is the standard workaround. - Inside the bus, you still store
any(the type-erased handler). The generics only protect the boundary. reflect.TypeFor[T]()is a Go 1.22+ helper that avoids constructing a zeroT.
Reference solution
package main
import (
"context"
"fmt"
"reflect"
)
// Bus is type-erased internally but the Register/Send helpers below give
// callers a type-safe surface. This is the canonical "generic CQRS bus"
// shape since Go 1.18.
type Bus struct {
handlers map[reflect.Type]any
}
func NewBus() *Bus { return &Bus{handlers: map[reflect.Type]any{}} }
// Register binds a typed handler for command type T.
// Note: methods can't be generic, so Register is a free function.
func Register[T any](b *Bus, h func(context.Context, T) error) {
b.handlers[reflect.TypeFor[T]()] = h
}
// Send dispatches a typed command. The type parameter is usually
// inferred from cmd, so callers write `Send(bus, ctx, MyCommand{...})`.
func Send[T any](b *Bus, ctx context.Context, cmd T) error {
h, ok := b.handlers[reflect.TypeFor[T]()]
if !ok {
return fmt.Errorf("no handler for %T", cmd)
}
// Safe: we only stored handler funcs of the matching type.
return h.(func(context.Context, T) error)(ctx, cmd)
}
// --- commands ---
type CreateUser struct {
Email string
}
type DeleteUser struct {
ID string
}
func main() {
bus := NewBus()
Register(bus, func(ctx context.Context, c CreateUser) error {
fmt.Println("creating user:", c.Email)
return nil
})
Register(bus, func(ctx context.Context, c DeleteUser) error {
fmt.Println("deleting user:", c.ID)
return nil
})
ctx := context.Background()
_ = Send(bus, ctx, CreateUser{Email: "a@b.c"})
_ = Send(bus, ctx, DeleteUser{ID: "u1"})
// Compile-time safety: passing a struct without a handler still
// compiles but fails at runtime — generics don't track registration.
type Unknown struct{}
if err := Send(bus, ctx, Unknown{}); err != nil {
fmt.Println("expected:", err)
}
}
Task 12 (A) — Saga executor¶
Goal: Run a slice of Step{Name, Forward, Compensate func(ctx) error}. If a step's Forward fails, run the Compensate of every previously-successful step in reverse. This is the saga pattern, used for cross-service transactions.
Starter:
package main
import "context"
type Step struct {
Name string
Forward func(ctx context.Context) error
Compensate func(ctx context.Context) error
}
// TODO: func RunSaga(ctx context.Context, steps []Step) error
Hints:
- Keep a
doneslice of indices (or steps) you have applied. - On failure at step
i, walkdonein reverse and call eachCompensate. - A compensation that itself fails is a Big Deal — log it, but keep walking (you cannot stop midway through a rollback). Return both the original error and a summary of the compensation failures.
Reference solution
package main
import (
"context"
"errors"
"fmt"
)
type Step struct {
Name string
Forward func(ctx context.Context) error
Compensate func(ctx context.Context) error
}
// RunSaga executes steps in order. On the first Forward failure, it
// rolls back by running Compensate on every previously-successful step
// in reverse. A failing Compensate is logged but does NOT stop the
// rollback — you cannot half-rollback a distributed transaction.
func RunSaga(ctx context.Context, steps []Step) error {
var done []Step
for _, s := range steps {
if err := s.Forward(ctx); err != nil {
rbErr := compensate(ctx, done)
if rbErr != nil {
return fmt.Errorf("saga: %s: %w; rollback errors: %v", s.Name, err, rbErr)
}
return fmt.Errorf("saga: %s: %w", s.Name, err)
}
done = append(done, s)
}
return nil
}
func compensate(ctx context.Context, done []Step) error {
var errs []error
for i := len(done) - 1; i >= 0; i-- {
if err := done[i].Compensate(ctx); err != nil {
errs = append(errs, fmt.Errorf("compensate %s: %w", done[i].Name, err))
}
}
return errors.Join(errs...) // errors.Join(nil) returns nil
}
func main() {
ctx := context.Background()
booked := map[string]bool{}
steps := []Step{
{
Name: "book-flight",
Forward: func(_ context.Context) error { booked["flight"] = true; return nil },
Compensate: func(_ context.Context) error { delete(booked, "flight"); return nil },
},
{
Name: "book-hotel",
Forward: func(_ context.Context) error { booked["hotel"] = true; return nil },
Compensate: func(_ context.Context) error { delete(booked, "hotel"); return nil },
},
{
Name: "book-car",
Forward: func(_ context.Context) error { return errors.New("no cars left") },
Compensate: func(_ context.Context) error { return nil },
},
}
if err := RunSaga(ctx, steps); err != nil {
fmt.Println("saga failed:", err)
}
fmt.Println("final state:", booked) // empty: flight and hotel rolled back
}
Task 13 (A) — Outbox pattern (in-memory)¶
Goal: Implement the transactional outbox pattern. A "transactional" function writes both the state change and an outbox record under the same lock. A background pump reads the outbox and dispatches the commands. This is how reliable event publishing is built when you do not have distributed transactions.
Starter:
package main
import (
"context"
"sync"
)
type OutboxEntry struct {
ID int
Cmd func(ctx context.Context) error
}
type Store struct {
mu sync.Mutex
state map[string]string
outbox []OutboxEntry
nextID int
}
// TODO: (s *Store) TransactWrite(key, val string, cmd) — atomic state + outbox
// TODO: (s *Store) Pump(ctx) — drain outbox, dispatch commands, remove on success
Hints:
- The whole point: the state write and the outbox append happen under the same mutex (or, in real life, the same SQL transaction). This makes them effectively atomic.
- The pump runs in its own goroutine. On success, remove the entry; on failure, leave it for retry.
- Do not hold the mutex while running a command — only while reading or writing the outbox.
Reference solution
package main
import (
"context"
"fmt"
"sync"
"time"
)
type OutboxEntry struct {
ID int
Cmd func(ctx context.Context) error
// In a real outbox: serialized payload, type tag, created_at, attempts.
}
type Store struct {
mu sync.Mutex
state map[string]string
outbox []OutboxEntry
next int
}
func NewStore() *Store {
return &Store{state: map[string]string{}}
}
// TransactWrite is the "atomic" boundary. In production this is a single
// SQL transaction that INSERTs both the state and the outbox row.
func (s *Store) TransactWrite(key, val string, cmd func(ctx context.Context) error) {
s.mu.Lock()
defer s.mu.Unlock()
s.state[key] = val
s.next++
s.outbox = append(s.outbox, OutboxEntry{ID: s.next, Cmd: cmd})
}
// dequeue returns a snapshot of pending entries.
func (s *Store) dequeue() []OutboxEntry {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]OutboxEntry, len(s.outbox))
copy(out, s.outbox)
return out
}
// removeIDs deletes successfully-dispatched entries.
func (s *Store) removeIDs(ids map[int]struct{}) {
s.mu.Lock()
defer s.mu.Unlock()
kept := s.outbox[:0]
for _, e := range s.outbox {
if _, done := ids[e.ID]; !done {
kept = append(kept, e)
}
}
s.outbox = kept
}
// Pump loops until ctx is done, dispatching outbox entries.
func (s *Store) Pump(ctx context.Context) {
tick := time.NewTicker(20 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
entries := s.dequeue()
ok := map[int]struct{}{}
for _, e := range entries {
if err := e.Cmd(ctx); err == nil {
ok[e.ID] = struct{}{}
} else {
fmt.Printf("outbox %d failed (will retry): %v\n", e.ID, err)
}
}
if len(ok) > 0 {
s.removeIDs(ok)
}
}
}
}
func main() {
s := NewStore()
ctx, cancel := context.WithCancel(context.Background())
go s.Pump(ctx)
s.TransactWrite("user:1", "Alice", func(ctx context.Context) error {
fmt.Println("publish UserCreated(1)")
return nil
})
s.TransactWrite("user:2", "Bob", func(ctx context.Context) error {
fmt.Println("publish UserCreated(2)")
return nil
})
time.Sleep(80 * time.Millisecond)
cancel()
fmt.Println("remaining outbox:", len(s.outbox))
}
Task 14 (A) — Idempotent commands¶
Goal: Each command exposes an IdempotencyKey() string. The dispatcher dedupes: if a command with the same key arrives within a TTL window, it is silently dropped and the cached result is returned. This is how Stripe-Idempotency-Key works.
Starter:
package main
import (
"context"
"sync"
"time"
)
type Idempotent interface {
IdempotencyKey() string
Execute(ctx context.Context) error
}
type Dispatcher struct {
mu sync.Mutex
seen map[string]time.Time
ttl time.Duration
}
// TODO: NewDispatcher(ttl)
// TODO: (d *Dispatcher) Dispatch(ctx, cmd) — dedupe by key, gc expired entries
Hints:
- On dispatch, look up the key. If present and not expired, return early.
- Caching the result (not just the key) is a real-world refinement — for this task, returning
nilon the cached path is fine. - GC: when checking the map, evict any entry whose TTL has passed. Do it inline so the map does not grow forever.
Reference solution
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Idempotent interface {
IdempotencyKey() string
Execute(ctx context.Context) error
}
type Dispatcher struct {
mu sync.Mutex
seen map[string]time.Time
ttl time.Duration
}
func NewDispatcher(ttl time.Duration) *Dispatcher {
return &Dispatcher{
seen: map[string]time.Time{},
ttl: ttl,
}
}
// Dispatch returns nil and skips execution if the same key was seen
// within ttl. A production version would also cache the original result
// so retries return the same error/payload, not just nil.
func (d *Dispatcher) Dispatch(ctx context.Context, cmd Idempotent) error {
key := cmd.IdempotencyKey()
now := time.Now()
d.mu.Lock()
// Inline GC. For very large maps, run a goroutine that sweeps
// periodically instead of doing work on every dispatch.
for k, t := range d.seen {
if now.Sub(t) > d.ttl {
delete(d.seen, k)
}
}
if _, dup := d.seen[key]; dup {
d.mu.Unlock()
fmt.Println("skipped duplicate:", key)
return nil
}
d.seen[key] = now
d.mu.Unlock()
return cmd.Execute(ctx)
}
// --- a concrete command ---
type ChargeCard struct {
OrderID string
Amount int
}
func (c ChargeCard) IdempotencyKey() string {
return "charge:" + c.OrderID
}
func (c ChargeCard) Execute(_ context.Context) error {
fmt.Printf("charging order=%s amount=%d\n", c.OrderID, c.Amount)
return nil
}
func main() {
d := NewDispatcher(200 * time.Millisecond)
ctx := context.Background()
_ = d.Dispatch(ctx, ChargeCard{OrderID: "o-1", Amount: 1000})
_ = d.Dispatch(ctx, ChargeCard{OrderID: "o-1", Amount: 1000}) // dropped
_ = d.Dispatch(ctx, ChargeCard{OrderID: "o-2", Amount: 500})
time.Sleep(250 * time.Millisecond)
// TTL has expired -> the same key is accepted again.
_ = d.Dispatch(ctx, ChargeCard{OrderID: "o-1", Amount: 1000})
}
Task 15 (S) — Mini job queue with everything¶
Goal: Combine the previous pieces into a small but production-shaped job queue. Workers, bounded queue, per-job timeout, panic recovery, retries with linear backoff, dead-letter for jobs that exceed the retry budget, structured logging with durations, graceful shutdown via context.
Starter:
package main
// type Job interface { Name() string; Run(ctx context.Context) error }
// type Queue struct{ /* jobs, dlq, workers, log */ }
// (q *Queue) Submit(j Job) error
// (q *Queue) Shutdown(ctx context.Context) error
// internals: per-job timeout, recover(), retries, dead-letter
Hints:
- Define
Jobas a named struct interface (Name()+Run(ctx)), not a closure — you want to log the name and route by type. - Each worker wraps the
Runcall in adefer recover()so a panicking handler does not bring the pool down. - A separate
dlq chan Job(or a slice protected by mutex) holds dead-letter jobs. - The submit path must respect
ctxso callers can refuse to enqueue once shutdown is in progress.
Reference solution
package main
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"runtime/debug"
"sync"
"time"
)
// Job is the unit of work. Struct form (not a bare func) because we want
// to log Name() and could later serialize Job to a real queue.
type Job interface {
Name() string
Run(ctx context.Context) error
}
type wrapped struct {
job Job
attempts int
}
type Queue struct {
jobs chan wrapped
dlq chan Job
log *slog.Logger
workerCnt int
maxTries int
jobTimeout time.Duration
wg sync.WaitGroup
closed chan struct{}
once sync.Once
}
type Config struct {
Workers int
QueueSize int
DLQSize int
MaxTries int
JobTimeout time.Duration
}
func NewQueue(cfg Config) *Queue {
return &Queue{
jobs: make(chan wrapped, cfg.QueueSize),
dlq: make(chan Job, cfg.DLQSize),
log: slog.New(slog.NewTextHandler(os.Stdout, nil)),
workerCnt: cfg.Workers,
maxTries: cfg.MaxTries,
jobTimeout: cfg.JobTimeout,
closed: make(chan struct{}),
}
}
func (q *Queue) Start(ctx context.Context) {
for i := 0; i < q.workerCnt; i++ {
q.wg.Add(1)
go q.worker(ctx, i)
}
}
func (q *Queue) Submit(j Job) error {
select {
case <-q.closed:
return errors.New("queue: shutting down")
case q.jobs <- wrapped{job: j}:
return nil
}
}
// DLQ returns dead-letter jobs for inspection.
func (q *Queue) DLQ() <-chan Job { return q.dlq }
// runOne executes a Job with per-job timeout and panic recovery.
// Critical: a panicking handler must not kill the worker.
func (q *Queue) runOne(ctx context.Context, j Job) (err error) {
ctx, cancel := context.WithTimeout(ctx, q.jobTimeout)
defer cancel()
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v\n%s", r, debug.Stack())
}
}()
return j.Run(ctx)
}
func (q *Queue) worker(ctx context.Context, id int) {
defer q.wg.Done()
for {
select {
case <-ctx.Done():
return
case w, ok := <-q.jobs:
if !ok {
return
}
start := time.Now()
err := q.runOne(ctx, w.job)
dur := time.Since(start)
if err == nil {
q.log.Info("job ok",
"name", w.job.Name(),
"attempts", w.attempts+1,
"dur_ms", dur.Milliseconds(),
"worker", id)
continue
}
w.attempts++
q.log.Warn("job failed",
"name", w.job.Name(),
"attempts", w.attempts,
"dur_ms", dur.Milliseconds(),
"err", err.Error())
if w.attempts >= q.maxTries {
select {
case q.dlq <- w.job:
q.log.Error("job dead-lettered", "name", w.job.Name())
default:
q.log.Error("dlq full, job dropped", "name", w.job.Name())
}
continue
}
// Linear backoff. Production: exponential + jitter.
backoff := time.Duration(w.attempts) * 50 * time.Millisecond
time.AfterFunc(backoff, func() {
select {
case q.jobs <- w:
case <-q.closed:
// dropping during shutdown is acceptable
}
})
}
}
}
// Shutdown stops accepting new jobs and waits for in-flight ones to
// finish (bounded by ctx). After Shutdown returns, no goroutines remain.
func (q *Queue) Shutdown(ctx context.Context) error {
q.once.Do(func() {
close(q.closed)
close(q.jobs)
})
done := make(chan struct{})
go func() {
q.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// --- demo jobs ---
type SendEmail struct{ To, Subject string }
func (s SendEmail) Name() string { return "send-email" }
func (s SendEmail) Run(_ context.Context) error {
fmt.Printf("[email] %s -> %s\n", s.Subject, s.To)
return nil
}
type FlakyJob struct {
ID int
tries *int
}
func (f FlakyJob) Name() string { return fmt.Sprintf("flaky-%d", f.ID) }
func (f FlakyJob) Run(_ context.Context) error {
*f.tries++
if *f.tries < 3 {
return errors.New("transient")
}
return nil
}
type PanicJob struct{}
func (PanicJob) Name() string { return "panic-job" }
func (PanicJob) Run(_ context.Context) error { panic("nope") }
type AlwaysFails struct{}
func (AlwaysFails) Name() string { return "always-fails" }
func (AlwaysFails) Run(_ context.Context) error { return errors.New("permanent") }
func main() {
q := NewQueue(Config{
Workers: 3,
QueueSize: 32,
DLQSize: 8,
MaxTries: 3,
JobTimeout: 500 * time.Millisecond,
})
q.Start(context.Background())
var tries int
_ = q.Submit(SendEmail{To: "a@b.c", Subject: "hi"})
_ = q.Submit(FlakyJob{ID: 1, tries: &tries})
_ = q.Submit(PanicJob{}) // must NOT kill the pool
_ = q.Submit(AlwaysFails{}) // ends up in DLQ
// Drain DLQ in a goroutine so the demo can show it.
go func() {
for j := range q.DLQ() {
fmt.Println("DLQ inspector got:", j.Name())
}
}()
time.Sleep(700 * time.Millisecond)
shutCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := q.Shutdown(shutCtx); err != nil {
fmt.Println("shutdown:", err)
}
fmt.Println("queue stopped cleanly")
}
4. How to grade yourself¶
Walk through your code and tick each box. If you cannot tick a box, the senior-level version of the task is not done yet.
- Every command that touches I/O takes
context.Contextand respects cancellation. - Worker handlers wrap user code in
defer recover()so a panicking job does not take down the pool. - Every channel is bounded — no unbounded
make(chan T)in production paths. - Retries use bounded attempts plus backoff; permanent failures go to a dead-letter sink, not
log.Fatal. - Logs are structured (
slog), include the command name, duration, attempt count, and error. - Naming reflects role:
Command,Job,Bus,Dispatcher,Queue— notManagerorHandlereverywhere. - Decorators (logging, timeout, retry) are separate functions that wrap commands, not extra fields on every command struct.
- Idempotency keys exist for any command that may be retried across a network boundary.
- Generics are used where they remove type assertions at the call site, not as decoration.
- Tests use hand-rolled fakes against interfaces; no
*sql.DBin unit tests. - Reversible commands keep
Undoas a pure inverse — no extra side effects, no logging, no metrics. - Shutdown is explicit (
ctxorShutdown(ctx)); there is noos.Exitfrom inside a worker.
5. Stretch challenges¶
Pick one and ship it end-to-end.
-
Redis-backed queue. Replace the in-memory
chan Jobwith a Redis list (LPUSH/BRPOP). Jobs are JSON-encoded{Type, Payload}envelopes; a registry mapsTypeto a typed handler. Add a separate Redis list for the DLQ. Compare throughput against the in-memory version withgo test -bench. -
Tracing through ctx. Instrument every command with OpenTelemetry spans propagated through
ctx. Each decorator (timeout, retry, log) becomes its own span. Verify that a single submitted command produces a parent span with one child per decorator and one for the user code. -
CLI replay tool. Write a CLI that reads a JSON file of
[{Type, Payload}, ...]and replays each command through the dispatcher from task 11. Add a--dry-runflag that calls a no-op handler and prints what would run. Bonus: a--from-offset Nflag so a crashed batch can resume.
The common thread: each stretch turns the in-memory toy into something a colleague could actually deploy. Stop when you can point at one of them on your laptop and say "this looks like the production queue at $JOB".