Back to Blog
Node.jsStreamsPerformanceBackend

Node.js Streams: A Comprehensive Guide

Process data efficiently with streams. From readable to writable to transform streams and backpressure.

B
Bootspring Team
Engineering
December 28, 2022
6 min read

Streams process data piece by piece, enabling efficient handling of large files and real-time data without loading everything into memory.

Stream Types#

Readable: Source of data (fs.createReadStream, http request) Writable: Destination for data (fs.createWriteStream, http response) Duplex: Both readable and writable (TCP socket) Transform: Modify data as it passes through (compression, encryption)

Reading Streams#

1import { createReadStream } from 'fs'; 2import { pipeline } from 'stream/promises'; 3 4// Basic readable stream 5const readable = createReadStream('large-file.txt', { 6 encoding: 'utf8', 7 highWaterMark: 64 * 1024, // 64KB chunks 8}); 9 10// Event-based reading 11readable.on('data', (chunk) => { 12 console.log(`Received ${chunk.length} bytes`); 13}); 14 15readable.on('end', () => { 16 console.log('Finished reading'); 17}); 18 19readable.on('error', (err) => { 20 console.error('Error:', err); 21}); 22 23// Async iteration (preferred) 24async function processFile(path: string): Promise<void> { 25 const stream = createReadStream(path, { encoding: 'utf8' }); 26 27 for await (const chunk of stream) { 28 console.log(`Processing ${chunk.length} characters`); 29 } 30} 31 32// Pausing and resuming 33readable.pause(); 34// Do something 35readable.resume();

Writing Streams#

1import { createWriteStream } from 'fs'; 2 3const writable = createWriteStream('output.txt'); 4 5// Write data 6writable.write('Hello, '); 7writable.write('World!\n'); 8 9// End stream (signals no more data) 10writable.end('Final line'); 11 12// Handle events 13writable.on('finish', () => { 14 console.log('All data written'); 15}); 16 17writable.on('error', (err) => { 18 console.error('Write error:', err); 19}); 20 21// Handle backpressure 22function writeData(stream: Writable, data: string[]): Promise<void> { 23 return new Promise((resolve, reject) => { 24 let i = 0; 25 26 function write() { 27 let ok = true; 28 29 while (i < data.length && ok) { 30 const chunk = data[i]; 31 i++; 32 33 if (i === data.length) { 34 // Last chunk 35 stream.write(chunk, (err) => { 36 if (err) reject(err); 37 else resolve(); 38 }); 39 } else { 40 // Check if we can continue 41 ok = stream.write(chunk); 42 } 43 } 44 45 if (i < data.length) { 46 // Wait for drain event 47 stream.once('drain', write); 48 } 49 } 50 51 stream.on('error', reject); 52 write(); 53 }); 54}

Transform Streams#

1import { Transform, TransformCallback } from 'stream'; 2 3// Custom transform stream 4class UppercaseTransform extends Transform { 5 _transform( 6 chunk: Buffer, 7 encoding: BufferEncoding, 8 callback: TransformCallback 9 ): void { 10 const upper = chunk.toString().toUpperCase(); 11 this.push(upper); 12 callback(); 13 } 14} 15 16// Usage 17const uppercase = new UppercaseTransform(); 18process.stdin.pipe(uppercase).pipe(process.stdout); 19 20// Line-by-line transform 21class LineSplitter extends Transform { 22 private buffer = ''; 23 24 _transform(chunk: Buffer, encoding: string, callback: TransformCallback): void { 25 this.buffer += chunk.toString(); 26 const lines = this.buffer.split('\n'); 27 this.buffer = lines.pop() || ''; 28 29 for (const line of lines) { 30 this.push(line + '\n'); 31 } 32 33 callback(); 34 } 35 36 _flush(callback: TransformCallback): void { 37 if (this.buffer) { 38 this.push(this.buffer); 39 } 40 callback(); 41 } 42} 43 44// JSON transform 45class JSONParser extends Transform { 46 constructor() { 47 super({ objectMode: true }); // Enable object mode 48 } 49 50 _transform(chunk: Buffer, encoding: string, callback: TransformCallback): void { 51 try { 52 const obj = JSON.parse(chunk.toString()); 53 this.push(obj); 54 callback(); 55 } catch (err) { 56 callback(err as Error); 57 } 58 } 59}

Pipeline#

1import { pipeline } from 'stream/promises'; 2import { createReadStream, createWriteStream } from 'fs'; 3import { createGzip, createGunzip } from 'zlib'; 4 5// Compress file 6async function compressFile(input: string, output: string): Promise<void> { 7 await pipeline( 8 createReadStream(input), 9 createGzip(), 10 createWriteStream(output) 11 ); 12 console.log('Compression complete'); 13} 14 15// Decompress file 16async function decompressFile(input: string, output: string): Promise<void> { 17 await pipeline( 18 createReadStream(input), 19 createGunzip(), 20 createWriteStream(output) 21 ); 22 console.log('Decompression complete'); 23} 24 25// Multiple transforms 26await pipeline( 27 createReadStream('input.txt'), 28 new LineSplitter(), 29 new UppercaseTransform(), 30 createWriteStream('output.txt') 31); 32 33// With error handling 34try { 35 await pipeline(source, transform, destination); 36} catch (err) { 37 console.error('Pipeline failed:', err); 38}

HTTP Streaming#

1import http from 'http'; 2import { createReadStream } from 'fs'; 3import { pipeline } from 'stream/promises'; 4 5// Stream file response 6const server = http.createServer(async (req, res) => { 7 if (req.url === '/video') { 8 const stat = await fs.promises.stat('video.mp4'); 9 10 res.writeHead(200, { 11 'Content-Type': 'video/mp4', 12 'Content-Length': stat.size, 13 }); 14 15 await pipeline(createReadStream('video.mp4'), res); 16 } 17}); 18 19// Stream with range support (video seeking) 20const server = http.createServer(async (req, res) => { 21 const stat = await fs.promises.stat('video.mp4'); 22 const fileSize = stat.size; 23 const range = req.headers.range; 24 25 if (range) { 26 const parts = range.replace(/bytes=/, '').split('-'); 27 const start = parseInt(parts[0], 10); 28 const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1; 29 30 res.writeHead(206, { 31 'Content-Range': `bytes ${start}-${end}/${fileSize}`, 32 'Accept-Ranges': 'bytes', 33 'Content-Length': end - start + 1, 34 'Content-Type': 'video/mp4', 35 }); 36 37 createReadStream('video.mp4', { start, end }).pipe(res); 38 } else { 39 res.writeHead(200, { 40 'Content-Length': fileSize, 41 'Content-Type': 'video/mp4', 42 }); 43 44 createReadStream('video.mp4').pipe(res); 45 } 46}); 47 48// Stream request body 49app.post('/upload', async (req, res) => { 50 const writeStream = createWriteStream('upload.bin'); 51 52 await pipeline(req, writeStream); 53 54 res.json({ message: 'Upload complete' }); 55});

Async Generators#

1import { Readable } from 'stream'; 2 3// Create readable from async generator 4async function* generateData() { 5 for (let i = 0; i < 100; i++) { 6 yield `Line ${i}\n`; 7 await sleep(10); 8 } 9} 10 11const readable = Readable.from(generateData()); 12 13// Database cursor as stream 14async function* fetchUsers(batchSize = 100) { 15 let cursor: string | undefined; 16 17 while (true) { 18 const users = await db.user.findMany({ 19 take: batchSize, 20 cursor: cursor ? { id: cursor } : undefined, 21 skip: cursor ? 1 : 0, 22 }); 23 24 if (users.length === 0) break; 25 26 for (const user of users) { 27 yield user; 28 } 29 30 cursor = users[users.length - 1].id; 31 } 32} 33 34// Stream to response 35app.get('/users/export', async (req, res) => { 36 res.setHeader('Content-Type', 'application/json'); 37 res.write('['); 38 39 let first = true; 40 for await (const user of fetchUsers()) { 41 if (!first) res.write(','); 42 res.write(JSON.stringify(user)); 43 first = false; 44 } 45 46 res.write(']'); 47 res.end(); 48});

Backpressure#

1// Handling backpressure properly 2function copyFile(source: string, dest: string): Promise<void> { 3 return new Promise((resolve, reject) => { 4 const readable = createReadStream(source); 5 const writable = createWriteStream(dest); 6 7 readable.on('data', (chunk) => { 8 const canContinue = writable.write(chunk); 9 10 if (!canContinue) { 11 // Destination is full, pause reading 12 readable.pause(); 13 14 writable.once('drain', () => { 15 // Destination ready, resume reading 16 readable.resume(); 17 }); 18 } 19 }); 20 21 readable.on('end', () => { 22 writable.end(); 23 }); 24 25 writable.on('finish', resolve); 26 writable.on('error', reject); 27 readable.on('error', reject); 28 }); 29} 30 31// Or use pipeline (handles backpressure automatically) 32await pipeline( 33 createReadStream(source), 34 createWriteStream(dest) 35);

Object Mode Streams#

1// Streams that handle objects instead of buffers 2class ObjectTransform extends Transform { 3 constructor() { 4 super({ objectMode: true }); 5 } 6 7 _transform(obj: any, encoding: string, callback: TransformCallback): void { 8 // Transform object 9 const transformed = { 10 ...obj, 11 processed: true, 12 timestamp: new Date(), 13 }; 14 15 this.push(transformed); 16 callback(); 17 } 18} 19 20// Usage 21const transform = new ObjectTransform(); 22 23transform.write({ id: 1, name: 'Test' }); 24transform.on('data', (obj) => { 25 console.log(obj); // { id: 1, name: 'Test', processed: true, timestamp: ... } 26});

Best Practices#

Memory: ✓ Use streams for large files ✓ Set appropriate highWaterMark ✓ Handle backpressure ✓ Clean up on error Error Handling: ✓ Always handle 'error' events ✓ Use pipeline for cleanup ✓ Destroy streams on error ✓ Check stream state Performance: ✓ Tune chunk size for workload ✓ Use object mode for objects ✓ Avoid unnecessary transforms ✓ Profile memory usage

Conclusion#

Streams enable efficient data processing in Node.js. Use pipeline for composing streams safely, handle backpressure to prevent memory issues, and leverage async iteration for cleaner code. Streams are essential for processing large files and real-time data.

Share this article

Help spread the word about Bootspring