Async & Functional — Practice Tasks¶
12 hands-on exercises on writing correct, composable asynchronous code. Each task gives a scenario, problematic code (JS/TS, Python
asyncio, or Go — varied), an instruction, and a collapsible full solution with the reasoning that makes it correct. Ordered easy → hard.
Table of Contents¶
- Flatten callback hell into async/await (JS)
- Parallelize independent sequential awaits (JS)
- Gather independent coroutines (Python)
- Stop swallowing/dropping a future (Python)
- Fix the unhandled rejection (JS)
- Add timeout + cancellation with AbortController (JS)
- Add cancellation with context (Go)
- Bound concurrency with a semaphore / p-limit (JS)
- Back-pressure on a producer/consumer pipe (Go)
- Move CPU-bound work off the event loop (Python)
- Aggregate errors with allSettled-style results (JS)
- Build an async map/filter pipeline over a stream (JS)
How to Use¶
- Read the scenario and the broken code. Predict what goes wrong at runtime before reading the instruction — latency, a crash, a leak, or a silently-swallowed error.
- Write your fix. Keep behaviour identical except for the defect being repaired.
- Open the solution, compare, and read the reasoning. The reasoning is the point — the diff is easy, knowing why the original was a bug is the skill.
- The decision tree below routes each smell to the technique that removes it.
Task 1 — Flatten Callback Hell (Easy)¶
Scenario: A Node.js handler reads a user, then their cart, then prices each item, then writes an order — each step depending on the previous. It was written with nested callbacks and is now unreadable and hard to error-handle.
function checkout(userId, callback) {
getUser(userId, (err, user) => {
if (err) return callback(err);
getCart(user.id, (err, cart) => {
if (err) return callback(err);
priceItems(cart.items, (err, priced) => {
if (err) return callback(err);
createOrder(user, priced, (err, order) => {
if (err) return callback(err);
callback(null, order);
});
});
});
});
}
Instruction: Promisify the leaf functions and rewrite checkout as a flat async function. Errors should propagate via a single try/catch (or just by being thrown), not four manual if (err) branches.
Solution
import { promisify } from "node:util";
const getUserAsync = promisify(getUser);
const getCartAsync = promisify(getCart);
const priceItemsAsync = promisify(priceItems);
const createOrderAsync = promisify(createOrder);
async function checkout(userId) {
const user = await getUserAsync(userId);
const cart = await getCartAsync(user.id);
const priced = await priceItemsAsync(cart.items);
return createOrderAsync(user, priced);
}
Task 2 — Parallelize Independent Awaits (Easy)¶
Scenario: A dashboard endpoint fetches three things that do not depend on each other: the user profile, their notifications, and the current feature flags. It awaits them one after another, so the request takes the sum of three round-trips instead of the max.
async function loadDashboard(userId) {
const profile = await fetchProfile(userId); // 120 ms
const notifications = await fetchNotifications(userId); // 90 ms
const flags = await fetchFeatureFlags(); // 60 ms
return { profile, notifications, flags }; // ~270 ms total
}
Instruction: Make the three independent requests run concurrently so total latency is ~max(120, 90, 60) ≈ 120 ms. Keep the return shape identical.
Solution
async function loadDashboard(userId) {
const [profile, notifications, flags] = await Promise.all([
fetchProfile(userId),
fetchNotifications(userId),
fetchFeatureFlags(),
]);
return { profile, notifications, flags };
}
Task 3 — Gather Independent Coroutines (Python)¶
Scenario: A Python service enriches an order by calling three downstream services. The current code awaits them sequentially inside a loop-free function.
import asyncio
async def enrich_order(order_id: int) -> dict:
customer = await fetch_customer(order_id)
inventory = await fetch_inventory(order_id)
shipping = await fetch_shipping_quote(order_id)
return {"customer": customer, "inventory": inventory, "shipping": shipping}
Instruction: Run the three coroutines concurrently with asyncio.gather. Then add a variant that does not abort the others if one fails.
Solution
import asyncio
async def enrich_order(order_id: int) -> dict:
customer, inventory, shipping = await asyncio.gather(
fetch_customer(order_id),
fetch_inventory(order_id),
fetch_shipping_quote(order_id),
)
return {"customer": customer, "inventory": inventory, "shipping": shipping}
async def enrich_order_lenient(order_id: int) -> dict:
results = await asyncio.gather(
fetch_customer(order_id),
fetch_inventory(order_id),
fetch_shipping_quote(order_id),
return_exceptions=True, # exceptions become return values, not raises
)
keys = ("customer", "inventory", "shipping")
out = {}
for key, result in zip(keys, results):
if isinstance(result, Exception):
out[key] = None # or log / record the error per field
else:
out[key] = result
return out
async def enrich_order_tg(order_id: int) -> dict:
async with asyncio.TaskGroup() as tg:
c = tg.create_task(fetch_customer(order_id))
i = tg.create_task(fetch_inventory(order_id))
s = tg.create_task(fetch_shipping_quote(order_id))
return {"customer": c.result(), "inventory": i.result(), "shipping": s.result()}
Task 4 — Stop Dropping a Future (Medium)¶
Scenario: A worker fires off an audit-log write but never awaits it. Under asyncio, calling a coroutine without awaiting it produces a coroutine object that is never scheduled — the audit write silently never happens, and Python emits a RuntimeWarning: coroutine was never awaited.
async def handle_request(req):
result = process(req)
log_audit_event(req.user, "processed", result) # BUG: coroutine created, never awaited
return result
Instruction: The audit write is fire-and-forget (we don't want to block the response on it), but it must actually run and its failures must not vanish. Fix it so the coroutine is scheduled and any exception is observed.
Solution
import asyncio
import logging
logger = logging.getLogger(__name__)
# Hold strong references so the GC can't collect a running task mid-flight.
_background_tasks: set[asyncio.Task] = set()
def _spawn(coro) -> None:
task = asyncio.create_task(coro)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
task.add_done_callback(_log_if_failed)
def _log_if_failed(task: asyncio.Task) -> None:
if task.cancelled():
return
exc = task.exception()
if exc is not None:
logger.error("background task failed", exc_info=exc)
async def handle_request(req):
result = await process(req)
_spawn(log_audit_event(req.user, "processed", result)) # scheduled, fire-and-forget
return result
Task 5 — Fix the Unhandled Rejection (Medium)¶
Scenario: A route handler kicks off a non-critical cache warm in the background. The rejection is never handled, so Node logs UnhandledPromiseRejection and — on modern Node — crashes the process.
app.post("/products/:id", async (req, res) => {
const product = await saveProduct(req.params.id, req.body);
warmCache(product); // BUG: returns a promise; if it rejects, nothing catches it
res.json(product);
});
Instruction: warmCache is best-effort — a failure must not crash the process or fail the request — but it must be logged. Fix the unhandled rejection. Also explain why res.json running before the cache warm is intentional here.
Solution
app.post("/products/:id", async (req, res, next) => {
try {
const product = await saveProduct(req.params.id, req.body);
// Fire-and-forget, but the rejection is handled so it can never go unhandled.
warmCache(product).catch((err) => {
logger.warn("cache warm failed", { productId: product.id, err });
});
res.json(product);
} catch (err) {
next(err); // saveProduct failing IS request-critical — surface it
}
});
Task 6 — Timeout + Cancellation (Medium)¶
Scenario: A function fetches from a slow upstream that occasionally hangs forever. There is no timeout, so requests pile up and exhaust the connection pool. There is also no way for the caller to cancel an in-flight fetch.
async function getQuote(symbol) {
const res = await fetch(`https://quotes.example.com/${symbol}`);
return res.json();
}
Instruction: Add a 2-second timeout using AbortController, and let the caller pass in their own signal so the fetch is cancellable from outside. On timeout, throw a clear error; make sure the timer is always cleared.
Solution
async function getQuote(symbol, { signal: callerSignal, timeoutMs = 2000 } = {}) {
const controller = new AbortController();
// Cancel if the caller cancels...
callerSignal?.addEventListener("abort", () => controller.abort(callerSignal.reason), {
once: true,
});
// ...or if we time out.
const timer = setTimeout(() => {
controller.abort(new DOMException(`getQuote(${symbol}) timed out`, "TimeoutError"));
}, timeoutMs);
try {
const res = await fetch(`https://quotes.example.com/${symbol}`, {
signal: controller.signal,
});
if (!res.ok) throw new Error(`upstream ${res.status}`);
return await res.json();
} finally {
clearTimeout(timer); // always runs — no leaked timer on success, error, or abort
}
}
async function getQuote(symbol, { signal, timeoutMs = 2000 } = {}) {
const signals = [AbortSignal.timeout(timeoutMs)];
if (signal) signals.push(signal);
const res = await fetch(`https://quotes.example.com/${symbol}`, {
signal: AbortSignal.any(signals),
});
if (!res.ok) throw new Error(`upstream ${res.status}`);
return res.json();
}
Task 7 — Cancellation with context (Go)¶
Scenario: A Go function calls a downstream service in a loop with retries. It ignores context.Context, so a cancelled or timed-out request keeps retrying and a Sleep between retries can't be interrupted — goroutines pile up.
func fetchWithRetry(url string) ([]byte, error) {
var lastErr error
for i := 0; i < 5; i++ {
body, err := doGet(url) // ignores cancellation
if err == nil {
return body, nil
}
lastErr = err
time.Sleep(time.Second) // can't be cancelled
}
return nil, lastErr
}
Instruction: Thread a context.Context through. Honour cancellation both during the request and during the backoff sleep. Return the context's error when it is cancelled.
Solution
func fetchWithRetry(ctx context.Context, url string) ([]byte, error) {
var lastErr error
backoff := 200 * time.Millisecond
for attempt := 0; attempt < 5; attempt++ {
// Bail out immediately if the caller already cancelled.
if err := ctx.Err(); err != nil {
return nil, err
}
body, err := doGet(ctx, url) // request is cancellable via ctx
if err == nil {
return body, nil
}
lastErr = err
// Cancellable backoff: wake on timer OR on cancellation, whichever first.
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return nil, ctx.Err()
case <-timer.C:
}
backoff *= 2 // exponential
}
return nil, lastErr
}
func doGet(ctx context.Context, url string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
Task 8 — Bound Concurrency (Medium)¶
Scenario: A migration script processes 50,000 records by calling an external API for each. The author "parallelized" it with Promise.all over the whole array — opening 50,000 sockets at once, which the API rate-limits and the local machine can't sustain (file-descriptor exhaustion, OOM).
async function migrateAll(records) {
return Promise.all(records.map((r) => migrateOne(r))); // 50,000 concurrent calls
}
Instruction: Cap concurrency at, say, 10 in-flight requests. Implement it two ways: with the p-limit library, and from scratch with a simple worker-pool so you understand the mechanism.
Solution
With `p-limit`:import pLimit from "p-limit";
async function migrateAll(records, concurrency = 10) {
const limit = pLimit(concurrency);
return Promise.all(records.map((r) => limit(() => migrateOne(r))));
}
async function migrateAll(records, concurrency = 10) {
const results = new Array(records.length);
let cursor = 0;
async function worker() {
while (cursor < records.length) {
const i = cursor++; // claim an index (single-threaded, so no race)
results[i] = await migrateOne(records[i]);
}
}
const workers = Array.from({ length: Math.min(concurrency, records.length) }, worker);
await Promise.all(workers);
return results;
}
Task 9 — Back-pressure with a Bounded Channel (Hard)¶
Scenario: A Go pipeline reads lines from a fast source and ships each to a slow uploader via a goroutine-per-line. With an unbounded buffer (or a goroutine per item), a fast producer outruns the slow consumer, memory balloons, and the program can OOM. There is no back-pressure.
func pipeline(lines []string) {
for _, line := range lines {
go upload(line) // unbounded: spawns one goroutine per line, no limit
}
}
Instruction: Replace the unbounded fan-out with a bounded worker pool fed by a buffered channel, so the producer blocks when consumers are saturated. Propagate the first error and stop. Use a small, fixed number of workers.
Solution
func pipeline(ctx context.Context, lines []string, workers int) error {
jobs := make(chan string, workers) // small buffer == back-pressure point
g, ctx := errgroup.WithContext(ctx)
// Workers: bounded consumers.
for i := 0; i < workers; i++ {
g.Go(func() error {
for line := range jobs {
if err := upload(ctx, line); err != nil {
return err // errgroup cancels ctx, siblings unwind
}
}
return nil
})
}
// Producer: blocks on a full channel — that IS the back-pressure.
g.Go(func() error {
defer close(jobs)
for _, line := range lines {
select {
case jobs <- line:
case <-ctx.Done():
return ctx.Err() // stop feeding if a worker failed
}
}
return nil
})
return g.Wait()
}
Task 10 — Get CPU-bound Work off the Loop (Hard)¶
Scenario: An asyncio web service hashes uploaded files with a deliberately expensive KDF (scrypt) inside an async handler. Because hashing is pure CPU, it blocks the single event loop for hundreds of milliseconds — every other in-flight request stalls, and concurrency collapses to one.
import hashlib
async def store_file(data: bytes) -> str:
# BUG: CPU-bound, runs ON the event loop, blocks all other coroutines
digest = hashlib.scrypt(data, salt=SALT, n=2**15, r=8, p=1, dklen=64)
await db.save(digest.hex(), data)
return digest.hex()
Instruction: Move the CPU-bound hashing off the event loop so other coroutines keep progressing. Use a process pool (since this is true CPU work, not I/O). Keep the function's signature and behaviour.
Solution
import asyncio
import hashlib
from concurrent.futures import ProcessPoolExecutor
# One pool for the process lifetime; sized to CPU cores by default.
_cpu_pool = ProcessPoolExecutor()
def _hash(data: bytes) -> bytes:
return hashlib.scrypt(data, salt=SALT, n=2**15, r=8, p=1, dklen=64)
async def store_file(data: bytes) -> str:
loop = asyncio.get_running_loop()
digest = await loop.run_in_executor(_cpu_pool, _hash, data) # off-loop
await db.save(digest.hex(), data)
return digest.hex()
Task 11 — Aggregate Errors Instead of Failing Fast (Hard)¶
Scenario: A status page pings six microservices and renders each one's health. The author used Promise.all, so the first service that is down throws and the entire page shows an error — even though five services are healthy. They want per-service results: each row shows up or down independently.
async function checkAll(services) {
// BUG: one failure rejects everything; we lose the five successes
const results = await Promise.all(services.map((s) => ping(s.url)));
return services.map((s, i) => ({ name: s.name, ok: true, latency: results[i] }));
}
Instruction: Switch to a strategy that returns all outcomes — successes and failures — so the page renders every service's true status. Aggregate the failures into a summary the caller can act on.
Solution
async function checkAll(services) {
const settled = await Promise.allSettled(services.map((s) => ping(s.url)));
const report = services.map((s, i) => {
const r = settled[i];
return r.status === "fulfilled"
? { name: s.name, ok: true, latency: r.value }
: { name: s.name, ok: false, error: String(r.reason) };
});
const failures = report.filter((r) => !r.ok);
return { report, healthy: failures.length === 0, failures };
}
Task 12 — Async Map/Filter Pipeline (Hard)¶
Scenario: A log-processing job reads a large file line by line, parses JSON, keeps only ERROR entries, enriches each by calling a (slow) geo-IP service, and writes the result. The original loads the whole file into memory, then does the geo-IP calls one at a time — slow and memory-hungry. The push-style approach also ignores demand: it pulls every line eagerly regardless of how fast the downstream can write.
async function processLogs(path) {
const text = await fs.promises.readFile(path, "utf8"); // whole file in memory
const out = [];
for (const line of text.split("\n")) {
const entry = JSON.parse(line);
if (entry.level !== "ERROR") continue;
entry.geo = await geoIp(entry.ip); // serial, one slow call at a time
out.push(entry);
}
return out;
}
Instruction: Rebuild this as a pull-based async pipeline using async generators: stream lines lazily (so memory stays flat), filter to errors, then map through the geo-IP enrichment with bounded concurrency so demand is respected and the slow service isn't flooded. Compose filter and map as reusable async-iterator combinators.
Solution
import { createReadStream } from "node:fs";
import { createInterface } from "node:readline";
// --- reusable async-iterator combinators (lazy, pull-based) ---
async function* asyncFilter(source, predicate) {
for await (const item of source) {
if (await predicate(item)) yield item;
}
}
// Bounded-concurrency map that preserves demand: at most `concurrency`
// enrichments run at once; results are yielded as they become available.
async function* asyncMap(source, fn, concurrency = 8) {
const inFlight = new Set();
const buffer = [];
async function pump(item) {
const value = await fn(item);
buffer.push(value);
}
for await (const item of source) {
const p = pump(item).finally(() => inFlight.delete(p));
inFlight.add(p);
if (inFlight.size >= concurrency) {
await Promise.race(inFlight); // back-pressure: wait for a slot
}
while (buffer.length) yield buffer.shift();
}
await Promise.all(inFlight); // drain the tail
while (buffer.length) yield buffer.shift();
}
// --- the pipeline ---
async function* lines(path) {
const rl = createInterface({
input: createReadStream(path, { encoding: "utf8" }),
crlfDelay: Infinity,
});
for await (const line of rl) {
if (line.trim()) yield line; // lazy: one line at a time, flat memory
}
}
async function* processLogs(path) {
const parsed = asyncMap(lines(path), (line) => JSON.parse(line), 1);
const errorsOnly = asyncFilter(parsed, (e) => e.level === "ERROR");
const enriched = asyncMap(errorsOnly, async (e) => ({ ...e, geo: await geoIp(e.ip) }), 8);
yield* enriched;
}
// Consumer pulls at its own rate — demand flows back up the pipeline:
// for await (const entry of processLogs("app.log")) await writeOut(entry);
Self-Assessment¶
Score yourself honestly. For each, can you do it without looking back?
- Sequential vs. parallel. Given a function with several
awaits, I can say which are independent and rewrite those withPromise.all/asyncio.gather— and explain why the rest must stay serial. - Fail-fast vs. aggregate. I know when
Promise.all/gatheris correct (atomic) and when I wantallSettled/return_exceptions=True/TaskGroup(independent), and I can justify the choice. - No floating promises. I never leave a created promise/coroutine without an awaiter or an explicit error handler, and I can describe what goes wrong if I do (crash, leak, silent drop, never-scheduled).
- Cancellation + timeout. I can add a timeout and external cancellation with
AbortController(JS),context(Go), orwait_for/TaskGroup(Python), and I always clean up timers/resources infinally. - Back-pressure. I can bound fan-out with a semaphore /
p-limit/ buffered channel, and I can explain why unboundedPromise.all(...map)or goroutine-per-item is a memory/DOS hazard. - Off the loop. I can identify CPU-bound work blocking an event loop and offload it to a worker/process/thread pool, choosing the right executor for the GIL situation.
- Pull-based streams. I can express a transform as composable async-iterator combinators and explain how demand propagates as back-pressure.
If any box is unchecked, redo the matching task from scratch on a blank file.
Related Topics¶
- README.md — the chapter's positive rules and anti-pattern catalog
- junior.md — the foundational definitions these tasks build on
- find-bug.md — buggy async snippets where these defects hide
- optimize.md — performance-focused async refactors
- Functional Programming — composition, immutability, and pipelines that make async code easier to reason about
- Refactoring — the systematic techniques (Extract Function, Replace Loop with Pipeline) used to reshape the code above
In this topic