Back to Blog
Node.jsWorker ThreadsPerformanceConcurrency

Node.js Worker Threads for CPU Tasks

Use Worker Threads for CPU-intensive operations. From basic usage to pools to practical patterns.

B
Bootspring Team
Engineering
June 21, 2021
7 min read

Worker Threads enable parallel JavaScript execution. Here's how to use them for CPU-intensive work.

Basic Worker#

1// main.js 2const { Worker } = require('worker_threads'); 3 4const worker = new Worker('./worker.js', { 5 workerData: { numbers: [1, 2, 3, 4, 5] }, 6}); 7 8worker.on('message', (result) => { 9 console.log('Result:', result); 10}); 11 12worker.on('error', (error) => { 13 console.error('Worker error:', error); 14}); 15 16worker.on('exit', (code) => { 17 if (code !== 0) { 18 console.error(`Worker stopped with exit code ${code}`); 19 } 20}); 21 22// worker.js 23const { parentPort, workerData } = require('worker_threads'); 24 25function heavyComputation(numbers) { 26 return numbers.reduce((sum, n) => sum + n * n, 0); 27} 28 29const result = heavyComputation(workerData.numbers); 30parentPort.postMessage(result);

Promise Wrapper#

1// worker-wrapper.ts 2import { Worker } from 'worker_threads'; 3 4interface WorkerOptions<T> { 5 workerPath: string; 6 workerData: T; 7 timeout?: number; 8} 9 10export function runWorker<T, R>({ 11 workerPath, 12 workerData, 13 timeout = 30000, 14}: WorkerOptions<T>): Promise<R> { 15 return new Promise((resolve, reject) => { 16 const worker = new Worker(workerPath, { workerData }); 17 18 const timer = setTimeout(() => { 19 worker.terminate(); 20 reject(new Error('Worker timeout')); 21 }, timeout); 22 23 worker.on('message', (result: R) => { 24 clearTimeout(timer); 25 resolve(result); 26 }); 27 28 worker.on('error', (error) => { 29 clearTimeout(timer); 30 reject(error); 31 }); 32 33 worker.on('exit', (code) => { 34 clearTimeout(timer); 35 if (code !== 0) { 36 reject(new Error(`Worker exited with code ${code}`)); 37 } 38 }); 39 }); 40} 41 42// Usage 43const result = await runWorker<{ data: number[] }, number>({ 44 workerPath: './compute-worker.js', 45 workerData: { data: [1, 2, 3, 4, 5] }, 46 timeout: 10000, 47});

Worker Pool#

1// worker-pool.ts 2import { Worker } from 'worker_threads'; 3import os from 'os'; 4 5interface Task<T, R> { 6 data: T; 7 resolve: (result: R) => void; 8 reject: (error: Error) => void; 9} 10 11class WorkerPool<T, R> { 12 private workers: Worker[] = []; 13 private freeWorkers: Worker[] = []; 14 private taskQueue: Task<T, R>[] = []; 15 private workerPath: string; 16 17 constructor(workerPath: string, poolSize = os.cpus().length) { 18 this.workerPath = workerPath; 19 20 for (let i = 0; i < poolSize; i++) { 21 this.addNewWorker(); 22 } 23 } 24 25 private addNewWorker(): void { 26 const worker = new Worker(this.workerPath); 27 28 worker.on('message', (result: R) => { 29 const task = (worker as any).currentTask as Task<T, R>; 30 task.resolve(result); 31 this.freeWorkers.push(worker); 32 this.processQueue(); 33 }); 34 35 worker.on('error', (error) => { 36 const task = (worker as any).currentTask as Task<T, R>; 37 if (task) { 38 task.reject(error); 39 } 40 // Replace failed worker 41 this.workers = this.workers.filter(w => w !== worker); 42 this.addNewWorker(); 43 }); 44 45 this.workers.push(worker); 46 this.freeWorkers.push(worker); 47 } 48 49 private processQueue(): void { 50 if (this.taskQueue.length === 0 || this.freeWorkers.length === 0) { 51 return; 52 } 53 54 const worker = this.freeWorkers.pop()!; 55 const task = this.taskQueue.shift()!; 56 57 (worker as any).currentTask = task; 58 worker.postMessage(task.data); 59 } 60 61 run(data: T): Promise<R> { 62 return new Promise((resolve, reject) => { 63 this.taskQueue.push({ data, resolve, reject }); 64 this.processQueue(); 65 }); 66 } 67 68 async close(): Promise<void> { 69 await Promise.all(this.workers.map(w => w.terminate())); 70 } 71} 72 73// Usage 74const pool = new WorkerPool<number[], number>('./sum-worker.js', 4); 75 76const results = await Promise.all([ 77 pool.run([1, 2, 3]), 78 pool.run([4, 5, 6]), 79 pool.run([7, 8, 9]), 80 pool.run([10, 11, 12]), 81]); 82 83await pool.close();

Inline Worker#

1import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'; 2 3function runInlineWorker<T, R>(fn: (data: T) => R, data: T): Promise<R> { 4 return new Promise((resolve, reject) => { 5 const workerCode = ` 6 const { parentPort, workerData } = require('worker_threads'); 7 const fn = ${fn.toString()}; 8 const result = fn(workerData); 9 parentPort.postMessage(result); 10 `; 11 12 const worker = new Worker(workerCode, { 13 eval: true, 14 workerData: data, 15 }); 16 17 worker.on('message', resolve); 18 worker.on('error', reject); 19 }); 20} 21 22// Usage 23const result = await runInlineWorker( 24 (nums: number[]) => nums.reduce((a, b) => a + b, 0), 25 [1, 2, 3, 4, 5] 26);

Shared Memory#

1// main.ts 2import { Worker } from 'worker_threads'; 3 4// Create shared buffer 5const sharedBuffer = new SharedArrayBuffer(4 * 1024); // 4KB 6const sharedArray = new Int32Array(sharedBuffer); 7 8// Initialize data 9for (let i = 0; i < 1000; i++) { 10 sharedArray[i] = i; 11} 12 13const worker = new Worker('./shared-worker.js', { 14 workerData: { sharedBuffer }, 15}); 16 17worker.on('message', () => { 18 console.log('First 10 values:', sharedArray.slice(0, 10)); 19}); 20 21// shared-worker.js 22const { parentPort, workerData } = require('worker_threads'); 23 24const sharedArray = new Int32Array(workerData.sharedBuffer); 25 26// Modify shared memory 27for (let i = 0; i < sharedArray.length; i++) { 28 sharedArray[i] = sharedArray[i] * 2; 29} 30 31parentPort.postMessage('done');

Atomics for Synchronization#

1// Using Atomics for thread-safe operations 2import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'; 3 4if (isMainThread) { 5 const sharedBuffer = new SharedArrayBuffer(4); 6 const sharedArray = new Int32Array(sharedBuffer); 7 8 // Start multiple workers 9 const workers = Array.from({ length: 4 }, () => 10 new Worker(__filename, { workerData: { sharedBuffer } }) 11 ); 12 13 Promise.all( 14 workers.map(w => new Promise(resolve => w.on('exit', resolve))) 15 ).then(() => { 16 console.log('Final count:', sharedArray[0]); 17 // Should be 4000 (4 workers * 1000 increments) 18 }); 19} else { 20 const sharedArray = new Int32Array(workerData.sharedBuffer); 21 22 // Atomic increment 23 for (let i = 0; i < 1000; i++) { 24 Atomics.add(sharedArray, 0, 1); 25 } 26 27 process.exit(0); 28} 29 30// Using Atomics.wait and Atomics.notify 31const sharedBuffer = new SharedArrayBuffer(4); 32const sharedArray = new Int32Array(sharedBuffer); 33 34// Worker waits 35Atomics.wait(sharedArray, 0, 0); // Wait while value is 0 36 37// Main thread notifies 38Atomics.store(sharedArray, 0, 1); 39Atomics.notify(sharedArray, 0, 1); // Wake one waiting worker

Message Channels#

1import { Worker, MessageChannel } from 'worker_threads'; 2 3// Create channel for direct worker-to-worker communication 4const channel = new MessageChannel(); 5 6const worker1 = new Worker('./worker1.js', { 7 workerData: { port: channel.port1 }, 8 transferList: [channel.port1], 9}); 10 11const worker2 = new Worker('./worker2.js', { 12 workerData: { port: channel.port2 }, 13 transferList: [channel.port2], 14}); 15 16// worker1.js 17const { workerData } = require('worker_threads'); 18const { port } = workerData; 19 20port.on('message', (msg) => { 21 console.log('Worker 1 received:', msg); 22}); 23 24port.postMessage('Hello from Worker 1'); 25 26// worker2.js 27const { workerData } = require('worker_threads'); 28const { port } = workerData; 29 30port.on('message', (msg) => { 31 console.log('Worker 2 received:', msg); 32 port.postMessage('Hello back from Worker 2'); 33});

Practical Example: Image Processing#

1// image-processor.ts 2import { Worker } from 'worker_threads'; 3import path from 'path'; 4 5interface ProcessOptions { 6 width: number; 7 height: number; 8 quality: number; 9} 10 11async function processImage( 12 imagePath: string, 13 options: ProcessOptions 14): Promise<Buffer> { 15 return new Promise((resolve, reject) => { 16 const worker = new Worker( 17 path.join(__dirname, 'image-worker.js'), 18 { 19 workerData: { imagePath, options }, 20 } 21 ); 22 23 worker.on('message', resolve); 24 worker.on('error', reject); 25 }); 26} 27 28// image-worker.js 29const { parentPort, workerData } = require('worker_threads'); 30const sharp = require('sharp'); 31 32async function process() { 33 const { imagePath, options } = workerData; 34 35 const result = await sharp(imagePath) 36 .resize(options.width, options.height) 37 .jpeg({ quality: options.quality }) 38 .toBuffer(); 39 40 parentPort.postMessage(result); 41} 42 43process();

Express Integration#

1import express from 'express'; 2import { Worker } from 'worker_threads'; 3import path from 'path'; 4 5const app = express(); 6 7// Worker pool for CPU tasks 8const pool = new WorkerPool( 9 path.join(__dirname, 'compute-worker.js'), 10 4 11); 12 13app.post('/compute', async (req, res) => { 14 try { 15 const result = await pool.run(req.body.data); 16 res.json({ result }); 17 } catch (error) { 18 res.status(500).json({ error: error.message }); 19 } 20}); 21 22// Don't block event loop 23app.get('/heavy', async (req, res) => { 24 const result = await runWorker({ 25 workerPath: './heavy-worker.js', 26 workerData: req.query, 27 }); 28 29 res.json(result); 30});

Best Practices#

When to Use: ✓ CPU-intensive computations ✓ Image/video processing ✓ Cryptographic operations ✓ Complex parsing/serialization When NOT to Use: ✗ I/O bound operations ✗ Simple tasks (overhead > benefit) ✗ Operations needing main thread access ✗ Tasks requiring DOM access Patterns: ✓ Use worker pools for reuse ✓ Transfer large data with transferList ✓ Use SharedArrayBuffer for shared state ✓ Implement proper error handling

Conclusion#

Worker Threads enable true parallelism for CPU-intensive tasks. Use worker pools for efficiency, SharedArrayBuffer for shared state, and Atomics for synchronization. Reserve workers for computationally heavy operations where the parallelism benefit outweighs the overhead.

Share this article

Help spread the word about Bootspring