Streams process data piece by piece, enabling efficient memory usage for large files and real-time data. Here's how to use them effectively.
Stream Types#
1import { Readable, Writable, Transform, Duplex } from 'stream';
2import { pipeline } from 'stream/promises';
3import fs from 'fs';
4
5// Four stream types:
6// 1. Readable - source of data
7// 2. Writable - destination for data
8// 3. Duplex - both readable and writable
9// 4. Transform - modify data as it passes through
10
11// Reading a file as stream
12const readStream = fs.createReadStream('large-file.txt', {
13 encoding: 'utf8',
14 highWaterMark: 64 * 1024, // 64KB chunks
15});
16
17readStream.on('data', (chunk) => {
18 console.log(`Received ${chunk.length} bytes`);
19});
20
21readStream.on('end', () => {
22 console.log('Finished reading');
23});
24
25readStream.on('error', (err) => {
26 console.error('Error:', err);
27});Creating Custom Streams#
1import { Readable, Writable, Transform } from 'stream';
2
3// Custom Readable stream
4class CounterStream extends Readable {
5 private current = 0;
6 private max: number;
7
8 constructor(max: number) {
9 super({ objectMode: true });
10 this.max = max;
11 }
12
13 _read() {
14 if (this.current < this.max) {
15 this.push({ count: this.current++ });
16 } else {
17 this.push(null); // Signal end of stream
18 }
19 }
20}
21
22// Usage
23const counter = new CounterStream(10);
24counter.on('data', (data) => console.log(data));
25
26// Custom Writable stream
27class LoggerStream extends Writable {
28 _write(
29 chunk: Buffer,
30 encoding: string,
31 callback: (error?: Error | null) => void
32 ) {
33 console.log(`[LOG] ${chunk.toString()}`);
34 callback(); // Signal completion
35 }
36
37 _writev(
38 chunks: Array<{ chunk: Buffer; encoding: string }>,
39 callback: (error?: Error | null) => void
40 ) {
41 // Handle multiple chunks at once (optional optimization)
42 chunks.forEach(({ chunk }) => {
43 console.log(`[LOG] ${chunk.toString()}`);
44 });
45 callback();
46 }
47}
48
49// Custom Transform stream
50class UppercaseTransform extends Transform {
51 _transform(
52 chunk: Buffer,
53 encoding: string,
54 callback: (error?: Error | null, data?: any) => void
55 ) {
56 const upperCased = chunk.toString().toUpperCase();
57 callback(null, upperCased);
58 }
59}
60
61// Chain them together
62const readable = fs.createReadStream('input.txt');
63const transform = new UppercaseTransform();
64const writable = fs.createWriteStream('output.txt');
65
66readable.pipe(transform).pipe(writable);Pipeline and Error Handling#
1import { pipeline } from 'stream/promises';
2import { createReadStream, createWriteStream } from 'fs';
3import { createGzip, createGunzip } from 'zlib';
4
5// Using pipeline (recommended)
6async function compressFile(source: string, destination: string) {
7 await pipeline(
8 createReadStream(source),
9 createGzip(),
10 createWriteStream(destination)
11 );
12 console.log('Compression complete');
13}
14
15// Pipeline handles errors and cleanup automatically
16async function processFile() {
17 try {
18 await pipeline(
19 createReadStream('input.txt'),
20 new UppercaseTransform(),
21 createWriteStream('output.txt')
22 );
23 } catch (error) {
24 console.error('Pipeline failed:', error);
25 }
26}
27
28// Multiple transforms
29async function processWithMultipleTransforms() {
30 await pipeline(
31 createReadStream('data.json'),
32 new ParseJSONTransform(),
33 new FilterTransform((item) => item.active),
34 new MapTransform((item) => ({ ...item, processed: true })),
35 new StringifyTransform(),
36 createWriteStream('processed.json')
37 );
38}Object Mode Streams#
1import { Transform, Readable } from 'stream';
2
3// Object mode for non-buffer data
4class JSONParseTransform extends Transform {
5 constructor() {
6 super({
7 objectMode: true,
8 readableObjectMode: true,
9 writableObjectMode: false,
10 });
11 }
12
13 _transform(chunk: Buffer, encoding: string, callback: Function) {
14 try {
15 const lines = chunk.toString().split('\n').filter(Boolean);
16 for (const line of lines) {
17 this.push(JSON.parse(line));
18 }
19 callback();
20 } catch (error) {
21 callback(error);
22 }
23 }
24}
25
26// Database cursor as stream
27class DatabaseStream extends Readable {
28 private cursor: any;
29 private reading = false;
30
31 constructor(cursor: any) {
32 super({ objectMode: true });
33 this.cursor = cursor;
34 }
35
36 async _read() {
37 if (this.reading) return;
38 this.reading = true;
39
40 try {
41 const doc = await this.cursor.next();
42 if (doc) {
43 this.push(doc);
44 } else {
45 this.push(null); // End of stream
46 }
47 } catch (error) {
48 this.destroy(error as Error);
49 } finally {
50 this.reading = false;
51 }
52 }
53}
54
55// Usage with MongoDB
56async function streamFromDatabase() {
57 const cursor = db.collection('users').find({});
58 const stream = new DatabaseStream(cursor);
59
60 for await (const user of stream) {
61 console.log(user.name);
62 }
63}Backpressure Handling#
1import { Writable, Readable } from 'stream';
2
3// Manual backpressure handling
4function copyWithBackpressure(source: Readable, destination: Writable) {
5 source.on('data', (chunk) => {
6 // write() returns false if internal buffer is full
7 const canContinue = destination.write(chunk);
8
9 if (!canContinue) {
10 // Pause reading until drain
11 source.pause();
12 }
13 });
14
15 destination.on('drain', () => {
16 // Resume reading when buffer is drained
17 source.resume();
18 });
19
20 source.on('end', () => {
21 destination.end();
22 });
23}
24
25// Writable with backpressure
26class SlowWriter extends Writable {
27 constructor() {
28 super({ highWaterMark: 1024 }); // Small buffer
29 }
30
31 _write(chunk: Buffer, encoding: string, callback: Function) {
32 // Simulate slow write
33 setTimeout(() => {
34 console.log(`Wrote ${chunk.length} bytes`);
35 callback();
36 }, 100);
37 }
38}
39
40// pipeline handles backpressure automatically
41await pipeline(
42 createReadStream('large-file.txt'),
43 new SlowWriter()
44);Async Iterators with Streams#
1import { Readable } from 'stream';
2
3// Streams are async iterable
4async function processStream(stream: Readable) {
5 for await (const chunk of stream) {
6 console.log(chunk.toString());
7 }
8}
9
10// Create readable from async generator
11async function* generateData() {
12 for (let i = 0; i < 10; i++) {
13 await new Promise((resolve) => setTimeout(resolve, 100));
14 yield `Line ${i}\n`;
15 }
16}
17
18const stream = Readable.from(generateData());
19
20// Create readable from array
21const arrayStream = Readable.from(['a', 'b', 'c']);
22
23// Process line by line
24import readline from 'readline';
25
26async function processLines(filePath: string) {
27 const fileStream = createReadStream(filePath);
28
29 const rl = readline.createInterface({
30 input: fileStream,
31 crlfDelay: Infinity,
32 });
33
34 for await (const line of rl) {
35 console.log(`Line: ${line}`);
36 }
37}Practical Examples#
1// CSV Parser Transform
2class CSVParser extends Transform {
3 private headers: string[] | null = null;
4 private buffer = '';
5
6 constructor() {
7 super({ objectMode: true });
8 }
9
10 _transform(chunk: Buffer, encoding: string, callback: Function) {
11 this.buffer += chunk.toString();
12 const lines = this.buffer.split('\n');
13 this.buffer = lines.pop() || '';
14
15 for (const line of lines) {
16 if (!line.trim()) continue;
17
18 const values = line.split(',').map((v) => v.trim());
19
20 if (!this.headers) {
21 this.headers = values;
22 } else {
23 const obj: Record<string, string> = {};
24 this.headers.forEach((header, i) => {
25 obj[header] = values[i];
26 });
27 this.push(obj);
28 }
29 }
30
31 callback();
32 }
33
34 _flush(callback: Function) {
35 if (this.buffer.trim()) {
36 const values = this.buffer.split(',').map((v) => v.trim());
37 if (this.headers) {
38 const obj: Record<string, string> = {};
39 this.headers.forEach((header, i) => {
40 obj[header] = values[i];
41 });
42 this.push(obj);
43 }
44 }
45 callback();
46 }
47}
48
49// HTTP streaming response
50import { createServer } from 'http';
51
52const server = createServer(async (req, res) => {
53 if (req.url === '/download') {
54 res.setHeader('Content-Type', 'application/octet-stream');
55 res.setHeader('Content-Disposition', 'attachment; filename="large.zip"');
56
57 await pipeline(
58 createReadStream('large.zip'),
59 res
60 );
61 }
62});
63
64// Upload with streams
65import { IncomingMessage } from 'http';
66
67async function handleUpload(req: IncomingMessage) {
68 const writeStream = createWriteStream('uploaded-file.bin');
69
70 await pipeline(req, writeStream);
71
72 console.log('Upload complete');
73}Memory-Efficient Processing#
1// Process large JSON file
2import { parser } from 'stream-json';
3import { streamArray } from 'stream-json/streamers/StreamArray';
4
5async function processLargeJSON(filePath: string) {
6 const pipeline = createReadStream(filePath)
7 .pipe(parser())
8 .pipe(streamArray());
9
10 for await (const { value } of pipeline) {
11 // Process each item
12 await processItem(value);
13 }
14}
15
16// Batch processing
17class BatchTransform extends Transform {
18 private batch: any[] = [];
19 private batchSize: number;
20
21 constructor(batchSize: number) {
22 super({ objectMode: true });
23 this.batchSize = batchSize;
24 }
25
26 _transform(item: any, encoding: string, callback: Function) {
27 this.batch.push(item);
28
29 if (this.batch.length >= this.batchSize) {
30 this.push(this.batch);
31 this.batch = [];
32 }
33
34 callback();
35 }
36
37 _flush(callback: Function) {
38 if (this.batch.length > 0) {
39 this.push(this.batch);
40 }
41 callback();
42 }
43}
44
45// Usage
46await pipeline(
47 createReadStream('data.ndjson'),
48 new JSONParseTransform(),
49 new BatchTransform(100),
50 new Writable({
51 objectMode: true,
52 write(batch, encoding, callback) {
53 // Insert batch to database
54 db.insertMany(batch).then(() => callback());
55 },
56 })
57);Best Practices#
Design:
✓ Use pipeline() for chaining
✓ Handle errors in all streams
✓ Set appropriate highWaterMark
✓ Use objectMode for non-binary data
Performance:
✓ Process in chunks, not all at once
✓ Respect backpressure
✓ Use streams for large files
✓ Avoid loading entire files into memory
Error Handling:
✓ Always handle 'error' event
✓ Use pipeline for automatic cleanup
✓ Implement _destroy() for cleanup
✓ Propagate errors correctly
Conclusion#
Node.js streams enable efficient processing of large data sets with constant memory usage. Use Readable for data sources, Writable for destinations, and Transform for processing. Always use pipeline() for proper error handling and backpressure management. Streams are essential for scalable Node.js applications.