The Orchestrator Pattern
From Single Agent to Multi-Agent Systems
I was deep in a build, trying to prompt-engineer my way out of what turned out to be a system design problem. Then I noticed a file the system had generated in the background. It was doing exactly what I should have been doing: managing boundaries, handling state, and routing inputs.
Companion resource to The Orchestrator.py Epiphany · Deep dive on the architecture from Write in Your Voice with AI
Why Monolithic Agents Break
Most people start with a single, giant prompt that tries to do everything:
"You are a content pipeline. First classify the input, then research the topic, write a draft, critique it, humanize the voice, generate image prompts, and prepare it for publication..."
This collapses at scale. The agent loses context between stages, you can't inject human feedback mid-pipeline, and when something fails at stage 6 you have to restart from scratch. There's no state management, no recovery, no observability.
The fix? An orchestrator that routes work instead of doing it.
A Real Orchestrator
This is an anonymized version of an actual production orchestrator I built. It manages a multi-stage content pipeline where each stage is handled by a specialized agent on a Python backend. The Node.js orchestrator never touches the LLM directly — it manages state, routes work, and handles recovery.
1/**2 * ContentOrchestrator — proxy to a graph-based workflow engine API.3 *4 * Keeps the exact same interface that the API layer expects, but delegates5 * all LLM work to a Python FastAPI service running at WORKFLOW_ENGINE_URL.6 *7 * Translation layer:8 * - startPipeline → POST /pipeline/start (auto-runs classify + research)9 * - runStage → classify/research: read cached file; draft+: POST /pipeline/resume10 * - advanceStage → update local pipeline-state.json only (graph handles progression)11 * - submitFeedback → POST /pipeline/resume with feedback payload12 * - getState → read local pipeline-state.json13 * - saveState → write local pipeline-state.json14 */15 16const fs = require('fs-extra');17const path = require('path');18 19const { fetch: engineFetch, Agent } = require('undici');20const _agent = new Agent({ headersTimeout: 1_200_000, bodyTimeout: 1_200_000 });21 22const WORKFLOW_ENGINE_URL = process.env.WORKFLOW_ENGINE_URL || 'http://localhost:8000';23 24const WORKSPACE = path.join(__dirname, '..');25const CONTENT = path.join(WORKSPACE, 'content');26 27// Stage → output filename28const STAGE_FILES = {29 classify: '01-classified.json',30 focus: '01b-focus-questions.json',31 research: '02-research.md',32 draft: '03-draft.md',33 images: '04-image_prompts.md',34 critique: '05-critique.md',35 humanize: '06-humanized.md',36 publish: '10-meta.json',37};38 39// Stages auto-run by the engine on startPipeline40const AUTO_STAGES = new Set(['classify', 'focus']);41 42class ContentOrchestrator {43 constructor(config = {}) {44 this.workspace = WORKSPACE;45 this.content = CONTENT;46 47 // Content type → ordered stage list48 this.stages = {49 article: ['classify', 'focus', 'research', 'draft', 'critique',50 'humanize', 'images', 'publish'],51 };52 }53 54 // -----------------------------------------------------------------------55 // HTTP helpers56 // -----------------------------------------------------------------------57 58 async _post(endpoint, body) {59 const url = `${WORKFLOW_ENGINE_URL}${endpoint}`;60 const res = await engineFetch(url, {61 method: 'POST',62 headers: { 'Content-Type': 'application/json' },63 body: JSON.stringify(body),64 dispatcher: _agent,65 });66 if (!res.ok) {67 const text = await res.text();68 throw new Error(69 `Engine API error (${res.status}) at ${endpoint}: ${text.slice(0, 400)}`70 );71 }72 return res.json();73 }74 75 async _get(endpoint) {76 const url = `${WORKFLOW_ENGINE_URL}${endpoint}`;77 const res = await engineFetch(url, { dispatcher: _agent });78 if (!res.ok) {79 const text = await res.text();80 throw new Error(81 `Engine API error (${res.status}) at ${endpoint}: ${text.slice(0, 400)}`82 );83 }84 return res.json();85 }86 87 // -----------------------------------------------------------------------88 // File helpers89 // -----------------------------------------------------------------------90 91 draftDir(slug, contentType = 'article') {92 return path.join(this.content, contentType, 'drafts', slug);93 }94 95 async readStageFile(slug, stage, contentType = 'article') {96 const filename = STAGE_FILES[stage];97 if (!filename) return null;98 const filepath = path.join(this.draftDir(slug, contentType), filename);99 if (!await fs.pathExists(filepath)) return null;100 return fs.readFile(filepath, 'utf8');101 }102 103 // -----------------------------------------------------------------------104 // State helpers (pipeline-state.json)105 // -----------------------------------------------------------------------106 107 async saveState(slug, state) {108 const statePath = path.join(109 this.content, state.contentType, 'drafts', slug,110 'pipeline-state.json'111 );112 await fs.ensureDir(path.dirname(statePath));113 await fs.writeJson(statePath, state, { spaces: 2 });114 }115 116 async getState(slug) {117 const contentTypes = Object.keys(this.stages);118 for (const contentType of contentTypes) {119 const statePath = path.join(120 this.content, contentType, 'drafts', slug,121 'pipeline-state.json'122 );123 if (await fs.pathExists(statePath)) {124 return fs.readJson(statePath);125 }126 }127 return null;128 }129 130 _buildState(slug, contentType, currentStage, extra = {}) {131 const stages = this.stages[contentType] || this.stages.article;132 const currentIdx = stages.indexOf(currentStage);133 return {134 slug,135 contentType,136 currentStage,137 status: 'active',138 createdAt: extra.createdAt || new Date().toISOString(),139 lastCompleted: new Date().toISOString(),140 stages,141 completedStages: stages.slice(0, Math.max(0, currentIdx)),142 feedback: extra.feedback || [],143 };144 }145 146 // -----------------------------------------------------------------------147 // Public API148 // -----------------------------------------------------------------------149 150 /**151 * Start a new pipeline run.152 * The engine auto-runs classify + focus, then pauses before research.153 */154 async startPipeline(slug, contentType = 'article', initialIdea = '') {155 console.log(`[orchestrator] startPipeline — slug: ${slug}`);156 157 await this._post('/pipeline/start', {158 slug,159 content_type: contentType,160 raw_input: initialIdea,161 });162 163 const state = this._buildState(slug, contentType, 'focus', {164 completedStages: ['classify'],165 });166 await this.saveState(slug, state);167 return await this.getState(slug);168 }169 170 /**171 * Accept author's answers to focus questions, then auto-run172 * research + draft stages.173 */174 async submitFocusAnswers(slug, focusAnswers) {175 const state = await this.getState(slug);176 if (!state) throw new Error(`Pipeline not found for ${slug}`);177 178 await this._post('/pipeline/submit-focus', {179 slug, focus_answers: focusAnswers180 });181 182 const withFocus = {183 ...state,184 currentStage: 'research',185 completedStages: [186 ...new Set([...state.completedStages, 'classify', 'focus'])187 ],188 lastCompleted: new Date().toISOString(),189 };190 await this.saveState(slug, withFocus);191 192 await this.runStage(slug, 'research');193 await this.runStage(slug, 'draft');194 return await this.getState(slug);195 }196 197 /**198 * Run a specific pipeline stage.199 *200 * - classify / focus: already done by startPipeline → read cached file201 * - draft / critique / humanize / publish: resume the graph engine,202 * which runs that node and pauses before the next one203 */204 async runStage(slug, stage, context = {}) {205 const state = await this.getState(slug);206 if (!state) throw new Error(`Pipeline not found for ${slug}`);207 208 if (AUTO_STAGES.has(stage)) {209 const cached = await this.readStageFile(210 slug, stage, state.contentType211 );212 if (cached) return cached;213 214 await this._post('/pipeline/start', {215 slug,216 content_type: state.contentType,217 raw_input: context.idea || '',218 });219 return this.readStageFile(slug, stage, state.contentType);220 }221 222 // Resume the graph — runs the next node and pauses223 const feedback = context.feedback224 ? (typeof context.feedback === 'string'225 ? context.feedback226 : context.feedback.overall227 || context.feedback.message228 || JSON.stringify(context.feedback))229 : null;230 231 try {232 await this._post('/pipeline/run-node', {233 slug, stage, feedback, model: context.model || null234 });235 } catch (err) {236 // If the engine lost state (e.g. after restart),237 // re-seed checkpoint from existing files and retry238 if (err.message.includes('404')239 || err.message.toLowerCase().includes('no pipeline state')) {240 await this._post('/pipeline/seed', {241 slug, content_type: state.contentType242 });243 await this._post('/pipeline/run-node', {244 slug, stage, feedback, model: context.model || null245 });246 } else {247 throw err;248 }249 }250 251 const output = await this.readStageFile(252 slug, stage, state.contentType253 );254 255 // Advance local state256 const stages = this.stages[state.contentType] || this.stages.article;257 const nextIdx = stages.indexOf(stage) + 1;258 const nextStage = stages[nextIdx] || stage;259 const updatedState = {260 ...state,261 currentStage: nextStage,262 lastCompleted: new Date().toISOString(),263 completedStages: [...new Set([...state.completedStages, stage])],264 };265 await this.saveState(slug, updatedState);266 return output;267 }268 269 /**270 * Advance currentStage without running it.271 * Supports rewinding to a specific stage.272 */273 async advanceStage(slug, fromStage = null) {274 const state = await this.getState(slug);275 if (!state) throw new Error(`Pipeline not found for ${slug}`);276 277 const stages = state.stages278 || this.stages[state.contentType]279 || this.stages.article;280 281 if (fromStage && stages.includes(fromStage)) {282 const fromIdx = stages.indexOf(fromStage);283 state.currentStage = fromStage;284 state.completedStages = stages.slice(0, fromIdx);285 state.status = 'active';286 await this.saveState(slug, state);287 return state;288 }289 290 const currentIdx = stages.indexOf(state.currentStage);291 if (currentIdx === stages.length - 1) {292 state.completedStages = [293 ...new Set([...state.completedStages, state.currentStage])294 ];295 state.status = 'completed';296 await this.saveState(slug, state);297 return state;298 }299 300 const nextStage = stages[currentIdx + 1];301 state.completedStages = [302 ...new Set([...state.completedStages, state.currentStage])303 ];304 state.currentStage = nextStage;305 await this.saveState(slug, state);306 return state;307 }308 309 /**310 * Submit user feedback and re-run the current stage.311 */312 async submitFeedback(slug, feedback) {313 const state = await this.getState(slug);314 if (!state) throw new Error(`Pipeline not found for ${slug}`);315 316 const feedbackText = typeof feedback === 'string'317 ? feedback318 : feedback.overall || feedback.message319 || JSON.stringify(feedback);320 321 state.feedback = [322 ...(state.feedback || []),323 { ...feedback, timestamp: new Date().toISOString() }324 ];325 await this.saveState(slug, state);326 327 await this.runStage(slug, state.currentStage, {328 feedback: feedbackText329 });330 return state;331 }332 333 /**334 * Health check for the workflow engine.335 */336 async healthCheck() {337 try {338 const res = await this._get('/health');339 return res.status === 'ok';340 } catch {341 return false;342 }343 }344}345 346module.exports = ContentOrchestrator;Anatomy of the Pattern
Stage Map
The orchestrator defines ordered sequences of stages per content type. An article goes through classify → focus → research → draft → critique → humanize → images → publish. The orchestrator knows the order but not the implementation of each stage. (If this pipeline looks familiar, it's the same architecture from the Write in Your Voice guide — this is the code that runs it.)
State as Source of Truth
A pipeline-state.json file tracks everything: current stage, completed stages, feedback history, timestamps. This makes the pipeline observable (you can always see where you are) and recoverable (you can restart from the last checkpoint).
Proxy, Not Processor
The orchestrator never calls an LLM directly. It sends HTTP requests to a backend engine that runs specialized agents for each stage. This separation means you can swap models per stage, scale agents independently, and test each piece in isolation.
Feedback Loops & Recovery
Humans can inject feedback at any stage and re-run it. The system can rewind to a previous stage. If the backend engine loses state after a restart, the orchestrator re-seeds the checkpoint from existing file artifacts — no work is lost.
Prompt: Decompose a Complex Agent
Paste this prompt into your LLM along with your monolithic agent code. It will analyze your agent and propose a clean decomposition into specialized sub-agents with a dependency graph.
You are an expert software architect specializing in AI agent systems. I need help decomposing a complex, monolithic agent into smaller, specialized sub-agents.
## Your Task
Analyze the agent code or description I provide and:
1. **Identify Responsibility Boundaries**
- Map every distinct capability the agent handles
- Group related capabilities into logical domains
- Flag any capability that mixes concerns (e.g., data fetching + decision-making + output formatting in one function)
2. **Propose Sub-Agent Decomposition**
For each proposed sub-agent, specify:
- **Name**: A clear, role-based name (e.g., "ClassifierAgent", "ResearchAgent")
- **Single Responsibility**: One sentence describing what this agent owns
- **Inputs**: What data/context it needs
- **Outputs**: What it produces
- **Dependencies**: Which other sub-agents it needs to run before/after
3. **Identify the State Contract**
- What shared state do the sub-agents need?
- What's the minimal data each agent needs to do its job?
- Where are the natural "pause points" where a human should review?
4. **Draw the Dependency Graph**
- Which agents can run in parallel?
- Which must be sequential?
- Where are the feedback loops (agent A's output might send you back to agent B)?
## Output Format
```
## Decomposition Map
### Sub-Agent: [Name]
- Responsibility: [one sentence]
- Inputs: [list]
- Outputs: [list]
- Dependencies: [list of other sub-agents]
- Can run in parallel with: [list or "none"]
### Dependency Graph
[ASCII or mermaid diagram showing flow]
### Shared State Schema
[Minimal state object all agents read/write]
### Human Review Points
[Where the pipeline should pause for human input]
```
## Here is my agent to decompose:
[PASTE YOUR AGENT CODE OR DESCRIPTION HERE]Prompt: Convert Single→Multi-Agent Router
Have a router that handles everything in one place? This prompt helps your LLM redesign it as a multi-agent orchestrator with proper state management, stage maps, and recovery patterns.
You are an expert at refactoring AI systems from single-agent architectures to multi-agent orchestration patterns. I have a complex router (single agent that handles everything) and I need to convert it into a multi-agent system with a central orchestrator.
## Your Task
Given the single-agent router code I provide:
1. **Analyze the Current Router**
- Identify every decision branch and routing path
- Map the implicit state machine (what states exist, what triggers transitions)
- Find where the agent is doing work vs. routing work
2. **Design the Orchestrator Layer**
Create an orchestrator that:
- **Owns the stage map**: Defines the ordered sequence of stages for each content/task type
- **Manages state**: Reads/writes a state file tracking current stage, completed stages, and feedback history
- **Routes, doesn't execute**: Delegates actual work to specialized agents/services via HTTP or function calls
- **Handles recovery**: Can re-seed state from existing outputs if the system restarts mid-pipeline
- **Supports feedback loops**: Accepts human feedback and re-runs the current stage with it
3. **Define the Agent Interfaces**
For each specialized agent the orchestrator will call:
- Input contract (what the orchestrator sends)
- Output contract (what the agent returns)
- Error handling (what happens when an agent fails)
4. **Implement State Management**
Design a state file (e.g., `pipeline-state.json`) that tracks:
```json
{
"slug": "unique-run-id",
"contentType": "blog",
"currentStage": "draft",
"status": "active",
"stages": ["classify", "research", "draft", "critique", "publish"],
"completedStages": ["classify", "research"],
"feedback": [],
"createdAt": "ISO-timestamp",
"lastCompleted": "ISO-timestamp"
}
```
5. **Handle Edge Cases**
- Stage rewinding (go back to a previous stage)
- Auto-run stages (some stages don't need human approval)
- Checkpoint recovery (engine restarts, state is lost)
- Feedback injection (human edits mid-pipeline)
## Key Principles
- **The orchestrator is a proxy, not a processor.** It routes work, manages state, and handles errors. It never does the LLM work itself.
- **Each stage produces a file artifact.** This makes the pipeline observable and recoverable.
- **State is the source of truth.** The state file knows what's done and what's next.
- **Human-in-the-loop by default.** The pipeline pauses between stages unless stages are marked as auto-run.
## Output Format
Provide:
1. The orchestrator class/module code
2. The state management schema
3. A diagram of the agent communication flow
4. Example of how to add a new stage to the pipeline
## Here is my single-agent router to convert:
[PASTE YOUR ROUTER CODE HERE]Try This Week
Look at your current AI setup. If you have one giant prompt doing everything, copy the decomposition prompt above, paste in your agent, and see what comes back. The jump from "one prompt to rule them all" to an orchestrated pipeline is the same jump from scripting to software engineering.
These prompts work with any modern LLM — Claude, Gemini, etc.. The orchestrator pattern itself is language-agnostic. Build it in Python, Node, Go, whatever your team uses.