Control Structures — Senior Level¶
Table of Contents¶
- Introduction
- Concurrency Control Structures
- State Machines
- Backpressure and Flow Control
- Code Examples
- Summary
Introduction¶
Focus: "How do control structures scale in concurrent, distributed systems?"
At the senior level, control structures extend to concurrent programming: goroutine coordination with select, thread synchronization patterns, event loops, and state machines for complex workflows.
Concurrency Control Structures¶
Go — select (Multiplexed Channel Control)¶
package main
import (
"context"
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
go func() { time.Sleep(1 * time.Second); ch1 <- "result from service A" }()
go func() { time.Sleep(2 * time.Second); ch2 <- "result from service B" }()
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Println("Got:", msg)
case msg := <-ch2:
fmt.Println("Got:", msg)
case <-ctx.Done():
fmt.Println("Timeout!")
return
}
}
}
Java — CompletableFuture Composition¶
import java.util.concurrent.*;
public class AsyncControl {
public static void main(String[] args) throws Exception {
var serviceA = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "result from service A";
});
var serviceB = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "result from service B";
});
// Wait for first to complete
var first = CompletableFuture.anyOf(serviceA, serviceB);
System.out.println("First: " + first.get());
// Wait for all
CompletableFuture.allOf(serviceA, serviceB).join();
System.out.println("A: " + serviceA.get());
System.out.println("B: " + serviceB.get());
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) {}
}
}
Python — asyncio Event Loop¶
import asyncio
async def service_a():
await asyncio.sleep(1)
return "result from service A"
async def service_b():
await asyncio.sleep(2)
return "result from service B"
async def main():
# Wait for first to complete
done, pending = await asyncio.wait(
[asyncio.create_task(service_a()), asyncio.create_task(service_b())],
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
print(f"First: {task.result()}")
for task in pending:
task.cancel()
# Wait for all
results = await asyncio.gather(service_a(), service_b())
print(f"All: {results}")
asyncio.run(main())
State Machines¶
Connection State Machine¶
stateDiagram-v2 [*] --> Disconnected Disconnected --> Connecting: connect() Connecting --> Connected: success Connecting --> Disconnected: failure Connected --> Disconnecting: disconnect() Disconnecting --> Disconnected: done Connected --> Connected: send/receive
Go¶
type State int
const (
Disconnected State = iota
Connecting
Connected
Disconnecting
)
type Connection struct {
state State
}
func (c *Connection) Handle(event string) error {
switch c.state {
case Disconnected:
if event == "connect" {
c.state = Connecting
// initiate connection...
c.state = Connected
return nil
}
case Connected:
switch event {
case "send":
// send data
return nil
case "disconnect":
c.state = Disconnecting
// cleanup...
c.state = Disconnected
return nil
}
}
return fmt.Errorf("invalid event %q in state %d", event, c.state)
}
Java¶
enum State { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING }
class Connection {
private State state = State.DISCONNECTED;
public void handle(String event) {
state = switch (state) {
case DISCONNECTED -> {
if ("connect".equals(event)) yield State.CONNECTED;
else throw new IllegalStateException("Invalid: " + event);
}
case CONNECTED -> switch (event) {
case "send" -> State.CONNECTED;
case "disconnect" -> State.DISCONNECTED;
default -> throw new IllegalStateException("Invalid: " + event);
};
default -> throw new IllegalStateException("Invalid state: " + state);
};
}
}
Python¶
from enum import Enum
class State(Enum):
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
DISCONNECTING = "disconnecting"
class Connection:
def __init__(self):
self.state = State.DISCONNECTED
self._transitions = {
State.DISCONNECTED: {"connect": State.CONNECTED},
State.CONNECTED: {"send": State.CONNECTED, "disconnect": State.DISCONNECTED},
}
def handle(self, event):
valid = self._transitions.get(self.state, {})
if event not in valid:
raise ValueError(f"Invalid event '{event}' in state {self.state}")
self.state = valid[event]
Backpressure and Flow Control¶
Producer-Consumer with Bounded Buffer¶
Go¶
func producer(ch chan<- int, count int) {
for i := 0; i < count; i++ {
ch <- i // blocks if channel is full (backpressure)
}
close(ch)
}
func consumer(ch <-chan int, done chan<- bool) {
for val := range ch {
time.Sleep(10 * time.Millisecond) // slow consumer
fmt.Println("consumed:", val)
}
done <- true
}
func main() {
ch := make(chan int, 10) // buffer size = backpressure threshold
done := make(chan bool)
go producer(ch, 100)
go consumer(ch, done)
<-done
}
Java¶
import java.util.concurrent.*;
public class ProducerConsumer {
public static void main(String[] args) throws Exception {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// Producer
var producer = new Thread(() -> {
for (int i = 0; i < 100; i++) {
try { queue.put(i); } catch (InterruptedException e) { break; }
}
});
// Consumer
var consumer = new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
int val = queue.take();
Thread.sleep(10);
System.out.println("consumed: " + val);
} catch (InterruptedException e) { break; }
}
});
producer.start(); consumer.start();
producer.join(); consumer.join();
}
}
Python¶
import asyncio
async def producer(queue, count):
for i in range(count):
await queue.put(i) # blocks if queue is full
await queue.put(None) # sentinel
async def consumer(queue):
while True:
val = await queue.get()
if val is None:
break
await asyncio.sleep(0.01)
print(f"consumed: {val}")
async def main():
queue = asyncio.Queue(maxsize=10)
await asyncio.gather(producer(queue, 100), consumer(queue))
asyncio.run(main())
Code Examples¶
Circuit Breaker Pattern¶
Go¶
type CircuitBreaker struct {
mu sync.Mutex
failures int
threshold int
state string // "closed", "open", "half-open"
lastFailTime time.Time
timeout time.Duration
}
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case "open":
if time.Since(cb.lastFailTime) > cb.timeout {
cb.state = "half-open"
} else {
return errors.New("circuit breaker is open")
}
}
err := fn()
if err != nil {
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.threshold {
cb.state = "open"
}
return err
}
cb.failures = 0
cb.state = "closed"
return nil
}
Summary¶
At the senior level, control structures include concurrency primitives (select, CompletableFuture, asyncio), state machines for complex workflows, and backpressure mechanisms for flow control. These patterns are essential for building reliable, scalable systems.