Back to Blog
CQRSArchitectureEvent SourcingDDD

CQRS Implementation Guide

Implement Command Query Responsibility Segregation. From separate models to event sourcing to eventual consistency.

B
Bootspring Team
Engineering
March 12, 2023
6 min read

Command Query Responsibility Segregation separates read and write operations into different models. This enables independent scaling, optimization, and complexity management.

CQRS Basics#

Traditional CRUD: - Single model for read/write - Same database for all operations - Trade-offs between read and write optimization CQRS: - Separate models for commands (writes) and queries (reads) - Optimized data stores for each - Can scale independently

Simple CQRS Structure#

1// Commands - write operations 2interface CreateOrderCommand { 3 type: 'CreateOrder'; 4 customerId: string; 5 items: OrderItem[]; 6} 7 8interface CancelOrderCommand { 9 type: 'CancelOrder'; 10 orderId: string; 11 reason: string; 12} 13 14type Command = CreateOrderCommand | CancelOrderCommand; 15 16// Command handler 17class OrderCommandHandler { 18 constructor( 19 private repository: OrderRepository, 20 private eventBus: EventBus 21 ) {} 22 23 async handle(command: Command): Promise<void> { 24 switch (command.type) { 25 case 'CreateOrder': 26 await this.createOrder(command); 27 break; 28 case 'CancelOrder': 29 await this.cancelOrder(command); 30 break; 31 } 32 } 33 34 private async createOrder(cmd: CreateOrderCommand): Promise<void> { 35 const order = Order.create(cmd.customerId, cmd.items); 36 await this.repository.save(order); 37 38 await this.eventBus.publish({ 39 type: 'OrderCreated', 40 data: { 41 orderId: order.id, 42 customerId: order.customerId, 43 items: order.items, 44 total: order.total, 45 }, 46 }); 47 } 48 49 private async cancelOrder(cmd: CancelOrderCommand): Promise<void> { 50 const order = await this.repository.get(cmd.orderId); 51 order.cancel(cmd.reason); 52 await this.repository.save(order); 53 54 await this.eventBus.publish({ 55 type: 'OrderCancelled', 56 data: { 57 orderId: order.id, 58 reason: cmd.reason, 59 }, 60 }); 61 } 62}

Query Side#

1// Queries - read operations 2interface GetOrderQuery { 3 orderId: string; 4} 5 6interface ListOrdersQuery { 7 customerId: string; 8 status?: string; 9 limit?: number; 10 cursor?: string; 11} 12 13// Read model - optimized for queries 14interface OrderReadModel { 15 id: string; 16 customerName: string; 17 customerEmail: string; 18 items: { 19 productName: string; 20 quantity: number; 21 price: number; 22 }[]; 23 total: number; 24 status: string; 25 createdAt: Date; 26} 27 28// Query handler 29class OrderQueryHandler { 30 constructor(private readDb: ReadDatabase) {} 31 32 async getOrder(query: GetOrderQuery): Promise<OrderReadModel | null> { 33 return this.readDb.orders.findUnique({ 34 where: { id: query.orderId }, 35 }); 36 } 37 38 async listOrders(query: ListOrdersQuery): Promise<PaginatedResult<OrderReadModel>> { 39 const orders = await this.readDb.orders.findMany({ 40 where: { 41 customerId: query.customerId, 42 status: query.status, 43 }, 44 take: query.limit || 20, 45 cursor: query.cursor ? { id: query.cursor } : undefined, 46 orderBy: { createdAt: 'desc' }, 47 }); 48 49 return { 50 data: orders, 51 hasMore: orders.length === (query.limit || 20), 52 nextCursor: orders[orders.length - 1]?.id, 53 }; 54 } 55}

Projections (Updating Read Models)#

1// Event handler that updates read models 2class OrderProjection { 3 constructor(private readDb: ReadDatabase) {} 4 5 async handle(event: DomainEvent): Promise<void> { 6 switch (event.type) { 7 case 'OrderCreated': 8 await this.onOrderCreated(event.data); 9 break; 10 case 'OrderCancelled': 11 await this.onOrderCancelled(event.data); 12 break; 13 case 'OrderShipped': 14 await this.onOrderShipped(event.data); 15 break; 16 } 17 } 18 19 private async onOrderCreated(data: OrderCreatedEvent): Promise<void> { 20 // Fetch additional data for denormalization 21 const customer = await this.customerService.get(data.customerId); 22 const products = await this.productService.getMany( 23 data.items.map((i) => i.productId) 24 ); 25 26 await this.readDb.orders.create({ 27 data: { 28 id: data.orderId, 29 customerId: data.customerId, 30 customerName: customer.name, 31 customerEmail: customer.email, 32 items: data.items.map((item) => ({ 33 productId: item.productId, 34 productName: products.find((p) => p.id === item.productId)!.name, 35 quantity: item.quantity, 36 price: item.price, 37 })), 38 total: data.total, 39 status: 'pending', 40 createdAt: new Date(), 41 }, 42 }); 43 } 44 45 private async onOrderCancelled(data: OrderCancelledEvent): Promise<void> { 46 await this.readDb.orders.update({ 47 where: { id: data.orderId }, 48 data: { 49 status: 'cancelled', 50 cancelledAt: new Date(), 51 cancellationReason: data.reason, 52 }, 53 }); 54 } 55 56 private async onOrderShipped(data: OrderShippedEvent): Promise<void> { 57 await this.readDb.orders.update({ 58 where: { id: data.orderId }, 59 data: { 60 status: 'shipped', 61 shippedAt: new Date(), 62 trackingNumber: data.trackingNumber, 63 }, 64 }); 65 } 66}

Event Sourcing Integration#

1// Event store 2interface Event { 3 id: string; 4 aggregateId: string; 5 type: string; 6 data: any; 7 version: number; 8 timestamp: Date; 9} 10 11class EventStore { 12 constructor(private db: Database) {} 13 14 async save(aggregateId: string, events: Event[], expectedVersion: number): Promise<void> { 15 const currentVersion = await this.getVersion(aggregateId); 16 17 if (currentVersion !== expectedVersion) { 18 throw new ConcurrencyError( 19 `Expected version ${expectedVersion}, found ${currentVersion}` 20 ); 21 } 22 23 await this.db.transaction(async (tx) => { 24 for (let i = 0; i < events.length; i++) { 25 await tx.events.create({ 26 data: { 27 ...events[i], 28 version: expectedVersion + i + 1, 29 }, 30 }); 31 } 32 }); 33 } 34 35 async getEvents(aggregateId: string, fromVersion = 0): Promise<Event[]> { 36 return this.db.events.findMany({ 37 where: { 38 aggregateId, 39 version: { gt: fromVersion }, 40 }, 41 orderBy: { version: 'asc' }, 42 }); 43 } 44 45 async getVersion(aggregateId: string): Promise<number> { 46 const lastEvent = await this.db.events.findFirst({ 47 where: { aggregateId }, 48 orderBy: { version: 'desc' }, 49 }); 50 return lastEvent?.version ?? 0; 51 } 52} 53 54// Rebuild aggregate from events 55class OrderAggregate { 56 private state: OrderState; 57 private version: number = 0; 58 private uncommittedEvents: Event[] = []; 59 60 static async load(eventStore: EventStore, id: string): Promise<OrderAggregate> { 61 const events = await eventStore.getEvents(id); 62 const aggregate = new OrderAggregate(); 63 64 for (const event of events) { 65 aggregate.apply(event); 66 aggregate.version = event.version; 67 } 68 69 return aggregate; 70 } 71 72 private apply(event: Event): void { 73 switch (event.type) { 74 case 'OrderCreated': 75 this.state = { 76 id: event.data.orderId, 77 status: 'pending', 78 items: event.data.items, 79 }; 80 break; 81 case 'OrderCancelled': 82 this.state.status = 'cancelled'; 83 break; 84 } 85 } 86 87 cancel(reason: string): void { 88 if (this.state.status !== 'pending') { 89 throw new Error('Can only cancel pending orders'); 90 } 91 92 const event: Event = { 93 id: crypto.randomUUID(), 94 aggregateId: this.state.id, 95 type: 'OrderCancelled', 96 data: { reason }, 97 version: this.version + this.uncommittedEvents.length + 1, 98 timestamp: new Date(), 99 }; 100 101 this.uncommittedEvents.push(event); 102 this.apply(event); 103 } 104 105 getUncommittedEvents(): Event[] { 106 return this.uncommittedEvents; 107 } 108 109 clearUncommittedEvents(): void { 110 this.uncommittedEvents = []; 111 } 112}

API Layer#

1// Express routes with CQRS 2class OrderController { 3 constructor( 4 private commandHandler: OrderCommandHandler, 5 private queryHandler: OrderQueryHandler 6 ) {} 7 8 // Commands 9 async createOrder(req: Request, res: Response): Promise<void> { 10 await this.commandHandler.handle({ 11 type: 'CreateOrder', 12 customerId: req.user.id, 13 items: req.body.items, 14 }); 15 16 res.status(202).json({ message: 'Order creation initiated' }); 17 } 18 19 async cancelOrder(req: Request, res: Response): Promise<void> { 20 await this.commandHandler.handle({ 21 type: 'CancelOrder', 22 orderId: req.params.id, 23 reason: req.body.reason, 24 }); 25 26 res.status(202).json({ message: 'Order cancellation initiated' }); 27 } 28 29 // Queries 30 async getOrder(req: Request, res: Response): Promise<void> { 31 const order = await this.queryHandler.getOrder({ 32 orderId: req.params.id, 33 }); 34 35 if (!order) { 36 res.status(404).json({ error: 'Order not found' }); 37 return; 38 } 39 40 res.json(order); 41 } 42 43 async listOrders(req: Request, res: Response): Promise<void> { 44 const orders = await this.queryHandler.listOrders({ 45 customerId: req.user.id, 46 status: req.query.status as string, 47 limit: parseInt(req.query.limit as string) || 20, 48 cursor: req.query.cursor as string, 49 }); 50 51 res.json(orders); 52 } 53}

Eventual Consistency#

1// Handle eventual consistency in UI 2class OrderService { 3 async createOrder(items: OrderItem[]): Promise<{ orderId: string }> { 4 // Send command 5 const result = await api.post('/orders', { items }); 6 const orderId = result.orderId; 7 8 // Poll for read model update 9 const order = await this.waitForOrder(orderId); 10 return order; 11 } 12 13 private async waitForOrder(orderId: string, maxAttempts = 10): Promise<Order> { 14 for (let i = 0; i < maxAttempts; i++) { 15 try { 16 const order = await api.get(`/orders/${orderId}`); 17 return order; 18 } catch (error) { 19 if (i === maxAttempts - 1) throw error; 20 await sleep(100 * Math.pow(2, i)); 21 } 22 } 23 throw new Error('Order not found after creation'); 24 } 25} 26 27// Or use WebSocket for real-time updates 28socket.on('OrderCreated', (order) => { 29 addOrderToUI(order); 30});

Best Practices#

When to Use CQRS: ✓ Complex domain logic ✓ Different read/write scaling needs ✓ Multiple read model representations ✓ Audit/history requirements When to Avoid: ✗ Simple CRUD applications ✗ Small teams/projects ✗ Real-time consistency required Implementation: ✓ Start simple, add complexity as needed ✓ Handle eventual consistency in UI ✓ Monitor projection lag ✓ Plan for replay scenarios

Conclusion#

CQRS separates concerns and enables independent optimization of reads and writes. Start with simple command/query separation, add event sourcing if you need audit trails, and carefully handle eventual consistency. The complexity is justified when you have different scaling needs or complex domain logic.

Share this article

Help spread the word about Bootspring