Command Query Responsibility Segregation separates read and write operations into different models. This enables independent scaling, optimization, and complexity management.
CQRS Basics#
Traditional CRUD:
- Single model for read/write
- Same database for all operations
- Trade-offs between read and write optimization
CQRS:
- Separate models for commands (writes) and queries (reads)
- Optimized data stores for each
- Can scale independently
Simple CQRS Structure#
1// Commands - write operations
2interface CreateOrderCommand {
3 type: 'CreateOrder';
4 customerId: string;
5 items: OrderItem[];
6}
7
8interface CancelOrderCommand {
9 type: 'CancelOrder';
10 orderId: string;
11 reason: string;
12}
13
14type Command = CreateOrderCommand | CancelOrderCommand;
15
16// Command handler
17class OrderCommandHandler {
18 constructor(
19 private repository: OrderRepository,
20 private eventBus: EventBus
21 ) {}
22
23 async handle(command: Command): Promise<void> {
24 switch (command.type) {
25 case 'CreateOrder':
26 await this.createOrder(command);
27 break;
28 case 'CancelOrder':
29 await this.cancelOrder(command);
30 break;
31 }
32 }
33
34 private async createOrder(cmd: CreateOrderCommand): Promise<void> {
35 const order = Order.create(cmd.customerId, cmd.items);
36 await this.repository.save(order);
37
38 await this.eventBus.publish({
39 type: 'OrderCreated',
40 data: {
41 orderId: order.id,
42 customerId: order.customerId,
43 items: order.items,
44 total: order.total,
45 },
46 });
47 }
48
49 private async cancelOrder(cmd: CancelOrderCommand): Promise<void> {
50 const order = await this.repository.get(cmd.orderId);
51 order.cancel(cmd.reason);
52 await this.repository.save(order);
53
54 await this.eventBus.publish({
55 type: 'OrderCancelled',
56 data: {
57 orderId: order.id,
58 reason: cmd.reason,
59 },
60 });
61 }
62}Query Side#
1// Queries - read operations
2interface GetOrderQuery {
3 orderId: string;
4}
5
6interface ListOrdersQuery {
7 customerId: string;
8 status?: string;
9 limit?: number;
10 cursor?: string;
11}
12
13// Read model - optimized for queries
14interface OrderReadModel {
15 id: string;
16 customerName: string;
17 customerEmail: string;
18 items: {
19 productName: string;
20 quantity: number;
21 price: number;
22 }[];
23 total: number;
24 status: string;
25 createdAt: Date;
26}
27
28// Query handler
29class OrderQueryHandler {
30 constructor(private readDb: ReadDatabase) {}
31
32 async getOrder(query: GetOrderQuery): Promise<OrderReadModel | null> {
33 return this.readDb.orders.findUnique({
34 where: { id: query.orderId },
35 });
36 }
37
38 async listOrders(query: ListOrdersQuery): Promise<PaginatedResult<OrderReadModel>> {
39 const orders = await this.readDb.orders.findMany({
40 where: {
41 customerId: query.customerId,
42 status: query.status,
43 },
44 take: query.limit || 20,
45 cursor: query.cursor ? { id: query.cursor } : undefined,
46 orderBy: { createdAt: 'desc' },
47 });
48
49 return {
50 data: orders,
51 hasMore: orders.length === (query.limit || 20),
52 nextCursor: orders[orders.length - 1]?.id,
53 };
54 }
55}Projections (Updating Read Models)#
1// Event handler that updates read models
2class OrderProjection {
3 constructor(private readDb: ReadDatabase) {}
4
5 async handle(event: DomainEvent): Promise<void> {
6 switch (event.type) {
7 case 'OrderCreated':
8 await this.onOrderCreated(event.data);
9 break;
10 case 'OrderCancelled':
11 await this.onOrderCancelled(event.data);
12 break;
13 case 'OrderShipped':
14 await this.onOrderShipped(event.data);
15 break;
16 }
17 }
18
19 private async onOrderCreated(data: OrderCreatedEvent): Promise<void> {
20 // Fetch additional data for denormalization
21 const customer = await this.customerService.get(data.customerId);
22 const products = await this.productService.getMany(
23 data.items.map((i) => i.productId)
24 );
25
26 await this.readDb.orders.create({
27 data: {
28 id: data.orderId,
29 customerId: data.customerId,
30 customerName: customer.name,
31 customerEmail: customer.email,
32 items: data.items.map((item) => ({
33 productId: item.productId,
34 productName: products.find((p) => p.id === item.productId)!.name,
35 quantity: item.quantity,
36 price: item.price,
37 })),
38 total: data.total,
39 status: 'pending',
40 createdAt: new Date(),
41 },
42 });
43 }
44
45 private async onOrderCancelled(data: OrderCancelledEvent): Promise<void> {
46 await this.readDb.orders.update({
47 where: { id: data.orderId },
48 data: {
49 status: 'cancelled',
50 cancelledAt: new Date(),
51 cancellationReason: data.reason,
52 },
53 });
54 }
55
56 private async onOrderShipped(data: OrderShippedEvent): Promise<void> {
57 await this.readDb.orders.update({
58 where: { id: data.orderId },
59 data: {
60 status: 'shipped',
61 shippedAt: new Date(),
62 trackingNumber: data.trackingNumber,
63 },
64 });
65 }
66}Event Sourcing Integration#
1// Event store
2interface Event {
3 id: string;
4 aggregateId: string;
5 type: string;
6 data: any;
7 version: number;
8 timestamp: Date;
9}
10
11class EventStore {
12 constructor(private db: Database) {}
13
14 async save(aggregateId: string, events: Event[], expectedVersion: number): Promise<void> {
15 const currentVersion = await this.getVersion(aggregateId);
16
17 if (currentVersion !== expectedVersion) {
18 throw new ConcurrencyError(
19 `Expected version ${expectedVersion}, found ${currentVersion}`
20 );
21 }
22
23 await this.db.transaction(async (tx) => {
24 for (let i = 0; i < events.length; i++) {
25 await tx.events.create({
26 data: {
27 ...events[i],
28 version: expectedVersion + i + 1,
29 },
30 });
31 }
32 });
33 }
34
35 async getEvents(aggregateId: string, fromVersion = 0): Promise<Event[]> {
36 return this.db.events.findMany({
37 where: {
38 aggregateId,
39 version: { gt: fromVersion },
40 },
41 orderBy: { version: 'asc' },
42 });
43 }
44
45 async getVersion(aggregateId: string): Promise<number> {
46 const lastEvent = await this.db.events.findFirst({
47 where: { aggregateId },
48 orderBy: { version: 'desc' },
49 });
50 return lastEvent?.version ?? 0;
51 }
52}
53
54// Rebuild aggregate from events
55class OrderAggregate {
56 private state: OrderState;
57 private version: number = 0;
58 private uncommittedEvents: Event[] = [];
59
60 static async load(eventStore: EventStore, id: string): Promise<OrderAggregate> {
61 const events = await eventStore.getEvents(id);
62 const aggregate = new OrderAggregate();
63
64 for (const event of events) {
65 aggregate.apply(event);
66 aggregate.version = event.version;
67 }
68
69 return aggregate;
70 }
71
72 private apply(event: Event): void {
73 switch (event.type) {
74 case 'OrderCreated':
75 this.state = {
76 id: event.data.orderId,
77 status: 'pending',
78 items: event.data.items,
79 };
80 break;
81 case 'OrderCancelled':
82 this.state.status = 'cancelled';
83 break;
84 }
85 }
86
87 cancel(reason: string): void {
88 if (this.state.status !== 'pending') {
89 throw new Error('Can only cancel pending orders');
90 }
91
92 const event: Event = {
93 id: crypto.randomUUID(),
94 aggregateId: this.state.id,
95 type: 'OrderCancelled',
96 data: { reason },
97 version: this.version + this.uncommittedEvents.length + 1,
98 timestamp: new Date(),
99 };
100
101 this.uncommittedEvents.push(event);
102 this.apply(event);
103 }
104
105 getUncommittedEvents(): Event[] {
106 return this.uncommittedEvents;
107 }
108
109 clearUncommittedEvents(): void {
110 this.uncommittedEvents = [];
111 }
112}API Layer#
1// Express routes with CQRS
2class OrderController {
3 constructor(
4 private commandHandler: OrderCommandHandler,
5 private queryHandler: OrderQueryHandler
6 ) {}
7
8 // Commands
9 async createOrder(req: Request, res: Response): Promise<void> {
10 await this.commandHandler.handle({
11 type: 'CreateOrder',
12 customerId: req.user.id,
13 items: req.body.items,
14 });
15
16 res.status(202).json({ message: 'Order creation initiated' });
17 }
18
19 async cancelOrder(req: Request, res: Response): Promise<void> {
20 await this.commandHandler.handle({
21 type: 'CancelOrder',
22 orderId: req.params.id,
23 reason: req.body.reason,
24 });
25
26 res.status(202).json({ message: 'Order cancellation initiated' });
27 }
28
29 // Queries
30 async getOrder(req: Request, res: Response): Promise<void> {
31 const order = await this.queryHandler.getOrder({
32 orderId: req.params.id,
33 });
34
35 if (!order) {
36 res.status(404).json({ error: 'Order not found' });
37 return;
38 }
39
40 res.json(order);
41 }
42
43 async listOrders(req: Request, res: Response): Promise<void> {
44 const orders = await this.queryHandler.listOrders({
45 customerId: req.user.id,
46 status: req.query.status as string,
47 limit: parseInt(req.query.limit as string) || 20,
48 cursor: req.query.cursor as string,
49 });
50
51 res.json(orders);
52 }
53}Eventual Consistency#
1// Handle eventual consistency in UI
2class OrderService {
3 async createOrder(items: OrderItem[]): Promise<{ orderId: string }> {
4 // Send command
5 const result = await api.post('/orders', { items });
6 const orderId = result.orderId;
7
8 // Poll for read model update
9 const order = await this.waitForOrder(orderId);
10 return order;
11 }
12
13 private async waitForOrder(orderId: string, maxAttempts = 10): Promise<Order> {
14 for (let i = 0; i < maxAttempts; i++) {
15 try {
16 const order = await api.get(`/orders/${orderId}`);
17 return order;
18 } catch (error) {
19 if (i === maxAttempts - 1) throw error;
20 await sleep(100 * Math.pow(2, i));
21 }
22 }
23 throw new Error('Order not found after creation');
24 }
25}
26
27// Or use WebSocket for real-time updates
28socket.on('OrderCreated', (order) => {
29 addOrderToUI(order);
30});Best Practices#
When to Use CQRS:
✓ Complex domain logic
✓ Different read/write scaling needs
✓ Multiple read model representations
✓ Audit/history requirements
When to Avoid:
✗ Simple CRUD applications
✗ Small teams/projects
✗ Real-time consistency required
Implementation:
✓ Start simple, add complexity as needed
✓ Handle eventual consistency in UI
✓ Monitor projection lag
✓ Plan for replay scenarios
Conclusion#
CQRS separates concerns and enables independent optimization of reads and writes. Start with simple command/query separation, add event sourcing if you need audit trails, and carefully handle eventual consistency. The complexity is justified when you have different scaling needs or complex domain logic.