Streams process data piece by piece, enabling efficient handling of large files and real-time data without loading everything into memory.
Stream Types#
Readable: Source of data (fs.createReadStream, http request)
Writable: Destination for data (fs.createWriteStream, http response)
Duplex: Both readable and writable (TCP socket)
Transform: Modify data as it passes through (compression, encryption)
Reading Streams#
1import { createReadStream } from 'fs';
2import { pipeline } from 'stream/promises';
3
4// Basic readable stream
5const readable = createReadStream('large-file.txt', {
6 encoding: 'utf8',
7 highWaterMark: 64 * 1024, // 64KB chunks
8});
9
10// Event-based reading
11readable.on('data', (chunk) => {
12 console.log(`Received ${chunk.length} bytes`);
13});
14
15readable.on('end', () => {
16 console.log('Finished reading');
17});
18
19readable.on('error', (err) => {
20 console.error('Error:', err);
21});
22
23// Async iteration (preferred)
24async function processFile(path: string): Promise<void> {
25 const stream = createReadStream(path, { encoding: 'utf8' });
26
27 for await (const chunk of stream) {
28 console.log(`Processing ${chunk.length} characters`);
29 }
30}
31
32// Pausing and resuming
33readable.pause();
34// Do something
35readable.resume();Writing Streams#
1import { createWriteStream } from 'fs';
2
3const writable = createWriteStream('output.txt');
4
5// Write data
6writable.write('Hello, ');
7writable.write('World!\n');
8
9// End stream (signals no more data)
10writable.end('Final line');
11
12// Handle events
13writable.on('finish', () => {
14 console.log('All data written');
15});
16
17writable.on('error', (err) => {
18 console.error('Write error:', err);
19});
20
21// Handle backpressure
22function writeData(stream: Writable, data: string[]): Promise<void> {
23 return new Promise((resolve, reject) => {
24 let i = 0;
25
26 function write() {
27 let ok = true;
28
29 while (i < data.length && ok) {
30 const chunk = data[i];
31 i++;
32
33 if (i === data.length) {
34 // Last chunk
35 stream.write(chunk, (err) => {
36 if (err) reject(err);
37 else resolve();
38 });
39 } else {
40 // Check if we can continue
41 ok = stream.write(chunk);
42 }
43 }
44
45 if (i < data.length) {
46 // Wait for drain event
47 stream.once('drain', write);
48 }
49 }
50
51 stream.on('error', reject);
52 write();
53 });
54}Transform Streams#
1import { Transform, TransformCallback } from 'stream';
2
3// Custom transform stream
4class UppercaseTransform extends Transform {
5 _transform(
6 chunk: Buffer,
7 encoding: BufferEncoding,
8 callback: TransformCallback
9 ): void {
10 const upper = chunk.toString().toUpperCase();
11 this.push(upper);
12 callback();
13 }
14}
15
16// Usage
17const uppercase = new UppercaseTransform();
18process.stdin.pipe(uppercase).pipe(process.stdout);
19
20// Line-by-line transform
21class LineSplitter extends Transform {
22 private buffer = '';
23
24 _transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
25 this.buffer += chunk.toString();
26 const lines = this.buffer.split('\n');
27 this.buffer = lines.pop() || '';
28
29 for (const line of lines) {
30 this.push(line + '\n');
31 }
32
33 callback();
34 }
35
36 _flush(callback: TransformCallback): void {
37 if (this.buffer) {
38 this.push(this.buffer);
39 }
40 callback();
41 }
42}
43
44// JSON transform
45class JSONParser extends Transform {
46 constructor() {
47 super({ objectMode: true }); // Enable object mode
48 }
49
50 _transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
51 try {
52 const obj = JSON.parse(chunk.toString());
53 this.push(obj);
54 callback();
55 } catch (err) {
56 callback(err as Error);
57 }
58 }
59}Pipeline#
1import { pipeline } from 'stream/promises';
2import { createReadStream, createWriteStream } from 'fs';
3import { createGzip, createGunzip } from 'zlib';
4
5// Compress file
6async function compressFile(input: string, output: string): Promise<void> {
7 await pipeline(
8 createReadStream(input),
9 createGzip(),
10 createWriteStream(output)
11 );
12 console.log('Compression complete');
13}
14
15// Decompress file
16async function decompressFile(input: string, output: string): Promise<void> {
17 await pipeline(
18 createReadStream(input),
19 createGunzip(),
20 createWriteStream(output)
21 );
22 console.log('Decompression complete');
23}
24
25// Multiple transforms
26await pipeline(
27 createReadStream('input.txt'),
28 new LineSplitter(),
29 new UppercaseTransform(),
30 createWriteStream('output.txt')
31);
32
33// With error handling
34try {
35 await pipeline(source, transform, destination);
36} catch (err) {
37 console.error('Pipeline failed:', err);
38}HTTP Streaming#
1import http from 'http';
2import { createReadStream } from 'fs';
3import { pipeline } from 'stream/promises';
4
5// Stream file response
6const server = http.createServer(async (req, res) => {
7 if (req.url === '/video') {
8 const stat = await fs.promises.stat('video.mp4');
9
10 res.writeHead(200, {
11 'Content-Type': 'video/mp4',
12 'Content-Length': stat.size,
13 });
14
15 await pipeline(createReadStream('video.mp4'), res);
16 }
17});
18
19// Stream with range support (video seeking)
20const server = http.createServer(async (req, res) => {
21 const stat = await fs.promises.stat('video.mp4');
22 const fileSize = stat.size;
23 const range = req.headers.range;
24
25 if (range) {
26 const parts = range.replace(/bytes=/, '').split('-');
27 const start = parseInt(parts[0], 10);
28 const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
29
30 res.writeHead(206, {
31 'Content-Range': `bytes ${start}-${end}/${fileSize}`,
32 'Accept-Ranges': 'bytes',
33 'Content-Length': end - start + 1,
34 'Content-Type': 'video/mp4',
35 });
36
37 createReadStream('video.mp4', { start, end }).pipe(res);
38 } else {
39 res.writeHead(200, {
40 'Content-Length': fileSize,
41 'Content-Type': 'video/mp4',
42 });
43
44 createReadStream('video.mp4').pipe(res);
45 }
46});
47
48// Stream request body
49app.post('/upload', async (req, res) => {
50 const writeStream = createWriteStream('upload.bin');
51
52 await pipeline(req, writeStream);
53
54 res.json({ message: 'Upload complete' });
55});Async Generators#
1import { Readable } from 'stream';
2
3// Create readable from async generator
4async function* generateData() {
5 for (let i = 0; i < 100; i++) {
6 yield `Line ${i}\n`;
7 await sleep(10);
8 }
9}
10
11const readable = Readable.from(generateData());
12
13// Database cursor as stream
14async function* fetchUsers(batchSize = 100) {
15 let cursor: string | undefined;
16
17 while (true) {
18 const users = await db.user.findMany({
19 take: batchSize,
20 cursor: cursor ? { id: cursor } : undefined,
21 skip: cursor ? 1 : 0,
22 });
23
24 if (users.length === 0) break;
25
26 for (const user of users) {
27 yield user;
28 }
29
30 cursor = users[users.length - 1].id;
31 }
32}
33
34// Stream to response
35app.get('/users/export', async (req, res) => {
36 res.setHeader('Content-Type', 'application/json');
37 res.write('[');
38
39 let first = true;
40 for await (const user of fetchUsers()) {
41 if (!first) res.write(',');
42 res.write(JSON.stringify(user));
43 first = false;
44 }
45
46 res.write(']');
47 res.end();
48});Backpressure#
1// Handling backpressure properly
2function copyFile(source: string, dest: string): Promise<void> {
3 return new Promise((resolve, reject) => {
4 const readable = createReadStream(source);
5 const writable = createWriteStream(dest);
6
7 readable.on('data', (chunk) => {
8 const canContinue = writable.write(chunk);
9
10 if (!canContinue) {
11 // Destination is full, pause reading
12 readable.pause();
13
14 writable.once('drain', () => {
15 // Destination ready, resume reading
16 readable.resume();
17 });
18 }
19 });
20
21 readable.on('end', () => {
22 writable.end();
23 });
24
25 writable.on('finish', resolve);
26 writable.on('error', reject);
27 readable.on('error', reject);
28 });
29}
30
31// Or use pipeline (handles backpressure automatically)
32await pipeline(
33 createReadStream(source),
34 createWriteStream(dest)
35);Object Mode Streams#
1// Streams that handle objects instead of buffers
2class ObjectTransform extends Transform {
3 constructor() {
4 super({ objectMode: true });
5 }
6
7 _transform(obj: any, encoding: string, callback: TransformCallback): void {
8 // Transform object
9 const transformed = {
10 ...obj,
11 processed: true,
12 timestamp: new Date(),
13 };
14
15 this.push(transformed);
16 callback();
17 }
18}
19
20// Usage
21const transform = new ObjectTransform();
22
23transform.write({ id: 1, name: 'Test' });
24transform.on('data', (obj) => {
25 console.log(obj); // { id: 1, name: 'Test', processed: true, timestamp: ... }
26});Best Practices#
Memory:
✓ Use streams for large files
✓ Set appropriate highWaterMark
✓ Handle backpressure
✓ Clean up on error
Error Handling:
✓ Always handle 'error' events
✓ Use pipeline for cleanup
✓ Destroy streams on error
✓ Check stream state
Performance:
✓ Tune chunk size for workload
✓ Use object mode for objects
✓ Avoid unnecessary transforms
✓ Profile memory usage
Conclusion#
Streams enable efficient data processing in Node.js. Use pipeline for composing streams safely, handle backpressure to prevent memory issues, and leverage async iteration for cleaner code. Streams are essential for processing large files and real-time data.