Event-driven architecture decouples components by communicating through events. Publishers emit events without knowing who listens. Subscribers react without knowing who published.
Node.js EventEmitter#
1import { EventEmitter } from 'events';
2
3interface UserEvents {
4 'user:created': (user: User) => void;
5 'user:updated': (user: User, changes: Partial<User>) => void;
6 'user:deleted': (userId: string) => void;
7}
8
9class TypedEventEmitter extends EventEmitter {
10 emit<K extends keyof UserEvents>(
11 event: K,
12 ...args: Parameters<UserEvents[K]>
13 ): boolean {
14 return super.emit(event, ...args);
15 }
16
17 on<K extends keyof UserEvents>(event: K, listener: UserEvents[K]): this {
18 return super.on(event, listener);
19 }
20}
21
22const userEvents = new TypedEventEmitter();
23
24// Subscribe
25userEvents.on('user:created', (user) => {
26 console.log(`User created: ${user.id}`);
27});
28
29userEvents.on('user:created', async (user) => {
30 await sendWelcomeEmail(user.email);
31});
32
33// Publish
34userEvents.emit('user:created', { id: '1', email: 'test@example.com', name: 'Test' });Application Event Bus#
1type EventHandler<T = any> = (payload: T) => void | Promise<void>;
2
3class EventBus {
4 private handlers = new Map<string, EventHandler[]>();
5
6 subscribe<T>(event: string, handler: EventHandler<T>): () => void {
7 if (!this.handlers.has(event)) {
8 this.handlers.set(event, []);
9 }
10 this.handlers.get(event)!.push(handler);
11
12 // Return unsubscribe function
13 return () => {
14 const handlers = this.handlers.get(event);
15 if (handlers) {
16 const index = handlers.indexOf(handler);
17 if (index > -1) handlers.splice(index, 1);
18 }
19 };
20 }
21
22 async publish<T>(event: string, payload: T): Promise<void> {
23 const handlers = this.handlers.get(event) || [];
24
25 await Promise.all(
26 handlers.map(async (handler) => {
27 try {
28 await handler(payload);
29 } catch (error) {
30 console.error(`Error in event handler for ${event}:`, error);
31 }
32 })
33 );
34 }
35}
36
37// Singleton
38export const eventBus = new EventBus();
39
40// Usage
41eventBus.subscribe('order:placed', async (order: Order) => {
42 await updateInventory(order.items);
43});
44
45eventBus.subscribe('order:placed', async (order: Order) => {
46 await sendOrderConfirmation(order);
47});
48
49await eventBus.publish('order:placed', order);Domain Events#
1interface DomainEvent {
2 type: string;
3 payload: any;
4 occurredAt: Date;
5 aggregateId: string;
6}
7
8// Event definitions
9const OrderEvents = {
10 PLACED: 'order.placed',
11 CONFIRMED: 'order.confirmed',
12 SHIPPED: 'order.shipped',
13 DELIVERED: 'order.delivered',
14 CANCELLED: 'order.cancelled',
15} as const;
16
17interface OrderPlacedEvent extends DomainEvent {
18 type: typeof OrderEvents.PLACED;
19 payload: {
20 orderId: string;
21 customerId: string;
22 items: OrderItem[];
23 total: number;
24 };
25}
26
27// Event creator
28function createEvent<T extends DomainEvent>(
29 type: T['type'],
30 aggregateId: string,
31 payload: T['payload']
32): T {
33 return {
34 type,
35 aggregateId,
36 payload,
37 occurredAt: new Date(),
38 } as T;
39}
40
41// Order aggregate with events
42class Order {
43 private events: DomainEvent[] = [];
44
45 constructor(
46 public readonly id: string,
47 public customerId: string,
48 public items: OrderItem[],
49 public status: string = 'pending'
50 ) {}
51
52 place(): void {
53 this.status = 'placed';
54 this.events.push(
55 createEvent<OrderPlacedEvent>(OrderEvents.PLACED, this.id, {
56 orderId: this.id,
57 customerId: this.customerId,
58 items: this.items,
59 total: this.calculateTotal(),
60 })
61 );
62 }
63
64 getUncommittedEvents(): DomainEvent[] {
65 return [...this.events];
66 }
67
68 clearEvents(): void {
69 this.events = [];
70 }
71
72 private calculateTotal(): number {
73 return this.items.reduce((sum, item) => sum + item.price * item.quantity, 0);
74 }
75}Message Queue Integration#
1import { Queue, Worker } from 'bullmq';
2import Redis from 'ioredis';
3
4const redis = new Redis(process.env.REDIS_URL);
5
6// Event queue
7const eventQueue = new Queue('events', { connection: redis });
8
9// Publish to queue
10async function publishEvent(event: DomainEvent): Promise<void> {
11 await eventQueue.add(event.type, event, {
12 attempts: 3,
13 backoff: {
14 type: 'exponential',
15 delay: 1000,
16 },
17 });
18}
19
20// Event handlers registry
21const eventHandlers = new Map<string, ((event: DomainEvent) => Promise<void>)[]>();
22
23function registerHandler(
24 eventType: string,
25 handler: (event: DomainEvent) => Promise<void>
26): void {
27 if (!eventHandlers.has(eventType)) {
28 eventHandlers.set(eventType, []);
29 }
30 eventHandlers.get(eventType)!.push(handler);
31}
32
33// Worker to process events
34const eventWorker = new Worker(
35 'events',
36 async (job) => {
37 const event = job.data as DomainEvent;
38 const handlers = eventHandlers.get(event.type) || [];
39
40 for (const handler of handlers) {
41 await handler(event);
42 }
43 },
44 { connection: redis }
45);
46
47// Register handlers
48registerHandler(OrderEvents.PLACED, async (event: OrderPlacedEvent) => {
49 await inventoryService.reserve(event.payload.items);
50});
51
52registerHandler(OrderEvents.PLACED, async (event: OrderPlacedEvent) => {
53 await notificationService.sendOrderConfirmation(event.payload.customerId);
54});Event Sourcing#
1interface Event {
2 id: string;
3 type: string;
4 aggregateId: string;
5 payload: any;
6 version: number;
7 timestamp: Date;
8}
9
10class EventStore {
11 constructor(private db: Database) {}
12
13 async save(aggregateId: string, events: Event[], expectedVersion: number): Promise<void> {
14 const currentVersion = await this.getVersion(aggregateId);
15
16 if (currentVersion !== expectedVersion) {
17 throw new ConcurrencyError(
18 `Expected version ${expectedVersion}, but found ${currentVersion}`
19 );
20 }
21
22 await this.db.transaction(async (tx) => {
23 for (let i = 0; i < events.length; i++) {
24 await tx.events.create({
25 data: {
26 ...events[i],
27 version: expectedVersion + i + 1,
28 },
29 });
30 }
31 });
32 }
33
34 async getEvents(aggregateId: string, fromVersion = 0): Promise<Event[]> {
35 return this.db.events.findMany({
36 where: {
37 aggregateId,
38 version: { gt: fromVersion },
39 },
40 orderBy: { version: 'asc' },
41 });
42 }
43
44 async getVersion(aggregateId: string): Promise<number> {
45 const lastEvent = await this.db.events.findFirst({
46 where: { aggregateId },
47 orderBy: { version: 'desc' },
48 });
49 return lastEvent?.version ?? 0;
50 }
51}
52
53// Aggregate rebuilding
54class OrderAggregate {
55 private state: OrderState = { status: 'draft', items: [] };
56 private version = 0;
57
58 apply(event: Event): void {
59 switch (event.type) {
60 case 'OrderCreated':
61 this.state = { ...this.state, id: event.payload.id, customerId: event.payload.customerId };
62 break;
63 case 'ItemAdded':
64 this.state.items.push(event.payload.item);
65 break;
66 case 'OrderPlaced':
67 this.state.status = 'placed';
68 break;
69 }
70 this.version = event.version;
71 }
72
73 static async load(eventStore: EventStore, id: string): Promise<OrderAggregate> {
74 const events = await eventStore.getEvents(id);
75 const aggregate = new OrderAggregate();
76
77 for (const event of events) {
78 aggregate.apply(event);
79 }
80
81 return aggregate;
82 }
83}CQRS Pattern#
1// Commands
2interface Command {
3 type: string;
4}
5
6interface PlaceOrderCommand extends Command {
7 type: 'PlaceOrder';
8 customerId: string;
9 items: OrderItem[];
10}
11
12// Command handler
13class OrderCommandHandler {
14 constructor(
15 private eventStore: EventStore,
16 private eventBus: EventBus
17 ) {}
18
19 async handle(command: PlaceOrderCommand): Promise<string> {
20 const orderId = crypto.randomUUID();
21
22 const events: Event[] = [
23 {
24 id: crypto.randomUUID(),
25 type: 'OrderPlaced',
26 aggregateId: orderId,
27 payload: {
28 customerId: command.customerId,
29 items: command.items,
30 },
31 version: 1,
32 timestamp: new Date(),
33 },
34 ];
35
36 await this.eventStore.save(orderId, events, 0);
37
38 // Publish for read model updates
39 for (const event of events) {
40 await this.eventBus.publish(event.type, event);
41 }
42
43 return orderId;
44 }
45}
46
47// Read model (projection)
48class OrderReadModel {
49 constructor(private db: Database) {}
50
51 async handle(event: Event): Promise<void> {
52 switch (event.type) {
53 case 'OrderPlaced':
54 await this.db.orderViews.create({
55 data: {
56 id: event.aggregateId,
57 customerId: event.payload.customerId,
58 itemCount: event.payload.items.length,
59 status: 'placed',
60 createdAt: event.timestamp,
61 },
62 });
63 break;
64
65 case 'OrderShipped':
66 await this.db.orderViews.update({
67 where: { id: event.aggregateId },
68 data: { status: 'shipped', shippedAt: event.timestamp },
69 });
70 break;
71 }
72 }
73}
74
75// Query
76async function getCustomerOrders(customerId: string): Promise<OrderView[]> {
77 return db.orderViews.findMany({
78 where: { customerId },
79 orderBy: { createdAt: 'desc' },
80 });
81}Saga Pattern#
1// Orchestration-based saga
2class OrderSaga {
3 constructor(
4 private orderService: OrderService,
5 private paymentService: PaymentService,
6 private inventoryService: InventoryService,
7 private eventBus: EventBus
8 ) {}
9
10 async execute(orderId: string): Promise<void> {
11 const order = await this.orderService.get(orderId);
12
13 try {
14 // Step 1: Reserve inventory
15 await this.inventoryService.reserve(order.items);
16
17 // Step 2: Process payment
18 await this.paymentService.charge(order.customerId, order.total);
19
20 // Step 3: Confirm order
21 await this.orderService.confirm(orderId);
22
23 await this.eventBus.publish('OrderCompleted', { orderId });
24 } catch (error) {
25 // Compensating transactions
26 await this.compensate(orderId, error);
27 }
28 }
29
30 private async compensate(orderId: string, error: Error): Promise<void> {
31 // Reverse inventory reservation
32 await this.inventoryService.release(orderId);
33
34 // Refund if payment was made
35 await this.paymentService.refund(orderId);
36
37 // Mark order as failed
38 await this.orderService.fail(orderId, error.message);
39
40 await this.eventBus.publish('OrderFailed', { orderId, reason: error.message });
41 }
42}Best Practices#
Event Design:
✓ Use past tense (OrderPlaced, not PlaceOrder)
✓ Include all relevant data
✓ Make events immutable
✓ Version your events
Implementation:
✓ Handle failures gracefully
✓ Make handlers idempotent
✓ Use transactions for consistency
✓ Monitor event processing
Avoid:
✗ Circular event chains
✗ Business logic in handlers
✗ Blocking event processing
✗ Losing events on failure
Conclusion#
Event-driven architecture enables loose coupling and scalability. Start with in-process events, graduate to message queues as needed, and consider event sourcing for audit trails and complex domains.