Back to Blog
Node.jsStreamsPerformanceI/O

Node.js Stream API Guide

Master Node.js streams for efficient data processing with readable, writable, and transform streams.

B
Bootspring Team
Engineering
December 26, 2018
6 min read

Streams are Node.js's way of handling data piece by piece, enabling efficient processing of large files and real-time data. Here's how to use them.

Stream Types#

1import { Readable, Writable, Transform, Duplex } from 'node:stream'; 2 3// Readable - source of data 4// Writable - destination for data 5// Transform - modify data as it passes through 6// Duplex - both readable and writable

Reading Files with Streams#

1import { createReadStream } from 'node:fs'; 2 3// Basic file reading 4const readStream = createReadStream('large-file.txt', { 5 encoding: 'utf8', 6 highWaterMark: 64 * 1024, // 64KB chunks 7}); 8 9readStream.on('data', (chunk) => { 10 console.log(`Received ${chunk.length} bytes`); 11}); 12 13readStream.on('end', () => { 14 console.log('Finished reading'); 15}); 16 17readStream.on('error', (err) => { 18 console.error('Error:', err); 19});

Writing Files with Streams#

1import { createWriteStream } from 'node:fs'; 2 3const writeStream = createWriteStream('output.txt'); 4 5writeStream.write('Hello, '); 6writeStream.write('World!'); 7writeStream.end('\n'); 8 9writeStream.on('finish', () => { 10 console.log('Finished writing'); 11}); 12 13writeStream.on('error', (err) => { 14 console.error('Error:', err); 15});

Piping Streams#

1import { createReadStream, createWriteStream } from 'node:fs'; 2import { createGzip, createGunzip } from 'node:zlib'; 3 4// Copy file 5createReadStream('source.txt') 6 .pipe(createWriteStream('destination.txt')); 7 8// Compress file 9createReadStream('data.txt') 10 .pipe(createGzip()) 11 .pipe(createWriteStream('data.txt.gz')); 12 13// Decompress file 14createReadStream('data.txt.gz') 15 .pipe(createGunzip()) 16 .pipe(createWriteStream('data.txt'));

Pipeline with Error Handling#

1import { pipeline } from 'node:stream/promises'; 2import { createReadStream, createWriteStream } from 'node:fs'; 3import { createGzip } from 'node:zlib'; 4 5async function compress(input, output) { 6 await pipeline( 7 createReadStream(input), 8 createGzip(), 9 createWriteStream(output) 10 ); 11 console.log('Compression complete'); 12} 13 14compress('data.txt', 'data.txt.gz').catch(console.error);

Custom Readable Stream#

1import { Readable } from 'node:stream'; 2 3// Using class 4class CounterStream extends Readable { 5 constructor(max) { 6 super(); 7 this.max = max; 8 this.current = 0; 9 } 10 11 _read() { 12 if (this.current < this.max) { 13 this.push(String(this.current++)); 14 } else { 15 this.push(null); // Signal end 16 } 17 } 18} 19 20const counter = new CounterStream(5); 21counter.on('data', (num) => console.log(num)); 22// 0, 1, 2, 3, 4 23 24// Using Readable.from 25const arrayStream = Readable.from(['a', 'b', 'c']); 26 27// Async generator 28async function* generateData() { 29 for (let i = 0; i < 5; i++) { 30 yield `Item ${i}\n`; 31 await new Promise((r) => setTimeout(r, 100)); 32 } 33} 34 35const asyncStream = Readable.from(generateData());

Custom Writable Stream#

1import { Writable } from 'node:stream'; 2 3class LogStream extends Writable { 4 constructor(options) { 5 super(options); 6 this.logs = []; 7 } 8 9 _write(chunk, encoding, callback) { 10 const log = { 11 timestamp: new Date().toISOString(), 12 message: chunk.toString(), 13 }; 14 this.logs.push(log); 15 console.log(`[${log.timestamp}] ${log.message}`); 16 callback(); 17 } 18 19 _final(callback) { 20 console.log(`Total logs: ${this.logs.length}`); 21 callback(); 22 } 23} 24 25const logger = new LogStream(); 26logger.write('First message'); 27logger.write('Second message'); 28logger.end();

Transform Streams#

1import { Transform } from 'node:stream'; 2 3// Uppercase transform 4class UpperCaseTransform extends Transform { 5 _transform(chunk, encoding, callback) { 6 this.push(chunk.toString().toUpperCase()); 7 callback(); 8 } 9} 10 11// JSON parser transform 12class JSONParser extends Transform { 13 constructor() { 14 super({ objectMode: true }); 15 this.buffer = ''; 16 } 17 18 _transform(chunk, encoding, callback) { 19 this.buffer += chunk.toString(); 20 const lines = this.buffer.split('\n'); 21 this.buffer = lines.pop(); // Keep incomplete line 22 23 for (const line of lines) { 24 if (line.trim()) { 25 try { 26 this.push(JSON.parse(line)); 27 } catch (err) { 28 callback(err); 29 return; 30 } 31 } 32 } 33 callback(); 34 } 35 36 _flush(callback) { 37 if (this.buffer.trim()) { 38 try { 39 this.push(JSON.parse(this.buffer)); 40 } catch (err) { 41 callback(err); 42 return; 43 } 44 } 45 callback(); 46 } 47}

Object Mode Streams#

1import { Transform } from 'node:stream'; 2 3// Process objects instead of buffers 4const objectTransform = new Transform({ 5 objectMode: true, 6 transform(user, encoding, callback) { 7 this.push({ 8 ...user, 9 fullName: `${user.firstName} ${user.lastName}`, 10 createdAt: new Date(), 11 }); 12 callback(); 13 }, 14}); 15 16// Usage 17objectTransform.write({ firstName: 'John', lastName: 'Doe' }); 18objectTransform.on('data', (user) => console.log(user));

Duplex Streams#

1import { Duplex } from 'node:stream'; 2 3class EchoStream extends Duplex { 4 constructor() { 5 super(); 6 this.data = []; 7 } 8 9 _write(chunk, encoding, callback) { 10 this.data.push(chunk); 11 callback(); 12 } 13 14 _read() { 15 if (this.data.length) { 16 this.push(this.data.shift()); 17 } else { 18 this.push(null); 19 } 20 } 21}

HTTP Streaming#

1import http from 'node:http'; 2import { createReadStream } from 'node:fs'; 3 4const server = http.createServer((req, res) => { 5 if (req.url === '/video') { 6 res.setHeader('Content-Type', 'video/mp4'); 7 createReadStream('video.mp4').pipe(res); 8 } 9 10 if (req.url === '/large-file') { 11 res.setHeader('Content-Type', 'application/octet-stream'); 12 res.setHeader('Content-Disposition', 'attachment; filename="data.csv"'); 13 createReadStream('large-data.csv').pipe(res); 14 } 15});

Stream Composition#

1import { pipeline } from 'node:stream/promises'; 2import { createReadStream, createWriteStream } from 'node:fs'; 3import { Transform } from 'node:stream'; 4 5// Filter transform 6const filter = new Transform({ 7 objectMode: true, 8 transform(line, encoding, callback) { 9 if (line.includes('ERROR')) { 10 this.push(line); 11 } 12 callback(); 13 }, 14}); 15 16// Line splitter 17const lineSplitter = new Transform({ 18 transform(chunk, encoding, callback) { 19 const lines = chunk.toString().split('\n'); 20 lines.forEach((line) => { 21 if (line.trim()) this.push(line + '\n'); 22 }); 23 callback(); 24 }, 25}); 26 27// Process log file 28await pipeline( 29 createReadStream('app.log'), 30 lineSplitter, 31 filter, 32 createWriteStream('errors.log') 33);

Async Iteration#

1import { createReadStream } from 'node:fs'; 2import { createInterface } from 'node:readline'; 3 4// Read file line by line 5async function processLines(filename) { 6 const fileStream = createReadStream(filename); 7 const rl = createInterface({ 8 input: fileStream, 9 crlfDelay: Infinity, 10 }); 11 12 for await (const line of rl) { 13 console.log(`Line: ${line}`); 14 } 15} 16 17// Iterate over stream directly 18async function processStream(stream) { 19 for await (const chunk of stream) { 20 console.log(`Chunk: ${chunk.length} bytes`); 21 } 22}

Backpressure Handling#

1import { createReadStream, createWriteStream } from 'node:fs'; 2 3const readable = createReadStream('large-file.txt'); 4const writable = createWriteStream('output.txt'); 5 6readable.on('data', (chunk) => { 7 // Check if we can write more 8 const canContinue = writable.write(chunk); 9 10 if (!canContinue) { 11 // Pause until drained 12 readable.pause(); 13 writable.once('drain', () => { 14 readable.resume(); 15 }); 16 } 17}); 18 19readable.on('end', () => { 20 writable.end(); 21});

Memory-Efficient Processing#

1import { createReadStream } from 'node:fs'; 2import { createHash } from 'node:crypto'; 3 4// Hash large file without loading into memory 5async function hashFile(filename) { 6 return new Promise((resolve, reject) => { 7 const hash = createHash('sha256'); 8 const stream = createReadStream(filename); 9 10 stream.on('data', (chunk) => hash.update(chunk)); 11 stream.on('end', () => resolve(hash.digest('hex'))); 12 stream.on('error', reject); 13 }); 14} 15 16// Count lines in large file 17async function countLines(filename) { 18 let count = 0; 19 const stream = createReadStream(filename); 20 21 for await (const chunk of stream) { 22 for (const char of chunk) { 23 if (char === 10) count++; // newline 24 } 25 } 26 27 return count; 28}

Best Practices#

Performance: ✓ Use streams for large files ✓ Set appropriate highWaterMark ✓ Handle backpressure ✓ Use pipeline for error handling Error Handling: ✓ Always handle 'error' events ✓ Use pipeline/promises ✓ Clean up resources ✓ Handle stream destruction Patterns: ✓ Pipe for simple cases ✓ Pipeline for complex chains ✓ Object mode for structured data ✓ Async iteration for simplicity Avoid: ✗ Loading large files into memory ✗ Ignoring backpressure ✗ Missing error handlers ✗ Not ending streams properly

Conclusion#

Node.js streams enable efficient processing of large data sets by handling data in chunks. Use readable streams for data sources, writable streams for destinations, and transform streams for data manipulation. Always use pipeline for proper error handling and cleanup, and be mindful of backpressure when processing data faster than it can be written.

Share this article

Help spread the word about Bootspring