Event sourcing stores state as a sequence of events rather than current values. Here's how to implement it effectively.
What is Event Sourcing#
Traditional state storage:
- Store current state
- Update in place
- History lost
Event sourcing:
- Store events that caused state changes
- Derive current state from events
- Complete history preserved
Example:
Instead of: { balance: 150 }
Store events:
1. AccountOpened { balance: 100 }
2. MoneyDeposited { amount: 100 }
3. MoneyWithdrawn { amount: 50 }
Current state derived: { balance: 150 }
Event Definition#
1// Base event interface
2interface DomainEvent {
3 id: string;
4 aggregateId: string;
5 aggregateType: string;
6 type: string;
7 timestamp: Date;
8 version: number;
9 data: unknown;
10 metadata?: Record<string, unknown>;
11}
12
13// Account events
14interface AccountOpenedEvent extends DomainEvent {
15 type: 'AccountOpened';
16 data: {
17 accountId: string;
18 ownerId: string;
19 initialBalance: number;
20 };
21}
22
23interface MoneyDepositedEvent extends DomainEvent {
24 type: 'MoneyDeposited';
25 data: {
26 amount: number;
27 source: string;
28 };
29}
30
31interface MoneyWithdrawnEvent extends DomainEvent {
32 type: 'MoneyWithdrawn';
33 data: {
34 amount: number;
35 destination: string;
36 };
37}
38
39type AccountEvent =
40 | AccountOpenedEvent
41 | MoneyDepositedEvent
42 | MoneyWithdrawnEvent;Event Store#
1interface EventStore {
2 append(
3 aggregateId: string,
4 events: DomainEvent[],
5 expectedVersion: number
6 ): Promise<void>;
7
8 getEvents(
9 aggregateId: string,
10 fromVersion?: number
11 ): Promise<DomainEvent[]>;
12
13 getAllEvents(fromPosition?: number): Promise<DomainEvent[]>;
14}
15
16// PostgreSQL implementation
17class PostgresEventStore implements EventStore {
18 async append(
19 aggregateId: string,
20 events: DomainEvent[],
21 expectedVersion: number
22 ): Promise<void> {
23 await prisma.$transaction(async (tx) => {
24 // Optimistic concurrency check
25 const currentVersion = await tx.event.count({
26 where: { aggregateId },
27 });
28
29 if (currentVersion !== expectedVersion) {
30 throw new ConcurrencyError(
31 `Expected version ${expectedVersion}, found ${currentVersion}`
32 );
33 }
34
35 // Append events
36 await tx.event.createMany({
37 data: events.map((event, index) => ({
38 id: event.id,
39 aggregateId: event.aggregateId,
40 aggregateType: event.aggregateType,
41 type: event.type,
42 version: expectedVersion + index + 1,
43 data: event.data,
44 metadata: event.metadata,
45 timestamp: event.timestamp,
46 })),
47 });
48 });
49 }
50
51 async getEvents(
52 aggregateId: string,
53 fromVersion = 0
54 ): Promise<DomainEvent[]> {
55 return prisma.event.findMany({
56 where: {
57 aggregateId,
58 version: { gt: fromVersion },
59 },
60 orderBy: { version: 'asc' },
61 });
62 }
63}Aggregate#
1// Base aggregate
2abstract class Aggregate {
3 private uncommittedEvents: DomainEvent[] = [];
4 protected version = 0;
5
6 abstract apply(event: DomainEvent): void;
7
8 protected raise(event: Omit<DomainEvent, 'version'>): void {
9 this.version++;
10 const versionedEvent = { ...event, version: this.version };
11
12 this.apply(versionedEvent);
13 this.uncommittedEvents.push(versionedEvent);
14 }
15
16 getUncommittedEvents(): DomainEvent[] {
17 return [...this.uncommittedEvents];
18 }
19
20 clearUncommittedEvents(): void {
21 this.uncommittedEvents = [];
22 }
23
24 loadFromHistory(events: DomainEvent[]): void {
25 events.forEach((event) => {
26 this.apply(event);
27 this.version = event.version;
28 });
29 }
30}
31
32// Account aggregate
33class Account extends Aggregate {
34 private id: string = '';
35 private ownerId: string = '';
36 private balance: number = 0;
37 private isOpen: boolean = false;
38
39 static open(id: string, ownerId: string, initialBalance: number): Account {
40 const account = new Account();
41
42 account.raise({
43 id: crypto.randomUUID(),
44 aggregateId: id,
45 aggregateType: 'Account',
46 type: 'AccountOpened',
47 timestamp: new Date(),
48 data: { accountId: id, ownerId, initialBalance },
49 });
50
51 return account;
52 }
53
54 deposit(amount: number, source: string): void {
55 if (!this.isOpen) {
56 throw new Error('Account is not open');
57 }
58
59 this.raise({
60 id: crypto.randomUUID(),
61 aggregateId: this.id,
62 aggregateType: 'Account',
63 type: 'MoneyDeposited',
64 timestamp: new Date(),
65 data: { amount, source },
66 });
67 }
68
69 withdraw(amount: number, destination: string): void {
70 if (!this.isOpen) {
71 throw new Error('Account is not open');
72 }
73
74 if (this.balance < amount) {
75 throw new Error('Insufficient funds');
76 }
77
78 this.raise({
79 id: crypto.randomUUID(),
80 aggregateId: this.id,
81 aggregateType: 'Account',
82 type: 'MoneyWithdrawn',
83 timestamp: new Date(),
84 data: { amount, destination },
85 });
86 }
87
88 apply(event: DomainEvent): void {
89 switch (event.type) {
90 case 'AccountOpened':
91 this.id = event.data.accountId;
92 this.ownerId = event.data.ownerId;
93 this.balance = event.data.initialBalance;
94 this.isOpen = true;
95 break;
96
97 case 'MoneyDeposited':
98 this.balance += event.data.amount;
99 break;
100
101 case 'MoneyWithdrawn':
102 this.balance -= event.data.amount;
103 break;
104 }
105 }
106
107 getBalance(): number {
108 return this.balance;
109 }
110}Repository#
1class AccountRepository {
2 constructor(private eventStore: EventStore) {}
3
4 async save(account: Account): Promise<void> {
5 const events = account.getUncommittedEvents();
6
7 if (events.length === 0) {
8 return;
9 }
10
11 const aggregateId = events[0].aggregateId;
12 const expectedVersion = events[0].version - 1;
13
14 await this.eventStore.append(aggregateId, events, expectedVersion);
15 account.clearUncommittedEvents();
16 }
17
18 async getById(id: string): Promise<Account | null> {
19 const events = await this.eventStore.getEvents(id);
20
21 if (events.length === 0) {
22 return null;
23 }
24
25 const account = new Account();
26 account.loadFromHistory(events);
27
28 return account;
29 }
30}
31
32// Usage
33const repository = new AccountRepository(eventStore);
34
35// Create account
36const account = Account.open('acc-123', 'user-456', 1000);
37await repository.save(account);
38
39// Load and modify
40const loaded = await repository.getById('acc-123');
41loaded.deposit(500, 'ATM');
42loaded.withdraw(200, 'transfer');
43await repository.save(loaded);Projections#
1// Read model for account balances
2interface AccountProjection {
3 id: string;
4 ownerId: string;
5 balance: number;
6 transactionCount: number;
7 lastActivityAt: Date;
8}
9
10class AccountProjectionHandler {
11 async handle(event: DomainEvent): Promise<void> {
12 switch (event.type) {
13 case 'AccountOpened':
14 await prisma.accountProjection.create({
15 data: {
16 id: event.data.accountId,
17 ownerId: event.data.ownerId,
18 balance: event.data.initialBalance,
19 transactionCount: 0,
20 lastActivityAt: event.timestamp,
21 },
22 });
23 break;
24
25 case 'MoneyDeposited':
26 await prisma.accountProjection.update({
27 where: { id: event.aggregateId },
28 data: {
29 balance: { increment: event.data.amount },
30 transactionCount: { increment: 1 },
31 lastActivityAt: event.timestamp,
32 },
33 });
34 break;
35
36 case 'MoneyWithdrawn':
37 await prisma.accountProjection.update({
38 where: { id: event.aggregateId },
39 data: {
40 balance: { decrement: event.data.amount },
41 transactionCount: { increment: 1 },
42 lastActivityAt: event.timestamp,
43 },
44 });
45 break;
46 }
47 }
48}
49
50// Event subscriber
51async function processEvents(): Promise<void> {
52 const handler = new AccountProjectionHandler();
53 let lastPosition = await getLastProcessedPosition();
54
55 const events = await eventStore.getAllEvents(lastPosition);
56
57 for (const event of events) {
58 await handler.handle(event);
59 await saveLastProcessedPosition(event.position);
60 }
61}Snapshots#
1interface Snapshot {
2 aggregateId: string;
3 version: number;
4 state: unknown;
5 createdAt: Date;
6}
7
8class SnapshotRepository {
9 async save(aggregateId: string, version: number, state: unknown): Promise<void> {
10 await prisma.snapshot.upsert({
11 where: { aggregateId },
12 create: { aggregateId, version, state, createdAt: new Date() },
13 update: { version, state, createdAt: new Date() },
14 });
15 }
16
17 async get(aggregateId: string): Promise<Snapshot | null> {
18 return prisma.snapshot.findUnique({ where: { aggregateId } });
19 }
20}
21
22// Repository with snapshots
23class AccountRepositoryWithSnapshots {
24 private snapshotFrequency = 100; // Snapshot every 100 events
25
26 async getById(id: string): Promise<Account | null> {
27 const account = new Account();
28
29 // Load from snapshot if available
30 const snapshot = await this.snapshotRepo.get(id);
31 if (snapshot) {
32 account.loadFromSnapshot(snapshot);
33 }
34
35 // Load events after snapshot
36 const fromVersion = snapshot?.version ?? 0;
37 const events = await this.eventStore.getEvents(id, fromVersion);
38
39 if (events.length === 0 && !snapshot) {
40 return null;
41 }
42
43 account.loadFromHistory(events);
44
45 // Create snapshot if needed
46 if (events.length >= this.snapshotFrequency) {
47 await this.snapshotRepo.save(id, account.version, account.getState());
48 }
49
50 return account;
51 }
52}Best Practices#
Events:
✓ Make events immutable
✓ Include all necessary data
✓ Use past tense naming
✓ Version event schemas
Storage:
✓ Use append-only storage
✓ Implement optimistic concurrency
✓ Create snapshots for performance
✓ Archive old events
Projections:
✓ Build for specific queries
✓ Handle idempotency
✓ Enable rebuilding
✓ Monitor projection lag
Conclusion#
Event sourcing provides complete audit history and enables temporal queries. Start with clear event definitions, implement proper aggregate handling, and build projections for read performance. The pattern works well for domains where history and audit trails are important.