Streams process data piece by piece without loading everything into memory. Here's how to use them effectively.
Stream Types#
1import { Readable, Writable, Duplex, Transform } from 'stream';
2
3// Readable - source of data
4// Examples: fs.createReadStream, http request, process.stdin
5
6// Writable - destination for data
7// Examples: fs.createWriteStream, http response, process.stdout
8
9// Duplex - both readable and writable
10// Examples: TCP socket, zlib streams
11
12// Transform - modify data as it passes through
13// Examples: zlib.createGzip, crypto streamsReading Streams#
1import { createReadStream } from 'fs';
2
3const readable = createReadStream('large-file.txt');
4
5// Event-based reading
6readable.on('data', (chunk) => {
7 console.log(`Received ${chunk.length} bytes`);
8});
9
10readable.on('end', () => {
11 console.log('Finished reading');
12});
13
14readable.on('error', (err) => {
15 console.error('Error:', err);
16});
17
18// Paused mode (manual reading)
19readable.on('readable', () => {
20 let chunk;
21 while ((chunk = readable.read()) !== null) {
22 console.log(`Read ${chunk.length} bytes`);
23 }
24});
25
26// Pause and resume
27readable.on('data', (chunk) => {
28 readable.pause();
29 processChunk(chunk).then(() => {
30 readable.resume();
31 });
32});Writing Streams#
1import { createWriteStream } from 'fs';
2
3const writable = createWriteStream('output.txt');
4
5// Write data
6writable.write('Hello, ');
7writable.write('World!');
8writable.end('\n'); // Signal end
9
10// Handle backpressure
11function writeData(stream, data) {
12 const canContinue = stream.write(data);
13
14 if (!canContinue) {
15 // Buffer full, wait for drain
16 stream.once('drain', () => {
17 console.log('Drained, can write more');
18 });
19 }
20}
21
22// Events
23writable.on('finish', () => {
24 console.log('All data written');
25});
26
27writable.on('error', (err) => {
28 console.error('Write error:', err);
29});
30
31// Close vs finish
32writable.on('close', () => {
33 // Underlying resource closed (after finish)
34});Piping Streams#
1import { createReadStream, createWriteStream } from 'fs';
2import { createGzip, createGunzip } from 'zlib';
3
4// Basic pipe
5const readable = createReadStream('input.txt');
6const writable = createWriteStream('output.txt');
7
8readable.pipe(writable);
9
10// Chain transforms
11createReadStream('file.txt')
12 .pipe(createGzip())
13 .pipe(createWriteStream('file.txt.gz'));
14
15// Decompress
16createReadStream('file.txt.gz')
17 .pipe(createGunzip())
18 .pipe(createWriteStream('file.txt'));
19
20// Error handling with pipe
21readable
22 .on('error', handleError)
23 .pipe(transform)
24 .on('error', handleError)
25 .pipe(writable)
26 .on('error', handleError);
27
28// Using pipeline (recommended)
29import { pipeline } from 'stream/promises';
30
31await pipeline(
32 createReadStream('input.txt'),
33 createGzip(),
34 createWriteStream('output.txt.gz')
35);
36// Automatically handles errors and cleanupCreating Readable Streams#
1import { Readable } from 'stream';
2
3// From array
4const readable = Readable.from(['Hello', ' ', 'World']);
5
6// Custom readable
7class CounterStream extends Readable {
8 constructor(max) {
9 super();
10 this.max = max;
11 this.current = 0;
12 }
13
14 _read() {
15 if (this.current <= this.max) {
16 this.push(String(this.current++));
17 } else {
18 this.push(null); // Signal end
19 }
20 }
21}
22
23const counter = new CounterStream(5);
24counter.on('data', (num) => console.log(num));
25// 0, 1, 2, 3, 4, 5
26
27// Async generator
28async function* asyncGenerator() {
29 for (let i = 0; i < 5; i++) {
30 await new Promise(r => setTimeout(r, 100));
31 yield `Item ${i}\n`;
32 }
33}
34
35const asyncReadable = Readable.from(asyncGenerator());
36
37// Object mode
38const objectStream = new Readable({
39 objectMode: true,
40 read() {
41 this.push({ id: 1, name: 'John' });
42 this.push({ id: 2, name: 'Jane' });
43 this.push(null);
44 }
45});Creating Writable Streams#
1import { Writable } from 'stream';
2
3// Custom writable
4class LogStream extends Writable {
5 _write(chunk, encoding, callback) {
6 console.log(`[LOG] ${chunk.toString()}`);
7 callback(); // Signal completion
8 }
9}
10
11const logger = new LogStream();
12logger.write('Hello');
13logger.write('World');
14logger.end();
15
16// With async operation
17class DatabaseWriter extends Writable {
18 constructor(db) {
19 super({ objectMode: true });
20 this.db = db;
21 }
22
23 async _write(record, encoding, callback) {
24 try {
25 await this.db.insert(record);
26 callback();
27 } catch (err) {
28 callback(err);
29 }
30 }
31
32 // Batch writes
33 async _writev(chunks, callback) {
34 const records = chunks.map(({ chunk }) => chunk);
35 try {
36 await this.db.insertMany(records);
37 callback();
38 } catch (err) {
39 callback(err);
40 }
41 }
42}
43
44// Using constructor options
45const myWritable = new Writable({
46 write(chunk, encoding, callback) {
47 console.log(chunk.toString());
48 callback();
49 }
50});Creating Transform Streams#
1import { Transform } from 'stream';
2
3// Uppercase transform
4class UppercaseTransform extends Transform {
5 _transform(chunk, encoding, callback) {
6 this.push(chunk.toString().toUpperCase());
7 callback();
8 }
9}
10
11// Usage
12createReadStream('input.txt')
13 .pipe(new UppercaseTransform())
14 .pipe(createWriteStream('output.txt'));
15
16// JSON line parser
17class JSONLineParser extends Transform {
18 constructor() {
19 super({ objectMode: true });
20 this.buffer = '';
21 }
22
23 _transform(chunk, encoding, callback) {
24 this.buffer += chunk.toString();
25 const lines = this.buffer.split('\n');
26 this.buffer = lines.pop(); // Keep incomplete line
27
28 for (const line of lines) {
29 if (line.trim()) {
30 try {
31 this.push(JSON.parse(line));
32 } catch (err) {
33 return callback(err);
34 }
35 }
36 }
37 callback();
38 }
39
40 _flush(callback) {
41 if (this.buffer.trim()) {
42 try {
43 this.push(JSON.parse(this.buffer));
44 } catch (err) {
45 return callback(err);
46 }
47 }
48 callback();
49 }
50}
51
52// Using constructor options
53const transform = new Transform({
54 transform(chunk, encoding, callback) {
55 callback(null, chunk.toString().toUpperCase());
56 }
57});Duplex Streams#
1import { Duplex } from 'stream';
2
3// Simple duplex
4class EchoStream extends Duplex {
5 constructor() {
6 super();
7 this.data = [];
8 }
9
10 _write(chunk, encoding, callback) {
11 this.data.push(chunk);
12 callback();
13 }
14
15 _read() {
16 if (this.data.length) {
17 this.push(this.data.shift());
18 } else {
19 this.push(null);
20 }
21 }
22}
23
24// TCP socket is a duplex stream
25import { createServer } from 'net';
26
27const server = createServer((socket) => {
28 // socket is duplex - read from client, write to client
29 socket.on('data', (data) => {
30 socket.write(`Echo: ${data}`);
31 });
32});Async Iteration#
1import { createReadStream } from 'fs';
2
3// Streams are async iterable
4async function processFile(filename) {
5 const stream = createReadStream(filename);
6
7 for await (const chunk of stream) {
8 console.log(`Chunk: ${chunk.length} bytes`);
9 }
10}
11
12// Line by line with readline
13import { createInterface } from 'readline';
14
15async function processLines(filename) {
16 const rl = createInterface({
17 input: createReadStream(filename),
18 crlfDelay: Infinity
19 });
20
21 for await (const line of rl) {
22 console.log(line);
23 }
24}
25
26// Process large JSON lines file
27async function processJSONLines(filename) {
28 const rl = createInterface({
29 input: createReadStream(filename)
30 });
31
32 for await (const line of rl) {
33 const record = JSON.parse(line);
34 await processRecord(record);
35 }
36}Backpressure Handling#
1import { Readable, Writable } from 'stream';
2
3// Slow consumer causes backpressure
4const fast = new Readable({
5 read() {
6 for (let i = 0; i < 1000; i++) {
7 const canContinue = this.push(`Data ${i}\n`);
8 if (!canContinue) {
9 // Internal buffer full, stop pushing
10 break;
11 }
12 }
13 this.push(null);
14 }
15});
16
17const slow = new Writable({
18 highWaterMark: 16, // Small buffer
19 write(chunk, encoding, callback) {
20 // Simulate slow write
21 setTimeout(callback, 100);
22 }
23});
24
25// pipe() handles backpressure automatically
26fast.pipe(slow);
27
28// Manual backpressure handling
29function copyWithBackpressure(source, dest) {
30 source.on('data', (chunk) => {
31 const canContinue = dest.write(chunk);
32
33 if (!canContinue) {
34 source.pause();
35 dest.once('drain', () => {
36 source.resume();
37 });
38 }
39 });
40
41 source.on('end', () => {
42 dest.end();
43 });
44}HTTP with Streams#
1import { createServer } from 'http';
2import { createReadStream } from 'fs';
3import { pipeline } from 'stream/promises';
4import { createGzip } from 'zlib';
5
6const server = createServer(async (req, res) => {
7 // Stream file response
8 if (req.url === '/file') {
9 res.setHeader('Content-Type', 'text/plain');
10 createReadStream('large-file.txt').pipe(res);
11 return;
12 }
13
14 // Compressed response
15 if (req.url === '/compressed') {
16 res.setHeader('Content-Encoding', 'gzip');
17 await pipeline(
18 createReadStream('large-file.txt'),
19 createGzip(),
20 res
21 );
22 return;
23 }
24
25 // Stream request body
26 if (req.method === 'POST') {
27 const chunks = [];
28 for await (const chunk of req) {
29 chunks.push(chunk);
30 }
31 const body = Buffer.concat(chunks).toString();
32 res.end(`Received: ${body.length} bytes`);
33 }
34});Error Handling#
1import { pipeline } from 'stream/promises';
2import { createReadStream, createWriteStream } from 'fs';
3
4// With pipeline (recommended)
5try {
6 await pipeline(
7 createReadStream('input.txt'),
8 transformStream,
9 createWriteStream('output.txt')
10 );
11 console.log('Pipeline succeeded');
12} catch (err) {
13 console.error('Pipeline failed:', err);
14}
15
16// Manual error handling
17const readable = createReadStream('input.txt');
18const writable = createWriteStream('output.txt');
19
20readable.on('error', (err) => {
21 console.error('Read error:', err);
22 writable.destroy();
23});
24
25writable.on('error', (err) => {
26 console.error('Write error:', err);
27 readable.destroy();
28});
29
30readable.pipe(writable);
31
32// Cleanup on error
33import { finished } from 'stream/promises';
34
35const stream = createReadStream('file.txt');
36try {
37 await finished(stream);
38} catch (err) {
39 // Stream ended with error
40} finally {
41 stream.destroy();
42}Best Practices#
Stream Usage:
✓ Use pipeline() for piping
✓ Handle all error events
✓ Use objectMode for objects
✓ Implement _flush for transforms
Performance:
✓ Use streams for large data
✓ Set appropriate highWaterMark
✓ Handle backpressure
✓ Avoid unnecessary buffering
Memory:
✓ Don't collect all data in array
✓ Process chunks immediately
✓ Use async iteration
✓ Destroy unused streams
Avoid:
✗ Ignoring backpressure
✗ Missing error handlers
✗ Calling push() after null
✗ Large highWaterMark values
Conclusion#
Streams enable efficient data processing by handling data in chunks rather than loading everything into memory. Use Readable for sources, Writable for destinations, Transform for processing, and Duplex for bidirectional communication. Always use pipeline() for piping multiple streams together, handle errors properly, and respect backpressure signals for robust stream processing.