Back to Blog
Event-DrivenArchitectureNode.jsMessage Queue

Event-Driven Architecture for Node.js Applications

Build decoupled systems with events. From EventEmitter to message queues to event sourcing patterns.

B
Bootspring Team
Engineering
August 5, 2023
6 min read

Event-driven architecture decouples components by communicating through events. Publishers emit events without knowing who listens. Subscribers react without knowing who published.

Node.js EventEmitter#

1import { EventEmitter } from 'events'; 2 3interface UserEvents { 4 'user:created': (user: User) => void; 5 'user:updated': (user: User, changes: Partial<User>) => void; 6 'user:deleted': (userId: string) => void; 7} 8 9class TypedEventEmitter extends EventEmitter { 10 emit<K extends keyof UserEvents>( 11 event: K, 12 ...args: Parameters<UserEvents[K]> 13 ): boolean { 14 return super.emit(event, ...args); 15 } 16 17 on<K extends keyof UserEvents>(event: K, listener: UserEvents[K]): this { 18 return super.on(event, listener); 19 } 20} 21 22const userEvents = new TypedEventEmitter(); 23 24// Subscribe 25userEvents.on('user:created', (user) => { 26 console.log(`User created: ${user.id}`); 27}); 28 29userEvents.on('user:created', async (user) => { 30 await sendWelcomeEmail(user.email); 31}); 32 33// Publish 34userEvents.emit('user:created', { id: '1', email: 'test@example.com', name: 'Test' });

Application Event Bus#

1type EventHandler<T = any> = (payload: T) => void | Promise<void>; 2 3class EventBus { 4 private handlers = new Map<string, EventHandler[]>(); 5 6 subscribe<T>(event: string, handler: EventHandler<T>): () => void { 7 if (!this.handlers.has(event)) { 8 this.handlers.set(event, []); 9 } 10 this.handlers.get(event)!.push(handler); 11 12 // Return unsubscribe function 13 return () => { 14 const handlers = this.handlers.get(event); 15 if (handlers) { 16 const index = handlers.indexOf(handler); 17 if (index > -1) handlers.splice(index, 1); 18 } 19 }; 20 } 21 22 async publish<T>(event: string, payload: T): Promise<void> { 23 const handlers = this.handlers.get(event) || []; 24 25 await Promise.all( 26 handlers.map(async (handler) => { 27 try { 28 await handler(payload); 29 } catch (error) { 30 console.error(`Error in event handler for ${event}:`, error); 31 } 32 }) 33 ); 34 } 35} 36 37// Singleton 38export const eventBus = new EventBus(); 39 40// Usage 41eventBus.subscribe('order:placed', async (order: Order) => { 42 await updateInventory(order.items); 43}); 44 45eventBus.subscribe('order:placed', async (order: Order) => { 46 await sendOrderConfirmation(order); 47}); 48 49await eventBus.publish('order:placed', order);

Domain Events#

1interface DomainEvent { 2 type: string; 3 payload: any; 4 occurredAt: Date; 5 aggregateId: string; 6} 7 8// Event definitions 9const OrderEvents = { 10 PLACED: 'order.placed', 11 CONFIRMED: 'order.confirmed', 12 SHIPPED: 'order.shipped', 13 DELIVERED: 'order.delivered', 14 CANCELLED: 'order.cancelled', 15} as const; 16 17interface OrderPlacedEvent extends DomainEvent { 18 type: typeof OrderEvents.PLACED; 19 payload: { 20 orderId: string; 21 customerId: string; 22 items: OrderItem[]; 23 total: number; 24 }; 25} 26 27// Event creator 28function createEvent<T extends DomainEvent>( 29 type: T['type'], 30 aggregateId: string, 31 payload: T['payload'] 32): T { 33 return { 34 type, 35 aggregateId, 36 payload, 37 occurredAt: new Date(), 38 } as T; 39} 40 41// Order aggregate with events 42class Order { 43 private events: DomainEvent[] = []; 44 45 constructor( 46 public readonly id: string, 47 public customerId: string, 48 public items: OrderItem[], 49 public status: string = 'pending' 50 ) {} 51 52 place(): void { 53 this.status = 'placed'; 54 this.events.push( 55 createEvent<OrderPlacedEvent>(OrderEvents.PLACED, this.id, { 56 orderId: this.id, 57 customerId: this.customerId, 58 items: this.items, 59 total: this.calculateTotal(), 60 }) 61 ); 62 } 63 64 getUncommittedEvents(): DomainEvent[] { 65 return [...this.events]; 66 } 67 68 clearEvents(): void { 69 this.events = []; 70 } 71 72 private calculateTotal(): number { 73 return this.items.reduce((sum, item) => sum + item.price * item.quantity, 0); 74 } 75}

Message Queue Integration#

1import { Queue, Worker } from 'bullmq'; 2import Redis from 'ioredis'; 3 4const redis = new Redis(process.env.REDIS_URL); 5 6// Event queue 7const eventQueue = new Queue('events', { connection: redis }); 8 9// Publish to queue 10async function publishEvent(event: DomainEvent): Promise<void> { 11 await eventQueue.add(event.type, event, { 12 attempts: 3, 13 backoff: { 14 type: 'exponential', 15 delay: 1000, 16 }, 17 }); 18} 19 20// Event handlers registry 21const eventHandlers = new Map<string, ((event: DomainEvent) => Promise<void>)[]>(); 22 23function registerHandler( 24 eventType: string, 25 handler: (event: DomainEvent) => Promise<void> 26): void { 27 if (!eventHandlers.has(eventType)) { 28 eventHandlers.set(eventType, []); 29 } 30 eventHandlers.get(eventType)!.push(handler); 31} 32 33// Worker to process events 34const eventWorker = new Worker( 35 'events', 36 async (job) => { 37 const event = job.data as DomainEvent; 38 const handlers = eventHandlers.get(event.type) || []; 39 40 for (const handler of handlers) { 41 await handler(event); 42 } 43 }, 44 { connection: redis } 45); 46 47// Register handlers 48registerHandler(OrderEvents.PLACED, async (event: OrderPlacedEvent) => { 49 await inventoryService.reserve(event.payload.items); 50}); 51 52registerHandler(OrderEvents.PLACED, async (event: OrderPlacedEvent) => { 53 await notificationService.sendOrderConfirmation(event.payload.customerId); 54});

Event Sourcing#

1interface Event { 2 id: string; 3 type: string; 4 aggregateId: string; 5 payload: any; 6 version: number; 7 timestamp: Date; 8} 9 10class EventStore { 11 constructor(private db: Database) {} 12 13 async save(aggregateId: string, events: Event[], expectedVersion: number): Promise<void> { 14 const currentVersion = await this.getVersion(aggregateId); 15 16 if (currentVersion !== expectedVersion) { 17 throw new ConcurrencyError( 18 `Expected version ${expectedVersion}, but found ${currentVersion}` 19 ); 20 } 21 22 await this.db.transaction(async (tx) => { 23 for (let i = 0; i < events.length; i++) { 24 await tx.events.create({ 25 data: { 26 ...events[i], 27 version: expectedVersion + i + 1, 28 }, 29 }); 30 } 31 }); 32 } 33 34 async getEvents(aggregateId: string, fromVersion = 0): Promise<Event[]> { 35 return this.db.events.findMany({ 36 where: { 37 aggregateId, 38 version: { gt: fromVersion }, 39 }, 40 orderBy: { version: 'asc' }, 41 }); 42 } 43 44 async getVersion(aggregateId: string): Promise<number> { 45 const lastEvent = await this.db.events.findFirst({ 46 where: { aggregateId }, 47 orderBy: { version: 'desc' }, 48 }); 49 return lastEvent?.version ?? 0; 50 } 51} 52 53// Aggregate rebuilding 54class OrderAggregate { 55 private state: OrderState = { status: 'draft', items: [] }; 56 private version = 0; 57 58 apply(event: Event): void { 59 switch (event.type) { 60 case 'OrderCreated': 61 this.state = { ...this.state, id: event.payload.id, customerId: event.payload.customerId }; 62 break; 63 case 'ItemAdded': 64 this.state.items.push(event.payload.item); 65 break; 66 case 'OrderPlaced': 67 this.state.status = 'placed'; 68 break; 69 } 70 this.version = event.version; 71 } 72 73 static async load(eventStore: EventStore, id: string): Promise<OrderAggregate> { 74 const events = await eventStore.getEvents(id); 75 const aggregate = new OrderAggregate(); 76 77 for (const event of events) { 78 aggregate.apply(event); 79 } 80 81 return aggregate; 82 } 83}

CQRS Pattern#

1// Commands 2interface Command { 3 type: string; 4} 5 6interface PlaceOrderCommand extends Command { 7 type: 'PlaceOrder'; 8 customerId: string; 9 items: OrderItem[]; 10} 11 12// Command handler 13class OrderCommandHandler { 14 constructor( 15 private eventStore: EventStore, 16 private eventBus: EventBus 17 ) {} 18 19 async handle(command: PlaceOrderCommand): Promise<string> { 20 const orderId = crypto.randomUUID(); 21 22 const events: Event[] = [ 23 { 24 id: crypto.randomUUID(), 25 type: 'OrderPlaced', 26 aggregateId: orderId, 27 payload: { 28 customerId: command.customerId, 29 items: command.items, 30 }, 31 version: 1, 32 timestamp: new Date(), 33 }, 34 ]; 35 36 await this.eventStore.save(orderId, events, 0); 37 38 // Publish for read model updates 39 for (const event of events) { 40 await this.eventBus.publish(event.type, event); 41 } 42 43 return orderId; 44 } 45} 46 47// Read model (projection) 48class OrderReadModel { 49 constructor(private db: Database) {} 50 51 async handle(event: Event): Promise<void> { 52 switch (event.type) { 53 case 'OrderPlaced': 54 await this.db.orderViews.create({ 55 data: { 56 id: event.aggregateId, 57 customerId: event.payload.customerId, 58 itemCount: event.payload.items.length, 59 status: 'placed', 60 createdAt: event.timestamp, 61 }, 62 }); 63 break; 64 65 case 'OrderShipped': 66 await this.db.orderViews.update({ 67 where: { id: event.aggregateId }, 68 data: { status: 'shipped', shippedAt: event.timestamp }, 69 }); 70 break; 71 } 72 } 73} 74 75// Query 76async function getCustomerOrders(customerId: string): Promise<OrderView[]> { 77 return db.orderViews.findMany({ 78 where: { customerId }, 79 orderBy: { createdAt: 'desc' }, 80 }); 81}

Saga Pattern#

1// Orchestration-based saga 2class OrderSaga { 3 constructor( 4 private orderService: OrderService, 5 private paymentService: PaymentService, 6 private inventoryService: InventoryService, 7 private eventBus: EventBus 8 ) {} 9 10 async execute(orderId: string): Promise<void> { 11 const order = await this.orderService.get(orderId); 12 13 try { 14 // Step 1: Reserve inventory 15 await this.inventoryService.reserve(order.items); 16 17 // Step 2: Process payment 18 await this.paymentService.charge(order.customerId, order.total); 19 20 // Step 3: Confirm order 21 await this.orderService.confirm(orderId); 22 23 await this.eventBus.publish('OrderCompleted', { orderId }); 24 } catch (error) { 25 // Compensating transactions 26 await this.compensate(orderId, error); 27 } 28 } 29 30 private async compensate(orderId: string, error: Error): Promise<void> { 31 // Reverse inventory reservation 32 await this.inventoryService.release(orderId); 33 34 // Refund if payment was made 35 await this.paymentService.refund(orderId); 36 37 // Mark order as failed 38 await this.orderService.fail(orderId, error.message); 39 40 await this.eventBus.publish('OrderFailed', { orderId, reason: error.message }); 41 } 42}

Best Practices#

Event Design: ✓ Use past tense (OrderPlaced, not PlaceOrder) ✓ Include all relevant data ✓ Make events immutable ✓ Version your events Implementation: ✓ Handle failures gracefully ✓ Make handlers idempotent ✓ Use transactions for consistency ✓ Monitor event processing Avoid: ✗ Circular event chains ✗ Business logic in handlers ✗ Blocking event processing ✗ Losing events on failure

Conclusion#

Event-driven architecture enables loose coupling and scalability. Start with in-process events, graduate to message queues as needed, and consider event sourcing for audit trails and complex domains.

Share this article

Help spread the word about Bootspring