Event-driven architecture (EDA) enables loosely coupled, scalable systems. This guide covers core patterns, implementation strategies, and best practices.
Core Concepts#
Events vs Commands vs Queries#
1// Event: Something that happened (past tense, immutable)
2interface OrderPlaced {
3 type: 'OrderPlaced';
4 orderId: string;
5 customerId: string;
6 items: OrderItem[];
7 timestamp: Date;
8}
9
10// Command: Request to do something (imperative)
11interface PlaceOrder {
12 type: 'PlaceOrder';
13 customerId: string;
14 items: OrderItem[];
15}
16
17// Query: Request for information
18interface GetOrderStatus {
19 type: 'GetOrderStatus';
20 orderId: string;
21}Event Flow#
┌─────────┐ Command ┌──────────┐ Event ┌──────────┐
│ Client │ ─────────────> │ Service │ ────────────> │ Queue │
└─────────┘ └──────────┘ └──────────┘
│
┌──────────────────────────────────┤
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Service │ │ Service │ │ Service │
│ A │ │ B │ │ C │
└──────────┘ └──────────┘ └──────────┘
Event Sourcing#
Store events as the source of truth:
1// Event Store
2interface EventStore {
3 append(streamId: string, events: DomainEvent[]): Promise<void>;
4 read(streamId: string, fromVersion?: number): Promise<DomainEvent[]>;
5}
6
7// Domain Events
8type OrderEvent =
9 | { type: 'OrderCreated'; orderId: string; customerId: string }
10 | { type: 'ItemAdded'; orderId: string; item: OrderItem }
11 | { type: 'ItemRemoved'; orderId: string; itemId: string }
12 | { type: 'OrderSubmitted'; orderId: string; timestamp: Date }
13 | { type: 'OrderShipped'; orderId: string; trackingNumber: string };
14
15// Aggregate
16class Order {
17 private state: OrderState;
18 private uncommittedEvents: OrderEvent[] = [];
19
20 constructor(private id: string) {
21 this.state = { status: 'draft', items: [] };
22 }
23
24 // Rebuild from events
25 static fromEvents(id: string, events: OrderEvent[]): Order {
26 const order = new Order(id);
27 events.forEach(event => order.apply(event));
28 return order;
29 }
30
31 addItem(item: OrderItem) {
32 if (this.state.status !== 'draft') {
33 throw new Error('Cannot modify submitted order');
34 }
35
36 const event: OrderEvent = {
37 type: 'ItemAdded',
38 orderId: this.id,
39 item,
40 };
41
42 this.apply(event);
43 this.uncommittedEvents.push(event);
44 }
45
46 private apply(event: OrderEvent) {
47 switch (event.type) {
48 case 'ItemAdded':
49 this.state.items.push(event.item);
50 break;
51 case 'ItemRemoved':
52 this.state.items = this.state.items.filter(i => i.id !== event.itemId);
53 break;
54 case 'OrderSubmitted':
55 this.state.status = 'submitted';
56 break;
57 }
58 }
59
60 getUncommittedEvents(): OrderEvent[] {
61 return this.uncommittedEvents;
62 }
63}CQRS (Command Query Responsibility Segregation)#
Separate read and write models:
1// Write Model (Commands)
2class OrderCommandHandler {
3 constructor(
4 private eventStore: EventStore,
5 private eventBus: EventBus
6 ) {}
7
8 async handle(command: PlaceOrder): Promise<void> {
9 const order = new Order(generateId());
10
11 command.items.forEach(item => order.addItem(item));
12 order.submit();
13
14 const events = order.getUncommittedEvents();
15
16 await this.eventStore.append(order.id, events);
17 await this.eventBus.publish(events);
18 }
19}
20
21// Read Model (Queries)
22class OrderReadModel {
23 private orders: Map<string, OrderSummary> = new Map();
24
25 // Update from events
26 async handleEvent(event: OrderEvent) {
27 switch (event.type) {
28 case 'OrderCreated':
29 this.orders.set(event.orderId, {
30 id: event.orderId,
31 customerId: event.customerId,
32 items: [],
33 status: 'draft',
34 });
35 break;
36 case 'ItemAdded':
37 const order = this.orders.get(event.orderId);
38 order?.items.push(event.item);
39 break;
40 case 'OrderShipped':
41 const shipped = this.orders.get(event.orderId);
42 if (shipped) shipped.status = 'shipped';
43 break;
44 }
45 }
46
47 // Query
48 getOrder(orderId: string): OrderSummary | undefined {
49 return this.orders.get(orderId);
50 }
51
52 getOrdersByCustomer(customerId: string): OrderSummary[] {
53 return Array.from(this.orders.values())
54 .filter(o => o.customerId === customerId);
55 }
56}Message Brokers#
Implementing with Redis Streams#
1import Redis from 'ioredis';
2
3class RedisEventBus {
4 private redis: Redis;
5 private subscriber: Redis;
6
7 constructor(url: string) {
8 this.redis = new Redis(url);
9 this.subscriber = new Redis(url);
10 }
11
12 async publish(stream: string, event: DomainEvent): Promise<void> {
13 await this.redis.xadd(
14 stream,
15 '*',
16 'type', event.type,
17 'data', JSON.stringify(event),
18 'timestamp', Date.now().toString()
19 );
20 }
21
22 async subscribe(
23 stream: string,
24 group: string,
25 consumer: string,
26 handler: (event: DomainEvent) => Promise<void>
27 ): Promise<void> {
28 // Create consumer group if not exists
29 try {
30 await this.redis.xgroup('CREATE', stream, group, '0', 'MKSTREAM');
31 } catch (e) {
32 // Group already exists
33 }
34
35 while (true) {
36 const results = await this.subscriber.xreadgroup(
37 'GROUP', group, consumer,
38 'BLOCK', 5000,
39 'COUNT', 10,
40 'STREAMS', stream, '>'
41 );
42
43 if (results) {
44 for (const [, messages] of results) {
45 for (const [id, fields] of messages) {
46 const event = JSON.parse(fields[3]); // data field
47 await handler(event);
48 await this.subscriber.xack(stream, group, id);
49 }
50 }
51 }
52 }
53 }
54}Implementing with RabbitMQ#
1import amqp from 'amqplib';
2
3class RabbitMQEventBus {
4 private channel: amqp.Channel;
5
6 async connect(url: string): Promise<void> {
7 const connection = await amqp.connect(url);
8 this.channel = await connection.createChannel();
9 }
10
11 async publish(exchange: string, event: DomainEvent): Promise<void> {
12 await this.channel.assertExchange(exchange, 'topic', { durable: true });
13
14 this.channel.publish(
15 exchange,
16 event.type,
17 Buffer.from(JSON.stringify(event)),
18 { persistent: true }
19 );
20 }
21
22 async subscribe(
23 exchange: string,
24 queue: string,
25 pattern: string,
26 handler: (event: DomainEvent) => Promise<void>
27 ): Promise<void> {
28 await this.channel.assertExchange(exchange, 'topic', { durable: true });
29 await this.channel.assertQueue(queue, { durable: true });
30 await this.channel.bindQueue(queue, exchange, pattern);
31
32 this.channel.consume(queue, async (msg) => {
33 if (msg) {
34 const event = JSON.parse(msg.content.toString());
35 try {
36 await handler(event);
37 this.channel.ack(msg);
38 } catch (error) {
39 this.channel.nack(msg, false, true);
40 }
41 }
42 });
43 }
44}Saga Pattern#
Coordinate distributed transactions:
1interface SagaStep<T> {
2 execute(context: T): Promise<void>;
3 compensate(context: T): Promise<void>;
4}
5
6class OrderSaga {
7 private steps: SagaStep<OrderContext>[] = [
8 {
9 async execute(ctx) {
10 ctx.paymentId = await paymentService.charge(ctx.amount);
11 },
12 async compensate(ctx) {
13 await paymentService.refund(ctx.paymentId);
14 },
15 },
16 {
17 async execute(ctx) {
18 ctx.inventoryReservation = await inventoryService.reserve(ctx.items);
19 },
20 async compensate(ctx) {
21 await inventoryService.release(ctx.inventoryReservation);
22 },
23 },
24 {
25 async execute(ctx) {
26 ctx.shipmentId = await shippingService.createShipment(ctx.orderId);
27 },
28 async compensate(ctx) {
29 await shippingService.cancelShipment(ctx.shipmentId);
30 },
31 },
32 ];
33
34 async execute(context: OrderContext): Promise<void> {
35 const completedSteps: number[] = [];
36
37 try {
38 for (let i = 0; i < this.steps.length; i++) {
39 await this.steps[i].execute(context);
40 completedSteps.push(i);
41 }
42 } catch (error) {
43 // Compensate in reverse order
44 for (const stepIndex of completedSteps.reverse()) {
45 try {
46 await this.steps[stepIndex].compensate(context);
47 } catch (compensateError) {
48 console.error(`Failed to compensate step ${stepIndex}`, compensateError);
49 }
50 }
51 throw error;
52 }
53 }
54}Best Practices#
- Idempotent handlers: Events may be delivered multiple times
- Event versioning: Plan for schema evolution
- Dead letter queues: Handle poison messages
- Monitoring: Track event processing latency and failures
- Event ordering: Use partitioning for ordered processing
Conclusion#
Event-driven architecture enables scalable, loosely coupled systems. Start with simple pub/sub patterns and evolve to event sourcing and CQRS as complexity grows.