Back to Blog
Event SourcingArchitectureDatabaseCQRS

Event Sourcing Fundamentals

Understand event sourcing. From event stores to projections to rebuilding state from events.

B
Bootspring Team
Engineering
September 5, 2022
6 min read

Event sourcing stores state as a sequence of events rather than current values. Here's how to implement it effectively.

What is Event Sourcing#

Traditional state storage: - Store current state - Update in place - History lost Event sourcing: - Store events that caused state changes - Derive current state from events - Complete history preserved Example: Instead of: { balance: 150 } Store events: 1. AccountOpened { balance: 100 } 2. MoneyDeposited { amount: 100 } 3. MoneyWithdrawn { amount: 50 } Current state derived: { balance: 150 }

Event Definition#

1// Base event interface 2interface DomainEvent { 3 id: string; 4 aggregateId: string; 5 aggregateType: string; 6 type: string; 7 timestamp: Date; 8 version: number; 9 data: unknown; 10 metadata?: Record<string, unknown>; 11} 12 13// Account events 14interface AccountOpenedEvent extends DomainEvent { 15 type: 'AccountOpened'; 16 data: { 17 accountId: string; 18 ownerId: string; 19 initialBalance: number; 20 }; 21} 22 23interface MoneyDepositedEvent extends DomainEvent { 24 type: 'MoneyDeposited'; 25 data: { 26 amount: number; 27 source: string; 28 }; 29} 30 31interface MoneyWithdrawnEvent extends DomainEvent { 32 type: 'MoneyWithdrawn'; 33 data: { 34 amount: number; 35 destination: string; 36 }; 37} 38 39type AccountEvent = 40 | AccountOpenedEvent 41 | MoneyDepositedEvent 42 | MoneyWithdrawnEvent;

Event Store#

1interface EventStore { 2 append( 3 aggregateId: string, 4 events: DomainEvent[], 5 expectedVersion: number 6 ): Promise<void>; 7 8 getEvents( 9 aggregateId: string, 10 fromVersion?: number 11 ): Promise<DomainEvent[]>; 12 13 getAllEvents(fromPosition?: number): Promise<DomainEvent[]>; 14} 15 16// PostgreSQL implementation 17class PostgresEventStore implements EventStore { 18 async append( 19 aggregateId: string, 20 events: DomainEvent[], 21 expectedVersion: number 22 ): Promise<void> { 23 await prisma.$transaction(async (tx) => { 24 // Optimistic concurrency check 25 const currentVersion = await tx.event.count({ 26 where: { aggregateId }, 27 }); 28 29 if (currentVersion !== expectedVersion) { 30 throw new ConcurrencyError( 31 `Expected version ${expectedVersion}, found ${currentVersion}` 32 ); 33 } 34 35 // Append events 36 await tx.event.createMany({ 37 data: events.map((event, index) => ({ 38 id: event.id, 39 aggregateId: event.aggregateId, 40 aggregateType: event.aggregateType, 41 type: event.type, 42 version: expectedVersion + index + 1, 43 data: event.data, 44 metadata: event.metadata, 45 timestamp: event.timestamp, 46 })), 47 }); 48 }); 49 } 50 51 async getEvents( 52 aggregateId: string, 53 fromVersion = 0 54 ): Promise<DomainEvent[]> { 55 return prisma.event.findMany({ 56 where: { 57 aggregateId, 58 version: { gt: fromVersion }, 59 }, 60 orderBy: { version: 'asc' }, 61 }); 62 } 63}

Aggregate#

1// Base aggregate 2abstract class Aggregate { 3 private uncommittedEvents: DomainEvent[] = []; 4 protected version = 0; 5 6 abstract apply(event: DomainEvent): void; 7 8 protected raise(event: Omit<DomainEvent, 'version'>): void { 9 this.version++; 10 const versionedEvent = { ...event, version: this.version }; 11 12 this.apply(versionedEvent); 13 this.uncommittedEvents.push(versionedEvent); 14 } 15 16 getUncommittedEvents(): DomainEvent[] { 17 return [...this.uncommittedEvents]; 18 } 19 20 clearUncommittedEvents(): void { 21 this.uncommittedEvents = []; 22 } 23 24 loadFromHistory(events: DomainEvent[]): void { 25 events.forEach((event) => { 26 this.apply(event); 27 this.version = event.version; 28 }); 29 } 30} 31 32// Account aggregate 33class Account extends Aggregate { 34 private id: string = ''; 35 private ownerId: string = ''; 36 private balance: number = 0; 37 private isOpen: boolean = false; 38 39 static open(id: string, ownerId: string, initialBalance: number): Account { 40 const account = new Account(); 41 42 account.raise({ 43 id: crypto.randomUUID(), 44 aggregateId: id, 45 aggregateType: 'Account', 46 type: 'AccountOpened', 47 timestamp: new Date(), 48 data: { accountId: id, ownerId, initialBalance }, 49 }); 50 51 return account; 52 } 53 54 deposit(amount: number, source: string): void { 55 if (!this.isOpen) { 56 throw new Error('Account is not open'); 57 } 58 59 this.raise({ 60 id: crypto.randomUUID(), 61 aggregateId: this.id, 62 aggregateType: 'Account', 63 type: 'MoneyDeposited', 64 timestamp: new Date(), 65 data: { amount, source }, 66 }); 67 } 68 69 withdraw(amount: number, destination: string): void { 70 if (!this.isOpen) { 71 throw new Error('Account is not open'); 72 } 73 74 if (this.balance < amount) { 75 throw new Error('Insufficient funds'); 76 } 77 78 this.raise({ 79 id: crypto.randomUUID(), 80 aggregateId: this.id, 81 aggregateType: 'Account', 82 type: 'MoneyWithdrawn', 83 timestamp: new Date(), 84 data: { amount, destination }, 85 }); 86 } 87 88 apply(event: DomainEvent): void { 89 switch (event.type) { 90 case 'AccountOpened': 91 this.id = event.data.accountId; 92 this.ownerId = event.data.ownerId; 93 this.balance = event.data.initialBalance; 94 this.isOpen = true; 95 break; 96 97 case 'MoneyDeposited': 98 this.balance += event.data.amount; 99 break; 100 101 case 'MoneyWithdrawn': 102 this.balance -= event.data.amount; 103 break; 104 } 105 } 106 107 getBalance(): number { 108 return this.balance; 109 } 110}

Repository#

1class AccountRepository { 2 constructor(private eventStore: EventStore) {} 3 4 async save(account: Account): Promise<void> { 5 const events = account.getUncommittedEvents(); 6 7 if (events.length === 0) { 8 return; 9 } 10 11 const aggregateId = events[0].aggregateId; 12 const expectedVersion = events[0].version - 1; 13 14 await this.eventStore.append(aggregateId, events, expectedVersion); 15 account.clearUncommittedEvents(); 16 } 17 18 async getById(id: string): Promise<Account | null> { 19 const events = await this.eventStore.getEvents(id); 20 21 if (events.length === 0) { 22 return null; 23 } 24 25 const account = new Account(); 26 account.loadFromHistory(events); 27 28 return account; 29 } 30} 31 32// Usage 33const repository = new AccountRepository(eventStore); 34 35// Create account 36const account = Account.open('acc-123', 'user-456', 1000); 37await repository.save(account); 38 39// Load and modify 40const loaded = await repository.getById('acc-123'); 41loaded.deposit(500, 'ATM'); 42loaded.withdraw(200, 'transfer'); 43await repository.save(loaded);

Projections#

1// Read model for account balances 2interface AccountProjection { 3 id: string; 4 ownerId: string; 5 balance: number; 6 transactionCount: number; 7 lastActivityAt: Date; 8} 9 10class AccountProjectionHandler { 11 async handle(event: DomainEvent): Promise<void> { 12 switch (event.type) { 13 case 'AccountOpened': 14 await prisma.accountProjection.create({ 15 data: { 16 id: event.data.accountId, 17 ownerId: event.data.ownerId, 18 balance: event.data.initialBalance, 19 transactionCount: 0, 20 lastActivityAt: event.timestamp, 21 }, 22 }); 23 break; 24 25 case 'MoneyDeposited': 26 await prisma.accountProjection.update({ 27 where: { id: event.aggregateId }, 28 data: { 29 balance: { increment: event.data.amount }, 30 transactionCount: { increment: 1 }, 31 lastActivityAt: event.timestamp, 32 }, 33 }); 34 break; 35 36 case 'MoneyWithdrawn': 37 await prisma.accountProjection.update({ 38 where: { id: event.aggregateId }, 39 data: { 40 balance: { decrement: event.data.amount }, 41 transactionCount: { increment: 1 }, 42 lastActivityAt: event.timestamp, 43 }, 44 }); 45 break; 46 } 47 } 48} 49 50// Event subscriber 51async function processEvents(): Promise<void> { 52 const handler = new AccountProjectionHandler(); 53 let lastPosition = await getLastProcessedPosition(); 54 55 const events = await eventStore.getAllEvents(lastPosition); 56 57 for (const event of events) { 58 await handler.handle(event); 59 await saveLastProcessedPosition(event.position); 60 } 61}

Snapshots#

1interface Snapshot { 2 aggregateId: string; 3 version: number; 4 state: unknown; 5 createdAt: Date; 6} 7 8class SnapshotRepository { 9 async save(aggregateId: string, version: number, state: unknown): Promise<void> { 10 await prisma.snapshot.upsert({ 11 where: { aggregateId }, 12 create: { aggregateId, version, state, createdAt: new Date() }, 13 update: { version, state, createdAt: new Date() }, 14 }); 15 } 16 17 async get(aggregateId: string): Promise<Snapshot | null> { 18 return prisma.snapshot.findUnique({ where: { aggregateId } }); 19 } 20} 21 22// Repository with snapshots 23class AccountRepositoryWithSnapshots { 24 private snapshotFrequency = 100; // Snapshot every 100 events 25 26 async getById(id: string): Promise<Account | null> { 27 const account = new Account(); 28 29 // Load from snapshot if available 30 const snapshot = await this.snapshotRepo.get(id); 31 if (snapshot) { 32 account.loadFromSnapshot(snapshot); 33 } 34 35 // Load events after snapshot 36 const fromVersion = snapshot?.version ?? 0; 37 const events = await this.eventStore.getEvents(id, fromVersion); 38 39 if (events.length === 0 && !snapshot) { 40 return null; 41 } 42 43 account.loadFromHistory(events); 44 45 // Create snapshot if needed 46 if (events.length >= this.snapshotFrequency) { 47 await this.snapshotRepo.save(id, account.version, account.getState()); 48 } 49 50 return account; 51 } 52}

Best Practices#

Events: ✓ Make events immutable ✓ Include all necessary data ✓ Use past tense naming ✓ Version event schemas Storage: ✓ Use append-only storage ✓ Implement optimistic concurrency ✓ Create snapshots for performance ✓ Archive old events Projections: ✓ Build for specific queries ✓ Handle idempotency ✓ Enable rebuilding ✓ Monitor projection lag

Conclusion#

Event sourcing provides complete audit history and enables temporal queries. Start with clear event definitions, implement proper aggregate handling, and build projections for read performance. The pattern works well for domains where history and audit trails are important.

Share this article

Help spread the word about Bootspring