Streams are Node.js's way of handling data piece by piece, enabling efficient processing of large files and real-time data. Here's how to use them.
Stream Types#
1import { Readable, Writable, Transform, Duplex } from 'node:stream';
2
3// Readable - source of data
4// Writable - destination for data
5// Transform - modify data as it passes through
6// Duplex - both readable and writableReading Files with Streams#
1import { createReadStream } from 'node:fs';
2
3// Basic file reading
4const readStream = createReadStream('large-file.txt', {
5 encoding: 'utf8',
6 highWaterMark: 64 * 1024, // 64KB chunks
7});
8
9readStream.on('data', (chunk) => {
10 console.log(`Received ${chunk.length} bytes`);
11});
12
13readStream.on('end', () => {
14 console.log('Finished reading');
15});
16
17readStream.on('error', (err) => {
18 console.error('Error:', err);
19});Writing Files with Streams#
1import { createWriteStream } from 'node:fs';
2
3const writeStream = createWriteStream('output.txt');
4
5writeStream.write('Hello, ');
6writeStream.write('World!');
7writeStream.end('\n');
8
9writeStream.on('finish', () => {
10 console.log('Finished writing');
11});
12
13writeStream.on('error', (err) => {
14 console.error('Error:', err);
15});Piping Streams#
1import { createReadStream, createWriteStream } from 'node:fs';
2import { createGzip, createGunzip } from 'node:zlib';
3
4// Copy file
5createReadStream('source.txt')
6 .pipe(createWriteStream('destination.txt'));
7
8// Compress file
9createReadStream('data.txt')
10 .pipe(createGzip())
11 .pipe(createWriteStream('data.txt.gz'));
12
13// Decompress file
14createReadStream('data.txt.gz')
15 .pipe(createGunzip())
16 .pipe(createWriteStream('data.txt'));Pipeline with Error Handling#
1import { pipeline } from 'node:stream/promises';
2import { createReadStream, createWriteStream } from 'node:fs';
3import { createGzip } from 'node:zlib';
4
5async function compress(input, output) {
6 await pipeline(
7 createReadStream(input),
8 createGzip(),
9 createWriteStream(output)
10 );
11 console.log('Compression complete');
12}
13
14compress('data.txt', 'data.txt.gz').catch(console.error);Custom Readable Stream#
1import { Readable } from 'node:stream';
2
3// Using class
4class CounterStream extends Readable {
5 constructor(max) {
6 super();
7 this.max = max;
8 this.current = 0;
9 }
10
11 _read() {
12 if (this.current < this.max) {
13 this.push(String(this.current++));
14 } else {
15 this.push(null); // Signal end
16 }
17 }
18}
19
20const counter = new CounterStream(5);
21counter.on('data', (num) => console.log(num));
22// 0, 1, 2, 3, 4
23
24// Using Readable.from
25const arrayStream = Readable.from(['a', 'b', 'c']);
26
27// Async generator
28async function* generateData() {
29 for (let i = 0; i < 5; i++) {
30 yield `Item ${i}\n`;
31 await new Promise((r) => setTimeout(r, 100));
32 }
33}
34
35const asyncStream = Readable.from(generateData());Custom Writable Stream#
1import { Writable } from 'node:stream';
2
3class LogStream extends Writable {
4 constructor(options) {
5 super(options);
6 this.logs = [];
7 }
8
9 _write(chunk, encoding, callback) {
10 const log = {
11 timestamp: new Date().toISOString(),
12 message: chunk.toString(),
13 };
14 this.logs.push(log);
15 console.log(`[${log.timestamp}] ${log.message}`);
16 callback();
17 }
18
19 _final(callback) {
20 console.log(`Total logs: ${this.logs.length}`);
21 callback();
22 }
23}
24
25const logger = new LogStream();
26logger.write('First message');
27logger.write('Second message');
28logger.end();Transform Streams#
1import { Transform } from 'node:stream';
2
3// Uppercase transform
4class UpperCaseTransform extends Transform {
5 _transform(chunk, encoding, callback) {
6 this.push(chunk.toString().toUpperCase());
7 callback();
8 }
9}
10
11// JSON parser transform
12class JSONParser extends Transform {
13 constructor() {
14 super({ objectMode: true });
15 this.buffer = '';
16 }
17
18 _transform(chunk, encoding, callback) {
19 this.buffer += chunk.toString();
20 const lines = this.buffer.split('\n');
21 this.buffer = lines.pop(); // Keep incomplete line
22
23 for (const line of lines) {
24 if (line.trim()) {
25 try {
26 this.push(JSON.parse(line));
27 } catch (err) {
28 callback(err);
29 return;
30 }
31 }
32 }
33 callback();
34 }
35
36 _flush(callback) {
37 if (this.buffer.trim()) {
38 try {
39 this.push(JSON.parse(this.buffer));
40 } catch (err) {
41 callback(err);
42 return;
43 }
44 }
45 callback();
46 }
47}Object Mode Streams#
1import { Transform } from 'node:stream';
2
3// Process objects instead of buffers
4const objectTransform = new Transform({
5 objectMode: true,
6 transform(user, encoding, callback) {
7 this.push({
8 ...user,
9 fullName: `${user.firstName} ${user.lastName}`,
10 createdAt: new Date(),
11 });
12 callback();
13 },
14});
15
16// Usage
17objectTransform.write({ firstName: 'John', lastName: 'Doe' });
18objectTransform.on('data', (user) => console.log(user));Duplex Streams#
1import { Duplex } from 'node:stream';
2
3class EchoStream extends Duplex {
4 constructor() {
5 super();
6 this.data = [];
7 }
8
9 _write(chunk, encoding, callback) {
10 this.data.push(chunk);
11 callback();
12 }
13
14 _read() {
15 if (this.data.length) {
16 this.push(this.data.shift());
17 } else {
18 this.push(null);
19 }
20 }
21}HTTP Streaming#
1import http from 'node:http';
2import { createReadStream } from 'node:fs';
3
4const server = http.createServer((req, res) => {
5 if (req.url === '/video') {
6 res.setHeader('Content-Type', 'video/mp4');
7 createReadStream('video.mp4').pipe(res);
8 }
9
10 if (req.url === '/large-file') {
11 res.setHeader('Content-Type', 'application/octet-stream');
12 res.setHeader('Content-Disposition', 'attachment; filename="data.csv"');
13 createReadStream('large-data.csv').pipe(res);
14 }
15});Stream Composition#
1import { pipeline } from 'node:stream/promises';
2import { createReadStream, createWriteStream } from 'node:fs';
3import { Transform } from 'node:stream';
4
5// Filter transform
6const filter = new Transform({
7 objectMode: true,
8 transform(line, encoding, callback) {
9 if (line.includes('ERROR')) {
10 this.push(line);
11 }
12 callback();
13 },
14});
15
16// Line splitter
17const lineSplitter = new Transform({
18 transform(chunk, encoding, callback) {
19 const lines = chunk.toString().split('\n');
20 lines.forEach((line) => {
21 if (line.trim()) this.push(line + '\n');
22 });
23 callback();
24 },
25});
26
27// Process log file
28await pipeline(
29 createReadStream('app.log'),
30 lineSplitter,
31 filter,
32 createWriteStream('errors.log')
33);Async Iteration#
1import { createReadStream } from 'node:fs';
2import { createInterface } from 'node:readline';
3
4// Read file line by line
5async function processLines(filename) {
6 const fileStream = createReadStream(filename);
7 const rl = createInterface({
8 input: fileStream,
9 crlfDelay: Infinity,
10 });
11
12 for await (const line of rl) {
13 console.log(`Line: ${line}`);
14 }
15}
16
17// Iterate over stream directly
18async function processStream(stream) {
19 for await (const chunk of stream) {
20 console.log(`Chunk: ${chunk.length} bytes`);
21 }
22}Backpressure Handling#
1import { createReadStream, createWriteStream } from 'node:fs';
2
3const readable = createReadStream('large-file.txt');
4const writable = createWriteStream('output.txt');
5
6readable.on('data', (chunk) => {
7 // Check if we can write more
8 const canContinue = writable.write(chunk);
9
10 if (!canContinue) {
11 // Pause until drained
12 readable.pause();
13 writable.once('drain', () => {
14 readable.resume();
15 });
16 }
17});
18
19readable.on('end', () => {
20 writable.end();
21});Memory-Efficient Processing#
1import { createReadStream } from 'node:fs';
2import { createHash } from 'node:crypto';
3
4// Hash large file without loading into memory
5async function hashFile(filename) {
6 return new Promise((resolve, reject) => {
7 const hash = createHash('sha256');
8 const stream = createReadStream(filename);
9
10 stream.on('data', (chunk) => hash.update(chunk));
11 stream.on('end', () => resolve(hash.digest('hex')));
12 stream.on('error', reject);
13 });
14}
15
16// Count lines in large file
17async function countLines(filename) {
18 let count = 0;
19 const stream = createReadStream(filename);
20
21 for await (const chunk of stream) {
22 for (const char of chunk) {
23 if (char === 10) count++; // newline
24 }
25 }
26
27 return count;
28}Best Practices#
Performance:
✓ Use streams for large files
✓ Set appropriate highWaterMark
✓ Handle backpressure
✓ Use pipeline for error handling
Error Handling:
✓ Always handle 'error' events
✓ Use pipeline/promises
✓ Clean up resources
✓ Handle stream destruction
Patterns:
✓ Pipe for simple cases
✓ Pipeline for complex chains
✓ Object mode for structured data
✓ Async iteration for simplicity
Avoid:
✗ Loading large files into memory
✗ Ignoring backpressure
✗ Missing error handlers
✗ Not ending streams properly
Conclusion#
Node.js streams enable efficient processing of large data sets by handling data in chunks. Use readable streams for data sources, writable streams for destinations, and transform streams for data manipulation. Always use pipeline for proper error handling and cleanup, and be mindful of backpressure when processing data faster than it can be written.