Streams enable efficient data processing for large datasets. Here's how to use them.
Stream Types#
1const { Readable, Writable, Transform, Duplex } = require('stream');
2const fs = require('fs');
3
4// Four types of streams:
5// 1. Readable - source of data
6// 2. Writable - destination for data
7// 3. Duplex - both readable and writable
8// 4. Transform - modifies data as it passes through
9
10// Reading a file as stream
11const readStream = fs.createReadStream('large-file.txt');
12readStream.on('data', (chunk) => {
13 console.log(`Received ${chunk.length} bytes`);
14});
15
16// Writing to a stream
17const writeStream = fs.createWriteStream('output.txt');
18writeStream.write('Hello, ');
19writeStream.write('World!');
20writeStream.end();Readable Streams#
1const { Readable } = require('stream');
2
3// Creating a readable stream
4class CounterStream extends Readable {
5 constructor(max) {
6 super();
7 this.max = max;
8 this.current = 0;
9 }
10
11 _read() {
12 if (this.current <= this.max) {
13 this.push(String(this.current++));
14 } else {
15 this.push(null); // Signal end of stream
16 }
17 }
18}
19
20const counter = new CounterStream(10);
21counter.on('data', (chunk) => console.log(chunk.toString()));
22
23// Using Readable.from() for iterables
24const numbers = Readable.from([1, 2, 3, 4, 5]);
25
26// Async generator as source
27async function* generateData() {
28 for (let i = 0; i < 5; i++) {
29 yield `Item ${i}\n`;
30 await new Promise(resolve => setTimeout(resolve, 100));
31 }
32}
33
34const asyncStream = Readable.from(generateData());
35
36// Object mode for non-string data
37const objectStream = new Readable({
38 objectMode: true,
39 read() {
40 this.push({ id: 1, name: 'Item' });
41 this.push(null);
42 },
43});Writable Streams#
1const { Writable } = require('stream');
2
3// Creating a writable stream
4class LogStream extends Writable {
5 _write(chunk, encoding, callback) {
6 console.log(`LOG: ${chunk.toString()}`);
7 callback(); // Signal completion
8 }
9
10 _writev(chunks, callback) {
11 // Handle multiple chunks at once
12 chunks.forEach(({ chunk }) => {
13 console.log(`LOG: ${chunk.toString()}`);
14 });
15 callback();
16 }
17}
18
19const logger = new LogStream();
20logger.write('Hello');
21logger.write('World');
22logger.end();
23
24// Object mode writable
25class DatabaseWriter extends Writable {
26 constructor() {
27 super({ objectMode: true });
28 this.records = [];
29 }
30
31 _write(record, encoding, callback) {
32 this.records.push(record);
33 console.log(`Saved record: ${record.id}`);
34 callback();
35 }
36
37 _final(callback) {
38 console.log(`Total records: ${this.records.length}`);
39 callback();
40 }
41}
42
43// Handle errors
44const writer = new DatabaseWriter();
45writer.on('error', (err) => console.error('Write error:', err));
46writer.on('finish', () => console.log('All writes complete'));Transform Streams#
1const { Transform } = require('stream');
2
3// Basic transform
4class UppercaseTransform extends Transform {
5 _transform(chunk, encoding, callback) {
6 this.push(chunk.toString().toUpperCase());
7 callback();
8 }
9}
10
11// JSON parsing transform
12class JSONParser extends Transform {
13 constructor() {
14 super({ objectMode: true });
15 this.buffer = '';
16 }
17
18 _transform(chunk, encoding, callback) {
19 this.buffer += chunk.toString();
20 const lines = this.buffer.split('\n');
21 this.buffer = lines.pop(); // Keep incomplete line
22
23 lines.forEach(line => {
24 if (line.trim()) {
25 try {
26 this.push(JSON.parse(line));
27 } catch (err) {
28 this.emit('error', err);
29 }
30 }
31 });
32 callback();
33 }
34
35 _flush(callback) {
36 if (this.buffer.trim()) {
37 try {
38 this.push(JSON.parse(this.buffer));
39 } catch (err) {
40 this.emit('error', err);
41 }
42 }
43 callback();
44 }
45}
46
47// Chunking transform
48class ChunkSplitter extends Transform {
49 constructor(chunkSize) {
50 super();
51 this.chunkSize = chunkSize;
52 this.buffer = Buffer.alloc(0);
53 }
54
55 _transform(chunk, encoding, callback) {
56 this.buffer = Buffer.concat([this.buffer, chunk]);
57
58 while (this.buffer.length >= this.chunkSize) {
59 this.push(this.buffer.slice(0, this.chunkSize));
60 this.buffer = this.buffer.slice(this.chunkSize);
61 }
62 callback();
63 }
64
65 _flush(callback) {
66 if (this.buffer.length > 0) {
67 this.push(this.buffer);
68 }
69 callback();
70 }
71}Piping Streams#
1const fs = require('fs');
2const zlib = require('zlib');
3
4// Basic piping
5const source = fs.createReadStream('input.txt');
6const dest = fs.createWriteStream('output.txt');
7source.pipe(dest);
8
9// Chain multiple transforms
10fs.createReadStream('file.txt')
11 .pipe(zlib.createGzip())
12 .pipe(fs.createWriteStream('file.txt.gz'));
13
14// Decompress
15fs.createReadStream('file.txt.gz')
16 .pipe(zlib.createGunzip())
17 .pipe(fs.createWriteStream('file-restored.txt'));
18
19// Pipeline with error handling (recommended)
20const { pipeline } = require('stream');
21
22pipeline(
23 fs.createReadStream('input.txt'),
24 new UppercaseTransform(),
25 zlib.createGzip(),
26 fs.createWriteStream('output.txt.gz'),
27 (err) => {
28 if (err) {
29 console.error('Pipeline failed:', err);
30 } else {
31 console.log('Pipeline succeeded');
32 }
33 }
34);
35
36// Promise-based pipeline
37const { pipeline: pipelinePromise } = require('stream/promises');
38
39async function processFile() {
40 await pipelinePromise(
41 fs.createReadStream('input.txt'),
42 new UppercaseTransform(),
43 fs.createWriteStream('output.txt')
44 );
45 console.log('Done');
46}Backpressure Handling#
1const { Writable, Readable } = require('stream');
2
3// Manual backpressure handling
4const readable = fs.createReadStream('large-file.txt');
5const writable = fs.createWriteStream('output.txt');
6
7readable.on('data', (chunk) => {
8 const canContinue = writable.write(chunk);
9
10 if (!canContinue) {
11 // Pause reading until writer drains
12 readable.pause();
13 writable.once('drain', () => {
14 readable.resume();
15 });
16 }
17});
18
19readable.on('end', () => {
20 writable.end();
21});
22
23// Automatic with pipe (handles backpressure)
24readable.pipe(writable);
25
26// Slow consumer simulation
27class SlowWriter extends Writable {
28 constructor() {
29 super({ highWaterMark: 1024 }); // Buffer size
30 }
31
32 _write(chunk, encoding, callback) {
33 // Simulate slow processing
34 setTimeout(() => {
35 console.log(`Processed ${chunk.length} bytes`);
36 callback();
37 }, 100);
38 }
39}
40
41// Fast producer
42class FastReader extends Readable {
43 constructor() {
44 super({ highWaterMark: 1024 });
45 this.counter = 0;
46 }
47
48 _read() {
49 if (this.counter < 100) {
50 const data = `Data chunk ${this.counter++}\n`;
51 const canPush = this.push(data);
52 if (!canPush) {
53 console.log('Backpressure: reader paused');
54 }
55 } else {
56 this.push(null);
57 }
58 }
59}Async Iteration#
1const fs = require('fs');
2const readline = require('readline');
3
4// Async iteration over stream
5async function processLines(filename) {
6 const stream = fs.createReadStream(filename);
7
8 for await (const chunk of stream) {
9 console.log(`Chunk: ${chunk.length} bytes`);
10 }
11}
12
13// Line-by-line processing
14async function readLines(filename) {
15 const rl = readline.createInterface({
16 input: fs.createReadStream(filename),
17 crlfDelay: Infinity,
18 });
19
20 for await (const line of rl) {
21 console.log(`Line: ${line}`);
22 }
23}
24
25// Custom async iterable stream
26async function* csvParser(stream) {
27 let buffer = '';
28
29 for await (const chunk of stream) {
30 buffer += chunk.toString();
31 const lines = buffer.split('\n');
32 buffer = lines.pop();
33
34 for (const line of lines) {
35 yield line.split(',');
36 }
37 }
38
39 if (buffer.trim()) {
40 yield buffer.split(',');
41 }
42}
43
44// Usage
45async function parseCSV(filename) {
46 const stream = fs.createReadStream(filename);
47
48 for await (const row of csvParser(stream)) {
49 console.log(row);
50 }
51}HTTP Streaming#
1const http = require('http');
2const fs = require('fs');
3
4// Stream file response
5const server = http.createServer((req, res) => {
6 const stream = fs.createReadStream('large-file.txt');
7 res.writeHead(200, { 'Content-Type': 'text/plain' });
8 stream.pipe(res);
9});
10
11// Stream with compression
12const zlib = require('zlib');
13
14const compressedServer = http.createServer((req, res) => {
15 const acceptEncoding = req.headers['accept-encoding'] || '';
16
17 if (acceptEncoding.includes('gzip')) {
18 res.writeHead(200, {
19 'Content-Type': 'text/plain',
20 'Content-Encoding': 'gzip',
21 });
22 fs.createReadStream('file.txt')
23 .pipe(zlib.createGzip())
24 .pipe(res);
25 } else {
26 res.writeHead(200, { 'Content-Type': 'text/plain' });
27 fs.createReadStream('file.txt').pipe(res);
28 }
29});
30
31// Stream request body
32const uploadServer = http.createServer((req, res) => {
33 if (req.method === 'POST') {
34 const writeStream = fs.createWriteStream('upload.txt');
35 req.pipe(writeStream);
36
37 writeStream.on('finish', () => {
38 res.writeHead(200);
39 res.end('Upload complete');
40 });
41 }
42});Error Handling#
1const { pipeline } = require('stream/promises');
2
3// Proper error handling with pipeline
4async function safeProcess() {
5 try {
6 await pipeline(
7 fs.createReadStream('input.txt'),
8 new Transform({
9 transform(chunk, encoding, callback) {
10 try {
11 // Process chunk
12 callback(null, chunk);
13 } catch (err) {
14 callback(err);
15 }
16 },
17 }),
18 fs.createWriteStream('output.txt')
19 );
20 } catch (err) {
21 console.error('Stream processing failed:', err);
22 // Cleanup handled automatically by pipeline
23 }
24}
25
26// AbortController for cancellation
27const { AbortController } = require('abort-controller');
28
29async function cancellableProcess(signal) {
30 try {
31 await pipeline(
32 fs.createReadStream('input.txt'),
33 fs.createWriteStream('output.txt'),
34 { signal }
35 );
36 } catch (err) {
37 if (err.name === 'AbortError') {
38 console.log('Processing cancelled');
39 } else {
40 throw err;
41 }
42 }
43}
44
45const controller = new AbortController();
46setTimeout(() => controller.abort(), 5000);
47cancellableProcess(controller.signal);Best Practices#
Memory Management:
✓ Use streams for large data
✓ Set appropriate highWaterMark
✓ Handle backpressure properly
✓ Clean up resources on error
Error Handling:
✓ Use pipeline() over pipe()
✓ Handle errors on all streams
✓ Implement _destroy() for cleanup
✓ Use AbortController for cancellation
Performance:
✓ Use object mode sparingly
✓ Batch small writes
✓ Consider worker threads for CPU tasks
✓ Profile memory usage
Patterns:
✓ Prefer transform streams
✓ Use async iteration when possible
✓ Chain streams with pipeline
✓ Implement proper backpressure
Conclusion#
Node.js streams enable efficient processing of large data sets. Use readable streams for data sources, writable for destinations, and transform for modifications. Handle backpressure properly, use pipeline for error handling, and leverage async iteration for cleaner code.