Worker threads enable parallel JavaScript execution for CPU-intensive tasks. Here's how to use them effectively.
Basic Worker Thread#
1// main.js
2const { Worker, isMainThread, parentPort } = require('worker_threads');
3
4if (isMainThread) {
5 // Main thread
6 const worker = new Worker(__filename);
7
8 worker.on('message', (result) => {
9 console.log('Result from worker:', result);
10 });
11
12 worker.on('error', (error) => {
13 console.error('Worker error:', error);
14 });
15
16 worker.on('exit', (code) => {
17 if (code !== 0) {
18 console.error(`Worker exited with code ${code}`);
19 }
20 });
21
22 worker.postMessage({ num: 42 });
23} else {
24 // Worker thread
25 parentPort.on('message', (data) => {
26 const result = heavyComputation(data.num);
27 parentPort.postMessage(result);
28 });
29}
30
31function heavyComputation(n) {
32 let result = 0;
33 for (let i = 0; i < 1000000000; i++) {
34 result += Math.sqrt(n * i);
35 }
36 return result;
37}Separate Worker File#
1// main.js
2const { Worker } = require('worker_threads');
3const path = require('path');
4
5function runWorker(data) {
6 return new Promise((resolve, reject) => {
7 const worker = new Worker(path.join(__dirname, 'worker.js'), {
8 workerData: data,
9 });
10
11 worker.on('message', resolve);
12 worker.on('error', reject);
13 worker.on('exit', (code) => {
14 if (code !== 0) {
15 reject(new Error(`Worker stopped with exit code ${code}`));
16 }
17 });
18 });
19}
20
21// worker.js
22const { workerData, parentPort } = require('worker_threads');
23
24const result = processData(workerData);
25parentPort.postMessage(result);
26
27function processData(data) {
28 // CPU-intensive work
29 return data.map(item => item * 2);
30}
31
32// Usage
33async function main() {
34 const result = await runWorker([1, 2, 3, 4, 5]);
35 console.log(result); // [2, 4, 6, 8, 10]
36}Worker Thread Pool#
1const { Worker } = require('worker_threads');
2const os = require('os');
3const path = require('path');
4
5class WorkerPool {
6 constructor(workerPath, numWorkers = os.cpus().length) {
7 this.workerPath = workerPath;
8 this.numWorkers = numWorkers;
9 this.workers = [];
10 this.freeWorkers = [];
11 this.taskQueue = [];
12
13 this.initialize();
14 }
15
16 initialize() {
17 for (let i = 0; i < this.numWorkers; i++) {
18 this.addWorker();
19 }
20 }
21
22 addWorker() {
23 const worker = new Worker(this.workerPath);
24
25 worker.on('message', (result) => {
26 const { resolve } = worker.currentTask;
27 resolve(result);
28 this.freeWorkers.push(worker);
29 this.processQueue();
30 });
31
32 worker.on('error', (error) => {
33 const { reject } = worker.currentTask;
34 reject(error);
35 this.workers = this.workers.filter(w => w !== worker);
36 this.addWorker();
37 });
38
39 this.workers.push(worker);
40 this.freeWorkers.push(worker);
41 }
42
43 execute(data) {
44 return new Promise((resolve, reject) => {
45 this.taskQueue.push({ data, resolve, reject });
46 this.processQueue();
47 });
48 }
49
50 processQueue() {
51 if (this.taskQueue.length === 0 || this.freeWorkers.length === 0) {
52 return;
53 }
54
55 const worker = this.freeWorkers.pop();
56 const task = this.taskQueue.shift();
57
58 worker.currentTask = task;
59 worker.postMessage(task.data);
60 }
61
62 async shutdown() {
63 await Promise.all(
64 this.workers.map(worker => worker.terminate())
65 );
66 }
67}
68
69// Usage
70const pool = new WorkerPool('./worker.js', 4);
71
72async function processItems(items) {
73 const results = await Promise.all(
74 items.map(item => pool.execute(item))
75 );
76 return results;
77}Shared Memory with SharedArrayBuffer#
1// main.js
2const { Worker } = require('worker_threads');
3
4// Create shared buffer
5const sharedBuffer = new SharedArrayBuffer(4 * 1024); // 4KB
6const sharedArray = new Int32Array(sharedBuffer);
7
8// Initialize data
9for (let i = 0; i < sharedArray.length; i++) {
10 sharedArray[i] = i;
11}
12
13// Create workers
14const workers = [];
15const numWorkers = 4;
16const chunkSize = Math.ceil(sharedArray.length / numWorkers);
17
18for (let i = 0; i < numWorkers; i++) {
19 const worker = new Worker('./shared-worker.js', {
20 workerData: {
21 buffer: sharedBuffer,
22 start: i * chunkSize,
23 end: Math.min((i + 1) * chunkSize, sharedArray.length),
24 },
25 });
26 workers.push(worker);
27}
28
29// Wait for all workers
30Promise.all(
31 workers.map(w => new Promise(resolve => w.on('message', resolve)))
32).then(() => {
33 console.log('Sum:', sharedArray.reduce((a, b) => a + b, 0));
34});
35
36// shared-worker.js
37const { workerData, parentPort } = require('worker_threads');
38
39const { buffer, start, end } = workerData;
40const array = new Int32Array(buffer);
41
42// Process assigned chunk
43for (let i = start; i < end; i++) {
44 array[i] = array[i] * 2;
45}
46
47parentPort.postMessage('done');Atomics for Synchronization#
1// Using Atomics for thread-safe operations
2const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
3
4if (isMainThread) {
5 const sharedBuffer = new SharedArrayBuffer(4);
6 const counter = new Int32Array(sharedBuffer);
7
8 // Create multiple workers
9 const workers = [];
10 for (let i = 0; i < 4; i++) {
11 const worker = new Worker(__filename, {
12 workerData: { buffer: sharedBuffer },
13 });
14 workers.push(worker);
15 }
16
17 // Wait for all workers
18 Promise.all(
19 workers.map(w => new Promise(resolve => w.on('exit', resolve)))
20 ).then(() => {
21 console.log('Final count:', counter[0]); // Should be 4000000
22 });
23} else {
24 const counter = new Int32Array(workerData.buffer);
25
26 // Atomically increment 1 million times
27 for (let i = 0; i < 1000000; i++) {
28 Atomics.add(counter, 0, 1);
29 }
30
31 process.exit(0);
32}
33
34// Wait and notify
35const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
36
37if (isMainThread) {
38 const buffer = new SharedArrayBuffer(4);
39 const array = new Int32Array(buffer);
40
41 const worker = new Worker(__filename, {
42 workerData: { buffer },
43 });
44
45 setTimeout(() => {
46 array[0] = 42;
47 Atomics.notify(array, 0, 1); // Wake up one waiting thread
48 }, 1000);
49
50 worker.on('message', console.log);
51} else {
52 const array = new Int32Array(workerData.buffer);
53
54 // Wait until notified
55 Atomics.wait(array, 0, 0); // Wait while value is 0
56
57 parentPort.postMessage(`Value is: ${array[0]}`);
58}Transferable Objects#
1// Transfer ArrayBuffer ownership
2const { Worker } = require('worker_threads');
3
4const buffer = new ArrayBuffer(1024 * 1024); // 1MB
5
6const worker = new Worker('./worker.js');
7
8// Transfer ownership (zero-copy)
9worker.postMessage(buffer, [buffer]);
10
11// buffer is now detached (length === 0)
12console.log(buffer.byteLength); // 0
13
14// worker.js
15const { parentPort } = require('worker_threads');
16
17parentPort.on('message', (buffer) => {
18 const array = new Uint8Array(buffer);
19 // Process buffer...
20
21 // Transfer back
22 parentPort.postMessage(buffer, [buffer]);
23});
24
25// MessageChannel for bidirectional communication
26const { Worker, MessageChannel } = require('worker_threads');
27
28const { port1, port2 } = new MessageChannel();
29
30const worker = new Worker('./channel-worker.js');
31worker.postMessage({ port: port2 }, [port2]);
32
33port1.on('message', (msg) => {
34 console.log('Received:', msg);
35});
36
37port1.postMessage('Hello from main!');
38
39// channel-worker.js
40const { parentPort } = require('worker_threads');
41
42parentPort.on('message', ({ port }) => {
43 port.on('message', (msg) => {
44 console.log('Worker received:', msg);
45 port.postMessage('Hello from worker!');
46 });
47});CPU-Intensive Task Examples#
1// Image processing
2const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
3
4if (isMainThread) {
5 async function processImage(imageData) {
6 return new Promise((resolve, reject) => {
7 const worker = new Worker(__filename, {
8 workerData: imageData,
9 });
10 worker.on('message', resolve);
11 worker.on('error', reject);
12 });
13 }
14
15 // Usage
16 const imageBuffer = loadImage('photo.jpg');
17 const processed = await processImage(imageBuffer);
18} else {
19 const imageData = workerData;
20
21 // Apply filter
22 const result = applyGrayscaleFilter(imageData);
23
24 parentPort.postMessage(result);
25}
26
27// Cryptographic operations
28const crypto = require('crypto');
29
30// Hash computation in worker
31if (isMainThread) {
32 const pool = new WorkerPool('./hash-worker.js', 4);
33
34 async function hashPasswords(passwords) {
35 return Promise.all(
36 passwords.map(pwd => pool.execute({ password: pwd }))
37 );
38 }
39} else {
40 parentPort.on('message', ({ password }) => {
41 const hash = crypto.pbkdf2Sync(
42 password,
43 'salt',
44 100000,
45 64,
46 'sha512'
47 );
48 parentPort.postMessage(hash.toString('hex'));
49 });
50}
51
52// JSON parsing for large files
53const { Worker } = require('worker_threads');
54const fs = require('fs');
55
56async function parselargeJSON(filePath) {
57 return new Promise((resolve, reject) => {
58 const worker = new Worker(`
59 const { workerData, parentPort } = require('worker_threads');
60 const fs = require('fs');
61
62 const data = fs.readFileSync(workerData, 'utf8');
63 const parsed = JSON.parse(data);
64
65 parentPort.postMessage(parsed);
66 `, {
67 eval: true,
68 workerData: filePath,
69 });
70
71 worker.on('message', resolve);
72 worker.on('error', reject);
73 });
74}Error Handling#
1const { Worker } = require('worker_threads');
2
3function createRobustWorker(workerPath, options = {}) {
4 return new Promise((resolve, reject) => {
5 const worker = new Worker(workerPath, options);
6
7 const timeout = setTimeout(() => {
8 worker.terminate();
9 reject(new Error('Worker timed out'));
10 }, options.timeout || 30000);
11
12 worker.on('message', (result) => {
13 clearTimeout(timeout);
14 resolve(result);
15 });
16
17 worker.on('error', (error) => {
18 clearTimeout(timeout);
19 reject(error);
20 });
21
22 worker.on('exit', (code) => {
23 clearTimeout(timeout);
24 if (code !== 0) {
25 reject(new Error(`Worker exited with code ${code}`));
26 }
27 });
28 });
29}
30
31// Graceful shutdown
32class ManagedWorkerPool {
33 constructor(workerPath, size) {
34 this.pool = new WorkerPool(workerPath, size);
35 this.isShuttingDown = false;
36
37 process.on('SIGTERM', () => this.shutdown());
38 process.on('SIGINT', () => this.shutdown());
39 }
40
41 async execute(data) {
42 if (this.isShuttingDown) {
43 throw new Error('Pool is shutting down');
44 }
45 return this.pool.execute(data);
46 }
47
48 async shutdown() {
49 this.isShuttingDown = true;
50 await this.pool.shutdown();
51 process.exit(0);
52 }
53}Best Practices#
When to Use:
✓ CPU-intensive computations
✓ Image/video processing
✓ Large data parsing
✓ Cryptographic operations
Design:
✓ Use worker pools for reuse
✓ Transfer large data, don't copy
✓ Use SharedArrayBuffer carefully
✓ Handle errors and timeouts
Performance:
✓ Limit worker count to CPU cores
✓ Balance work distribution
✓ Minimize message passing
✓ Use Atomics for synchronization
Avoid:
✗ I/O-bound tasks in workers
✗ Creating workers for small tasks
✗ Sharing complex objects
✗ Race conditions without Atomics
Conclusion#
Worker threads enable true parallelism for CPU-intensive tasks in Node.js. Use them for computations that would block the event loop. Implement worker pools for efficiency, SharedArrayBuffer for shared memory, and Atomics for synchronization. Remember that workers are best for CPU-bound work, not I/O operations.