The previous posts in this series argued what the orchestration layer is and why it is the part that persists. This post is the implementation reference. Not a framework tutorial: the patterns here are framework-agnostic and apply whether you are using LangGraph, building on raw SDK calls, or writing the orchestration layer from scratch. The goal is a working reference for the decisions that determine whether the system holds in production, not just whether it demos cleanly.
Code examples use TypeScript. The patterns translate directly to Python; the idioms differ but the discipline is identical.
The four primitives
Every production orchestrator derives from four types. Get these right first; everything else is specialisation on top of them.
// A unit of work moving through the system
type Task = {
id: string;
type: string; // the routing key
status: TaskStatus;
priority: number; // higher is sooner
payload: unknown; // validated at intake, not inside the orchestrator
result?: unknown;
error?: SerializedError;
agentId?: string; // the agent currently holding this task
parentId?: string; // for subtasks spawned mid-flight
retryCount: number;
createdAt: Date;
updatedAt: Date;
completedAt?: Date;
metadata: Record<string, unknown>;
};
type TaskStatus =
| 'queued'
| 'classifying'
| 'assigned'
| 'in_progress'
| 'awaiting_human'
| 'completed'
| 'failed'
| 'dead_lettered';
// An agent is a typed function: task in, result out
type Agent = {
id: string;
capabilities: string[]; // task types this agent handles; ['*'] means classifier
process: (task: Task, ctx: AgentContext) => Promise<AgentResult>;
};
// A tool is an external action an agent can call
type Tool<TParams, TResult> = {
name: string;
parameters: JSONSchema;
execute: (params: TParams, ctx: ToolContext) => Promise<TResult>;
};
// A handoff is a validated, typed payload between agents
type Handoff<T> = {
fromAgentId: string;
toAgentId: string;
taskId: string;
schemaId: string; // the registered schema this payload conforms to
payload: T;
timestamp: Date;
};
The Task type is the system’s primitive. Everything the orchestrator does, it does by reading and writing task state. Nothing happens outside a task; if work cannot be represented as a task, it is not work the orchestrator should touch.
Durable state: the task table
The orchestration layer’s state must live in a durable store, not in process memory. An orchestrator process that restarts mid-task must be able to resume that task from its last known state. This is not a performance consideration; it is a correctness invariant. The minimum viable schema:
CREATE TABLE tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
priority INTEGER NOT NULL DEFAULT 5,
payload JSONB NOT NULL,
result JSONB,
error JSONB,
agent_id TEXT,
parent_id UUID REFERENCES tasks(id),
retry_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
completed_at TIMESTAMPTZ,
metadata JSONB NOT NULL DEFAULT '{}'
);
-- The primary query path: next task to assign
CREATE INDEX tasks_queue_idx
ON tasks (status, priority DESC, created_at ASC)
WHERE status = 'queued';
CREATE INDEX tasks_type_status_idx ON tasks (type, status);
The checkpoint pattern enforces that state is written to the database before any external action is taken. If the process dies after the checkpoint but before the action completes, recovery picks up from the checkpoint and retries the action. If the process dies after the action but before a subsequent checkpoint, the action may execute twice, which is why all external actions must be idempotent.
The function takes two status values: inProgressStatus is written before the action executes (so recovery knows a step is in flight), and completedStatus is written after the action succeeds (so the task does not remain permanently in the in-progress state if the process dies before the caller’s next step):
async function checkpointAndAct<T>(
taskId: string,
inProgressStatus: TaskStatus,
completedStatus: TaskStatus,
action: () => Promise<T>
): Promise<T> {
// Write the intent before the act. Never the reverse.
await db.tasks.update(taskId, {
status: inProgressStatus,
updatedAt: new Date(),
});
let result: T;
try {
result = await action();
} catch (error) {
await db.tasks.update(taskId, {
status: 'failed',
error: serializeError(error),
updatedAt: new Date(),
});
throw error;
}
// Write the completion before returning. Without this, a process crash
// between the action completing and the caller's next step leaves the task
// permanently in inProgressStatus with no way to distinguish "in flight"
// from "completed but not yet advanced."
await db.tasks.update(taskId, {
status: completedStatus,
updatedAt: new Date(),
});
return result;
}
Routing: the classifier as an agent
Routing is itself a cognitive task, which means it belongs to an agent, not to the orchestrator’s core. The orchestrator dispatches to the classifier; the classifier returns a typed result; the orchestrator uses that result to assign the task. This separation matters because the routing logic evolves often, and you want it to be testable, versioned, and replaceable without touching the orchestrator core.
const ClassificationResult = z.object({
taskType: z.string(),
confidence: z.number().min(0).max(1),
targetAgentId: z.string(),
rationale: z.string().optional(),
});
class ClassifierAgent implements Agent {
id = 'classifier';
capabilities = ['*']; // receives every incoming task
async process(task: Task, ctx: AgentContext): Promise<AgentResult> {
const response = await ctx.llm.complete({
system: ctx.classifierSystemPrompt,
user: JSON.stringify(task.payload),
tools: [], // the classifier never calls tools
});
const parsed = ClassificationResult.parse(
JSON.parse(response.content)
);
return { classification: parsed };
}
}
// Inside the orchestrator's main loop
async function route(task: Task): Promise<void> {
const classifierResult = await checkpointAndAct(
task.id,
'classifying',
'queued', // return to queued so the recovery worker can re-examine the result
() => this.classifier.process(task, this.ctx)
);
const { taskType, confidence, targetAgentId } =
classifierResult.classification;
if (confidence < CONFIDENCE_THRESHOLD) {
await this.escalate(task, 'low_classification_confidence');
return;
}
await this.assign(task, targetAgentId);
}
One refinement that matters at volume: the classifier should return not just the primary classification but an ordered shortlist with confidence scores. This allows the orchestrator to fall back gracefully when the first-choice agent is saturated, and it provides richer signal for observability.
Typed contracts at every boundary
Every handoff between agents is an interface. Treat it as one. Define the schema upfront, register it centrally, validate at every boundary. The schema registry does not need to be elaborate; a plain object keyed by schema ID is sufficient for most systems:
import { z } from 'zod';
// Register schemas centrally
const schemaRegistry = {
'customer-enquiry-v1': z.object({
customerId: z.string().uuid(),
channel: z.enum(['email', 'chat', 'phone']),
intent: z.enum(['billing', 'technical', 'general', 'escalation']),
sentiment: z.enum(['positive', 'neutral', 'negative']),
enquiryText: z.string().min(1).max(4000),
suggestedDraft: z.string().optional(),
}),
'refund-recommendation-v1': z.object({
customerId: z.string().uuid(),
orderId: z.string(),
recommendedAmountCents: z.number().int().nonneg(),
reason: z.string(),
confidence: z.number().min(0).max(1),
requiresHumanApproval: z.boolean(),
}),
} satisfies Record<string, z.ZodTypeAny>;
// A typed handoff factory that validates at construction time
function handoff<K extends keyof typeof schemaRegistry>(
schemaId: K,
from: string,
to: string,
taskId: string,
payload: z.infer<(typeof schemaRegistry)[K]>
): Handoff<z.infer<(typeof schemaRegistry)[K]>> {
// parse throws on invalid; we want the error to surface here, not downstream
const validated = schemaRegistry[schemaId].parse(payload);
return { fromAgentId: from, toAgentId: to, taskId, schemaId, payload: validated, timestamp: new Date() };
}
Version the schemas. When agent A’s output format needs to change, create customer-enquiry-v2 alongside customer-enquiry-v1. Run both in parallel during the rollout period. The orchestrator handles both until downstream agents are upgraded, at which point v1 is deprecated. This is slower than an in-place edit and it is the only pattern that avoids a coordinated multi-service deployment for every schema change.
The orchestrator knows which schema version to use because each agent registration declares the schema IDs and versions it accepts. When dispatching a handoff, the orchestrator picks the highest version the receiving agent supports; if it only supports v1 and the producing agent now emits v2, the orchestrator routes to an adapter or holds the handoff until the downstream agent is upgraded. This is the part teams skip: they add v2 to the registry but have no mechanism to select between versions at dispatch time. Add a supportedSchemas: string[] field to the Agent type at the start and the version routing comes for free.
Human-in-the-loop: three tiers as code
Human-in-the-loop is a spectrum, not a switch. Three tiers cover most production requirements. Classify each action type at design time; encode the classification in configuration; make it visible to operators.
type HitlTier = 'auto' | 'confirm' | 'draft';
// Tier 1: auto: agent acts; operator sees it after
async function autoAction(
task: Task,
action: Action,
bus: EventBus
): Promise<void> {
await action.execute();
await bus.emit({
type: 'action.auto_executed',
taskId: task.id,
action: action.summary,
});
}
// Tier 2: confirm: agent prepares; operator approves before execution.
// The orchestrator does NOT block here. It submits the approval request,
// transitions the task to awaiting_human, and returns. A separate background
// worker polls for approved requests and resumes the task.
async function submitConfirmRequest(
task: Task,
action: Action,
approvalChannel: ApprovalChannel,
timeoutMs = 3_600_000 // default: 1 hour; expiry enforced by the worker
): Promise<void> {
const request = await approvalChannel.request({
taskId: task.id,
summary: action.summary,
context: action.context,
expiresAt: new Date(Date.now() + timeoutMs),
});
// Persist the approval request ID so the background worker can resume
// this task when the decision arrives.
await db.tasks.update(task.id, {
status: 'awaiting_human',
metadata: { approvalRequestId: request.id, pendingAction: action.serialize() },
updatedAt: new Date(),
});
// Return immediately. The orchestrator continues processing other tasks.
}
// The background worker calls this when it detects a decision.
async function resumeAfterApproval(
task: Task,
decision: 'approved' | 'rejected' | 'timed_out',
action: Action
): Promise<void> {
if (decision === 'approved') {
await action.execute();
}
await db.tasks.update(task.id, {
status: decision === 'approved' ? 'in_progress' : 'failed',
metadata: { ...task.metadata, approvalDecision: decision },
updatedAt: new Date(),
});
}
// Tier 3: draft: agent produces output; human takes the action manually
// The agent's job ends at the recommendation; nothing executes automatically
async function draftAction(
task: Task,
recommendation: Recommendation,
draftChannel: DraftChannel
): Promise<void> {
await draftChannel.submit({
taskId: task.id,
recommendation,
forHuman: recommendation.assignedTo,
});
await db.tasks.update(task.id, {
status: 'awaiting_human',
metadata: { draftSubmittedAt: new Date() },
});
}
The boundary between tiers is configuration, not code. An admin surface that lets an operator see every action type and its current tier, and change that tier with an audit log entry, is not a nice-to-have. It is how you manage the system safely as confidence grows.
Promoting an action from confirm to auto is a consequential decision. It should require a deliberate configuration change with a named approver, not a comment in a prompt.
The five observability event categories
Observability in an agent system is not the same as logging. Logs capture what happened for engineers. Observability events capture what happened for the orchestrator, so that it can be queried, aggregated, and surfaced to operators in a form they can use. Five categories cover the full lifecycle of a task: task lifecycle, action execution, human interaction, policy decisions, and schema events. All five are required; omitting policy and schema events is the most common gap, and it leaves the operator blind to cost ceiling trips and schema version mismatches:
type OrchestratorEvent =
// Task lifecycle
| { type: 'task.received'; taskId: string; taskType: string; priority: number }
| { type: 'task.classified'; taskId: string; targetAgentId: string; confidence: number }
| { type: 'task.assigned'; taskId: string; agentId: string }
| { type: 'task.completed'; taskId: string; agentId: string; durationMs: number }
| { type: 'task.failed'; taskId: string; agentId?: string; category: FailureCategory;
error: string }
| { type: 'task.escalated'; taskId: string; reason: string }
| { type: 'task.dead_lettered'; taskId: string; reason: string }
// Action execution
| { type: 'action.executed'; taskId: string; agentId: string; tool: string;
durationMs: number; costUsd?: number; idempotencyKey: string }
// Human interaction
| { type: 'human.prompted'; taskId: string; tier: HitlTier; actionSummary: string }
| { type: 'human.responded'; taskId: string; decision: string; latencyMs: number }
// Policy decisions
| { type: 'policy.ceiling_hit'; taskId: string; scope: BudgetScope; currentUsd: number;
ceilingUsd: number }
| { type: 'policy.breaker_tripped'; scope: BudgetScope; id: string }
// Schema events
| { type: 'schema.validated'; taskId: string; schemaId: string; version: string }
| { type: 'schema.rejected'; taskId: string; schemaId: string; version: string;
errors: string[] };
class EventBus {
async emit(event: OrchestratorEvent): Promise<void> {
// Durable first, then notify. Never the reverse.
await this.store.append({
...event,
timestamp: new Date(),
sessionId: this.sessionId,
});
// Downstream subscribers (operator UI, alerting) can fail without data loss
await this.notify(event).catch(err => this.log.error('notify failed', err));
}
}
Track cost at the event level, not the task level. Aggregating cost from events gives you cost per task, per agent, per tool, and per day without a separate reporting pipeline. The costUsd field on action.executed events should be computed from actual token usage at call time, not estimated after the fact.
The failure taxonomy
Not all failures are the same, and treating them the same is how you build a system that either retries forever or gives up too quickly. Three categories, three responses:
type FailureCategory = 'transient' | 'degraded' | 'structural';
class RecoveryHandler {
async handle(task: Task, error: unknown): Promise<void> {
const { category, reason } = this.classify(error);
switch (category) {
case 'transient':
// Rate limits, timeouts, intermittent API failures
// Retry with exponential backoff; dead-letter after MAX_RETRIES
if (task.retryCount < MAX_RETRIES) {
const delayMs = Math.min(1_000 * 2 ** task.retryCount, 30_000);
await this.scheduleRetry(task, delayMs);
} else {
await this.deadLetter(task, 'max_retries_exceeded');
}
break;
case 'degraded':
// Agent returned malformed output; schema validation failed;
// classification produced no viable route
// Human review, not retry: retrying will produce the same result
await this.escalate(task, `degraded: ${reason}`);
break;
case 'structural':
// Missing tool; invalid configuration; unregistered schema
// These are bugs, not runtime conditions: alert immediately
await this.deadLetter(task, `structural: ${reason}`);
await this.alertOnCall({ task, error, reason });
break;
}
}
}
The dead-letter queue is a first-class concern, not a discard bin. Every task that lands in dead_lettered status should be inspectable, retryable manually, and reportable in the operator surface. A growing dead-letter queue is a signal the system is degrading; an operator who cannot see it cannot act on it.
// Dead-lettering is a state transition, not a deletion
async function deadLetter(task: Task, reason: string): Promise<void> {
await db.tasks.update(task.id, {
status: 'dead_lettered',
metadata: {
...task.metadata,
deadLetterReason: reason,
deadLetteredAt: new Date(),
},
updatedAt: new Date(),
});
await bus.emit({ type: 'task.dead_lettered', taskId: task.id, reason });
}
Cost ceilings as circuit breakers
Cost ceilings must be enforced at the orchestrator level, not the agent level. An individual agent cannot see global spend; the orchestrator can. Implement three tiers:
type BudgetScope = 'task' | 'agent_daily' | 'system_monthly';
class BudgetGuard {
// Use an atomic increment: add the estimate to the running total and return
// the new total in one database operation. If the new total exceeds the ceiling,
// throw and let the caller debit the estimate back out. A plain read-then-compare
// is not safe under concurrent task processing: two tasks can each read the same
// current total, both pass the check, and both proceed, breaching the ceiling.
async reserve(scope: BudgetScope, id: string, estimatedCostUsd: number): Promise<void> {
const newTotal = await this.usage.increment(scope, id, estimatedCostUsd);
const ceiling = this.ceilings[scope];
if (newTotal > ceiling) {
// Roll back the reservation before throwing
await this.usage.increment(scope, id, -estimatedCostUsd);
throw new BudgetExceededError(scope, id, newTotal - estimatedCostUsd, ceiling);
}
}
// Called by RecoveryHandler when a BudgetExceededError surfaces
async tripCircuitBreaker(scope: BudgetScope, id: string): Promise<void> {
await this.breakers.open(scope, id);
await this.alertOnCall({ scope, id });
// The orchestrator stops accepting tasks in the affected scope
// until a human resets the breaker
}
}
The estimatedCostUsd parameter reflects an important constraint: the guard runs before the LLM call, so actual cost is not yet known. In practice, estimate from the prompt’s token count using the model’s published input rate, add a conservative multiplier for output tokens, and treat the estimate as a ceiling-check, not an accounting entry. Actual cost from the completed call (read from the API response’s usage field) is then recorded via the action.executed event and used to reconcile the reservation (increment by the actual cost, decrement the estimate). This two-pass approach (reserve before, reconcile after) accepts that the estimate will sometimes be off, which is why the per-task ceiling should include headroom.
Budget ceilings trip in the first weeks of operating any new system. That is not a bug in the ceilings; it is the ceilings doing their job. The right response is to tune the ceiling based on observed behaviour, not to raise it reflexively. Set the task ceiling first, observe the distribution for two weeks, then set the daily and monthly ceilings from data. Guessing initial ceilings is fine; never raising them without reviewing the data is not.
Testing non-deterministic systems
Two testing patterns are worth building from the start:
Record-and-replay captures real LLM interactions and replays them deterministically in regression tests. This gives you fast, stable tests for orchestration logic without hitting the model in CI:
class RecordingLLMClient {
constructor(
private real: LLMClient,
private recorder: InteractionRecorder
) {}
async complete(prompt: Prompt): Promise<LLMResponse> {
const response = await this.real.complete(prompt);
await this.recorder.record({ prompt, response, timestamp: new Date() });
return response;
}
}
class ReplayLLMClient {
async complete(prompt: Prompt): Promise<LLMResponse> {
// Hash the normalised prompt, not the raw object. Strip fields that vary
// per-invocation (timestamps, session IDs, request IDs) before hashing,
// or no recording will ever match a replay. Define a stable normalisation
// function and apply it identically at record time and replay time.
const recorded = await this.recorder.lookup(normalise(prompt));
if (!recorded) throw new Error(`No recording for prompt hash ${hash(normalise(prompt))}`);
return recorded.response;
}
}
The normalisation step is where most record-and-replay implementations fail. If the prompt includes a timestamp, a request ID, or any other per-invocation field, the hash computed at record time will not match the hash computed at replay time, and the replay client will throw for every prompt. Define the normalisation function explicitly, test it, and apply it consistently. Fields that vary per-invocation should either be stripped from the hash input or replaced with stable sentinels.
Contract tests validate handoff schemas independently of agent logic. They are fast, deterministic, and catch regressions from schema changes before they reach production:
describe('customer-enquiry-v1 contract', () => {
const valid = {
customerId: crypto.randomUUID(),
channel: 'email',
intent: 'billing',
sentiment: 'neutral',
enquiryText: 'My invoice looks incorrect.',
};
it('accepts a valid payload', () => {
expect(() => schemaRegistry['customer-enquiry-v1'].parse(valid)).not.toThrow();
});
it('rejects a missing required field', () => {
const { customerId: _, ...rest } = valid;
expect(() => schemaRegistry['customer-enquiry-v1'].parse(rest)).toThrow();
});
it('rejects an invalid enum value', () => {
expect(() =>
schemaRegistry['customer-enquiry-v1'].parse({ ...valid, channel: 'fax' })
).toThrow();
});
});
For the orchestrator’s routing logic, write unit tests against mock agents. The goal is not to test LLM outputs; it is to test the orchestrator’s decisions given a known classifier result:
describe('Orchestrator.route', () => {
it('escalates when classifier confidence is below threshold', async () => {
const classifier = mockClassifier({ confidence: 0.3, targetAgentId: 'agent-a' });
const escalate = jest.fn();
const orch = new Orchestrator({ classifier, escalate });
await orch.route(mockTask({ type: 'customer-enquiry' }));
expect(escalate).toHaveBeenCalledWith(
expect.objectContaining({ id: expect.any(String) }),
'low_classification_confidence'
);
});
it('assigns when confidence meets threshold', async () => {
const classifier = mockClassifier({ confidence: 0.9, targetAgentId: 'agent-a' });
const assign = jest.fn();
const orch = new Orchestrator({ classifier, assign });
await orch.route(mockTask({ type: 'customer-enquiry' }));
expect(assign).toHaveBeenCalledWith(expect.any(Object), 'agent-a');
});
});
The anti-patterns
Four patterns are common, feel natural, and fail in production.
Statefulness inside an agent. An agent that maintains in-process state between invocations becomes a bottleneck and a single point of failure. Agents should be stateless functions of their inputs. All state belongs in the task table, accessible to any agent instance regardless of which process is running it.
LLM calls inside the orchestrator core. The orchestrator must be deterministic, debuggable, and predictable. Every language model call inside the orchestrator introduces a source of nondeterminism in the component that should have none. Routing classifiers, policy evaluators, output synthesisers: all of these are agents the orchestrator dispatches to. The orchestrator core does not make model calls.
Trusting agent self-reporting on action completion. An agent that calls a tool cannot reliably attest that the call succeeded. Tools fail partially. Networks partition. The agent may not surface the failure correctly. The orchestrator’s source of truth on action completion is the event bus, not the agent’s return value. Build this assumption into the design: the orchestrator confirms completion from the event log, not from what the agent says.
Synchronous blocking at scale. A confirm-tier action that blocks the orchestrator’s event loop while awaiting human approval prevents all other work from progressing. Human approval is asynchronous by nature: the orchestrator submits the approval request, transitions the task to awaiting_human, and moves on to other work. A background worker polls for approved requests and resumes the relevant tasks. Blocking is almost always the first implementation and almost always the wrong one.
What this gives you
These patterns together produce a system with a specific set of guarantees: tasks survive process restarts; every action is recorded before it happens; every handoff is typed and validated; humans can approve or decline at any level of sensitivity; cost cannot run without bound; failures are classified and handled proportionately; and the system can be tested without hitting the live model in every test run.
None of this is novel. It is software engineering applied to a substrate that is less deterministic than a database and more expensive than a function call. The teams that succeed with agent systems are the teams that treat them that way from the start, not as a new category requiring new engineering intuitions, but as distributed systems that happen to use language models, carrying all the design obligations that entails.
The system that results from applying these patterns can be operated by a team that did not build it, investigated when it behaves unexpectedly, extended without touching the core, and run for two years without a rewrite. That is the bar. It is achievable on the first build if the build is done with these constraints in mind.