Back to Blog
Node.jsStreamsPerformanceData Processing

Node.js Stream Processing

Master Node.js streams. From readable and writable to transform streams and backpressure handling.

B
Bootspring Team
Engineering
November 1, 2020
7 min read

Streams enable efficient data processing for large datasets. Here's how to use them.

Stream Types#

1const { Readable, Writable, Transform, Duplex } = require('stream'); 2const fs = require('fs'); 3 4// Four types of streams: 5// 1. Readable - source of data 6// 2. Writable - destination for data 7// 3. Duplex - both readable and writable 8// 4. Transform - modifies data as it passes through 9 10// Reading a file as stream 11const readStream = fs.createReadStream('large-file.txt'); 12readStream.on('data', (chunk) => { 13 console.log(`Received ${chunk.length} bytes`); 14}); 15 16// Writing to a stream 17const writeStream = fs.createWriteStream('output.txt'); 18writeStream.write('Hello, '); 19writeStream.write('World!'); 20writeStream.end();

Readable Streams#

1const { Readable } = require('stream'); 2 3// Creating a readable stream 4class CounterStream extends Readable { 5 constructor(max) { 6 super(); 7 this.max = max; 8 this.current = 0; 9 } 10 11 _read() { 12 if (this.current <= this.max) { 13 this.push(String(this.current++)); 14 } else { 15 this.push(null); // Signal end of stream 16 } 17 } 18} 19 20const counter = new CounterStream(10); 21counter.on('data', (chunk) => console.log(chunk.toString())); 22 23// Using Readable.from() for iterables 24const numbers = Readable.from([1, 2, 3, 4, 5]); 25 26// Async generator as source 27async function* generateData() { 28 for (let i = 0; i < 5; i++) { 29 yield `Item ${i}\n`; 30 await new Promise(resolve => setTimeout(resolve, 100)); 31 } 32} 33 34const asyncStream = Readable.from(generateData()); 35 36// Object mode for non-string data 37const objectStream = new Readable({ 38 objectMode: true, 39 read() { 40 this.push({ id: 1, name: 'Item' }); 41 this.push(null); 42 }, 43});

Writable Streams#

1const { Writable } = require('stream'); 2 3// Creating a writable stream 4class LogStream extends Writable { 5 _write(chunk, encoding, callback) { 6 console.log(`LOG: ${chunk.toString()}`); 7 callback(); // Signal completion 8 } 9 10 _writev(chunks, callback) { 11 // Handle multiple chunks at once 12 chunks.forEach(({ chunk }) => { 13 console.log(`LOG: ${chunk.toString()}`); 14 }); 15 callback(); 16 } 17} 18 19const logger = new LogStream(); 20logger.write('Hello'); 21logger.write('World'); 22logger.end(); 23 24// Object mode writable 25class DatabaseWriter extends Writable { 26 constructor() { 27 super({ objectMode: true }); 28 this.records = []; 29 } 30 31 _write(record, encoding, callback) { 32 this.records.push(record); 33 console.log(`Saved record: ${record.id}`); 34 callback(); 35 } 36 37 _final(callback) { 38 console.log(`Total records: ${this.records.length}`); 39 callback(); 40 } 41} 42 43// Handle errors 44const writer = new DatabaseWriter(); 45writer.on('error', (err) => console.error('Write error:', err)); 46writer.on('finish', () => console.log('All writes complete'));

Transform Streams#

1const { Transform } = require('stream'); 2 3// Basic transform 4class UppercaseTransform extends Transform { 5 _transform(chunk, encoding, callback) { 6 this.push(chunk.toString().toUpperCase()); 7 callback(); 8 } 9} 10 11// JSON parsing transform 12class JSONParser extends Transform { 13 constructor() { 14 super({ objectMode: true }); 15 this.buffer = ''; 16 } 17 18 _transform(chunk, encoding, callback) { 19 this.buffer += chunk.toString(); 20 const lines = this.buffer.split('\n'); 21 this.buffer = lines.pop(); // Keep incomplete line 22 23 lines.forEach(line => { 24 if (line.trim()) { 25 try { 26 this.push(JSON.parse(line)); 27 } catch (err) { 28 this.emit('error', err); 29 } 30 } 31 }); 32 callback(); 33 } 34 35 _flush(callback) { 36 if (this.buffer.trim()) { 37 try { 38 this.push(JSON.parse(this.buffer)); 39 } catch (err) { 40 this.emit('error', err); 41 } 42 } 43 callback(); 44 } 45} 46 47// Chunking transform 48class ChunkSplitter extends Transform { 49 constructor(chunkSize) { 50 super(); 51 this.chunkSize = chunkSize; 52 this.buffer = Buffer.alloc(0); 53 } 54 55 _transform(chunk, encoding, callback) { 56 this.buffer = Buffer.concat([this.buffer, chunk]); 57 58 while (this.buffer.length >= this.chunkSize) { 59 this.push(this.buffer.slice(0, this.chunkSize)); 60 this.buffer = this.buffer.slice(this.chunkSize); 61 } 62 callback(); 63 } 64 65 _flush(callback) { 66 if (this.buffer.length > 0) { 67 this.push(this.buffer); 68 } 69 callback(); 70 } 71}

Piping Streams#

1const fs = require('fs'); 2const zlib = require('zlib'); 3 4// Basic piping 5const source = fs.createReadStream('input.txt'); 6const dest = fs.createWriteStream('output.txt'); 7source.pipe(dest); 8 9// Chain multiple transforms 10fs.createReadStream('file.txt') 11 .pipe(zlib.createGzip()) 12 .pipe(fs.createWriteStream('file.txt.gz')); 13 14// Decompress 15fs.createReadStream('file.txt.gz') 16 .pipe(zlib.createGunzip()) 17 .pipe(fs.createWriteStream('file-restored.txt')); 18 19// Pipeline with error handling (recommended) 20const { pipeline } = require('stream'); 21 22pipeline( 23 fs.createReadStream('input.txt'), 24 new UppercaseTransform(), 25 zlib.createGzip(), 26 fs.createWriteStream('output.txt.gz'), 27 (err) => { 28 if (err) { 29 console.error('Pipeline failed:', err); 30 } else { 31 console.log('Pipeline succeeded'); 32 } 33 } 34); 35 36// Promise-based pipeline 37const { pipeline: pipelinePromise } = require('stream/promises'); 38 39async function processFile() { 40 await pipelinePromise( 41 fs.createReadStream('input.txt'), 42 new UppercaseTransform(), 43 fs.createWriteStream('output.txt') 44 ); 45 console.log('Done'); 46}

Backpressure Handling#

1const { Writable, Readable } = require('stream'); 2 3// Manual backpressure handling 4const readable = fs.createReadStream('large-file.txt'); 5const writable = fs.createWriteStream('output.txt'); 6 7readable.on('data', (chunk) => { 8 const canContinue = writable.write(chunk); 9 10 if (!canContinue) { 11 // Pause reading until writer drains 12 readable.pause(); 13 writable.once('drain', () => { 14 readable.resume(); 15 }); 16 } 17}); 18 19readable.on('end', () => { 20 writable.end(); 21}); 22 23// Automatic with pipe (handles backpressure) 24readable.pipe(writable); 25 26// Slow consumer simulation 27class SlowWriter extends Writable { 28 constructor() { 29 super({ highWaterMark: 1024 }); // Buffer size 30 } 31 32 _write(chunk, encoding, callback) { 33 // Simulate slow processing 34 setTimeout(() => { 35 console.log(`Processed ${chunk.length} bytes`); 36 callback(); 37 }, 100); 38 } 39} 40 41// Fast producer 42class FastReader extends Readable { 43 constructor() { 44 super({ highWaterMark: 1024 }); 45 this.counter = 0; 46 } 47 48 _read() { 49 if (this.counter < 100) { 50 const data = `Data chunk ${this.counter++}\n`; 51 const canPush = this.push(data); 52 if (!canPush) { 53 console.log('Backpressure: reader paused'); 54 } 55 } else { 56 this.push(null); 57 } 58 } 59}

Async Iteration#

1const fs = require('fs'); 2const readline = require('readline'); 3 4// Async iteration over stream 5async function processLines(filename) { 6 const stream = fs.createReadStream(filename); 7 8 for await (const chunk of stream) { 9 console.log(`Chunk: ${chunk.length} bytes`); 10 } 11} 12 13// Line-by-line processing 14async function readLines(filename) { 15 const rl = readline.createInterface({ 16 input: fs.createReadStream(filename), 17 crlfDelay: Infinity, 18 }); 19 20 for await (const line of rl) { 21 console.log(`Line: ${line}`); 22 } 23} 24 25// Custom async iterable stream 26async function* csvParser(stream) { 27 let buffer = ''; 28 29 for await (const chunk of stream) { 30 buffer += chunk.toString(); 31 const lines = buffer.split('\n'); 32 buffer = lines.pop(); 33 34 for (const line of lines) { 35 yield line.split(','); 36 } 37 } 38 39 if (buffer.trim()) { 40 yield buffer.split(','); 41 } 42} 43 44// Usage 45async function parseCSV(filename) { 46 const stream = fs.createReadStream(filename); 47 48 for await (const row of csvParser(stream)) { 49 console.log(row); 50 } 51}

HTTP Streaming#

1const http = require('http'); 2const fs = require('fs'); 3 4// Stream file response 5const server = http.createServer((req, res) => { 6 const stream = fs.createReadStream('large-file.txt'); 7 res.writeHead(200, { 'Content-Type': 'text/plain' }); 8 stream.pipe(res); 9}); 10 11// Stream with compression 12const zlib = require('zlib'); 13 14const compressedServer = http.createServer((req, res) => { 15 const acceptEncoding = req.headers['accept-encoding'] || ''; 16 17 if (acceptEncoding.includes('gzip')) { 18 res.writeHead(200, { 19 'Content-Type': 'text/plain', 20 'Content-Encoding': 'gzip', 21 }); 22 fs.createReadStream('file.txt') 23 .pipe(zlib.createGzip()) 24 .pipe(res); 25 } else { 26 res.writeHead(200, { 'Content-Type': 'text/plain' }); 27 fs.createReadStream('file.txt').pipe(res); 28 } 29}); 30 31// Stream request body 32const uploadServer = http.createServer((req, res) => { 33 if (req.method === 'POST') { 34 const writeStream = fs.createWriteStream('upload.txt'); 35 req.pipe(writeStream); 36 37 writeStream.on('finish', () => { 38 res.writeHead(200); 39 res.end('Upload complete'); 40 }); 41 } 42});

Error Handling#

1const { pipeline } = require('stream/promises'); 2 3// Proper error handling with pipeline 4async function safeProcess() { 5 try { 6 await pipeline( 7 fs.createReadStream('input.txt'), 8 new Transform({ 9 transform(chunk, encoding, callback) { 10 try { 11 // Process chunk 12 callback(null, chunk); 13 } catch (err) { 14 callback(err); 15 } 16 }, 17 }), 18 fs.createWriteStream('output.txt') 19 ); 20 } catch (err) { 21 console.error('Stream processing failed:', err); 22 // Cleanup handled automatically by pipeline 23 } 24} 25 26// AbortController for cancellation 27const { AbortController } = require('abort-controller'); 28 29async function cancellableProcess(signal) { 30 try { 31 await pipeline( 32 fs.createReadStream('input.txt'), 33 fs.createWriteStream('output.txt'), 34 { signal } 35 ); 36 } catch (err) { 37 if (err.name === 'AbortError') { 38 console.log('Processing cancelled'); 39 } else { 40 throw err; 41 } 42 } 43} 44 45const controller = new AbortController(); 46setTimeout(() => controller.abort(), 5000); 47cancellableProcess(controller.signal);

Best Practices#

Memory Management: ✓ Use streams for large data ✓ Set appropriate highWaterMark ✓ Handle backpressure properly ✓ Clean up resources on error Error Handling: ✓ Use pipeline() over pipe() ✓ Handle errors on all streams ✓ Implement _destroy() for cleanup ✓ Use AbortController for cancellation Performance: ✓ Use object mode sparingly ✓ Batch small writes ✓ Consider worker threads for CPU tasks ✓ Profile memory usage Patterns: ✓ Prefer transform streams ✓ Use async iteration when possible ✓ Chain streams with pipeline ✓ Implement proper backpressure

Conclusion#

Node.js streams enable efficient processing of large data sets. Use readable streams for data sources, writable for destinations, and transform for modifications. Handle backpressure properly, use pipeline for error handling, and leverage async iteration for cleaner code.

Share this article

Help spread the word about Bootspring