Back to Blog
Node.jsStreamsI/OBackend

Node.js Streams Basics Guide

Learn the fundamentals of Node.js streams for efficient data processing.

B
Bootspring Team
Engineering
September 1, 2018
7 min read

Streams process data piece by piece without loading everything into memory. Here's how to use them effectively.

Stream Types#

1import { Readable, Writable, Duplex, Transform } from 'stream'; 2 3// Readable - source of data 4// Examples: fs.createReadStream, http request, process.stdin 5 6// Writable - destination for data 7// Examples: fs.createWriteStream, http response, process.stdout 8 9// Duplex - both readable and writable 10// Examples: TCP socket, zlib streams 11 12// Transform - modify data as it passes through 13// Examples: zlib.createGzip, crypto streams

Reading Streams#

1import { createReadStream } from 'fs'; 2 3const readable = createReadStream('large-file.txt'); 4 5// Event-based reading 6readable.on('data', (chunk) => { 7 console.log(`Received ${chunk.length} bytes`); 8}); 9 10readable.on('end', () => { 11 console.log('Finished reading'); 12}); 13 14readable.on('error', (err) => { 15 console.error('Error:', err); 16}); 17 18// Paused mode (manual reading) 19readable.on('readable', () => { 20 let chunk; 21 while ((chunk = readable.read()) !== null) { 22 console.log(`Read ${chunk.length} bytes`); 23 } 24}); 25 26// Pause and resume 27readable.on('data', (chunk) => { 28 readable.pause(); 29 processChunk(chunk).then(() => { 30 readable.resume(); 31 }); 32});

Writing Streams#

1import { createWriteStream } from 'fs'; 2 3const writable = createWriteStream('output.txt'); 4 5// Write data 6writable.write('Hello, '); 7writable.write('World!'); 8writable.end('\n'); // Signal end 9 10// Handle backpressure 11function writeData(stream, data) { 12 const canContinue = stream.write(data); 13 14 if (!canContinue) { 15 // Buffer full, wait for drain 16 stream.once('drain', () => { 17 console.log('Drained, can write more'); 18 }); 19 } 20} 21 22// Events 23writable.on('finish', () => { 24 console.log('All data written'); 25}); 26 27writable.on('error', (err) => { 28 console.error('Write error:', err); 29}); 30 31// Close vs finish 32writable.on('close', () => { 33 // Underlying resource closed (after finish) 34});

Piping Streams#

1import { createReadStream, createWriteStream } from 'fs'; 2import { createGzip, createGunzip } from 'zlib'; 3 4// Basic pipe 5const readable = createReadStream('input.txt'); 6const writable = createWriteStream('output.txt'); 7 8readable.pipe(writable); 9 10// Chain transforms 11createReadStream('file.txt') 12 .pipe(createGzip()) 13 .pipe(createWriteStream('file.txt.gz')); 14 15// Decompress 16createReadStream('file.txt.gz') 17 .pipe(createGunzip()) 18 .pipe(createWriteStream('file.txt')); 19 20// Error handling with pipe 21readable 22 .on('error', handleError) 23 .pipe(transform) 24 .on('error', handleError) 25 .pipe(writable) 26 .on('error', handleError); 27 28// Using pipeline (recommended) 29import { pipeline } from 'stream/promises'; 30 31await pipeline( 32 createReadStream('input.txt'), 33 createGzip(), 34 createWriteStream('output.txt.gz') 35); 36// Automatically handles errors and cleanup

Creating Readable Streams#

1import { Readable } from 'stream'; 2 3// From array 4const readable = Readable.from(['Hello', ' ', 'World']); 5 6// Custom readable 7class CounterStream extends Readable { 8 constructor(max) { 9 super(); 10 this.max = max; 11 this.current = 0; 12 } 13 14 _read() { 15 if (this.current <= this.max) { 16 this.push(String(this.current++)); 17 } else { 18 this.push(null); // Signal end 19 } 20 } 21} 22 23const counter = new CounterStream(5); 24counter.on('data', (num) => console.log(num)); 25// 0, 1, 2, 3, 4, 5 26 27// Async generator 28async function* asyncGenerator() { 29 for (let i = 0; i < 5; i++) { 30 await new Promise(r => setTimeout(r, 100)); 31 yield `Item ${i}\n`; 32 } 33} 34 35const asyncReadable = Readable.from(asyncGenerator()); 36 37// Object mode 38const objectStream = new Readable({ 39 objectMode: true, 40 read() { 41 this.push({ id: 1, name: 'John' }); 42 this.push({ id: 2, name: 'Jane' }); 43 this.push(null); 44 } 45});

Creating Writable Streams#

1import { Writable } from 'stream'; 2 3// Custom writable 4class LogStream extends Writable { 5 _write(chunk, encoding, callback) { 6 console.log(`[LOG] ${chunk.toString()}`); 7 callback(); // Signal completion 8 } 9} 10 11const logger = new LogStream(); 12logger.write('Hello'); 13logger.write('World'); 14logger.end(); 15 16// With async operation 17class DatabaseWriter extends Writable { 18 constructor(db) { 19 super({ objectMode: true }); 20 this.db = db; 21 } 22 23 async _write(record, encoding, callback) { 24 try { 25 await this.db.insert(record); 26 callback(); 27 } catch (err) { 28 callback(err); 29 } 30 } 31 32 // Batch writes 33 async _writev(chunks, callback) { 34 const records = chunks.map(({ chunk }) => chunk); 35 try { 36 await this.db.insertMany(records); 37 callback(); 38 } catch (err) { 39 callback(err); 40 } 41 } 42} 43 44// Using constructor options 45const myWritable = new Writable({ 46 write(chunk, encoding, callback) { 47 console.log(chunk.toString()); 48 callback(); 49 } 50});

Creating Transform Streams#

1import { Transform } from 'stream'; 2 3// Uppercase transform 4class UppercaseTransform extends Transform { 5 _transform(chunk, encoding, callback) { 6 this.push(chunk.toString().toUpperCase()); 7 callback(); 8 } 9} 10 11// Usage 12createReadStream('input.txt') 13 .pipe(new UppercaseTransform()) 14 .pipe(createWriteStream('output.txt')); 15 16// JSON line parser 17class JSONLineParser extends Transform { 18 constructor() { 19 super({ objectMode: true }); 20 this.buffer = ''; 21 } 22 23 _transform(chunk, encoding, callback) { 24 this.buffer += chunk.toString(); 25 const lines = this.buffer.split('\n'); 26 this.buffer = lines.pop(); // Keep incomplete line 27 28 for (const line of lines) { 29 if (line.trim()) { 30 try { 31 this.push(JSON.parse(line)); 32 } catch (err) { 33 return callback(err); 34 } 35 } 36 } 37 callback(); 38 } 39 40 _flush(callback) { 41 if (this.buffer.trim()) { 42 try { 43 this.push(JSON.parse(this.buffer)); 44 } catch (err) { 45 return callback(err); 46 } 47 } 48 callback(); 49 } 50} 51 52// Using constructor options 53const transform = new Transform({ 54 transform(chunk, encoding, callback) { 55 callback(null, chunk.toString().toUpperCase()); 56 } 57});

Duplex Streams#

1import { Duplex } from 'stream'; 2 3// Simple duplex 4class EchoStream extends Duplex { 5 constructor() { 6 super(); 7 this.data = []; 8 } 9 10 _write(chunk, encoding, callback) { 11 this.data.push(chunk); 12 callback(); 13 } 14 15 _read() { 16 if (this.data.length) { 17 this.push(this.data.shift()); 18 } else { 19 this.push(null); 20 } 21 } 22} 23 24// TCP socket is a duplex stream 25import { createServer } from 'net'; 26 27const server = createServer((socket) => { 28 // socket is duplex - read from client, write to client 29 socket.on('data', (data) => { 30 socket.write(`Echo: ${data}`); 31 }); 32});

Async Iteration#

1import { createReadStream } from 'fs'; 2 3// Streams are async iterable 4async function processFile(filename) { 5 const stream = createReadStream(filename); 6 7 for await (const chunk of stream) { 8 console.log(`Chunk: ${chunk.length} bytes`); 9 } 10} 11 12// Line by line with readline 13import { createInterface } from 'readline'; 14 15async function processLines(filename) { 16 const rl = createInterface({ 17 input: createReadStream(filename), 18 crlfDelay: Infinity 19 }); 20 21 for await (const line of rl) { 22 console.log(line); 23 } 24} 25 26// Process large JSON lines file 27async function processJSONLines(filename) { 28 const rl = createInterface({ 29 input: createReadStream(filename) 30 }); 31 32 for await (const line of rl) { 33 const record = JSON.parse(line); 34 await processRecord(record); 35 } 36}

Backpressure Handling#

1import { Readable, Writable } from 'stream'; 2 3// Slow consumer causes backpressure 4const fast = new Readable({ 5 read() { 6 for (let i = 0; i < 1000; i++) { 7 const canContinue = this.push(`Data ${i}\n`); 8 if (!canContinue) { 9 // Internal buffer full, stop pushing 10 break; 11 } 12 } 13 this.push(null); 14 } 15}); 16 17const slow = new Writable({ 18 highWaterMark: 16, // Small buffer 19 write(chunk, encoding, callback) { 20 // Simulate slow write 21 setTimeout(callback, 100); 22 } 23}); 24 25// pipe() handles backpressure automatically 26fast.pipe(slow); 27 28// Manual backpressure handling 29function copyWithBackpressure(source, dest) { 30 source.on('data', (chunk) => { 31 const canContinue = dest.write(chunk); 32 33 if (!canContinue) { 34 source.pause(); 35 dest.once('drain', () => { 36 source.resume(); 37 }); 38 } 39 }); 40 41 source.on('end', () => { 42 dest.end(); 43 }); 44}

HTTP with Streams#

1import { createServer } from 'http'; 2import { createReadStream } from 'fs'; 3import { pipeline } from 'stream/promises'; 4import { createGzip } from 'zlib'; 5 6const server = createServer(async (req, res) => { 7 // Stream file response 8 if (req.url === '/file') { 9 res.setHeader('Content-Type', 'text/plain'); 10 createReadStream('large-file.txt').pipe(res); 11 return; 12 } 13 14 // Compressed response 15 if (req.url === '/compressed') { 16 res.setHeader('Content-Encoding', 'gzip'); 17 await pipeline( 18 createReadStream('large-file.txt'), 19 createGzip(), 20 res 21 ); 22 return; 23 } 24 25 // Stream request body 26 if (req.method === 'POST') { 27 const chunks = []; 28 for await (const chunk of req) { 29 chunks.push(chunk); 30 } 31 const body = Buffer.concat(chunks).toString(); 32 res.end(`Received: ${body.length} bytes`); 33 } 34});

Error Handling#

1import { pipeline } from 'stream/promises'; 2import { createReadStream, createWriteStream } from 'fs'; 3 4// With pipeline (recommended) 5try { 6 await pipeline( 7 createReadStream('input.txt'), 8 transformStream, 9 createWriteStream('output.txt') 10 ); 11 console.log('Pipeline succeeded'); 12} catch (err) { 13 console.error('Pipeline failed:', err); 14} 15 16// Manual error handling 17const readable = createReadStream('input.txt'); 18const writable = createWriteStream('output.txt'); 19 20readable.on('error', (err) => { 21 console.error('Read error:', err); 22 writable.destroy(); 23}); 24 25writable.on('error', (err) => { 26 console.error('Write error:', err); 27 readable.destroy(); 28}); 29 30readable.pipe(writable); 31 32// Cleanup on error 33import { finished } from 'stream/promises'; 34 35const stream = createReadStream('file.txt'); 36try { 37 await finished(stream); 38} catch (err) { 39 // Stream ended with error 40} finally { 41 stream.destroy(); 42}

Best Practices#

Stream Usage: ✓ Use pipeline() for piping ✓ Handle all error events ✓ Use objectMode for objects ✓ Implement _flush for transforms Performance: ✓ Use streams for large data ✓ Set appropriate highWaterMark ✓ Handle backpressure ✓ Avoid unnecessary buffering Memory: ✓ Don't collect all data in array ✓ Process chunks immediately ✓ Use async iteration ✓ Destroy unused streams Avoid: ✗ Ignoring backpressure ✗ Missing error handlers ✗ Calling push() after null ✗ Large highWaterMark values

Conclusion#

Streams enable efficient data processing by handling data in chunks rather than loading everything into memory. Use Readable for sources, Writable for destinations, Transform for processing, and Duplex for bidirectional communication. Always use pipeline() for piping multiple streams together, handle errors properly, and respect backpressure signals for robust stream processing.

Share this article

Help spread the word about Bootspring