Free Resource

The Orchestrator Pattern

From Single Agent to Multi-Agent Systems

I was deep in a build, trying to prompt-engineer my way out of what turned out to be a system design problem. Then I noticed a file the system had generated in the background. It was doing exactly what I should have been doing: managing boundaries, handling state, and routing inputs.

Companion resource to The Orchestrator.py Epiphany · Deep dive on the architecture from Write in Your Voice with AI

Why Monolithic Agents Break

Most people start with a single, giant prompt that tries to do everything:

"You are a content pipeline. First classify the input, then research the topic, write a draft, critique it, humanize the voice, generate image prompts, and prepare it for publication..."

This collapses at scale. The agent loses context between stages, you can't inject human feedback mid-pipeline, and when something fails at stage 6 you have to restart from scratch. There's no state management, no recovery, no observability.

The fix? An orchestrator that routes work instead of doing it.

A Real Orchestrator

This is an anonymized version of an actual production orchestrator I built. It manages a multi-stage content pipeline where each stage is handled by a specialized agent on a Python backend. The Node.js orchestrator never touches the LLM directly — it manages state, routes work, and handles recovery.

content-orchestrator.js
1/**
2 * ContentOrchestrator — proxy to a graph-based workflow engine API.
3 *
4 * Keeps the exact same interface that the API layer expects, but delegates
5 * all LLM work to a Python FastAPI service running at WORKFLOW_ENGINE_URL.
6 *
7 * Translation layer:
8 * - startPipeline → POST /pipeline/start (auto-runs classify + research)
9 * - runStage → classify/research: read cached file; draft+: POST /pipeline/resume
10 * - advanceStage → update local pipeline-state.json only (graph handles progression)
11 * - submitFeedback → POST /pipeline/resume with feedback payload
12 * - getState → read local pipeline-state.json
13 * - saveState → write local pipeline-state.json
14 */
15
16const fs = require('fs-extra');
17const path = require('path');
18
19const { fetch: engineFetch, Agent } = require('undici');
20const _agent = new Agent({ headersTimeout: 1_200_000, bodyTimeout: 1_200_000 });
21
22const WORKFLOW_ENGINE_URL = process.env.WORKFLOW_ENGINE_URL || 'http://localhost:8000';
23
24const WORKSPACE = path.join(__dirname, '..');
25const CONTENT = path.join(WORKSPACE, 'content');
26
27// Stage → output filename
28const STAGE_FILES = {
29 classify: '01-classified.json',
30 focus: '01b-focus-questions.json',
31 research: '02-research.md',
32 draft: '03-draft.md',
33 images: '04-image_prompts.md',
34 critique: '05-critique.md',
35 humanize: '06-humanized.md',
36 publish: '10-meta.json',
37};
38
39// Stages auto-run by the engine on startPipeline
40const AUTO_STAGES = new Set(['classify', 'focus']);
41
42class ContentOrchestrator {
43 constructor(config = {}) {
44 this.workspace = WORKSPACE;
45 this.content = CONTENT;
46
47 // Content type → ordered stage list
48 this.stages = {
49 article: ['classify', 'focus', 'research', 'draft', 'critique',
50 'humanize', 'images', 'publish'],
51 };
52 }
53
54 // -----------------------------------------------------------------------
55 // HTTP helpers
56 // -----------------------------------------------------------------------
57
58 async _post(endpoint, body) {
59 const url = `${WORKFLOW_ENGINE_URL}${endpoint}`;
60 const res = await engineFetch(url, {
61 method: 'POST',
62 headers: { 'Content-Type': 'application/json' },
63 body: JSON.stringify(body),
64 dispatcher: _agent,
65 });
66 if (!res.ok) {
67 const text = await res.text();
68 throw new Error(
69 `Engine API error (${res.status}) at ${endpoint}: ${text.slice(0, 400)}`
70 );
71 }
72 return res.json();
73 }
74
75 async _get(endpoint) {
76 const url = `${WORKFLOW_ENGINE_URL}${endpoint}`;
77 const res = await engineFetch(url, { dispatcher: _agent });
78 if (!res.ok) {
79 const text = await res.text();
80 throw new Error(
81 `Engine API error (${res.status}) at ${endpoint}: ${text.slice(0, 400)}`
82 );
83 }
84 return res.json();
85 }
86
87 // -----------------------------------------------------------------------
88 // File helpers
89 // -----------------------------------------------------------------------
90
91 draftDir(slug, contentType = 'article') {
92 return path.join(this.content, contentType, 'drafts', slug);
93 }
94
95 async readStageFile(slug, stage, contentType = 'article') {
96 const filename = STAGE_FILES[stage];
97 if (!filename) return null;
98 const filepath = path.join(this.draftDir(slug, contentType), filename);
99 if (!await fs.pathExists(filepath)) return null;
100 return fs.readFile(filepath, 'utf8');
101 }
102
103 // -----------------------------------------------------------------------
104 // State helpers (pipeline-state.json)
105 // -----------------------------------------------------------------------
106
107 async saveState(slug, state) {
108 const statePath = path.join(
109 this.content, state.contentType, 'drafts', slug,
110 'pipeline-state.json'
111 );
112 await fs.ensureDir(path.dirname(statePath));
113 await fs.writeJson(statePath, state, { spaces: 2 });
114 }
115
116 async getState(slug) {
117 const contentTypes = Object.keys(this.stages);
118 for (const contentType of contentTypes) {
119 const statePath = path.join(
120 this.content, contentType, 'drafts', slug,
121 'pipeline-state.json'
122 );
123 if (await fs.pathExists(statePath)) {
124 return fs.readJson(statePath);
125 }
126 }
127 return null;
128 }
129
130 _buildState(slug, contentType, currentStage, extra = {}) {
131 const stages = this.stages[contentType] || this.stages.article;
132 const currentIdx = stages.indexOf(currentStage);
133 return {
134 slug,
135 contentType,
136 currentStage,
137 status: 'active',
138 createdAt: extra.createdAt || new Date().toISOString(),
139 lastCompleted: new Date().toISOString(),
140 stages,
141 completedStages: stages.slice(0, Math.max(0, currentIdx)),
142 feedback: extra.feedback || [],
143 };
144 }
145
146 // -----------------------------------------------------------------------
147 // Public API
148 // -----------------------------------------------------------------------
149
150 /**
151 * Start a new pipeline run.
152 * The engine auto-runs classify + focus, then pauses before research.
153 */
154 async startPipeline(slug, contentType = 'article', initialIdea = '') {
155 console.log(`[orchestrator] startPipeline — slug: ${slug}`);
156
157 await this._post('/pipeline/start', {
158 slug,
159 content_type: contentType,
160 raw_input: initialIdea,
161 });
162
163 const state = this._buildState(slug, contentType, 'focus', {
164 completedStages: ['classify'],
165 });
166 await this.saveState(slug, state);
167 return await this.getState(slug);
168 }
169
170 /**
171 * Accept author's answers to focus questions, then auto-run
172 * research + draft stages.
173 */
174 async submitFocusAnswers(slug, focusAnswers) {
175 const state = await this.getState(slug);
176 if (!state) throw new Error(`Pipeline not found for ${slug}`);
177
178 await this._post('/pipeline/submit-focus', {
179 slug, focus_answers: focusAnswers
180 });
181
182 const withFocus = {
183 ...state,
184 currentStage: 'research',
185 completedStages: [
186 ...new Set([...state.completedStages, 'classify', 'focus'])
187 ],
188 lastCompleted: new Date().toISOString(),
189 };
190 await this.saveState(slug, withFocus);
191
192 await this.runStage(slug, 'research');
193 await this.runStage(slug, 'draft');
194 return await this.getState(slug);
195 }
196
197 /**
198 * Run a specific pipeline stage.
199 *
200 * - classify / focus: already done by startPipeline → read cached file
201 * - draft / critique / humanize / publish: resume the graph engine,
202 * which runs that node and pauses before the next one
203 */
204 async runStage(slug, stage, context = {}) {
205 const state = await this.getState(slug);
206 if (!state) throw new Error(`Pipeline not found for ${slug}`);
207
208 if (AUTO_STAGES.has(stage)) {
209 const cached = await this.readStageFile(
210 slug, stage, state.contentType
211 );
212 if (cached) return cached;
213
214 await this._post('/pipeline/start', {
215 slug,
216 content_type: state.contentType,
217 raw_input: context.idea || '',
218 });
219 return this.readStageFile(slug, stage, state.contentType);
220 }
221
222 // Resume the graph — runs the next node and pauses
223 const feedback = context.feedback
224 ? (typeof context.feedback === 'string'
225 ? context.feedback
226 : context.feedback.overall
227 || context.feedback.message
228 || JSON.stringify(context.feedback))
229 : null;
230
231 try {
232 await this._post('/pipeline/run-node', {
233 slug, stage, feedback, model: context.model || null
234 });
235 } catch (err) {
236 // If the engine lost state (e.g. after restart),
237 // re-seed checkpoint from existing files and retry
238 if (err.message.includes('404')
239 || err.message.toLowerCase().includes('no pipeline state')) {
240 await this._post('/pipeline/seed', {
241 slug, content_type: state.contentType
242 });
243 await this._post('/pipeline/run-node', {
244 slug, stage, feedback, model: context.model || null
245 });
246 } else {
247 throw err;
248 }
249 }
250
251 const output = await this.readStageFile(
252 slug, stage, state.contentType
253 );
254
255 // Advance local state
256 const stages = this.stages[state.contentType] || this.stages.article;
257 const nextIdx = stages.indexOf(stage) + 1;
258 const nextStage = stages[nextIdx] || stage;
259 const updatedState = {
260 ...state,
261 currentStage: nextStage,
262 lastCompleted: new Date().toISOString(),
263 completedStages: [...new Set([...state.completedStages, stage])],
264 };
265 await this.saveState(slug, updatedState);
266 return output;
267 }
268
269 /**
270 * Advance currentStage without running it.
271 * Supports rewinding to a specific stage.
272 */
273 async advanceStage(slug, fromStage = null) {
274 const state = await this.getState(slug);
275 if (!state) throw new Error(`Pipeline not found for ${slug}`);
276
277 const stages = state.stages
278 || this.stages[state.contentType]
279 || this.stages.article;
280
281 if (fromStage && stages.includes(fromStage)) {
282 const fromIdx = stages.indexOf(fromStage);
283 state.currentStage = fromStage;
284 state.completedStages = stages.slice(0, fromIdx);
285 state.status = 'active';
286 await this.saveState(slug, state);
287 return state;
288 }
289
290 const currentIdx = stages.indexOf(state.currentStage);
291 if (currentIdx === stages.length - 1) {
292 state.completedStages = [
293 ...new Set([...state.completedStages, state.currentStage])
294 ];
295 state.status = 'completed';
296 await this.saveState(slug, state);
297 return state;
298 }
299
300 const nextStage = stages[currentIdx + 1];
301 state.completedStages = [
302 ...new Set([...state.completedStages, state.currentStage])
303 ];
304 state.currentStage = nextStage;
305 await this.saveState(slug, state);
306 return state;
307 }
308
309 /**
310 * Submit user feedback and re-run the current stage.
311 */
312 async submitFeedback(slug, feedback) {
313 const state = await this.getState(slug);
314 if (!state) throw new Error(`Pipeline not found for ${slug}`);
315
316 const feedbackText = typeof feedback === 'string'
317 ? feedback
318 : feedback.overall || feedback.message
319 || JSON.stringify(feedback);
320
321 state.feedback = [
322 ...(state.feedback || []),
323 { ...feedback, timestamp: new Date().toISOString() }
324 ];
325 await this.saveState(slug, state);
326
327 await this.runStage(slug, state.currentStage, {
328 feedback: feedbackText
329 });
330 return state;
331 }
332
333 /**
334 * Health check for the workflow engine.
335 */
336 async healthCheck() {
337 try {
338 const res = await this._get('/health');
339 return res.status === 'ok';
340 } catch {
341 return false;
342 }
343 }
344}
345
346module.exports = ContentOrchestrator;

Anatomy of the Pattern

1

Stage Map

The orchestrator defines ordered sequences of stages per content type. An article goes through classify → focus → research → draft → critique → humanize → images → publish. The orchestrator knows the order but not the implementation of each stage. (If this pipeline looks familiar, it's the same architecture from the Write in Your Voice guide — this is the code that runs it.)

2

State as Source of Truth

A pipeline-state.json file tracks everything: current stage, completed stages, feedback history, timestamps. This makes the pipeline observable (you can always see where you are) and recoverable (you can restart from the last checkpoint).

3

Proxy, Not Processor

The orchestrator never calls an LLM directly. It sends HTTP requests to a backend engine that runs specialized agents for each stage. This separation means you can swap models per stage, scale agents independently, and test each piece in isolation.

4

Feedback Loops & Recovery

Humans can inject feedback at any stage and re-run it. The system can rewind to a previous stage. If the backend engine loses state after a restart, the orchestrator re-seeds the checkpoint from existing file artifacts — no work is lost.

Prompt: Decompose a Complex Agent

Paste this prompt into your LLM along with your monolithic agent code. It will analyze your agent and propose a clean decomposition into specialized sub-agents with a dependency graph.

decompose-agent-prompt.md
You are an expert software architect specializing in AI agent systems. I need help decomposing a complex, monolithic agent into smaller, specialized sub-agents.

## Your Task

Analyze the agent code or description I provide and:

1. **Identify Responsibility Boundaries**
   - Map every distinct capability the agent handles
   - Group related capabilities into logical domains
   - Flag any capability that mixes concerns (e.g., data fetching + decision-making + output formatting in one function)

2. **Propose Sub-Agent Decomposition**
   For each proposed sub-agent, specify:
   - **Name**: A clear, role-based name (e.g., "ClassifierAgent", "ResearchAgent")
   - **Single Responsibility**: One sentence describing what this agent owns
   - **Inputs**: What data/context it needs
   - **Outputs**: What it produces
   - **Dependencies**: Which other sub-agents it needs to run before/after

3. **Identify the State Contract**
   - What shared state do the sub-agents need?
   - What's the minimal data each agent needs to do its job?
   - Where are the natural "pause points" where a human should review?

4. **Draw the Dependency Graph**
   - Which agents can run in parallel?
   - Which must be sequential?
   - Where are the feedback loops (agent A's output might send you back to agent B)?

## Output Format

```
## Decomposition Map

### Sub-Agent: [Name]
- Responsibility: [one sentence]
- Inputs: [list]
- Outputs: [list]
- Dependencies: [list of other sub-agents]
- Can run in parallel with: [list or "none"]

### Dependency Graph
[ASCII or mermaid diagram showing flow]

### Shared State Schema
[Minimal state object all agents read/write]

### Human Review Points
[Where the pipeline should pause for human input]
```

## Here is my agent to decompose:

[PASTE YOUR AGENT CODE OR DESCRIPTION HERE]

Prompt: Convert Single→Multi-Agent Router

Have a router that handles everything in one place? This prompt helps your LLM redesign it as a multi-agent orchestrator with proper state management, stage maps, and recovery patterns.

multi-agent-router-prompt.md
You are an expert at refactoring AI systems from single-agent architectures to multi-agent orchestration patterns. I have a complex router (single agent that handles everything) and I need to convert it into a multi-agent system with a central orchestrator.

## Your Task

Given the single-agent router code I provide:

1. **Analyze the Current Router**
   - Identify every decision branch and routing path
   - Map the implicit state machine (what states exist, what triggers transitions)
   - Find where the agent is doing work vs. routing work

2. **Design the Orchestrator Layer**
   Create an orchestrator that:
   - **Owns the stage map**: Defines the ordered sequence of stages for each content/task type
   - **Manages state**: Reads/writes a state file tracking current stage, completed stages, and feedback history
   - **Routes, doesn't execute**: Delegates actual work to specialized agents/services via HTTP or function calls
   - **Handles recovery**: Can re-seed state from existing outputs if the system restarts mid-pipeline
   - **Supports feedback loops**: Accepts human feedback and re-runs the current stage with it

3. **Define the Agent Interfaces**
   For each specialized agent the orchestrator will call:
   - Input contract (what the orchestrator sends)
   - Output contract (what the agent returns)
   - Error handling (what happens when an agent fails)

4. **Implement State Management**
   Design a state file (e.g., `pipeline-state.json`) that tracks:
   ```json
   {
     "slug": "unique-run-id",
     "contentType": "blog",
     "currentStage": "draft",
     "status": "active",
     "stages": ["classify", "research", "draft", "critique", "publish"],
     "completedStages": ["classify", "research"],
     "feedback": [],
     "createdAt": "ISO-timestamp",
     "lastCompleted": "ISO-timestamp"
   }
   ```

5. **Handle Edge Cases**
   - Stage rewinding (go back to a previous stage)
   - Auto-run stages (some stages don't need human approval)
   - Checkpoint recovery (engine restarts, state is lost)
   - Feedback injection (human edits mid-pipeline)

## Key Principles

- **The orchestrator is a proxy, not a processor.** It routes work, manages state, and handles errors. It never does the LLM work itself.
- **Each stage produces a file artifact.** This makes the pipeline observable and recoverable.
- **State is the source of truth.** The state file knows what's done and what's next.
- **Human-in-the-loop by default.** The pipeline pauses between stages unless stages are marked as auto-run.

## Output Format

Provide:
1. The orchestrator class/module code
2. The state management schema
3. A diagram of the agent communication flow
4. Example of how to add a new stage to the pipeline

## Here is my single-agent router to convert:

[PASTE YOUR ROUTER CODE HERE]

Try This Week

Look at your current AI setup. If you have one giant prompt doing everything, copy the decomposition prompt above, paste in your agent, and see what comes back. The jump from "one prompt to rule them all" to an orchestrated pipeline is the same jump from scripting to software engineering.

These prompts work with any modern LLM — Claude, Gemini, etc.. The orchestrator pattern itself is language-agnostic. Build it in Python, Node, Go, whatever your team uses.

Continue Your Journey