Back to Blog
Node.jsStreamsPerformanceBackend

Node.js Streams: A Complete Guide

Master Node.js streams for efficient data processing. From readable and writable streams to transforms and backpressure handling.

B
Bootspring Team
Engineering
March 8, 2022
7 min read

Streams process data piece by piece, enabling efficient memory usage for large files and real-time data. Here's how to use them effectively.

Stream Types#

1import { Readable, Writable, Transform, Duplex } from 'stream'; 2import { pipeline } from 'stream/promises'; 3import fs from 'fs'; 4 5// Four stream types: 6// 1. Readable - source of data 7// 2. Writable - destination for data 8// 3. Duplex - both readable and writable 9// 4. Transform - modify data as it passes through 10 11// Reading a file as stream 12const readStream = fs.createReadStream('large-file.txt', { 13 encoding: 'utf8', 14 highWaterMark: 64 * 1024, // 64KB chunks 15}); 16 17readStream.on('data', (chunk) => { 18 console.log(`Received ${chunk.length} bytes`); 19}); 20 21readStream.on('end', () => { 22 console.log('Finished reading'); 23}); 24 25readStream.on('error', (err) => { 26 console.error('Error:', err); 27});

Creating Custom Streams#

1import { Readable, Writable, Transform } from 'stream'; 2 3// Custom Readable stream 4class CounterStream extends Readable { 5 private current = 0; 6 private max: number; 7 8 constructor(max: number) { 9 super({ objectMode: true }); 10 this.max = max; 11 } 12 13 _read() { 14 if (this.current < this.max) { 15 this.push({ count: this.current++ }); 16 } else { 17 this.push(null); // Signal end of stream 18 } 19 } 20} 21 22// Usage 23const counter = new CounterStream(10); 24counter.on('data', (data) => console.log(data)); 25 26// Custom Writable stream 27class LoggerStream extends Writable { 28 _write( 29 chunk: Buffer, 30 encoding: string, 31 callback: (error?: Error | null) => void 32 ) { 33 console.log(`[LOG] ${chunk.toString()}`); 34 callback(); // Signal completion 35 } 36 37 _writev( 38 chunks: Array<{ chunk: Buffer; encoding: string }>, 39 callback: (error?: Error | null) => void 40 ) { 41 // Handle multiple chunks at once (optional optimization) 42 chunks.forEach(({ chunk }) => { 43 console.log(`[LOG] ${chunk.toString()}`); 44 }); 45 callback(); 46 } 47} 48 49// Custom Transform stream 50class UppercaseTransform extends Transform { 51 _transform( 52 chunk: Buffer, 53 encoding: string, 54 callback: (error?: Error | null, data?: any) => void 55 ) { 56 const upperCased = chunk.toString().toUpperCase(); 57 callback(null, upperCased); 58 } 59} 60 61// Chain them together 62const readable = fs.createReadStream('input.txt'); 63const transform = new UppercaseTransform(); 64const writable = fs.createWriteStream('output.txt'); 65 66readable.pipe(transform).pipe(writable);

Pipeline and Error Handling#

1import { pipeline } from 'stream/promises'; 2import { createReadStream, createWriteStream } from 'fs'; 3import { createGzip, createGunzip } from 'zlib'; 4 5// Using pipeline (recommended) 6async function compressFile(source: string, destination: string) { 7 await pipeline( 8 createReadStream(source), 9 createGzip(), 10 createWriteStream(destination) 11 ); 12 console.log('Compression complete'); 13} 14 15// Pipeline handles errors and cleanup automatically 16async function processFile() { 17 try { 18 await pipeline( 19 createReadStream('input.txt'), 20 new UppercaseTransform(), 21 createWriteStream('output.txt') 22 ); 23 } catch (error) { 24 console.error('Pipeline failed:', error); 25 } 26} 27 28// Multiple transforms 29async function processWithMultipleTransforms() { 30 await pipeline( 31 createReadStream('data.json'), 32 new ParseJSONTransform(), 33 new FilterTransform((item) => item.active), 34 new MapTransform((item) => ({ ...item, processed: true })), 35 new StringifyTransform(), 36 createWriteStream('processed.json') 37 ); 38}

Object Mode Streams#

1import { Transform, Readable } from 'stream'; 2 3// Object mode for non-buffer data 4class JSONParseTransform extends Transform { 5 constructor() { 6 super({ 7 objectMode: true, 8 readableObjectMode: true, 9 writableObjectMode: false, 10 }); 11 } 12 13 _transform(chunk: Buffer, encoding: string, callback: Function) { 14 try { 15 const lines = chunk.toString().split('\n').filter(Boolean); 16 for (const line of lines) { 17 this.push(JSON.parse(line)); 18 } 19 callback(); 20 } catch (error) { 21 callback(error); 22 } 23 } 24} 25 26// Database cursor as stream 27class DatabaseStream extends Readable { 28 private cursor: any; 29 private reading = false; 30 31 constructor(cursor: any) { 32 super({ objectMode: true }); 33 this.cursor = cursor; 34 } 35 36 async _read() { 37 if (this.reading) return; 38 this.reading = true; 39 40 try { 41 const doc = await this.cursor.next(); 42 if (doc) { 43 this.push(doc); 44 } else { 45 this.push(null); // End of stream 46 } 47 } catch (error) { 48 this.destroy(error as Error); 49 } finally { 50 this.reading = false; 51 } 52 } 53} 54 55// Usage with MongoDB 56async function streamFromDatabase() { 57 const cursor = db.collection('users').find({}); 58 const stream = new DatabaseStream(cursor); 59 60 for await (const user of stream) { 61 console.log(user.name); 62 } 63}

Backpressure Handling#

1import { Writable, Readable } from 'stream'; 2 3// Manual backpressure handling 4function copyWithBackpressure(source: Readable, destination: Writable) { 5 source.on('data', (chunk) => { 6 // write() returns false if internal buffer is full 7 const canContinue = destination.write(chunk); 8 9 if (!canContinue) { 10 // Pause reading until drain 11 source.pause(); 12 } 13 }); 14 15 destination.on('drain', () => { 16 // Resume reading when buffer is drained 17 source.resume(); 18 }); 19 20 source.on('end', () => { 21 destination.end(); 22 }); 23} 24 25// Writable with backpressure 26class SlowWriter extends Writable { 27 constructor() { 28 super({ highWaterMark: 1024 }); // Small buffer 29 } 30 31 _write(chunk: Buffer, encoding: string, callback: Function) { 32 // Simulate slow write 33 setTimeout(() => { 34 console.log(`Wrote ${chunk.length} bytes`); 35 callback(); 36 }, 100); 37 } 38} 39 40// pipeline handles backpressure automatically 41await pipeline( 42 createReadStream('large-file.txt'), 43 new SlowWriter() 44);

Async Iterators with Streams#

1import { Readable } from 'stream'; 2 3// Streams are async iterable 4async function processStream(stream: Readable) { 5 for await (const chunk of stream) { 6 console.log(chunk.toString()); 7 } 8} 9 10// Create readable from async generator 11async function* generateData() { 12 for (let i = 0; i < 10; i++) { 13 await new Promise((resolve) => setTimeout(resolve, 100)); 14 yield `Line ${i}\n`; 15 } 16} 17 18const stream = Readable.from(generateData()); 19 20// Create readable from array 21const arrayStream = Readable.from(['a', 'b', 'c']); 22 23// Process line by line 24import readline from 'readline'; 25 26async function processLines(filePath: string) { 27 const fileStream = createReadStream(filePath); 28 29 const rl = readline.createInterface({ 30 input: fileStream, 31 crlfDelay: Infinity, 32 }); 33 34 for await (const line of rl) { 35 console.log(`Line: ${line}`); 36 } 37}

Practical Examples#

1// CSV Parser Transform 2class CSVParser extends Transform { 3 private headers: string[] | null = null; 4 private buffer = ''; 5 6 constructor() { 7 super({ objectMode: true }); 8 } 9 10 _transform(chunk: Buffer, encoding: string, callback: Function) { 11 this.buffer += chunk.toString(); 12 const lines = this.buffer.split('\n'); 13 this.buffer = lines.pop() || ''; 14 15 for (const line of lines) { 16 if (!line.trim()) continue; 17 18 const values = line.split(',').map((v) => v.trim()); 19 20 if (!this.headers) { 21 this.headers = values; 22 } else { 23 const obj: Record<string, string> = {}; 24 this.headers.forEach((header, i) => { 25 obj[header] = values[i]; 26 }); 27 this.push(obj); 28 } 29 } 30 31 callback(); 32 } 33 34 _flush(callback: Function) { 35 if (this.buffer.trim()) { 36 const values = this.buffer.split(',').map((v) => v.trim()); 37 if (this.headers) { 38 const obj: Record<string, string> = {}; 39 this.headers.forEach((header, i) => { 40 obj[header] = values[i]; 41 }); 42 this.push(obj); 43 } 44 } 45 callback(); 46 } 47} 48 49// HTTP streaming response 50import { createServer } from 'http'; 51 52const server = createServer(async (req, res) => { 53 if (req.url === '/download') { 54 res.setHeader('Content-Type', 'application/octet-stream'); 55 res.setHeader('Content-Disposition', 'attachment; filename="large.zip"'); 56 57 await pipeline( 58 createReadStream('large.zip'), 59 res 60 ); 61 } 62}); 63 64// Upload with streams 65import { IncomingMessage } from 'http'; 66 67async function handleUpload(req: IncomingMessage) { 68 const writeStream = createWriteStream('uploaded-file.bin'); 69 70 await pipeline(req, writeStream); 71 72 console.log('Upload complete'); 73}

Memory-Efficient Processing#

1// Process large JSON file 2import { parser } from 'stream-json'; 3import { streamArray } from 'stream-json/streamers/StreamArray'; 4 5async function processLargeJSON(filePath: string) { 6 const pipeline = createReadStream(filePath) 7 .pipe(parser()) 8 .pipe(streamArray()); 9 10 for await (const { value } of pipeline) { 11 // Process each item 12 await processItem(value); 13 } 14} 15 16// Batch processing 17class BatchTransform extends Transform { 18 private batch: any[] = []; 19 private batchSize: number; 20 21 constructor(batchSize: number) { 22 super({ objectMode: true }); 23 this.batchSize = batchSize; 24 } 25 26 _transform(item: any, encoding: string, callback: Function) { 27 this.batch.push(item); 28 29 if (this.batch.length >= this.batchSize) { 30 this.push(this.batch); 31 this.batch = []; 32 } 33 34 callback(); 35 } 36 37 _flush(callback: Function) { 38 if (this.batch.length > 0) { 39 this.push(this.batch); 40 } 41 callback(); 42 } 43} 44 45// Usage 46await pipeline( 47 createReadStream('data.ndjson'), 48 new JSONParseTransform(), 49 new BatchTransform(100), 50 new Writable({ 51 objectMode: true, 52 write(batch, encoding, callback) { 53 // Insert batch to database 54 db.insertMany(batch).then(() => callback()); 55 }, 56 }) 57);

Best Practices#

Design: ✓ Use pipeline() for chaining ✓ Handle errors in all streams ✓ Set appropriate highWaterMark ✓ Use objectMode for non-binary data Performance: ✓ Process in chunks, not all at once ✓ Respect backpressure ✓ Use streams for large files ✓ Avoid loading entire files into memory Error Handling: ✓ Always handle 'error' event ✓ Use pipeline for automatic cleanup ✓ Implement _destroy() for cleanup ✓ Propagate errors correctly

Conclusion#

Node.js streams enable efficient processing of large data sets with constant memory usage. Use Readable for data sources, Writable for destinations, and Transform for processing. Always use pipeline() for proper error handling and backpressure management. Streams are essential for scalable Node.js applications.

Share this article

Help spread the word about Bootspring