Back to Blog
KafkaData StreamingReal-TimeData Engineering

Apache Kafka: Building Real-Time Data Pipelines

Master Apache Kafka for real-time data streaming. Learn producers, consumers, and patterns for building scalable data pipelines.

B
Bootspring Team
Engineering
February 26, 2026
6 min read

Apache Kafka is a distributed streaming platform for building real-time data pipelines. This guide covers essential patterns for producing, consuming, and processing streaming data.

Core Concepts#

┌─────────────────────────────────────────────────────────────┐ │ KAFKA CLUSTER │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Broker │ │ Broker │ │ Broker │ │ │ │ 1 │ │ 2 │ │ 3 │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ Topic: orders │ │ ┌────────────┬────────────┬────────────┐ │ │ │ Partition 0│ Partition 1│ Partition 2│ │ │ │ [0,1,2,3] │ [0,1,2] │ [0,1,2,3,4] │ │ └────────────┴────────────┴────────────┘ │ └─────────────────────────────────────────────────────────────┘ Producers ─────────────────► Topics ─────────────────► Consumers

Producer Implementation#

Basic Producer#

1import { Kafka, Partitioners } from 'kafkajs'; 2 3const kafka = new Kafka({ 4 clientId: 'order-service', 5 brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'], 6 ssl: true, 7 sasl: { 8 mechanism: 'plain', 9 username: process.env.KAFKA_USERNAME!, 10 password: process.env.KAFKA_PASSWORD!, 11 }, 12}); 13 14const producer = kafka.producer({ 15 createPartitioner: Partitioners.DefaultPartitioner, 16 idempotent: true, // Exactly-once semantics 17 maxInFlightRequests: 5, 18 transactionTimeout: 60000, 19}); 20 21async function initProducer() { 22 await producer.connect(); 23 24 // Handle disconnects 25 producer.on('producer.disconnect', () => { 26 console.error('Producer disconnected'); 27 // Implement reconnection logic 28 }); 29} 30 31async function sendEvent(topic: string, event: object, key?: string) { 32 try { 33 const result = await producer.send({ 34 topic, 35 messages: [ 36 { 37 key: key || undefined, 38 value: JSON.stringify(event), 39 headers: { 40 'correlation-id': crypto.randomUUID(), 41 'event-type': event.type, 42 'timestamp': Date.now().toString(), 43 }, 44 }, 45 ], 46 }); 47 48 console.log('Message sent:', result); 49 return result; 50 } catch (error) { 51 console.error('Failed to send message:', error); 52 throw error; 53 } 54}

Batched Producer#

1class BatchProducer { 2 private batch: Array<{ topic: string; messages: any[] }> = []; 3 private batchSize = 100; 4 private flushInterval = 1000; 5 private timer: NodeJS.Timeout | null = null; 6 7 constructor(private producer: Producer) { 8 this.startFlushTimer(); 9 } 10 11 async send(topic: string, event: object, key?: string) { 12 const topicBatch = this.batch.find(b => b.topic === topic); 13 14 const message = { 15 key: key || undefined, 16 value: JSON.stringify(event), 17 }; 18 19 if (topicBatch) { 20 topicBatch.messages.push(message); 21 } else { 22 this.batch.push({ topic, messages: [message] }); 23 } 24 25 if (this.getTotalMessages() >= this.batchSize) { 26 await this.flush(); 27 } 28 } 29 30 private getTotalMessages(): number { 31 return this.batch.reduce((sum, b) => sum + b.messages.length, 0); 32 } 33 34 async flush() { 35 if (this.batch.length === 0) return; 36 37 const currentBatch = this.batch; 38 this.batch = []; 39 40 await this.producer.sendBatch({ 41 topicMessages: currentBatch, 42 }); 43 } 44 45 private startFlushTimer() { 46 this.timer = setInterval(() => this.flush(), this.flushInterval); 47 } 48 49 async shutdown() { 50 if (this.timer) clearInterval(this.timer); 51 await this.flush(); 52 } 53}

Consumer Implementation#

Basic Consumer#

1const consumer = kafka.consumer({ 2 groupId: 'order-processor', 3 sessionTimeout: 30000, 4 heartbeatInterval: 3000, 5 maxBytesPerPartition: 1048576, // 1MB 6 retry: { 7 retries: 5, 8 initialRetryTime: 100, 9 maxRetryTime: 30000, 10 }, 11}); 12 13async function startConsumer() { 14 await consumer.connect(); 15 16 await consumer.subscribe({ 17 topics: ['orders', 'payments'], 18 fromBeginning: false, 19 }); 20 21 await consumer.run({ 22 eachMessage: async ({ topic, partition, message }) => { 23 const event = JSON.parse(message.value!.toString()); 24 25 console.log({ 26 topic, 27 partition, 28 offset: message.offset, 29 key: message.key?.toString(), 30 event, 31 }); 32 33 await processEvent(event); 34 }, 35 }); 36} 37 38// Graceful shutdown 39process.on('SIGTERM', async () => { 40 await consumer.disconnect(); 41 process.exit(0); 42});

Batch Consumer#

1await consumer.run({ 2 eachBatch: async ({ 3 batch, 4 resolveOffset, 5 heartbeat, 6 commitOffsetsIfNecessary, 7 isRunning, 8 isStale, 9 }) => { 10 for (const message of batch.messages) { 11 if (!isRunning() || isStale()) break; 12 13 try { 14 const event = JSON.parse(message.value!.toString()); 15 await processEvent(event); 16 17 resolveOffset(message.offset); 18 await heartbeat(); 19 } catch (error) { 20 console.error('Processing error:', error); 21 // Don't resolve offset - will retry 22 throw error; 23 } 24 } 25 26 await commitOffsetsIfNecessary(); 27 }, 28});

Stream Processing#

Kafka Streams with TypeScript#

1import { KafkaStreams } from 'kafka-streams'; 2 3const config = { 4 kafkaHost: 'kafka:9092', 5 groupId: 'stream-processor', 6 clientName: 'analytics-stream', 7}; 8 9const kafkaStreams = new KafkaStreams(config); 10 11// Stream processing pipeline 12const stream = kafkaStreams.getKStream('page-views'); 13 14stream 15 // Parse JSON 16 .map((message) => JSON.parse(message.value)) 17 18 // Filter by event type 19 .filter((event) => event.type === 'page_view') 20 21 // Group by page 22 .branch([ 23 (event) => event.page.startsWith('/products'), 24 (event) => event.page.startsWith('/checkout'), 25 ]) 26 .map((branches) => { 27 const [productViews, checkoutViews] = branches; 28 29 productViews 30 .countByKey('product_id', 'tumbling', 60000) // 1 minute window 31 .to('product-view-counts'); 32 33 checkoutViews 34 .countByKey('user_id', 'tumbling', 300000) // 5 minute window 35 .to('checkout-funnel'); 36 }); 37 38stream.start();

Event Aggregation#

1interface PageViewEvent { 2 userId: string; 3 page: string; 4 timestamp: number; 5} 6 7interface SessionAggregate { 8 userId: string; 9 pages: string[]; 10 startTime: number; 11 endTime: number; 12 duration: number; 13} 14 15class SessionAggregator { 16 private sessions: Map<string, SessionAggregate> = new Map(); 17 private sessionTimeout = 30 * 60 * 1000; // 30 minutes 18 19 process(event: PageViewEvent): SessionAggregate | null { 20 const existing = this.sessions.get(event.userId); 21 22 if (existing) { 23 // Check if session expired 24 if (event.timestamp - existing.endTime > this.sessionTimeout) { 25 // Emit completed session 26 this.sessions.delete(event.userId); 27 this.startNewSession(event); 28 return existing; 29 } 30 31 // Update existing session 32 existing.pages.push(event.page); 33 existing.endTime = event.timestamp; 34 existing.duration = existing.endTime - existing.startTime; 35 return null; 36 } 37 38 this.startNewSession(event); 39 return null; 40 } 41 42 private startNewSession(event: PageViewEvent) { 43 this.sessions.set(event.userId, { 44 userId: event.userId, 45 pages: [event.page], 46 startTime: event.timestamp, 47 endTime: event.timestamp, 48 duration: 0, 49 }); 50 } 51}

Error Handling#

Dead Letter Queue#

1async function processWithDLQ(message: KafkaMessage, topic: string) { 2 const retryCount = parseInt( 3 message.headers?.['retry-count']?.toString() || '0' 4 ); 5 6 try { 7 const event = JSON.parse(message.value!.toString()); 8 await processEvent(event); 9 } catch (error) { 10 if (retryCount < 3) { 11 // Retry with exponential backoff 12 await producer.send({ 13 topic: `${topic}-retry`, 14 messages: [{ 15 key: message.key, 16 value: message.value, 17 headers: { 18 ...message.headers, 19 'retry-count': (retryCount + 1).toString(), 20 'original-topic': topic, 21 'error': error.message, 22 }, 23 }], 24 }); 25 } else { 26 // Send to DLQ 27 await producer.send({ 28 topic: `${topic}-dlq`, 29 messages: [{ 30 key: message.key, 31 value: message.value, 32 headers: { 33 ...message.headers, 34 'failed-at': new Date().toISOString(), 35 'error': error.message, 36 'stack': error.stack, 37 }, 38 }], 39 }); 40 } 41 } 42}

Consumer Error Recovery#

1await consumer.run({ 2 eachMessage: async ({ topic, partition, message }) => { 3 try { 4 await processMessage(message); 5 } catch (error) { 6 if (isRetryableError(error)) { 7 // Pause partition and retry after delay 8 consumer.pause([{ topic, partitions: [partition] }]); 9 10 setTimeout(() => { 11 consumer.resume([{ topic, partitions: [partition] }]); 12 }, 5000); 13 14 throw error; // Don't commit offset 15 } 16 17 // Non-retryable error - send to DLQ 18 await sendToDLQ(topic, message, error); 19 } 20 }, 21}); 22 23function isRetryableError(error: Error): boolean { 24 return ( 25 error.message.includes('timeout') || 26 error.message.includes('connection') || 27 error.message.includes('temporarily unavailable') 28 ); 29}

Monitoring#

Consumer Lag Monitoring#

1const admin = kafka.admin(); 2 3async function getConsumerLag(groupId: string) { 4 const groupDescription = await admin.describeGroups([groupId]); 5 const topics = await admin.fetchTopicOffsets(groupId); 6 7 const lag: Record<string, number> = {}; 8 9 for (const [topic, partitions] of Object.entries(topics)) { 10 const topicOffsets = await admin.fetchTopicOffsets(topic); 11 12 let topicLag = 0; 13 for (const partition of partitions) { 14 const latestOffset = topicOffsets.find( 15 (o) => o.partition === partition.partition 16 ); 17 if (latestOffset) { 18 topicLag += parseInt(latestOffset.offset) - parseInt(partition.offset); 19 } 20 } 21 lag[topic] = topicLag; 22 } 23 24 return lag; 25} 26 27// Expose as Prometheus metrics 28import { Gauge } from 'prom-client'; 29 30const consumerLag = new Gauge({ 31 name: 'kafka_consumer_lag', 32 help: 'Consumer group lag', 33 labelNames: ['topic', 'group'], 34}); 35 36setInterval(async () => { 37 const lag = await getConsumerLag('order-processor'); 38 for (const [topic, value] of Object.entries(lag)) { 39 consumerLag.set({ topic, group: 'order-processor' }, value); 40 } 41}, 10000);

Best Practices#

  1. Use meaningful keys: For proper partitioning and ordering
  2. Set appropriate retention: Balance storage with replay needs
  3. Monitor consumer lag: Alert on growing lag
  4. Implement idempotency: Handle duplicate messages
  5. Use schemas: Avro or Protobuf for type safety
  6. Test failure scenarios: Network issues, broker failures

Conclusion#

Kafka enables scalable, fault-tolerant data streaming. Start with simple producers and consumers, then add stream processing as complexity grows. Focus on proper error handling and monitoring for production reliability.

Share this article

Help spread the word about Bootspring