Skip to content

Channel Close Violations — Hands-On Tasks

Introduction

This file provides 15+ hands-on tasks. Each task includes a problem statement, hints, and a reference solution. Work through them in order; later tasks build on earlier ones.

Run each solution with go test -race -count=10 to verify correctness under concurrency.


Task 1: Single-Sender Pipeline

Goal. Build a producer-consumer pipeline where one goroutine generates integers 1 through 100, and another goroutine sums them. Use a channel; the producer closes the channel when done.

Hints. - Use defer close(out) in the producer. - Use for range in the consumer.

Reference solution.

package main

import "fmt"

func produce() <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 1; i <= 100; i++ {
            out <- i
        }
    }()
    return out
}

func main() {
    sum := 0
    for v := range produce() {
        sum += v
    }
    fmt.Println(sum) // 5050
}

Task 2: Fan-In (Multi-Sender Coordination)

Goal. Two goroutines each generate integers from a different range. A third goroutine merges them into one stream. Sum the merged stream.

Hints. - Two senders, one consumer: use a coordinator goroutine to close the merged channel. - Use sync.WaitGroup.

Reference solution.

package main

import (
    "fmt"
    "sync"
)

func gen(start, count int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 0; i < count; i++ {
            out <- start + i
        }
    }()
    return out
}

func merge(srcs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    wg.Add(len(srcs))
    for _, s := range srcs {
        go func(s <-chan int) {
            defer wg.Done()
            for v := range s {
                out <- v
            }
        }(s)
    }
    go func() { wg.Wait(); close(out) }()
    return out
}

func main() {
    merged := merge(gen(1, 10), gen(100, 10))
    sum := 0
    for v := range merged {
        sum += v
    }
    fmt.Println(sum) // 55 + 1045 = 1100
}

Task 3: Fan-Out (Multi-Receiver Cancellation)

Goal. One producer feeds N workers, each processing in parallel. On context cancellation, workers must exit cleanly.

Hints. - Workers select on ctx.Done() and the work channel. - The producer closes the work channel when it finishes (or on cancellation).

Reference solution.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()

    work := make(chan int)
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    fmt.Printf("worker %d: cancelled\n", id)
                    return
                case v, ok := <-work:
                    if !ok { return }
                    fmt.Printf("worker %d: %d\n", id, v)
                    time.Sleep(20 * time.Millisecond)
                }
            }
        }(i)
    }

    go func() {
        defer close(work)
        for i := 0; ; i++ {
            select {
            case <-ctx.Done():
                return
            case work <- i:
            }
        }
    }()

    wg.Wait()
}

Task 4: Idempotent Close

Goal. Implement a service struct with a Close() method. Multiple concurrent calls to Close() should not panic; each should return the same result.

Hints. - Use sync.Once.

Reference solution.

package main

import (
    "fmt"
    "sync"
)

type Svc struct {
    done chan struct{}
    once sync.Once
    err  error
}

func New() *Svc { return &Svc{done: make(chan struct{})} }

func (s *Svc) Close() error {
    s.once.Do(func() {
        s.err = s.cleanup()
        close(s.done)
    })
    <-s.done
    return s.err
}

func (s *Svc) cleanup() error {
    fmt.Println("cleaning up")
    return nil
}

func main() {
    s := New()
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() { defer wg.Done(); _ = s.Close() }()
    }
    wg.Wait()
}

cleanup runs exactly once; all callers wait for it.


Task 5: Safe-Close Wrapper

Goal. Implement a SafeCh[T] wrapper around a channel. Send returns false if Close has been called; Close is idempotent; the data channel is never closed (only the done channel).

Hints. - Use a done channel + sync.Once for Close. - Send uses select on done.

Reference solution.

package main

import (
    "fmt"
    "sync"
)

type SafeCh[T any] struct {
    ch   chan T
    done chan struct{}
    once sync.Once
}

func NewSafeCh[T any](buf int) *SafeCh[T] {
    return &SafeCh[T]{ch: make(chan T, buf), done: make(chan struct{})}
}

func (s *SafeCh[T]) Send(v T) bool {
    select {
    case <-s.done: return false
    case s.ch <- v: return true
    }
}

func (s *SafeCh[T]) Recv() (T, bool) {
    select {
    case v := <-s.ch: return v, true
    case <-s.done:
        var z T
        return z, false
    }
}

func (s *SafeCh[T]) Close() { s.once.Do(func() { close(s.done) }) }

func main() {
    c := NewSafeCh[int](2)
    c.Send(1)
    c.Send(2)
    c.Close()
    fmt.Println(c.Send(3)) // false
    v, ok := c.Recv()
    fmt.Println(v, ok) // depends on race; usually returns 1 or 0/false
}

Task 6: Pipeline with Cancellation

Goal. Build a three-stage pipeline (generate → filter → map). On cancellation, all stages exit cleanly within milliseconds.

Hints. - Each stage takes context; each select has a ctx.Done() arm in both receive and send. - Each stage defer close(out).

Reference solution.

package main

import (
    "context"
    "fmt"
    "time"
)

func gen(ctx context.Context, n int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 0; i < n; i++ {
            select {
            case <-ctx.Done(): return
            case out <- i:
            }
        }
    }()
    return out
}

func filter(ctx context.Context, in <-chan int, p func(int) bool) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for v := range in {
            if !p(v) { continue }
            select {
            case <-ctx.Done(): return
            case out <- v:
            }
        }
    }()
    return out
}

func mapStage(ctx context.Context, in <-chan int, f func(int) int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for v := range in {
            mapped := f(v)
            select {
            case <-ctx.Done(): return
            case out <- mapped:
            }
        }
    }()
    return out
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()

    src := gen(ctx, 1000)
    even := filter(ctx, src, func(v int) bool { return v%2 == 0 })
    doubled := mapStage(ctx, even, func(v int) int { return v * 2 })

    for v := range doubled {
        fmt.Println(v)
    }
}

After 100ms, cancellation triggers; every stage exits; the consumer's range exits.


Task 7: Worker Pool with Graceful Drain

Goal. Build a worker pool. On Shutdown(ctx), the pool stops accepting new Submits, drains its queue, waits for in-flight jobs to finish (subject to ctx), and returns nil or ctx.Err().

Hints. - Use sync.Once for the drain trigger. - Workers select on a drain signal channel.

Reference solution.

package pool

import (
    "context"
    "errors"
    "sync"
    "sync/atomic"
)

var ErrClosed = errors.New("pool: closed")

type Job interface{ Run() }

type Pool struct {
    jobs chan Job
    done chan struct{}
    wg   sync.WaitGroup
    once sync.Once
    inflight atomic.Int64
}

func New(workers, queue int) *Pool {
    p := &Pool{
        jobs: make(chan Job, queue),
        done: make(chan struct{}),
    }
    p.wg.Add(workers)
    for i := 0; i < workers; i++ {
        go p.worker()
    }
    return p
}

func (p *Pool) Submit(j Job) error {
    select {
    case <-p.done: return ErrClosed
    default:
    }
    select {
    case <-p.done: return ErrClosed
    case p.jobs <- j: return nil
    }
}

func (p *Pool) worker() {
    defer p.wg.Done()
    for {
        select {
        case <-p.done:
            for {
                select {
                case j := <-p.jobs:
                    p.inflight.Add(1)
                    j.Run()
                    p.inflight.Add(-1)
                default:
                    return
                }
            }
        case j := <-p.jobs:
            p.inflight.Add(1)
            j.Run()
            p.inflight.Add(-1)
        }
    }
}

func (p *Pool) Shutdown(ctx context.Context) error {
    p.once.Do(func() { close(p.done) })
    waitCh := make(chan struct{})
    go func() { p.wg.Wait(); close(waitCh) }()
    select {
    case <-waitCh: return nil
    case <-ctx.Done(): return ctx.Err()
    }
}

Test with stress: many concurrent Submits and Shutdowns.


Task 8: Producer with Bounded Drain

Goal. Build a producer that has been receiving items, has at most 10 buffered, and on Drain(ctx) consumes the buffer with no new items entering. Drain bounded by ctx timeout.

Hints. - Two channels: input (from upstream) and output (to consumer). - Drain stops accepting input; consumer reads output until empty.

Reference solution.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Producer struct {
    in   chan int
    done chan struct{}
    once sync.Once
}

func New() *Producer {
    p := &Producer{
        in: make(chan int, 10),
        done: make(chan struct{}),
    }
    return p
}

func (p *Producer) Send(v int) bool {
    select {
    case <-p.done: return false
    case p.in <- v: return true
    }
}

func (p *Producer) Drain(ctx context.Context) []int {
    p.once.Do(func() { close(p.done) })
    var out []int
    for {
        select {
        case v := <-p.in:
            out = append(out, v)
        case <-ctx.Done():
            return out
        default:
            return out
        }
    }
}

func main() {
    p := New()
    for i := 0; i < 5; i++ { p.Send(i) }
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    fmt.Println(p.Drain(ctx)) // [0 1 2 3 4]
    fmt.Println(p.Send(99))   // false
}

Task 9: Pub-Sub with Independent Subscribers

Goal. Build a pub-sub where:

  • Publishers call Publish(event).
  • Subscribers call Subscribe() to get a channel.
  • Subscribers call Unsubscribe(channel) to stop.
  • Closing one subscriber does not affect others.

Hints. - Store subscribers in a map; protect with a mutex. - Unsubscribe removes from the map and closes the subscriber's channel.

Reference solution.

package main

import (
    "fmt"
    "sync"
    "time"
)

type PubSub struct {
    mu  sync.Mutex
    subs map[uint64]chan string
    next uint64
}

func New() *PubSub {
    return &PubSub{subs: map[uint64]chan string{}}
}

func (p *PubSub) Subscribe() (uint64, <-chan string) {
    p.mu.Lock()
    defer p.mu.Unlock()
    id := p.next
    p.next++
    ch := make(chan string, 4)
    p.subs[id] = ch
    return id, ch
}

func (p *PubSub) Unsubscribe(id uint64) {
    p.mu.Lock()
    defer p.mu.Unlock()
    if ch, ok := p.subs[id]; ok {
        close(ch)
        delete(p.subs, id)
    }
}

func (p *PubSub) Publish(msg string) {
    p.mu.Lock()
    defer p.mu.Unlock()
    for _, ch := range p.subs {
        select {
        case ch <- msg:
        default:
        }
    }
}

func main() {
    ps := New()
    id1, ch1 := ps.Subscribe()
    id2, ch2 := ps.Subscribe()

    go func() { for m := range ch1 { fmt.Println("sub1:", m) } }()
    go func() { for m := range ch2 { fmt.Println("sub2:", m) } }()

    ps.Publish("hello")
    ps.Publish("world")
    time.Sleep(10 * time.Millisecond)
    ps.Unsubscribe(id1)
    ps.Publish("goodbye")
    time.Sleep(10 * time.Millisecond)
    ps.Unsubscribe(id2)
}

Task 10: Errgroup-Based Pipeline

Goal. Use golang.org/x/sync/errgroup to coordinate a multi-stage pipeline. On the first error, all stages cancel and exit.

Hints. - errgroup.WithContext returns a context that cancels on first error. - Each stage takes the context.

Reference solution.

package main

import (
    "context"
    "errors"
    "fmt"
    "math/rand"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, ctx := errgroup.WithContext(context.Background())
    in := make(chan int, 8)
    mid := make(chan int, 8)

    g.Go(func() error {
        defer close(in)
        for i := 0; i < 100; i++ {
            select {
            case <-ctx.Done(): return ctx.Err()
            case in <- i:
            }
        }
        return nil
    })

    g.Go(func() error {
        defer close(mid)
        for v := range in {
            if rand.Intn(100) == 0 {
                return errors.New("random failure")
            }
            select {
            case <-ctx.Done(): return ctx.Err()
            case mid <- v * 2:
            }
        }
        return nil
    })

    g.Go(func() error {
        for v := range mid {
            fmt.Println(v)
        }
        return nil
    })

    if err := g.Wait(); err != nil {
        fmt.Println("pipeline error:", err)
    }
}

Task 11: Detect Closed Channel via Comma-Ok

Goal. Write a function that reads up to N values from a channel and returns them. Stop early if the channel closes.

Hints. - Use the comma-ok form in a for loop.

Reference solution.

func takeN[T any](ch <-chan T, n int) []T {
    var out []T
    for i := 0; i < n; i++ {
        v, ok := <-ch
        if !ok { return out }
        out = append(out, v)
    }
    return out
}

Task 12: Coordinator That Closes After WaitGroup

Goal. Implement a closeAfterWG helper: takes a WaitGroup and a channel; spawns a goroutine that waits for the WG and then closes the channel.

Reference solution.

func closeAfterWG(wg *sync.WaitGroup, ch chan<- int) {
    // Note: chan<- int cannot be closed directly via type system.
    // Caller must pass chan int (bidirectional).
}

func closeAfterWGBidi(wg *sync.WaitGroup, ch chan int) {
    go func() {
        wg.Wait()
        close(ch)
    }()
}

Realisation: the function needs chan int, not chan<- int, to call close. The caller is exposing the channel for close authority.


Task 13: Drainable Channel with Snapshot

Goal. Build a Snapshot() method that captures the current channel state (occupancy, closed-or-not, capacity).

Hints. - len(ch) and cap(ch) are atomic. - To check closed without panic, use non-blocking receive in a select.

Reference solution.

type Stats struct {
    Length   int
    Capacity int
    Closed   bool
}

func Snapshot[T any](ch chan T) Stats {
    s := Stats{
        Length:   len(ch),
        Capacity: cap(ch),
    }
    select {
    case v, ok := <-ch:
        if !ok {
            s.Closed = true
        } else {
            // Oops, we consumed a value. Put it back?
            // This is why there is no IsClosed in Go.
            // For safety, we can't return v back without races.
            _ = v
        }
    default:
        // Channel not closed and not ready (empty or no waiter).
    }
    return s
}

This implementation is unsafe — it may consume a value. The lesson: there is no race-free way to check "is closed" without potentially destroying state. The "right" answer is "you can't"; design so you don't need to ask.


Task 14: Race Detector Reveal

Goal. Write a broken close pattern and run it under -race. Verify the detector flags it.

Reference solution.

package main

import (
    "fmt"
    "sync"
)

func main() {
    ch := make(chan int)
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        ch <- 1
    }()
    go func() {
        defer wg.Done()
        close(ch)
    }()
    wg.Wait()
    fmt.Println("done")
}

Run with go run -race main.go. The race detector will report a write/read race on ch.


Task 15: Multi-Producer Coordinator with sync.Once

Goal. Multiple producers may shut down independently. The last one to finish should close the shared output channel.

Hints. - Use an atomic counter. The last decrement closes the channel.

Reference solution.

package main

import (
    "sync"
    "sync/atomic"
)

type Coord struct {
    out chan int
    rem atomic.Int32
}

func New(producers int) *Coord {
    c := &Coord{out: make(chan int)}
    c.rem.Store(int32(producers))
    return c
}

func (c *Coord) Out() <-chan int { return c.out }

func (c *Coord) Done() {
    if c.rem.Add(-1) == 0 {
        close(c.out)
    }
}

func main() {
    coord := New(3)
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            defer coord.Done()
            for j := 0; j < 5; j++ {
                coord.out <- id*10 + j
            }
        }(i)
    }
    go func() { wg.Wait() }()
    for v := range coord.Out() {
        _ = v
    }
}

Note: coord.Done() is called from each producer's defer. The atomic counter ensures exactly one close.


Task 16: Server with Graceful HTTP Shutdown

Goal. Build an HTTP server that handles SIGTERM gracefully, completing in-flight requests within 10 seconds.

Reference solution.

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    srv := &http.Server{
        Addr: ":8080",
        Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            time.Sleep(2 * time.Second)
            fmt.Fprintln(w, "hello")
        }),
    }

    go func() {
        if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
    <-sigCh

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    if err := srv.Shutdown(ctx); err != nil {
        log.Println("shutdown:", err)
    }
}

Task 17: Implement Latch (One-Shot Broadcast)

Goal. Implement a Latch type with Fire() and Wait() semantics. Fire is idempotent; Wait blocks until Fire is called; after Fire, Wait returns instantly.

Reference solution.

type Latch struct {
    ch   chan struct{}
    once sync.Once
}

func NewLatch() *Latch { return &Latch{ch: make(chan struct{})} }
func (l *Latch) Fire() { l.once.Do(func() { close(l.ch) }) }
func (l *Latch) Wait() { <-l.ch }
func (l *Latch) Fired() bool {
    select { case <-l.ch: return true; default: return false }
}

Task 18: Custom errgroup Implementation

Goal. Implement a stripped-down errgroup with Go(func() error) and Wait() error.

Reference solution.

type Group struct {
    wg      sync.WaitGroup
    errOnce sync.Once
    err     error
    cancel  func()
}

func NewGroup(parent context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancel(parent)
    return &Group{cancel: cancel}, ctx
}

func (g *Group) Go(fn func() error) {
    g.wg.Add(1)
    go func() {
        defer g.wg.Done()
        if err := fn(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                g.cancel()
            })
        }
    }()
}

func (g *Group) Wait() error {
    g.wg.Wait()
    g.cancel()
    return g.err
}

Note: the cancel triggers context cancellation, which propagates to in-flight goroutines.


Task 19: Stress Test for Idempotent Close

Goal. Write a test that runs 1000 concurrent Close() calls and verifies no panic.

Reference solution.

package svc

import (
    "sync"
    "testing"
)

func TestStressClose(t *testing.T) {
    s := New()
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() { defer wg.Done(); _ = s.Close() }()
    }
    wg.Wait()
}

Run with go test -race -count=10. If sync.Once is missing in Close, this fails.


Task 20: Diagnose a Goroutine Leak

Goal. Read this program; explain why it leaks goroutines; fix the leak.

func main() {
    ch := make(chan int)
    for i := 0; i < 10; i++ {
        go func() {
            v := <-ch  // blocks forever
            fmt.Println(v)
        }()
    }
    time.Sleep(time.Second)
    // 10 goroutines still blocked
}

Diagnosis. 10 goroutines are blocked on <-ch. No one sends to ch. No one closes ch. The goroutines never exit. Leak.

Fix. Either send 10 values then close:

for i := 0; i < 10; i++ {
    ch <- i
}
close(ch)

Or just close (each goroutine sees ok=false):

close(ch)

Or use a done-channel for cancellation:

done := make(chan struct{})
for i := 0; i < 10; i++ {
    go func() {
        select { case v := <-ch: ... case <-done: return }
    }()
}
close(done) // wakes all

Closing Notes

Working through these tasks builds the muscle memory for correct close handling. Patterns become second nature:

  • Single sender: defer close(out).
  • Multiple senders: coordinator with WaitGroup.
  • Idempotent close: sync.Once.
  • Cancellation: done-channel or context.
  • Safe send: select on done.

Run all your code with -race. Stress test. Verify with goleak. The disciplined practice is what produces production-grade Go.