Back to Blog
Event-DrivenArchitectureMessagingScalability

Event-Driven Architecture: A Practical Guide

Design scalable, loosely-coupled systems with event-driven architecture. From event sourcing to message brokers to patterns that work.

B
Bootspring Team
Engineering
April 15, 2025
7 min read

Event-driven architecture (EDA) decouples systems through asynchronous events. Instead of direct calls between services, components publish and subscribe to events. This creates flexible, scalable systems that handle change gracefully.

Core Concepts#

Events vs Commands vs Queries#

Event: - Something that happened (past tense) - "OrderPlaced", "UserRegistered", "PaymentProcessed" - Immutable fact - Multiple consumers possible Command: - Request to do something (imperative) - "PlaceOrder", "RegisterUser", "ProcessPayment" - Single handler - May be rejected Query: - Request for information - "GetOrder", "FindUser", "ListPayments" - Returns data - No side effects

Event Structure#

1interface Event<T = unknown> { 2 id: string; // Unique event ID 3 type: string; // Event type name 4 timestamp: Date; // When it occurred 5 version: number; // Schema version 6 source: string; // Originating service 7 correlationId: string; // Request chain tracking 8 data: T; // Event payload 9} 10 11// Example 12const orderPlacedEvent: Event<OrderPlacedData> = { 13 id: 'evt_abc123', 14 type: 'order.placed', 15 timestamp: new Date(), 16 version: 1, 17 source: 'order-service', 18 correlationId: 'req_xyz789', 19 data: { 20 orderId: 'ord_123', 21 customerId: 'cust_456', 22 items: [...], 23 total: 99.99, 24 }, 25};

Event Patterns#

Publish-Subscribe#

1// Publisher 2class OrderService { 3 constructor(private eventBus: EventBus) {} 4 5 async placeOrder(order: Order): Promise<void> { 6 // Save order 7 await this.repository.save(order); 8 9 // Publish event 10 await this.eventBus.publish({ 11 type: 'order.placed', 12 data: { 13 orderId: order.id, 14 customerId: order.customerId, 15 items: order.items, 16 total: order.total, 17 }, 18 }); 19 } 20} 21 22// Subscribers 23class EmailService { 24 @Subscribe('order.placed') 25 async sendConfirmation(event: OrderPlacedEvent): Promise<void> { 26 await this.mailer.send({ 27 to: event.data.customerEmail, 28 template: 'order-confirmation', 29 data: event.data, 30 }); 31 } 32} 33 34class InventoryService { 35 @Subscribe('order.placed') 36 async reserveStock(event: OrderPlacedEvent): Promise<void> { 37 for (const item of event.data.items) { 38 await this.inventory.reserve(item.productId, item.quantity); 39 } 40 } 41} 42 43class AnalyticsService { 44 @Subscribe('order.placed') 45 async trackOrder(event: OrderPlacedEvent): Promise<void> { 46 await this.analytics.track('purchase', { 47 orderId: event.data.orderId, 48 value: event.data.total, 49 }); 50 } 51}

Event Sourcing#

1// Store events, not state 2class Account { 3 private events: Event[] = []; 4 private balance: number = 0; 5 6 // Apply events to rebuild state 7 private apply(event: Event): void { 8 switch (event.type) { 9 case 'account.credited': 10 this.balance += event.data.amount; 11 break; 12 case 'account.debited': 13 this.balance -= event.data.amount; 14 break; 15 } 16 } 17 18 // Commands produce events 19 credit(amount: number): void { 20 if (amount <= 0) throw new Error('Invalid amount'); 21 22 const event = { 23 type: 'account.credited', 24 data: { amount, timestamp: new Date() }, 25 }; 26 27 this.events.push(event); 28 this.apply(event); 29 } 30 31 debit(amount: number): void { 32 if (amount > this.balance) throw new Error('Insufficient funds'); 33 34 const event = { 35 type: 'account.debited', 36 data: { amount, timestamp: new Date() }, 37 }; 38 39 this.events.push(event); 40 this.apply(event); 41 } 42 43 // Rebuild from event history 44 static fromEvents(events: Event[]): Account { 45 const account = new Account(); 46 events.forEach(e => account.apply(e)); 47 account.events = events; 48 return account; 49 } 50}

CQRS (Command Query Responsibility Segregation)#

1// Separate write and read models 2class OrderCommandHandler { 3 async handle(command: PlaceOrderCommand): Promise<void> { 4 // Validate 5 const customer = await this.customers.findById(command.customerId); 6 if (!customer) throw new Error('Customer not found'); 7 8 // Create aggregate 9 const order = Order.create(command); 10 11 // Persist events 12 await this.eventStore.save(order.id, order.uncommittedEvents); 13 14 // Publish for read model updates 15 for (const event of order.uncommittedEvents) { 16 await this.eventBus.publish(event); 17 } 18 } 19} 20 21class OrderQueryHandler { 22 // Optimized read model 23 async getOrderSummary(orderId: string): Promise<OrderSummary> { 24 return this.readDb.query(` 25 SELECT o.*, c.name as customer_name 26 FROM order_summaries o 27 JOIN customers c ON o.customer_id = c.id 28 WHERE o.id = $1 29 `, [orderId]); 30 } 31 32 async getCustomerOrders(customerId: string): Promise<OrderSummary[]> { 33 return this.readDb.query(` 34 SELECT * FROM order_summaries 35 WHERE customer_id = $1 36 ORDER BY created_at DESC 37 `, [customerId]); 38 } 39} 40 41// Read model projector 42class OrderProjector { 43 @Subscribe('order.placed') 44 async onOrderPlaced(event: OrderPlacedEvent): Promise<void> { 45 await this.readDb.query(` 46 INSERT INTO order_summaries (id, customer_id, total, status, created_at) 47 VALUES ($1, $2, $3, 'placed', $4) 48 `, [event.data.orderId, event.data.customerId, event.data.total, event.timestamp]); 49 } 50 51 @Subscribe('order.shipped') 52 async onOrderShipped(event: OrderShippedEvent): Promise<void> { 53 await this.readDb.query(` 54 UPDATE order_summaries SET status = 'shipped', shipped_at = $2 55 WHERE id = $1 56 `, [event.data.orderId, event.timestamp]); 57 } 58}

Message Brokers#

Redis Streams#

1import Redis from 'ioredis'; 2 3const redis = new Redis(); 4 5// Publish 6async function publishEvent(stream: string, event: Event): Promise<void> { 7 await redis.xadd(stream, '*', 8 'type', event.type, 9 'data', JSON.stringify(event.data), 10 'timestamp', event.timestamp.toISOString(), 11 ); 12} 13 14// Consumer group 15async function consumeEvents( 16 stream: string, 17 group: string, 18 consumer: string, 19 handler: (event: Event) => Promise<void>, 20): Promise<void> { 21 // Create group if not exists 22 try { 23 await redis.xgroup('CREATE', stream, group, '0', 'MKSTREAM'); 24 } catch (e) { 25 // Group exists 26 } 27 28 while (true) { 29 const results = await redis.xreadgroup( 30 'GROUP', group, consumer, 31 'COUNT', 10, 32 'BLOCK', 5000, 33 'STREAMS', stream, '>', 34 ); 35 36 if (!results) continue; 37 38 for (const [, messages] of results) { 39 for (const [id, fields] of messages) { 40 const event = parseEvent(fields); 41 42 try { 43 await handler(event); 44 await redis.xack(stream, group, id); 45 } catch (error) { 46 console.error('Failed to process event:', id, error); 47 } 48 } 49 } 50 } 51}

RabbitMQ#

1import amqp from 'amqplib'; 2 3class RabbitMQEventBus { 4 private connection: amqp.Connection; 5 private channel: amqp.Channel; 6 7 async connect(): Promise<void> { 8 this.connection = await amqp.connect(process.env.RABBITMQ_URL); 9 this.channel = await this.connection.createChannel(); 10 } 11 12 async publish(event: Event): Promise<void> { 13 const exchange = 'events'; 14 15 await this.channel.assertExchange(exchange, 'topic', { durable: true }); 16 17 this.channel.publish( 18 exchange, 19 event.type, 20 Buffer.from(JSON.stringify(event)), 21 { persistent: true }, 22 ); 23 } 24 25 async subscribe( 26 pattern: string, 27 handler: (event: Event) => Promise<void>, 28 ): Promise<void> { 29 const exchange = 'events'; 30 const queue = `${pattern}-handler`; 31 32 await this.channel.assertExchange(exchange, 'topic', { durable: true }); 33 await this.channel.assertQueue(queue, { durable: true }); 34 await this.channel.bindQueue(queue, exchange, pattern); 35 36 this.channel.consume(queue, async (msg) => { 37 if (!msg) return; 38 39 try { 40 const event = JSON.parse(msg.content.toString()); 41 await handler(event); 42 this.channel.ack(msg); 43 } catch (error) { 44 // Reject and requeue 45 this.channel.nack(msg, false, true); 46 } 47 }); 48 } 49}

Error Handling#

Dead Letter Queues#

1class EventProcessor { 2 private maxRetries = 3; 3 4 async processWithRetry( 5 event: Event, 6 handler: (e: Event) => Promise<void>, 7 ): Promise<void> { 8 let attempts = 0; 9 10 while (attempts < this.maxRetries) { 11 try { 12 await handler(event); 13 return; 14 } catch (error) { 15 attempts++; 16 console.error(`Attempt ${attempts} failed:`, error); 17 18 if (attempts < this.maxRetries) { 19 await this.delay(Math.pow(2, attempts) * 1000); 20 } 21 } 22 } 23 24 // Move to dead letter queue 25 await this.deadLetterQueue.add({ 26 event, 27 error: 'Max retries exceeded', 28 attempts, 29 timestamp: new Date(), 30 }); 31 } 32 33 private delay(ms: number): Promise<void> { 34 return new Promise(resolve => setTimeout(resolve, ms)); 35 } 36}

Idempotency#

1class IdempotentHandler { 2 constructor(private redis: Redis) {} 3 4 async handle( 5 event: Event, 6 handler: (e: Event) => Promise<void>, 7 ): Promise<void> { 8 const key = `processed:${event.id}`; 9 10 // Check if already processed 11 const processed = await this.redis.get(key); 12 if (processed) { 13 console.log(`Event ${event.id} already processed, skipping`); 14 return; 15 } 16 17 // Process 18 await handler(event); 19 20 // Mark as processed (with TTL) 21 await this.redis.setex(key, 86400 * 7, '1'); // 7 days 22 } 23}

Testing Event-Driven Systems#

1describe('OrderService', () => { 2 let eventBus: MockEventBus; 3 let service: OrderService; 4 5 beforeEach(() => { 6 eventBus = new MockEventBus(); 7 service = new OrderService(eventBus); 8 }); 9 10 it('publishes OrderPlaced event', async () => { 11 await service.placeOrder({ 12 customerId: 'cust_123', 13 items: [{ productId: 'prod_1', quantity: 2 }], 14 }); 15 16 expect(eventBus.publishedEvents).toHaveLength(1); 17 expect(eventBus.publishedEvents[0].type).toBe('order.placed'); 18 expect(eventBus.publishedEvents[0].data.customerId).toBe('cust_123'); 19 }); 20}); 21 22describe('EmailService', () => { 23 it('sends confirmation on OrderPlaced', async () => { 24 const mailer = new MockMailer(); 25 const service = new EmailService(mailer); 26 27 await service.handleOrderPlaced({ 28 type: 'order.placed', 29 data: { 30 orderId: 'ord_123', 31 customerEmail: 'test@example.com', 32 }, 33 }); 34 35 expect(mailer.sentEmails).toHaveLength(1); 36 expect(mailer.sentEmails[0].to).toBe('test@example.com'); 37 }); 38});

Best Practices#

Event Design#

1// Good: Specific, meaningful events 2'order.placed' 3'order.item_added' 4'order.shipped' 5'order.delivered' 6 7// Bad: Generic, unclear events 8'order.updated' // What changed? 9'data.changed' // Too vague

Event Versioning#

1// Version in event type or payload 2interface OrderPlacedV1 { 3 orderId: string; 4 items: string[]; // Just IDs 5} 6 7interface OrderPlacedV2 { 8 orderId: string; 9 items: Array<{ // Full item data 10 productId: string; 11 quantity: number; 12 price: number; 13 }>; 14} 15 16// Handle multiple versions 17function handleOrderPlaced(event: Event): void { 18 switch (event.version) { 19 case 1: 20 return handleV1(event.data as OrderPlacedV1); 21 case 2: 22 return handleV2(event.data as OrderPlacedV2); 23 } 24}

Conclusion#

Event-driven architecture enables scalable, loosely-coupled systems. Events capture what happened, subscribers react independently, and the system evolves without tight dependencies.

Start simple—basic pub/sub covers most needs. Add event sourcing and CQRS when you need audit trails or complex read models. The key is designing meaningful events that capture business reality.

Share this article

Help spread the word about Bootspring