Apache Kafka is a distributed streaming platform for building real-time data pipelines. This guide covers essential patterns for producing, consuming, and processing streaming data.
Core Concepts#
┌─────────────────────────────────────────────────────────────┐
│ KAFKA CLUSTER │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Broker │ │ Broker │ │ Broker │ │
│ │ 1 │ │ 2 │ │ 3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Topic: orders │
│ ┌────────────┬────────────┬────────────┐ │
│ │ Partition 0│ Partition 1│ Partition 2│ │
│ │ [0,1,2,3] │ [0,1,2] │ [0,1,2,3,4] │
│ └────────────┴────────────┴────────────┘ │
└─────────────────────────────────────────────────────────────┘
Producers ─────────────────► Topics ─────────────────► Consumers
Producer Implementation#
Basic Producer#
1import { Kafka, Partitioners } from 'kafkajs';
2
3const kafka = new Kafka({
4 clientId: 'order-service',
5 brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
6 ssl: true,
7 sasl: {
8 mechanism: 'plain',
9 username: process.env.KAFKA_USERNAME!,
10 password: process.env.KAFKA_PASSWORD!,
11 },
12});
13
14const producer = kafka.producer({
15 createPartitioner: Partitioners.DefaultPartitioner,
16 idempotent: true, // Exactly-once semantics
17 maxInFlightRequests: 5,
18 transactionTimeout: 60000,
19});
20
21async function initProducer() {
22 await producer.connect();
23
24 // Handle disconnects
25 producer.on('producer.disconnect', () => {
26 console.error('Producer disconnected');
27 // Implement reconnection logic
28 });
29}
30
31async function sendEvent(topic: string, event: object, key?: string) {
32 try {
33 const result = await producer.send({
34 topic,
35 messages: [
36 {
37 key: key || undefined,
38 value: JSON.stringify(event),
39 headers: {
40 'correlation-id': crypto.randomUUID(),
41 'event-type': event.type,
42 'timestamp': Date.now().toString(),
43 },
44 },
45 ],
46 });
47
48 console.log('Message sent:', result);
49 return result;
50 } catch (error) {
51 console.error('Failed to send message:', error);
52 throw error;
53 }
54}Batched Producer#
1class BatchProducer {
2 private batch: Array<{ topic: string; messages: any[] }> = [];
3 private batchSize = 100;
4 private flushInterval = 1000;
5 private timer: NodeJS.Timeout | null = null;
6
7 constructor(private producer: Producer) {
8 this.startFlushTimer();
9 }
10
11 async send(topic: string, event: object, key?: string) {
12 const topicBatch = this.batch.find(b => b.topic === topic);
13
14 const message = {
15 key: key || undefined,
16 value: JSON.stringify(event),
17 };
18
19 if (topicBatch) {
20 topicBatch.messages.push(message);
21 } else {
22 this.batch.push({ topic, messages: [message] });
23 }
24
25 if (this.getTotalMessages() >= this.batchSize) {
26 await this.flush();
27 }
28 }
29
30 private getTotalMessages(): number {
31 return this.batch.reduce((sum, b) => sum + b.messages.length, 0);
32 }
33
34 async flush() {
35 if (this.batch.length === 0) return;
36
37 const currentBatch = this.batch;
38 this.batch = [];
39
40 await this.producer.sendBatch({
41 topicMessages: currentBatch,
42 });
43 }
44
45 private startFlushTimer() {
46 this.timer = setInterval(() => this.flush(), this.flushInterval);
47 }
48
49 async shutdown() {
50 if (this.timer) clearInterval(this.timer);
51 await this.flush();
52 }
53}Consumer Implementation#
Basic Consumer#
1const consumer = kafka.consumer({
2 groupId: 'order-processor',
3 sessionTimeout: 30000,
4 heartbeatInterval: 3000,
5 maxBytesPerPartition: 1048576, // 1MB
6 retry: {
7 retries: 5,
8 initialRetryTime: 100,
9 maxRetryTime: 30000,
10 },
11});
12
13async function startConsumer() {
14 await consumer.connect();
15
16 await consumer.subscribe({
17 topics: ['orders', 'payments'],
18 fromBeginning: false,
19 });
20
21 await consumer.run({
22 eachMessage: async ({ topic, partition, message }) => {
23 const event = JSON.parse(message.value!.toString());
24
25 console.log({
26 topic,
27 partition,
28 offset: message.offset,
29 key: message.key?.toString(),
30 event,
31 });
32
33 await processEvent(event);
34 },
35 });
36}
37
38// Graceful shutdown
39process.on('SIGTERM', async () => {
40 await consumer.disconnect();
41 process.exit(0);
42});Batch Consumer#
1await consumer.run({
2 eachBatch: async ({
3 batch,
4 resolveOffset,
5 heartbeat,
6 commitOffsetsIfNecessary,
7 isRunning,
8 isStale,
9 }) => {
10 for (const message of batch.messages) {
11 if (!isRunning() || isStale()) break;
12
13 try {
14 const event = JSON.parse(message.value!.toString());
15 await processEvent(event);
16
17 resolveOffset(message.offset);
18 await heartbeat();
19 } catch (error) {
20 console.error('Processing error:', error);
21 // Don't resolve offset - will retry
22 throw error;
23 }
24 }
25
26 await commitOffsetsIfNecessary();
27 },
28});Stream Processing#
Kafka Streams with TypeScript#
1import { KafkaStreams } from 'kafka-streams';
2
3const config = {
4 kafkaHost: 'kafka:9092',
5 groupId: 'stream-processor',
6 clientName: 'analytics-stream',
7};
8
9const kafkaStreams = new KafkaStreams(config);
10
11// Stream processing pipeline
12const stream = kafkaStreams.getKStream('page-views');
13
14stream
15 // Parse JSON
16 .map((message) => JSON.parse(message.value))
17
18 // Filter by event type
19 .filter((event) => event.type === 'page_view')
20
21 // Group by page
22 .branch([
23 (event) => event.page.startsWith('/products'),
24 (event) => event.page.startsWith('/checkout'),
25 ])
26 .map((branches) => {
27 const [productViews, checkoutViews] = branches;
28
29 productViews
30 .countByKey('product_id', 'tumbling', 60000) // 1 minute window
31 .to('product-view-counts');
32
33 checkoutViews
34 .countByKey('user_id', 'tumbling', 300000) // 5 minute window
35 .to('checkout-funnel');
36 });
37
38stream.start();Event Aggregation#
1interface PageViewEvent {
2 userId: string;
3 page: string;
4 timestamp: number;
5}
6
7interface SessionAggregate {
8 userId: string;
9 pages: string[];
10 startTime: number;
11 endTime: number;
12 duration: number;
13}
14
15class SessionAggregator {
16 private sessions: Map<string, SessionAggregate> = new Map();
17 private sessionTimeout = 30 * 60 * 1000; // 30 minutes
18
19 process(event: PageViewEvent): SessionAggregate | null {
20 const existing = this.sessions.get(event.userId);
21
22 if (existing) {
23 // Check if session expired
24 if (event.timestamp - existing.endTime > this.sessionTimeout) {
25 // Emit completed session
26 this.sessions.delete(event.userId);
27 this.startNewSession(event);
28 return existing;
29 }
30
31 // Update existing session
32 existing.pages.push(event.page);
33 existing.endTime = event.timestamp;
34 existing.duration = existing.endTime - existing.startTime;
35 return null;
36 }
37
38 this.startNewSession(event);
39 return null;
40 }
41
42 private startNewSession(event: PageViewEvent) {
43 this.sessions.set(event.userId, {
44 userId: event.userId,
45 pages: [event.page],
46 startTime: event.timestamp,
47 endTime: event.timestamp,
48 duration: 0,
49 });
50 }
51}Error Handling#
Dead Letter Queue#
1async function processWithDLQ(message: KafkaMessage, topic: string) {
2 const retryCount = parseInt(
3 message.headers?.['retry-count']?.toString() || '0'
4 );
5
6 try {
7 const event = JSON.parse(message.value!.toString());
8 await processEvent(event);
9 } catch (error) {
10 if (retryCount < 3) {
11 // Retry with exponential backoff
12 await producer.send({
13 topic: `${topic}-retry`,
14 messages: [{
15 key: message.key,
16 value: message.value,
17 headers: {
18 ...message.headers,
19 'retry-count': (retryCount + 1).toString(),
20 'original-topic': topic,
21 'error': error.message,
22 },
23 }],
24 });
25 } else {
26 // Send to DLQ
27 await producer.send({
28 topic: `${topic}-dlq`,
29 messages: [{
30 key: message.key,
31 value: message.value,
32 headers: {
33 ...message.headers,
34 'failed-at': new Date().toISOString(),
35 'error': error.message,
36 'stack': error.stack,
37 },
38 }],
39 });
40 }
41 }
42}Consumer Error Recovery#
1await consumer.run({
2 eachMessage: async ({ topic, partition, message }) => {
3 try {
4 await processMessage(message);
5 } catch (error) {
6 if (isRetryableError(error)) {
7 // Pause partition and retry after delay
8 consumer.pause([{ topic, partitions: [partition] }]);
9
10 setTimeout(() => {
11 consumer.resume([{ topic, partitions: [partition] }]);
12 }, 5000);
13
14 throw error; // Don't commit offset
15 }
16
17 // Non-retryable error - send to DLQ
18 await sendToDLQ(topic, message, error);
19 }
20 },
21});
22
23function isRetryableError(error: Error): boolean {
24 return (
25 error.message.includes('timeout') ||
26 error.message.includes('connection') ||
27 error.message.includes('temporarily unavailable')
28 );
29}Monitoring#
Consumer Lag Monitoring#
1const admin = kafka.admin();
2
3async function getConsumerLag(groupId: string) {
4 const groupDescription = await admin.describeGroups([groupId]);
5 const topics = await admin.fetchTopicOffsets(groupId);
6
7 const lag: Record<string, number> = {};
8
9 for (const [topic, partitions] of Object.entries(topics)) {
10 const topicOffsets = await admin.fetchTopicOffsets(topic);
11
12 let topicLag = 0;
13 for (const partition of partitions) {
14 const latestOffset = topicOffsets.find(
15 (o) => o.partition === partition.partition
16 );
17 if (latestOffset) {
18 topicLag += parseInt(latestOffset.offset) - parseInt(partition.offset);
19 }
20 }
21 lag[topic] = topicLag;
22 }
23
24 return lag;
25}
26
27// Expose as Prometheus metrics
28import { Gauge } from 'prom-client';
29
30const consumerLag = new Gauge({
31 name: 'kafka_consumer_lag',
32 help: 'Consumer group lag',
33 labelNames: ['topic', 'group'],
34});
35
36setInterval(async () => {
37 const lag = await getConsumerLag('order-processor');
38 for (const [topic, value] of Object.entries(lag)) {
39 consumerLag.set({ topic, group: 'order-processor' }, value);
40 }
41}, 10000);Best Practices#
- Use meaningful keys: For proper partitioning and ordering
- Set appropriate retention: Balance storage with replay needs
- Monitor consumer lag: Alert on growing lag
- Implement idempotency: Handle duplicate messages
- Use schemas: Avro or Protobuf for type safety
- Test failure scenarios: Network issues, broker failures
Conclusion#
Kafka enables scalable, fault-tolerant data streaming. Start with simple producers and consumers, then add stream processing as complexity grows. Focus on proper error handling and monitoring for production reliability.