Event-driven architecture (EDA) decouples systems through asynchronous events. Instead of direct calls between services, components publish and subscribe to events. This creates flexible, scalable systems that handle change gracefully.
Core Concepts#
Events vs Commands vs Queries#
Event:
- Something that happened (past tense)
- "OrderPlaced", "UserRegistered", "PaymentProcessed"
- Immutable fact
- Multiple consumers possible
Command:
- Request to do something (imperative)
- "PlaceOrder", "RegisterUser", "ProcessPayment"
- Single handler
- May be rejected
Query:
- Request for information
- "GetOrder", "FindUser", "ListPayments"
- Returns data
- No side effects
Event Structure#
1interface Event<T = unknown> {
2 id: string; // Unique event ID
3 type: string; // Event type name
4 timestamp: Date; // When it occurred
5 version: number; // Schema version
6 source: string; // Originating service
7 correlationId: string; // Request chain tracking
8 data: T; // Event payload
9}
10
11// Example
12const orderPlacedEvent: Event<OrderPlacedData> = {
13 id: 'evt_abc123',
14 type: 'order.placed',
15 timestamp: new Date(),
16 version: 1,
17 source: 'order-service',
18 correlationId: 'req_xyz789',
19 data: {
20 orderId: 'ord_123',
21 customerId: 'cust_456',
22 items: [...],
23 total: 99.99,
24 },
25};Event Patterns#
Publish-Subscribe#
1// Publisher
2class OrderService {
3 constructor(private eventBus: EventBus) {}
4
5 async placeOrder(order: Order): Promise<void> {
6 // Save order
7 await this.repository.save(order);
8
9 // Publish event
10 await this.eventBus.publish({
11 type: 'order.placed',
12 data: {
13 orderId: order.id,
14 customerId: order.customerId,
15 items: order.items,
16 total: order.total,
17 },
18 });
19 }
20}
21
22// Subscribers
23class EmailService {
24 @Subscribe('order.placed')
25 async sendConfirmation(event: OrderPlacedEvent): Promise<void> {
26 await this.mailer.send({
27 to: event.data.customerEmail,
28 template: 'order-confirmation',
29 data: event.data,
30 });
31 }
32}
33
34class InventoryService {
35 @Subscribe('order.placed')
36 async reserveStock(event: OrderPlacedEvent): Promise<void> {
37 for (const item of event.data.items) {
38 await this.inventory.reserve(item.productId, item.quantity);
39 }
40 }
41}
42
43class AnalyticsService {
44 @Subscribe('order.placed')
45 async trackOrder(event: OrderPlacedEvent): Promise<void> {
46 await this.analytics.track('purchase', {
47 orderId: event.data.orderId,
48 value: event.data.total,
49 });
50 }
51}Event Sourcing#
1// Store events, not state
2class Account {
3 private events: Event[] = [];
4 private balance: number = 0;
5
6 // Apply events to rebuild state
7 private apply(event: Event): void {
8 switch (event.type) {
9 case 'account.credited':
10 this.balance += event.data.amount;
11 break;
12 case 'account.debited':
13 this.balance -= event.data.amount;
14 break;
15 }
16 }
17
18 // Commands produce events
19 credit(amount: number): void {
20 if (amount <= 0) throw new Error('Invalid amount');
21
22 const event = {
23 type: 'account.credited',
24 data: { amount, timestamp: new Date() },
25 };
26
27 this.events.push(event);
28 this.apply(event);
29 }
30
31 debit(amount: number): void {
32 if (amount > this.balance) throw new Error('Insufficient funds');
33
34 const event = {
35 type: 'account.debited',
36 data: { amount, timestamp: new Date() },
37 };
38
39 this.events.push(event);
40 this.apply(event);
41 }
42
43 // Rebuild from event history
44 static fromEvents(events: Event[]): Account {
45 const account = new Account();
46 events.forEach(e => account.apply(e));
47 account.events = events;
48 return account;
49 }
50}CQRS (Command Query Responsibility Segregation)#
1// Separate write and read models
2class OrderCommandHandler {
3 async handle(command: PlaceOrderCommand): Promise<void> {
4 // Validate
5 const customer = await this.customers.findById(command.customerId);
6 if (!customer) throw new Error('Customer not found');
7
8 // Create aggregate
9 const order = Order.create(command);
10
11 // Persist events
12 await this.eventStore.save(order.id, order.uncommittedEvents);
13
14 // Publish for read model updates
15 for (const event of order.uncommittedEvents) {
16 await this.eventBus.publish(event);
17 }
18 }
19}
20
21class OrderQueryHandler {
22 // Optimized read model
23 async getOrderSummary(orderId: string): Promise<OrderSummary> {
24 return this.readDb.query(`
25 SELECT o.*, c.name as customer_name
26 FROM order_summaries o
27 JOIN customers c ON o.customer_id = c.id
28 WHERE o.id = $1
29 `, [orderId]);
30 }
31
32 async getCustomerOrders(customerId: string): Promise<OrderSummary[]> {
33 return this.readDb.query(`
34 SELECT * FROM order_summaries
35 WHERE customer_id = $1
36 ORDER BY created_at DESC
37 `, [customerId]);
38 }
39}
40
41// Read model projector
42class OrderProjector {
43 @Subscribe('order.placed')
44 async onOrderPlaced(event: OrderPlacedEvent): Promise<void> {
45 await this.readDb.query(`
46 INSERT INTO order_summaries (id, customer_id, total, status, created_at)
47 VALUES ($1, $2, $3, 'placed', $4)
48 `, [event.data.orderId, event.data.customerId, event.data.total, event.timestamp]);
49 }
50
51 @Subscribe('order.shipped')
52 async onOrderShipped(event: OrderShippedEvent): Promise<void> {
53 await this.readDb.query(`
54 UPDATE order_summaries SET status = 'shipped', shipped_at = $2
55 WHERE id = $1
56 `, [event.data.orderId, event.timestamp]);
57 }
58}Message Brokers#
Redis Streams#
1import Redis from 'ioredis';
2
3const redis = new Redis();
4
5// Publish
6async function publishEvent(stream: string, event: Event): Promise<void> {
7 await redis.xadd(stream, '*',
8 'type', event.type,
9 'data', JSON.stringify(event.data),
10 'timestamp', event.timestamp.toISOString(),
11 );
12}
13
14// Consumer group
15async function consumeEvents(
16 stream: string,
17 group: string,
18 consumer: string,
19 handler: (event: Event) => Promise<void>,
20): Promise<void> {
21 // Create group if not exists
22 try {
23 await redis.xgroup('CREATE', stream, group, '0', 'MKSTREAM');
24 } catch (e) {
25 // Group exists
26 }
27
28 while (true) {
29 const results = await redis.xreadgroup(
30 'GROUP', group, consumer,
31 'COUNT', 10,
32 'BLOCK', 5000,
33 'STREAMS', stream, '>',
34 );
35
36 if (!results) continue;
37
38 for (const [, messages] of results) {
39 for (const [id, fields] of messages) {
40 const event = parseEvent(fields);
41
42 try {
43 await handler(event);
44 await redis.xack(stream, group, id);
45 } catch (error) {
46 console.error('Failed to process event:', id, error);
47 }
48 }
49 }
50 }
51}RabbitMQ#
1import amqp from 'amqplib';
2
3class RabbitMQEventBus {
4 private connection: amqp.Connection;
5 private channel: amqp.Channel;
6
7 async connect(): Promise<void> {
8 this.connection = await amqp.connect(process.env.RABBITMQ_URL);
9 this.channel = await this.connection.createChannel();
10 }
11
12 async publish(event: Event): Promise<void> {
13 const exchange = 'events';
14
15 await this.channel.assertExchange(exchange, 'topic', { durable: true });
16
17 this.channel.publish(
18 exchange,
19 event.type,
20 Buffer.from(JSON.stringify(event)),
21 { persistent: true },
22 );
23 }
24
25 async subscribe(
26 pattern: string,
27 handler: (event: Event) => Promise<void>,
28 ): Promise<void> {
29 const exchange = 'events';
30 const queue = `${pattern}-handler`;
31
32 await this.channel.assertExchange(exchange, 'topic', { durable: true });
33 await this.channel.assertQueue(queue, { durable: true });
34 await this.channel.bindQueue(queue, exchange, pattern);
35
36 this.channel.consume(queue, async (msg) => {
37 if (!msg) return;
38
39 try {
40 const event = JSON.parse(msg.content.toString());
41 await handler(event);
42 this.channel.ack(msg);
43 } catch (error) {
44 // Reject and requeue
45 this.channel.nack(msg, false, true);
46 }
47 });
48 }
49}Error Handling#
Dead Letter Queues#
1class EventProcessor {
2 private maxRetries = 3;
3
4 async processWithRetry(
5 event: Event,
6 handler: (e: Event) => Promise<void>,
7 ): Promise<void> {
8 let attempts = 0;
9
10 while (attempts < this.maxRetries) {
11 try {
12 await handler(event);
13 return;
14 } catch (error) {
15 attempts++;
16 console.error(`Attempt ${attempts} failed:`, error);
17
18 if (attempts < this.maxRetries) {
19 await this.delay(Math.pow(2, attempts) * 1000);
20 }
21 }
22 }
23
24 // Move to dead letter queue
25 await this.deadLetterQueue.add({
26 event,
27 error: 'Max retries exceeded',
28 attempts,
29 timestamp: new Date(),
30 });
31 }
32
33 private delay(ms: number): Promise<void> {
34 return new Promise(resolve => setTimeout(resolve, ms));
35 }
36}Idempotency#
1class IdempotentHandler {
2 constructor(private redis: Redis) {}
3
4 async handle(
5 event: Event,
6 handler: (e: Event) => Promise<void>,
7 ): Promise<void> {
8 const key = `processed:${event.id}`;
9
10 // Check if already processed
11 const processed = await this.redis.get(key);
12 if (processed) {
13 console.log(`Event ${event.id} already processed, skipping`);
14 return;
15 }
16
17 // Process
18 await handler(event);
19
20 // Mark as processed (with TTL)
21 await this.redis.setex(key, 86400 * 7, '1'); // 7 days
22 }
23}Testing Event-Driven Systems#
1describe('OrderService', () => {
2 let eventBus: MockEventBus;
3 let service: OrderService;
4
5 beforeEach(() => {
6 eventBus = new MockEventBus();
7 service = new OrderService(eventBus);
8 });
9
10 it('publishes OrderPlaced event', async () => {
11 await service.placeOrder({
12 customerId: 'cust_123',
13 items: [{ productId: 'prod_1', quantity: 2 }],
14 });
15
16 expect(eventBus.publishedEvents).toHaveLength(1);
17 expect(eventBus.publishedEvents[0].type).toBe('order.placed');
18 expect(eventBus.publishedEvents[0].data.customerId).toBe('cust_123');
19 });
20});
21
22describe('EmailService', () => {
23 it('sends confirmation on OrderPlaced', async () => {
24 const mailer = new MockMailer();
25 const service = new EmailService(mailer);
26
27 await service.handleOrderPlaced({
28 type: 'order.placed',
29 data: {
30 orderId: 'ord_123',
31 customerEmail: 'test@example.com',
32 },
33 });
34
35 expect(mailer.sentEmails).toHaveLength(1);
36 expect(mailer.sentEmails[0].to).toBe('test@example.com');
37 });
38});Best Practices#
Event Design#
1// Good: Specific, meaningful events
2'order.placed'
3'order.item_added'
4'order.shipped'
5'order.delivered'
6
7// Bad: Generic, unclear events
8'order.updated' // What changed?
9'data.changed' // Too vagueEvent Versioning#
1// Version in event type or payload
2interface OrderPlacedV1 {
3 orderId: string;
4 items: string[]; // Just IDs
5}
6
7interface OrderPlacedV2 {
8 orderId: string;
9 items: Array<{ // Full item data
10 productId: string;
11 quantity: number;
12 price: number;
13 }>;
14}
15
16// Handle multiple versions
17function handleOrderPlaced(event: Event): void {
18 switch (event.version) {
19 case 1:
20 return handleV1(event.data as OrderPlacedV1);
21 case 2:
22 return handleV2(event.data as OrderPlacedV2);
23 }
24}Conclusion#
Event-driven architecture enables scalable, loosely-coupled systems. Events capture what happened, subscribers react independently, and the system evolves without tight dependencies.
Start simple—basic pub/sub covers most needs. Add event sourcing and CQRS when you need audit trails or complex read models. The key is designing meaningful events that capture business reality.