Workflow Engine Architecture

How Bootspring orchestrates multi-phase workflows.

Overview#

The workflow engine manages complex, multi-phase processes that coordinate agents, track progress, and handle failures.

Workflow Architecture#

┌─────────────────────────────────────────────────────────────────────┐ │ WORKFLOW ENGINE │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ Workflow Definitions │ │ │ │ feature-dev | security-audit | database-migration | ... │ │ │ └──────────────────────────┬──────────────────────────────────┘ │ │ │ │ │ ┌──────────────────────────▼──────────────────────────────────┐ │ │ │ Orchestrator │ │ │ │ │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │ Phase │ │ Agent │ │ Signal │ │ Rollback │ │ │ │ │ │ Manager │ │Coordinator│ │ Checker │ │ Handler │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌──────────────────────────▼──────────────────────────────────┐ │ │ │ Execution State │ │ │ │ Persistence | Progress Tracking | History │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘

Workflow Definition#

Definition Structure#

1interface WorkflowDefinition { 2 // Identity 3 name: string; 4 displayName: string; 5 description: string; 6 category: WorkflowCategory; 7 tier: Tier; 8 9 // Configuration 10 phases: Phase[]; 11 config: WorkflowConfig; 12 13 // Metadata 14 estimatedDuration: string; 15 tags: string[]; 16} 17 18interface Phase { 19 name: string; 20 description: string; 21 22 // Execution 23 agents: string[]; 24 tasks: Task[]; 25 parallel: boolean; 26 27 // Control 28 critical: boolean; 29 timeout?: number; 30 retries?: number; 31 32 // Completion 33 completionSignals: CompletionSignal[]; 34 35 // Recovery 36 rollbackStrategy?: RollbackStrategy; 37} 38 39interface Task { 40 id: string; 41 name: string; 42 description: string; 43 agent?: string; 44 action: TaskAction; 45 inputs: Record<string, unknown>; 46 outputs: string[]; 47}

Example Definition#

1# workflows/feature-development.yaml 2 3name: feature-development 4displayName: Feature Development 5description: End-to-end feature implementation workflow 6category: development 7tier: free 8 9estimatedDuration: 3-5 days 10tags: [development, feature, full-stack] 11 12config: 13 allowParallel: true 14 maxConcurrentPhases: 2 15 autoProgress: false 16 17phases: 18 - name: design 19 description: Design the feature architecture 20 agents: [ui-ux-expert, api-expert] 21 parallel: false 22 critical: true 23 24 tasks: 25 - id: design-ui 26 name: Design UI/UX 27 agent: ui-ux-expert 28 action: generate 29 inputs: 30 type: wireframe 31 requirements: "{{feature.requirements}}" 32 outputs: [ui-design] 33 34 - id: design-api 35 name: Design API 36 agent: api-expert 37 action: generate 38 inputs: 39 type: api-spec 40 requirements: "{{feature.requirements}}" 41 outputs: [api-spec] 42 43 completionSignals: 44 - type: manual 45 description: Design approved by stakeholder 46 47 - name: implementation 48 description: Implement frontend and backend 49 agents: [frontend-expert, backend-expert] 50 parallel: true 51 critical: true 52 53 tasks: 54 - id: impl-frontend 55 name: Implement Frontend 56 agent: frontend-expert 57 action: generate 58 inputs: 59 design: "{{design.ui-design}}" 60 61 - id: impl-backend 62 name: Implement Backend 63 agent: backend-expert 64 action: generate 65 inputs: 66 spec: "{{design.api-spec}}" 67 68 completionSignals: 69 - type: file_exists 70 paths: ["app/**/*.tsx", "app/api/**/*.ts"] 71 - type: test_passes 72 command: npm test 73 74 rollbackStrategy: 75 type: git_revert 76 scope: phase 77 78 - name: testing 79 description: Test the implementation 80 agents: [testing-expert] 81 parallel: false 82 critical: true 83 84 tasks: 85 - id: write-tests 86 name: Write Tests 87 agent: testing-expert 88 action: generate 89 inputs: 90 coverage: 80 91 92 completionSignals: 93 - type: test_passes 94 command: npm test -- --coverage 95 96 - name: review 97 description: Code review and security check 98 agents: [code-review-expert, security-expert] 99 parallel: true 100 critical: false 101 102 completionSignals: 103 - type: manual 104 description: Code review approved

Orchestrator#

Orchestrator Implementation#

1// core/workflows/orchestrator.ts 2 3export class WorkflowOrchestrator { 4 private registry: WorkflowRegistry; 5 private phaseManager: PhaseManager; 6 private agentCoordinator: AgentCoordinator; 7 private signalChecker: SignalChecker; 8 private stateManager: StateManager; 9 10 async start( 11 workflowName: string, 12 options: StartOptions 13 ): Promise<WorkflowExecution> { 14 // Load workflow definition 15 const workflow = this.registry.get(workflowName); 16 if (!workflow) { 17 throw new WorkflowNotFoundError(workflowName); 18 } 19 20 // Create execution 21 const execution = new WorkflowExecution({ 22 id: generateId(), 23 workflow: workflow.name, 24 status: 'running', 25 startedAt: new Date(), 26 phases: workflow.phases.map(p => ({ 27 name: p.name, 28 status: 'pending', 29 })), 30 }); 31 32 // Save initial state 33 await this.stateManager.save(execution); 34 35 // Start execution 36 this.executeAsync(execution, workflow, options); 37 38 return execution; 39 } 40 41 private async executeAsync( 42 execution: WorkflowExecution, 43 workflow: WorkflowDefinition, 44 options: StartOptions 45 ): Promise<void> { 46 try { 47 for (const phase of workflow.phases) { 48 // Check if we can start this phase 49 const canStart = await this.canStartPhase(execution, phase); 50 if (!canStart) { 51 if (phase.critical) { 52 throw new PhaseBlockedError(phase.name); 53 } 54 continue; 55 } 56 57 // Execute phase 58 await this.executePhase(execution, phase, options); 59 } 60 61 // Mark workflow complete 62 execution.status = 'completed'; 63 execution.completedAt = new Date(); 64 } catch (error) { 65 execution.status = 'failed'; 66 execution.error = error.message; 67 68 // Attempt rollback 69 await this.handleFailure(execution, workflow); 70 } 71 72 await this.stateManager.save(execution); 73 } 74 75 private async executePhase( 76 execution: WorkflowExecution, 77 phase: Phase, 78 options: StartOptions 79 ): Promise<void> { 80 const phaseExecution = execution.phases.find(p => p.name === phase.name)!; 81 phaseExecution.status = 'running'; 82 phaseExecution.startedAt = new Date(); 83 84 await this.stateManager.save(execution); 85 86 try { 87 // Execute tasks 88 if (phase.parallel) { 89 await this.executeTasksParallel(phase.tasks, execution); 90 } else { 91 await this.executeTasksSequential(phase.tasks, execution); 92 } 93 94 // Check completion signals 95 const completed = await this.checkCompletionSignals(phase); 96 if (!completed) { 97 throw new SignalsNotMetError(phase.name); 98 } 99 100 phaseExecution.status = 'completed'; 101 phaseExecution.completedAt = new Date(); 102 } catch (error) { 103 phaseExecution.status = 'failed'; 104 phaseExecution.error = error.message; 105 106 if (phase.critical) { 107 throw error; 108 } 109 } 110 } 111}

Phase Management#

Phase Manager#

1// core/workflows/phases.ts 2 3export class PhaseManager { 4 canStart(execution: WorkflowExecution, phase: Phase): boolean { 5 // Check dependencies 6 if (phase.dependsOn) { 7 for (const dep of phase.dependsOn) { 8 const depPhase = execution.phases.find(p => p.name === dep); 9 if (depPhase?.status !== 'completed') { 10 return false; 11 } 12 } 13 } 14 15 // Check inputs available 16 for (const task of phase.tasks) { 17 if (!this.inputsAvailable(task, execution)) { 18 return false; 19 } 20 } 21 22 return true; 23 } 24 25 getNextPhase(execution: WorkflowExecution): Phase | null { 26 for (const phaseExec of execution.phases) { 27 if (phaseExec.status === 'pending') { 28 const phase = this.getPhaseDefinition(phaseExec.name); 29 if (this.canStart(execution, phase)) { 30 return phase; 31 } 32 } 33 } 34 return null; 35 } 36}

Agent Coordination#

Coordinator Implementation#

1// core/workflows/agent-coordinator.ts 2 3export class AgentCoordinator { 4 private invoker: AgentInvoker; 5 private contextSharer: ContextSharer; 6 7 async coordinate( 8 agents: string[], 9 task: Task, 10 context: WorkflowContext 11 ): Promise<CoordinationResult> { 12 const results: AgentResult[] = []; 13 14 // Build shared context 15 const sharedContext = await this.contextSharer.build(context); 16 17 for (const agentName of agents) { 18 // Add previous results to context 19 const agentContext = { 20 ...sharedContext, 21 previousResults: results, 22 }; 23 24 // Invoke agent 25 const result = await this.invoker.invoke(agentName, task.description, { 26 context: agentContext, 27 task: task.id, 28 }); 29 30 results.push({ 31 agent: agentName, 32 task: task.id, 33 response: result, 34 }); 35 } 36 37 return { 38 task: task.id, 39 results, 40 synthesized: this.synthesize(results), 41 }; 42 } 43 44 private synthesize(results: AgentResult[]): string { 45 // Combine agent outputs intelligently 46 return results 47 .map(r => `## ${r.agent}\n${r.response.content}`) 48 .join('\n\n'); 49 } 50}

Completion Signals#

Signal Checker#

1// core/workflows/signals.ts 2 3export class SignalChecker { 4 private checkers: Map<SignalType, SignalHandler> = new Map([ 5 ['file_exists', new FileExistsChecker()], 6 ['test_passes', new TestPassesChecker()], 7 ['manual', new ManualChecker()], 8 ['api_response', new ApiResponseChecker()], 9 ]); 10 11 async check(signals: CompletionSignal[]): Promise<boolean> { 12 for (const signal of signals) { 13 const checker = this.checkers.get(signal.type); 14 if (!checker) { 15 throw new UnknownSignalTypeError(signal.type); 16 } 17 18 const passed = await checker.check(signal); 19 if (!passed) { 20 return false; 21 } 22 } 23 return true; 24 } 25} 26 27// Signal handlers 28class FileExistsChecker implements SignalHandler { 29 async check(signal: CompletionSignal): Promise<boolean> { 30 const paths = signal.paths as string[]; 31 for (const pattern of paths) { 32 const matches = await glob(pattern); 33 if (matches.length === 0) { 34 return false; 35 } 36 } 37 return true; 38 } 39} 40 41class TestPassesChecker implements SignalHandler { 42 async check(signal: CompletionSignal): Promise<boolean> { 43 const command = signal.command as string; 44 try { 45 await exec(command); 46 return true; 47 } catch { 48 return false; 49 } 50 } 51}

Rollback Handling#

Rollback Manager#

1// core/workflows/rollback.ts 2 3export class RollbackManager { 4 async rollback( 5 execution: WorkflowExecution, 6 phase: Phase 7 ): Promise<RollbackResult> { 8 const strategy = phase.rollbackStrategy; 9 if (!strategy) { 10 return { success: false, reason: 'No rollback strategy defined' }; 11 } 12 13 switch (strategy.type) { 14 case 'git_revert': 15 return this.gitRevert(execution, strategy); 16 17 case 'restore_backup': 18 return this.restoreBackup(execution, strategy); 19 20 case 'manual': 21 return this.requestManualRollback(execution, strategy); 22 23 default: 24 return { success: false, reason: 'Unknown rollback type' }; 25 } 26 } 27 28 private async gitRevert( 29 execution: WorkflowExecution, 30 strategy: RollbackStrategy 31 ): Promise<RollbackResult> { 32 const checkpoint = execution.checkpoints.find( 33 c => c.phase === strategy.scope 34 ); 35 36 if (!checkpoint) { 37 return { success: false, reason: 'No checkpoint found' }; 38 } 39 40 await exec(`git revert --no-commit ${checkpoint.commit}..HEAD`); 41 42 return { 43 success: true, 44 revertedTo: checkpoint.commit, 45 }; 46 } 47}

State Persistence#

State Manager#

1// core/workflows/state.ts 2 3export class StateManager { 4 private storage: StateStorage; 5 6 async save(execution: WorkflowExecution): Promise<void> { 7 const key = `workflow:${execution.id}`; 8 await this.storage.set(key, execution); 9 } 10 11 async load(executionId: string): Promise<WorkflowExecution | null> { 12 const key = `workflow:${executionId}`; 13 return this.storage.get(key); 14 } 15 16 async list(filter?: ExecutionFilter): Promise<WorkflowExecution[]> { 17 const pattern = 'workflow:*'; 18 const keys = await this.storage.keys(pattern); 19 20 const executions = await Promise.all( 21 keys.map(k => this.storage.get(k)) 22 ); 23 24 return this.applyFilter(executions, filter); 25 } 26}

Parallel Execution#

Parallel Phases#

1// core/workflows/parallel.ts 2 3export class ParallelExecutor { 4 async execute( 5 phases: Phase[], 6 execution: WorkflowExecution, 7 maxConcurrent: number 8 ): Promise<PhaseResult[]> { 9 const results: PhaseResult[] = []; 10 const running: Promise<PhaseResult>[] = []; 11 12 for (const phase of phases) { 13 // Wait if at max concurrency 14 if (running.length >= maxConcurrent) { 15 const completed = await Promise.race(running); 16 results.push(completed); 17 running.splice(running.indexOf(completed), 1); 18 } 19 20 // Start phase 21 const promise = this.executePhase(phase, execution); 22 running.push(promise); 23 } 24 25 // Wait for remaining 26 const remaining = await Promise.all(running); 27 results.push(...remaining); 28 29 return results; 30 } 31}