Worker Threads enable parallel JavaScript execution. Here's how to use them for CPU-intensive work.
Basic Worker#
1// main.js
2const { Worker } = require('worker_threads');
3
4const worker = new Worker('./worker.js', {
5 workerData: { numbers: [1, 2, 3, 4, 5] },
6});
7
8worker.on('message', (result) => {
9 console.log('Result:', result);
10});
11
12worker.on('error', (error) => {
13 console.error('Worker error:', error);
14});
15
16worker.on('exit', (code) => {
17 if (code !== 0) {
18 console.error(`Worker stopped with exit code ${code}`);
19 }
20});
21
22// worker.js
23const { parentPort, workerData } = require('worker_threads');
24
25function heavyComputation(numbers) {
26 return numbers.reduce((sum, n) => sum + n * n, 0);
27}
28
29const result = heavyComputation(workerData.numbers);
30parentPort.postMessage(result);Promise Wrapper#
1// worker-wrapper.ts
2import { Worker } from 'worker_threads';
3
4interface WorkerOptions<T> {
5 workerPath: string;
6 workerData: T;
7 timeout?: number;
8}
9
10export function runWorker<T, R>({
11 workerPath,
12 workerData,
13 timeout = 30000,
14}: WorkerOptions<T>): Promise<R> {
15 return new Promise((resolve, reject) => {
16 const worker = new Worker(workerPath, { workerData });
17
18 const timer = setTimeout(() => {
19 worker.terminate();
20 reject(new Error('Worker timeout'));
21 }, timeout);
22
23 worker.on('message', (result: R) => {
24 clearTimeout(timer);
25 resolve(result);
26 });
27
28 worker.on('error', (error) => {
29 clearTimeout(timer);
30 reject(error);
31 });
32
33 worker.on('exit', (code) => {
34 clearTimeout(timer);
35 if (code !== 0) {
36 reject(new Error(`Worker exited with code ${code}`));
37 }
38 });
39 });
40}
41
42// Usage
43const result = await runWorker<{ data: number[] }, number>({
44 workerPath: './compute-worker.js',
45 workerData: { data: [1, 2, 3, 4, 5] },
46 timeout: 10000,
47});Worker Pool#
1// worker-pool.ts
2import { Worker } from 'worker_threads';
3import os from 'os';
4
5interface Task<T, R> {
6 data: T;
7 resolve: (result: R) => void;
8 reject: (error: Error) => void;
9}
10
11class WorkerPool<T, R> {
12 private workers: Worker[] = [];
13 private freeWorkers: Worker[] = [];
14 private taskQueue: Task<T, R>[] = [];
15 private workerPath: string;
16
17 constructor(workerPath: string, poolSize = os.cpus().length) {
18 this.workerPath = workerPath;
19
20 for (let i = 0; i < poolSize; i++) {
21 this.addNewWorker();
22 }
23 }
24
25 private addNewWorker(): void {
26 const worker = new Worker(this.workerPath);
27
28 worker.on('message', (result: R) => {
29 const task = (worker as any).currentTask as Task<T, R>;
30 task.resolve(result);
31 this.freeWorkers.push(worker);
32 this.processQueue();
33 });
34
35 worker.on('error', (error) => {
36 const task = (worker as any).currentTask as Task<T, R>;
37 if (task) {
38 task.reject(error);
39 }
40 // Replace failed worker
41 this.workers = this.workers.filter(w => w !== worker);
42 this.addNewWorker();
43 });
44
45 this.workers.push(worker);
46 this.freeWorkers.push(worker);
47 }
48
49 private processQueue(): void {
50 if (this.taskQueue.length === 0 || this.freeWorkers.length === 0) {
51 return;
52 }
53
54 const worker = this.freeWorkers.pop()!;
55 const task = this.taskQueue.shift()!;
56
57 (worker as any).currentTask = task;
58 worker.postMessage(task.data);
59 }
60
61 run(data: T): Promise<R> {
62 return new Promise((resolve, reject) => {
63 this.taskQueue.push({ data, resolve, reject });
64 this.processQueue();
65 });
66 }
67
68 async close(): Promise<void> {
69 await Promise.all(this.workers.map(w => w.terminate()));
70 }
71}
72
73// Usage
74const pool = new WorkerPool<number[], number>('./sum-worker.js', 4);
75
76const results = await Promise.all([
77 pool.run([1, 2, 3]),
78 pool.run([4, 5, 6]),
79 pool.run([7, 8, 9]),
80 pool.run([10, 11, 12]),
81]);
82
83await pool.close();Inline Worker#
1import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
2
3function runInlineWorker<T, R>(fn: (data: T) => R, data: T): Promise<R> {
4 return new Promise((resolve, reject) => {
5 const workerCode = `
6 const { parentPort, workerData } = require('worker_threads');
7 const fn = ${fn.toString()};
8 const result = fn(workerData);
9 parentPort.postMessage(result);
10 `;
11
12 const worker = new Worker(workerCode, {
13 eval: true,
14 workerData: data,
15 });
16
17 worker.on('message', resolve);
18 worker.on('error', reject);
19 });
20}
21
22// Usage
23const result = await runInlineWorker(
24 (nums: number[]) => nums.reduce((a, b) => a + b, 0),
25 [1, 2, 3, 4, 5]
26);Shared Memory#
1// main.ts
2import { Worker } from 'worker_threads';
3
4// Create shared buffer
5const sharedBuffer = new SharedArrayBuffer(4 * 1024); // 4KB
6const sharedArray = new Int32Array(sharedBuffer);
7
8// Initialize data
9for (let i = 0; i < 1000; i++) {
10 sharedArray[i] = i;
11}
12
13const worker = new Worker('./shared-worker.js', {
14 workerData: { sharedBuffer },
15});
16
17worker.on('message', () => {
18 console.log('First 10 values:', sharedArray.slice(0, 10));
19});
20
21// shared-worker.js
22const { parentPort, workerData } = require('worker_threads');
23
24const sharedArray = new Int32Array(workerData.sharedBuffer);
25
26// Modify shared memory
27for (let i = 0; i < sharedArray.length; i++) {
28 sharedArray[i] = sharedArray[i] * 2;
29}
30
31parentPort.postMessage('done');Atomics for Synchronization#
1// Using Atomics for thread-safe operations
2import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
3
4if (isMainThread) {
5 const sharedBuffer = new SharedArrayBuffer(4);
6 const sharedArray = new Int32Array(sharedBuffer);
7
8 // Start multiple workers
9 const workers = Array.from({ length: 4 }, () =>
10 new Worker(__filename, { workerData: { sharedBuffer } })
11 );
12
13 Promise.all(
14 workers.map(w => new Promise(resolve => w.on('exit', resolve)))
15 ).then(() => {
16 console.log('Final count:', sharedArray[0]);
17 // Should be 4000 (4 workers * 1000 increments)
18 });
19} else {
20 const sharedArray = new Int32Array(workerData.sharedBuffer);
21
22 // Atomic increment
23 for (let i = 0; i < 1000; i++) {
24 Atomics.add(sharedArray, 0, 1);
25 }
26
27 process.exit(0);
28}
29
30// Using Atomics.wait and Atomics.notify
31const sharedBuffer = new SharedArrayBuffer(4);
32const sharedArray = new Int32Array(sharedBuffer);
33
34// Worker waits
35Atomics.wait(sharedArray, 0, 0); // Wait while value is 0
36
37// Main thread notifies
38Atomics.store(sharedArray, 0, 1);
39Atomics.notify(sharedArray, 0, 1); // Wake one waiting workerMessage Channels#
1import { Worker, MessageChannel } from 'worker_threads';
2
3// Create channel for direct worker-to-worker communication
4const channel = new MessageChannel();
5
6const worker1 = new Worker('./worker1.js', {
7 workerData: { port: channel.port1 },
8 transferList: [channel.port1],
9});
10
11const worker2 = new Worker('./worker2.js', {
12 workerData: { port: channel.port2 },
13 transferList: [channel.port2],
14});
15
16// worker1.js
17const { workerData } = require('worker_threads');
18const { port } = workerData;
19
20port.on('message', (msg) => {
21 console.log('Worker 1 received:', msg);
22});
23
24port.postMessage('Hello from Worker 1');
25
26// worker2.js
27const { workerData } = require('worker_threads');
28const { port } = workerData;
29
30port.on('message', (msg) => {
31 console.log('Worker 2 received:', msg);
32 port.postMessage('Hello back from Worker 2');
33});Practical Example: Image Processing#
1// image-processor.ts
2import { Worker } from 'worker_threads';
3import path from 'path';
4
5interface ProcessOptions {
6 width: number;
7 height: number;
8 quality: number;
9}
10
11async function processImage(
12 imagePath: string,
13 options: ProcessOptions
14): Promise<Buffer> {
15 return new Promise((resolve, reject) => {
16 const worker = new Worker(
17 path.join(__dirname, 'image-worker.js'),
18 {
19 workerData: { imagePath, options },
20 }
21 );
22
23 worker.on('message', resolve);
24 worker.on('error', reject);
25 });
26}
27
28// image-worker.js
29const { parentPort, workerData } = require('worker_threads');
30const sharp = require('sharp');
31
32async function process() {
33 const { imagePath, options } = workerData;
34
35 const result = await sharp(imagePath)
36 .resize(options.width, options.height)
37 .jpeg({ quality: options.quality })
38 .toBuffer();
39
40 parentPort.postMessage(result);
41}
42
43process();Express Integration#
1import express from 'express';
2import { Worker } from 'worker_threads';
3import path from 'path';
4
5const app = express();
6
7// Worker pool for CPU tasks
8const pool = new WorkerPool(
9 path.join(__dirname, 'compute-worker.js'),
10 4
11);
12
13app.post('/compute', async (req, res) => {
14 try {
15 const result = await pool.run(req.body.data);
16 res.json({ result });
17 } catch (error) {
18 res.status(500).json({ error: error.message });
19 }
20});
21
22// Don't block event loop
23app.get('/heavy', async (req, res) => {
24 const result = await runWorker({
25 workerPath: './heavy-worker.js',
26 workerData: req.query,
27 });
28
29 res.json(result);
30});Best Practices#
When to Use:
✓ CPU-intensive computations
✓ Image/video processing
✓ Cryptographic operations
✓ Complex parsing/serialization
When NOT to Use:
✗ I/O bound operations
✗ Simple tasks (overhead > benefit)
✗ Operations needing main thread access
✗ Tasks requiring DOM access
Patterns:
✓ Use worker pools for reuse
✓ Transfer large data with transferList
✓ Use SharedArrayBuffer for shared state
✓ Implement proper error handling
Conclusion#
Worker Threads enable true parallelism for CPU-intensive tasks. Use worker pools for efficiency, SharedArrayBuffer for shared state, and Atomics for synchronization. Reserve workers for computationally heavy operations where the parallelism benefit outweighs the overhead.