Back to Blog
Node.jsWorker ThreadsParallelPerformance

Node.js Worker Threads Guide

Master Node.js worker threads for CPU-intensive tasks. From basics to thread pools to shared memory.

B
Bootspring Team
Engineering
July 4, 2020
7 min read

Worker threads enable parallel JavaScript execution for CPU-intensive tasks. Here's how to use them effectively.

Basic Worker Thread#

1// main.js 2const { Worker, isMainThread, parentPort } = require('worker_threads'); 3 4if (isMainThread) { 5 // Main thread 6 const worker = new Worker(__filename); 7 8 worker.on('message', (result) => { 9 console.log('Result from worker:', result); 10 }); 11 12 worker.on('error', (error) => { 13 console.error('Worker error:', error); 14 }); 15 16 worker.on('exit', (code) => { 17 if (code !== 0) { 18 console.error(`Worker exited with code ${code}`); 19 } 20 }); 21 22 worker.postMessage({ num: 42 }); 23} else { 24 // Worker thread 25 parentPort.on('message', (data) => { 26 const result = heavyComputation(data.num); 27 parentPort.postMessage(result); 28 }); 29} 30 31function heavyComputation(n) { 32 let result = 0; 33 for (let i = 0; i < 1000000000; i++) { 34 result += Math.sqrt(n * i); 35 } 36 return result; 37}

Separate Worker File#

1// main.js 2const { Worker } = require('worker_threads'); 3const path = require('path'); 4 5function runWorker(data) { 6 return new Promise((resolve, reject) => { 7 const worker = new Worker(path.join(__dirname, 'worker.js'), { 8 workerData: data, 9 }); 10 11 worker.on('message', resolve); 12 worker.on('error', reject); 13 worker.on('exit', (code) => { 14 if (code !== 0) { 15 reject(new Error(`Worker stopped with exit code ${code}`)); 16 } 17 }); 18 }); 19} 20 21// worker.js 22const { workerData, parentPort } = require('worker_threads'); 23 24const result = processData(workerData); 25parentPort.postMessage(result); 26 27function processData(data) { 28 // CPU-intensive work 29 return data.map(item => item * 2); 30} 31 32// Usage 33async function main() { 34 const result = await runWorker([1, 2, 3, 4, 5]); 35 console.log(result); // [2, 4, 6, 8, 10] 36}

Worker Thread Pool#

1const { Worker } = require('worker_threads'); 2const os = require('os'); 3const path = require('path'); 4 5class WorkerPool { 6 constructor(workerPath, numWorkers = os.cpus().length) { 7 this.workerPath = workerPath; 8 this.numWorkers = numWorkers; 9 this.workers = []; 10 this.freeWorkers = []; 11 this.taskQueue = []; 12 13 this.initialize(); 14 } 15 16 initialize() { 17 for (let i = 0; i < this.numWorkers; i++) { 18 this.addWorker(); 19 } 20 } 21 22 addWorker() { 23 const worker = new Worker(this.workerPath); 24 25 worker.on('message', (result) => { 26 const { resolve } = worker.currentTask; 27 resolve(result); 28 this.freeWorkers.push(worker); 29 this.processQueue(); 30 }); 31 32 worker.on('error', (error) => { 33 const { reject } = worker.currentTask; 34 reject(error); 35 this.workers = this.workers.filter(w => w !== worker); 36 this.addWorker(); 37 }); 38 39 this.workers.push(worker); 40 this.freeWorkers.push(worker); 41 } 42 43 execute(data) { 44 return new Promise((resolve, reject) => { 45 this.taskQueue.push({ data, resolve, reject }); 46 this.processQueue(); 47 }); 48 } 49 50 processQueue() { 51 if (this.taskQueue.length === 0 || this.freeWorkers.length === 0) { 52 return; 53 } 54 55 const worker = this.freeWorkers.pop(); 56 const task = this.taskQueue.shift(); 57 58 worker.currentTask = task; 59 worker.postMessage(task.data); 60 } 61 62 async shutdown() { 63 await Promise.all( 64 this.workers.map(worker => worker.terminate()) 65 ); 66 } 67} 68 69// Usage 70const pool = new WorkerPool('./worker.js', 4); 71 72async function processItems(items) { 73 const results = await Promise.all( 74 items.map(item => pool.execute(item)) 75 ); 76 return results; 77}

Shared Memory with SharedArrayBuffer#

1// main.js 2const { Worker } = require('worker_threads'); 3 4// Create shared buffer 5const sharedBuffer = new SharedArrayBuffer(4 * 1024); // 4KB 6const sharedArray = new Int32Array(sharedBuffer); 7 8// Initialize data 9for (let i = 0; i < sharedArray.length; i++) { 10 sharedArray[i] = i; 11} 12 13// Create workers 14const workers = []; 15const numWorkers = 4; 16const chunkSize = Math.ceil(sharedArray.length / numWorkers); 17 18for (let i = 0; i < numWorkers; i++) { 19 const worker = new Worker('./shared-worker.js', { 20 workerData: { 21 buffer: sharedBuffer, 22 start: i * chunkSize, 23 end: Math.min((i + 1) * chunkSize, sharedArray.length), 24 }, 25 }); 26 workers.push(worker); 27} 28 29// Wait for all workers 30Promise.all( 31 workers.map(w => new Promise(resolve => w.on('message', resolve))) 32).then(() => { 33 console.log('Sum:', sharedArray.reduce((a, b) => a + b, 0)); 34}); 35 36// shared-worker.js 37const { workerData, parentPort } = require('worker_threads'); 38 39const { buffer, start, end } = workerData; 40const array = new Int32Array(buffer); 41 42// Process assigned chunk 43for (let i = start; i < end; i++) { 44 array[i] = array[i] * 2; 45} 46 47parentPort.postMessage('done');

Atomics for Synchronization#

1// Using Atomics for thread-safe operations 2const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); 3 4if (isMainThread) { 5 const sharedBuffer = new SharedArrayBuffer(4); 6 const counter = new Int32Array(sharedBuffer); 7 8 // Create multiple workers 9 const workers = []; 10 for (let i = 0; i < 4; i++) { 11 const worker = new Worker(__filename, { 12 workerData: { buffer: sharedBuffer }, 13 }); 14 workers.push(worker); 15 } 16 17 // Wait for all workers 18 Promise.all( 19 workers.map(w => new Promise(resolve => w.on('exit', resolve))) 20 ).then(() => { 21 console.log('Final count:', counter[0]); // Should be 4000000 22 }); 23} else { 24 const counter = new Int32Array(workerData.buffer); 25 26 // Atomically increment 1 million times 27 for (let i = 0; i < 1000000; i++) { 28 Atomics.add(counter, 0, 1); 29 } 30 31 process.exit(0); 32} 33 34// Wait and notify 35const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); 36 37if (isMainThread) { 38 const buffer = new SharedArrayBuffer(4); 39 const array = new Int32Array(buffer); 40 41 const worker = new Worker(__filename, { 42 workerData: { buffer }, 43 }); 44 45 setTimeout(() => { 46 array[0] = 42; 47 Atomics.notify(array, 0, 1); // Wake up one waiting thread 48 }, 1000); 49 50 worker.on('message', console.log); 51} else { 52 const array = new Int32Array(workerData.buffer); 53 54 // Wait until notified 55 Atomics.wait(array, 0, 0); // Wait while value is 0 56 57 parentPort.postMessage(`Value is: ${array[0]}`); 58}

Transferable Objects#

1// Transfer ArrayBuffer ownership 2const { Worker } = require('worker_threads'); 3 4const buffer = new ArrayBuffer(1024 * 1024); // 1MB 5 6const worker = new Worker('./worker.js'); 7 8// Transfer ownership (zero-copy) 9worker.postMessage(buffer, [buffer]); 10 11// buffer is now detached (length === 0) 12console.log(buffer.byteLength); // 0 13 14// worker.js 15const { parentPort } = require('worker_threads'); 16 17parentPort.on('message', (buffer) => { 18 const array = new Uint8Array(buffer); 19 // Process buffer... 20 21 // Transfer back 22 parentPort.postMessage(buffer, [buffer]); 23}); 24 25// MessageChannel for bidirectional communication 26const { Worker, MessageChannel } = require('worker_threads'); 27 28const { port1, port2 } = new MessageChannel(); 29 30const worker = new Worker('./channel-worker.js'); 31worker.postMessage({ port: port2 }, [port2]); 32 33port1.on('message', (msg) => { 34 console.log('Received:', msg); 35}); 36 37port1.postMessage('Hello from main!'); 38 39// channel-worker.js 40const { parentPort } = require('worker_threads'); 41 42parentPort.on('message', ({ port }) => { 43 port.on('message', (msg) => { 44 console.log('Worker received:', msg); 45 port.postMessage('Hello from worker!'); 46 }); 47});

CPU-Intensive Task Examples#

1// Image processing 2const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); 3 4if (isMainThread) { 5 async function processImage(imageData) { 6 return new Promise((resolve, reject) => { 7 const worker = new Worker(__filename, { 8 workerData: imageData, 9 }); 10 worker.on('message', resolve); 11 worker.on('error', reject); 12 }); 13 } 14 15 // Usage 16 const imageBuffer = loadImage('photo.jpg'); 17 const processed = await processImage(imageBuffer); 18} else { 19 const imageData = workerData; 20 21 // Apply filter 22 const result = applyGrayscaleFilter(imageData); 23 24 parentPort.postMessage(result); 25} 26 27// Cryptographic operations 28const crypto = require('crypto'); 29 30// Hash computation in worker 31if (isMainThread) { 32 const pool = new WorkerPool('./hash-worker.js', 4); 33 34 async function hashPasswords(passwords) { 35 return Promise.all( 36 passwords.map(pwd => pool.execute({ password: pwd })) 37 ); 38 } 39} else { 40 parentPort.on('message', ({ password }) => { 41 const hash = crypto.pbkdf2Sync( 42 password, 43 'salt', 44 100000, 45 64, 46 'sha512' 47 ); 48 parentPort.postMessage(hash.toString('hex')); 49 }); 50} 51 52// JSON parsing for large files 53const { Worker } = require('worker_threads'); 54const fs = require('fs'); 55 56async function parselargeJSON(filePath) { 57 return new Promise((resolve, reject) => { 58 const worker = new Worker(` 59 const { workerData, parentPort } = require('worker_threads'); 60 const fs = require('fs'); 61 62 const data = fs.readFileSync(workerData, 'utf8'); 63 const parsed = JSON.parse(data); 64 65 parentPort.postMessage(parsed); 66 `, { 67 eval: true, 68 workerData: filePath, 69 }); 70 71 worker.on('message', resolve); 72 worker.on('error', reject); 73 }); 74}

Error Handling#

1const { Worker } = require('worker_threads'); 2 3function createRobustWorker(workerPath, options = {}) { 4 return new Promise((resolve, reject) => { 5 const worker = new Worker(workerPath, options); 6 7 const timeout = setTimeout(() => { 8 worker.terminate(); 9 reject(new Error('Worker timed out')); 10 }, options.timeout || 30000); 11 12 worker.on('message', (result) => { 13 clearTimeout(timeout); 14 resolve(result); 15 }); 16 17 worker.on('error', (error) => { 18 clearTimeout(timeout); 19 reject(error); 20 }); 21 22 worker.on('exit', (code) => { 23 clearTimeout(timeout); 24 if (code !== 0) { 25 reject(new Error(`Worker exited with code ${code}`)); 26 } 27 }); 28 }); 29} 30 31// Graceful shutdown 32class ManagedWorkerPool { 33 constructor(workerPath, size) { 34 this.pool = new WorkerPool(workerPath, size); 35 this.isShuttingDown = false; 36 37 process.on('SIGTERM', () => this.shutdown()); 38 process.on('SIGINT', () => this.shutdown()); 39 } 40 41 async execute(data) { 42 if (this.isShuttingDown) { 43 throw new Error('Pool is shutting down'); 44 } 45 return this.pool.execute(data); 46 } 47 48 async shutdown() { 49 this.isShuttingDown = true; 50 await this.pool.shutdown(); 51 process.exit(0); 52 } 53}

Best Practices#

When to Use: ✓ CPU-intensive computations ✓ Image/video processing ✓ Large data parsing ✓ Cryptographic operations Design: ✓ Use worker pools for reuse ✓ Transfer large data, don't copy ✓ Use SharedArrayBuffer carefully ✓ Handle errors and timeouts Performance: ✓ Limit worker count to CPU cores ✓ Balance work distribution ✓ Minimize message passing ✓ Use Atomics for synchronization Avoid: ✗ I/O-bound tasks in workers ✗ Creating workers for small tasks ✗ Sharing complex objects ✗ Race conditions without Atomics

Conclusion#

Worker threads enable true parallelism for CPU-intensive tasks in Node.js. Use them for computations that would block the event loop. Implement worker pools for efficiency, SharedArrayBuffer for shared memory, and Atomics for synchronization. Remember that workers are best for CPU-bound work, not I/O operations.

Share this article

Help spread the word about Bootspring