Workflow Engine Architecture
How Bootspring orchestrates multi-phase workflows.
Overview#
The workflow engine manages complex, multi-phase processes that coordinate agents, track progress, and handle failures.
Workflow Architecture#
┌─────────────────────────────────────────────────────────────────────┐
│ WORKFLOW ENGINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Workflow Definitions │ │
│ │ feature-dev | security-audit | database-migration | ... │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼──────────────────────────────────┐ │
│ │ Orchestrator │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Phase │ │ Agent │ │ Signal │ │ Rollback │ │ │
│ │ │ Manager │ │Coordinator│ │ Checker │ │ Handler │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼──────────────────────────────────┐ │
│ │ Execution State │ │
│ │ Persistence | Progress Tracking | History │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Workflow Definition#
Definition Structure#
1interface WorkflowDefinition {
2 // Identity
3 name: string;
4 displayName: string;
5 description: string;
6 category: WorkflowCategory;
7 tier: Tier;
8
9 // Configuration
10 phases: Phase[];
11 config: WorkflowConfig;
12
13 // Metadata
14 estimatedDuration: string;
15 tags: string[];
16}
17
18interface Phase {
19 name: string;
20 description: string;
21
22 // Execution
23 agents: string[];
24 tasks: Task[];
25 parallel: boolean;
26
27 // Control
28 critical: boolean;
29 timeout?: number;
30 retries?: number;
31
32 // Completion
33 completionSignals: CompletionSignal[];
34
35 // Recovery
36 rollbackStrategy?: RollbackStrategy;
37}
38
39interface Task {
40 id: string;
41 name: string;
42 description: string;
43 agent?: string;
44 action: TaskAction;
45 inputs: Record<string, unknown>;
46 outputs: string[];
47}Example Definition#
1# workflows/feature-development.yaml
2
3name: feature-development
4displayName: Feature Development
5description: End-to-end feature implementation workflow
6category: development
7tier: free
8
9estimatedDuration: 3-5 days
10tags: [development, feature, full-stack]
11
12config:
13 allowParallel: true
14 maxConcurrentPhases: 2
15 autoProgress: false
16
17phases:
18 - name: design
19 description: Design the feature architecture
20 agents: [ui-ux-expert, api-expert]
21 parallel: false
22 critical: true
23
24 tasks:
25 - id: design-ui
26 name: Design UI/UX
27 agent: ui-ux-expert
28 action: generate
29 inputs:
30 type: wireframe
31 requirements: "{{feature.requirements}}"
32 outputs: [ui-design]
33
34 - id: design-api
35 name: Design API
36 agent: api-expert
37 action: generate
38 inputs:
39 type: api-spec
40 requirements: "{{feature.requirements}}"
41 outputs: [api-spec]
42
43 completionSignals:
44 - type: manual
45 description: Design approved by stakeholder
46
47 - name: implementation
48 description: Implement frontend and backend
49 agents: [frontend-expert, backend-expert]
50 parallel: true
51 critical: true
52
53 tasks:
54 - id: impl-frontend
55 name: Implement Frontend
56 agent: frontend-expert
57 action: generate
58 inputs:
59 design: "{{design.ui-design}}"
60
61 - id: impl-backend
62 name: Implement Backend
63 agent: backend-expert
64 action: generate
65 inputs:
66 spec: "{{design.api-spec}}"
67
68 completionSignals:
69 - type: file_exists
70 paths: ["app/**/*.tsx", "app/api/**/*.ts"]
71 - type: test_passes
72 command: npm test
73
74 rollbackStrategy:
75 type: git_revert
76 scope: phase
77
78 - name: testing
79 description: Test the implementation
80 agents: [testing-expert]
81 parallel: false
82 critical: true
83
84 tasks:
85 - id: write-tests
86 name: Write Tests
87 agent: testing-expert
88 action: generate
89 inputs:
90 coverage: 80
91
92 completionSignals:
93 - type: test_passes
94 command: npm test -- --coverage
95
96 - name: review
97 description: Code review and security check
98 agents: [code-review-expert, security-expert]
99 parallel: true
100 critical: false
101
102 completionSignals:
103 - type: manual
104 description: Code review approvedOrchestrator#
Orchestrator Implementation#
1// core/workflows/orchestrator.ts
2
3export class WorkflowOrchestrator {
4 private registry: WorkflowRegistry;
5 private phaseManager: PhaseManager;
6 private agentCoordinator: AgentCoordinator;
7 private signalChecker: SignalChecker;
8 private stateManager: StateManager;
9
10 async start(
11 workflowName: string,
12 options: StartOptions
13 ): Promise<WorkflowExecution> {
14 // Load workflow definition
15 const workflow = this.registry.get(workflowName);
16 if (!workflow) {
17 throw new WorkflowNotFoundError(workflowName);
18 }
19
20 // Create execution
21 const execution = new WorkflowExecution({
22 id: generateId(),
23 workflow: workflow.name,
24 status: 'running',
25 startedAt: new Date(),
26 phases: workflow.phases.map(p => ({
27 name: p.name,
28 status: 'pending',
29 })),
30 });
31
32 // Save initial state
33 await this.stateManager.save(execution);
34
35 // Start execution
36 this.executeAsync(execution, workflow, options);
37
38 return execution;
39 }
40
41 private async executeAsync(
42 execution: WorkflowExecution,
43 workflow: WorkflowDefinition,
44 options: StartOptions
45 ): Promise<void> {
46 try {
47 for (const phase of workflow.phases) {
48 // Check if we can start this phase
49 const canStart = await this.canStartPhase(execution, phase);
50 if (!canStart) {
51 if (phase.critical) {
52 throw new PhaseBlockedError(phase.name);
53 }
54 continue;
55 }
56
57 // Execute phase
58 await this.executePhase(execution, phase, options);
59 }
60
61 // Mark workflow complete
62 execution.status = 'completed';
63 execution.completedAt = new Date();
64 } catch (error) {
65 execution.status = 'failed';
66 execution.error = error.message;
67
68 // Attempt rollback
69 await this.handleFailure(execution, workflow);
70 }
71
72 await this.stateManager.save(execution);
73 }
74
75 private async executePhase(
76 execution: WorkflowExecution,
77 phase: Phase,
78 options: StartOptions
79 ): Promise<void> {
80 const phaseExecution = execution.phases.find(p => p.name === phase.name)!;
81 phaseExecution.status = 'running';
82 phaseExecution.startedAt = new Date();
83
84 await this.stateManager.save(execution);
85
86 try {
87 // Execute tasks
88 if (phase.parallel) {
89 await this.executeTasksParallel(phase.tasks, execution);
90 } else {
91 await this.executeTasksSequential(phase.tasks, execution);
92 }
93
94 // Check completion signals
95 const completed = await this.checkCompletionSignals(phase);
96 if (!completed) {
97 throw new SignalsNotMetError(phase.name);
98 }
99
100 phaseExecution.status = 'completed';
101 phaseExecution.completedAt = new Date();
102 } catch (error) {
103 phaseExecution.status = 'failed';
104 phaseExecution.error = error.message;
105
106 if (phase.critical) {
107 throw error;
108 }
109 }
110 }
111}Phase Management#
Phase Manager#
1// core/workflows/phases.ts
2
3export class PhaseManager {
4 canStart(execution: WorkflowExecution, phase: Phase): boolean {
5 // Check dependencies
6 if (phase.dependsOn) {
7 for (const dep of phase.dependsOn) {
8 const depPhase = execution.phases.find(p => p.name === dep);
9 if (depPhase?.status !== 'completed') {
10 return false;
11 }
12 }
13 }
14
15 // Check inputs available
16 for (const task of phase.tasks) {
17 if (!this.inputsAvailable(task, execution)) {
18 return false;
19 }
20 }
21
22 return true;
23 }
24
25 getNextPhase(execution: WorkflowExecution): Phase | null {
26 for (const phaseExec of execution.phases) {
27 if (phaseExec.status === 'pending') {
28 const phase = this.getPhaseDefinition(phaseExec.name);
29 if (this.canStart(execution, phase)) {
30 return phase;
31 }
32 }
33 }
34 return null;
35 }
36}Agent Coordination#
Coordinator Implementation#
1// core/workflows/agent-coordinator.ts
2
3export class AgentCoordinator {
4 private invoker: AgentInvoker;
5 private contextSharer: ContextSharer;
6
7 async coordinate(
8 agents: string[],
9 task: Task,
10 context: WorkflowContext
11 ): Promise<CoordinationResult> {
12 const results: AgentResult[] = [];
13
14 // Build shared context
15 const sharedContext = await this.contextSharer.build(context);
16
17 for (const agentName of agents) {
18 // Add previous results to context
19 const agentContext = {
20 ...sharedContext,
21 previousResults: results,
22 };
23
24 // Invoke agent
25 const result = await this.invoker.invoke(agentName, task.description, {
26 context: agentContext,
27 task: task.id,
28 });
29
30 results.push({
31 agent: agentName,
32 task: task.id,
33 response: result,
34 });
35 }
36
37 return {
38 task: task.id,
39 results,
40 synthesized: this.synthesize(results),
41 };
42 }
43
44 private synthesize(results: AgentResult[]): string {
45 // Combine agent outputs intelligently
46 return results
47 .map(r => `## ${r.agent}\n${r.response.content}`)
48 .join('\n\n');
49 }
50}Completion Signals#
Signal Checker#
1// core/workflows/signals.ts
2
3export class SignalChecker {
4 private checkers: Map<SignalType, SignalHandler> = new Map([
5 ['file_exists', new FileExistsChecker()],
6 ['test_passes', new TestPassesChecker()],
7 ['manual', new ManualChecker()],
8 ['api_response', new ApiResponseChecker()],
9 ]);
10
11 async check(signals: CompletionSignal[]): Promise<boolean> {
12 for (const signal of signals) {
13 const checker = this.checkers.get(signal.type);
14 if (!checker) {
15 throw new UnknownSignalTypeError(signal.type);
16 }
17
18 const passed = await checker.check(signal);
19 if (!passed) {
20 return false;
21 }
22 }
23 return true;
24 }
25}
26
27// Signal handlers
28class FileExistsChecker implements SignalHandler {
29 async check(signal: CompletionSignal): Promise<boolean> {
30 const paths = signal.paths as string[];
31 for (const pattern of paths) {
32 const matches = await glob(pattern);
33 if (matches.length === 0) {
34 return false;
35 }
36 }
37 return true;
38 }
39}
40
41class TestPassesChecker implements SignalHandler {
42 async check(signal: CompletionSignal): Promise<boolean> {
43 const command = signal.command as string;
44 try {
45 await exec(command);
46 return true;
47 } catch {
48 return false;
49 }
50 }
51}Rollback Handling#
Rollback Manager#
1// core/workflows/rollback.ts
2
3export class RollbackManager {
4 async rollback(
5 execution: WorkflowExecution,
6 phase: Phase
7 ): Promise<RollbackResult> {
8 const strategy = phase.rollbackStrategy;
9 if (!strategy) {
10 return { success: false, reason: 'No rollback strategy defined' };
11 }
12
13 switch (strategy.type) {
14 case 'git_revert':
15 return this.gitRevert(execution, strategy);
16
17 case 'restore_backup':
18 return this.restoreBackup(execution, strategy);
19
20 case 'manual':
21 return this.requestManualRollback(execution, strategy);
22
23 default:
24 return { success: false, reason: 'Unknown rollback type' };
25 }
26 }
27
28 private async gitRevert(
29 execution: WorkflowExecution,
30 strategy: RollbackStrategy
31 ): Promise<RollbackResult> {
32 const checkpoint = execution.checkpoints.find(
33 c => c.phase === strategy.scope
34 );
35
36 if (!checkpoint) {
37 return { success: false, reason: 'No checkpoint found' };
38 }
39
40 await exec(`git revert --no-commit ${checkpoint.commit}..HEAD`);
41
42 return {
43 success: true,
44 revertedTo: checkpoint.commit,
45 };
46 }
47}State Persistence#
State Manager#
1// core/workflows/state.ts
2
3export class StateManager {
4 private storage: StateStorage;
5
6 async save(execution: WorkflowExecution): Promise<void> {
7 const key = `workflow:${execution.id}`;
8 await this.storage.set(key, execution);
9 }
10
11 async load(executionId: string): Promise<WorkflowExecution | null> {
12 const key = `workflow:${executionId}`;
13 return this.storage.get(key);
14 }
15
16 async list(filter?: ExecutionFilter): Promise<WorkflowExecution[]> {
17 const pattern = 'workflow:*';
18 const keys = await this.storage.keys(pattern);
19
20 const executions = await Promise.all(
21 keys.map(k => this.storage.get(k))
22 );
23
24 return this.applyFilter(executions, filter);
25 }
26}Parallel Execution#
Parallel Phases#
1// core/workflows/parallel.ts
2
3export class ParallelExecutor {
4 async execute(
5 phases: Phase[],
6 execution: WorkflowExecution,
7 maxConcurrent: number
8 ): Promise<PhaseResult[]> {
9 const results: PhaseResult[] = [];
10 const running: Promise<PhaseResult>[] = [];
11
12 for (const phase of phases) {
13 // Wait if at max concurrency
14 if (running.length >= maxConcurrent) {
15 const completed = await Promise.race(running);
16 results.push(completed);
17 running.splice(running.indexOf(completed), 1);
18 }
19
20 // Start phase
21 const promise = this.executePhase(phase, execution);
22 running.push(promise);
23 }
24
25 // Wait for remaining
26 const remaining = await Promise.all(running);
27 results.push(...remaining);
28
29 return results;
30 }
31}