Back to Blog
Background JobsQueuesNode.jsRedis

Background Job Processing in Node.js

Handle long-running tasks reliably. From job queues to workers to monitoring and error handling.

B
Bootspring Team
Engineering
November 5, 2023
5 min read

Some tasks don't belong in request handlers—sending emails, processing images, generating reports. Background jobs handle these asynchronously, improving response times and reliability.

When to Use Background Jobs#

Good candidates: - Sending emails/notifications - Processing uploads (images, videos) - Generating reports - Syncing with external services - Batch data processing - Scheduled tasks (cron) - Webhooks delivery

BullMQ Setup#

1import { Queue, Worker, Job } from 'bullmq'; 2import Redis from 'ioredis'; 3 4const connection = new Redis({ 5 host: process.env.REDIS_HOST, 6 port: parseInt(process.env.REDIS_PORT || '6379'), 7 maxRetriesPerRequest: null, 8}); 9 10// Create queue 11const emailQueue = new Queue('emails', { connection }); 12const imageQueue = new Queue('image-processing', { connection }); 13 14// Add job 15async function sendWelcomeEmail(userId: string) { 16 await emailQueue.add( 17 'welcome-email', 18 { userId }, 19 { 20 attempts: 3, 21 backoff: { 22 type: 'exponential', 23 delay: 1000, 24 }, 25 removeOnComplete: 100, 26 removeOnFail: 1000, 27 } 28 ); 29} 30 31// Add delayed job 32async function sendReminderEmail(userId: string) { 33 await emailQueue.add( 34 'reminder-email', 35 { userId }, 36 { 37 delay: 24 * 60 * 60 * 1000, // 24 hours 38 } 39 ); 40} 41 42// Add scheduled job (cron) 43async function setupDailyReport() { 44 await emailQueue.add( 45 'daily-report', 46 {}, 47 { 48 repeat: { 49 pattern: '0 9 * * *', // 9 AM daily 50 }, 51 } 52 ); 53}

Worker Implementation#

1// Email worker 2const emailWorker = new Worker( 3 'emails', 4 async (job: Job) => { 5 const { userId } = job.data; 6 7 switch (job.name) { 8 case 'welcome-email': 9 await sendWelcomeEmailToUser(userId); 10 break; 11 case 'reminder-email': 12 await sendReminderEmailToUser(userId); 13 break; 14 case 'daily-report': 15 await generateAndSendDailyReport(); 16 break; 17 default: 18 throw new Error(`Unknown job type: ${job.name}`); 19 } 20 21 return { success: true }; 22 }, 23 { 24 connection, 25 concurrency: 5, 26 limiter: { 27 max: 100, 28 duration: 60000, // 100 jobs per minute 29 }, 30 } 31); 32 33// Event handlers 34emailWorker.on('completed', (job) => { 35 logger.info('Job completed', { 36 jobId: job.id, 37 name: job.name, 38 duration: job.finishedOn! - job.processedOn!, 39 }); 40}); 41 42emailWorker.on('failed', (job, error) => { 43 logger.error('Job failed', { 44 jobId: job?.id, 45 name: job?.name, 46 error: error.message, 47 attempts: job?.attemptsMade, 48 }); 49}); 50 51emailWorker.on('error', (error) => { 52 logger.error('Worker error', { error: error.message }); 53});

Job Patterns#

Fan-Out#

1// Process multiple items in parallel 2async function processOrder(orderId: string) { 3 const order = await prisma.order.findUnique({ 4 where: { id: orderId }, 5 include: { items: true }, 6 }); 7 8 // Create child jobs for each item 9 const jobs = order.items.map((item) => ({ 10 name: 'process-item', 11 data: { orderId, itemId: item.id }, 12 })); 13 14 await orderQueue.addBulk(jobs); 15}

Job Chaining#

1// Sequential processing 2const pipelineQueue = new Queue('pipeline', { connection }); 3 4async function startImagePipeline(imageId: string) { 5 await pipelineQueue.add('download', { imageId }); 6} 7 8const pipelineWorker = new Worker( 9 'pipeline', 10 async (job) => { 11 const { imageId } = job.data; 12 13 switch (job.name) { 14 case 'download': 15 const image = await downloadImage(imageId); 16 await pipelineQueue.add('resize', { imageId, path: image.path }); 17 break; 18 19 case 'resize': 20 const resized = await resizeImage(job.data.path); 21 await pipelineQueue.add('upload', { imageId, path: resized.path }); 22 break; 23 24 case 'upload': 25 const url = await uploadToS3(job.data.path); 26 await prisma.image.update({ 27 where: { id: imageId }, 28 data: { url, status: 'processed' }, 29 }); 30 break; 31 } 32 }, 33 { connection } 34);

Job Dependencies (Flow)#

1import { FlowProducer } from 'bullmq'; 2 3const flowProducer = new FlowProducer({ connection }); 4 5async function createOrderFlow(orderId: string) { 6 await flowProducer.add({ 7 name: 'complete-order', 8 queueName: 'orders', 9 data: { orderId }, 10 children: [ 11 { 12 name: 'send-confirmation', 13 queueName: 'emails', 14 data: { orderId }, 15 }, 16 { 17 name: 'update-inventory', 18 queueName: 'inventory', 19 data: { orderId }, 20 }, 21 { 22 name: 'notify-shipping', 23 queueName: 'shipping', 24 data: { orderId }, 25 }, 26 ], 27 }); 28} 29 30// Parent job runs after all children complete

Progress Tracking#

1// Update progress from worker 2const worker = new Worker( 3 'reports', 4 async (job) => { 5 const { items } = job.data; 6 let processed = 0; 7 8 for (const item of items) { 9 await processItem(item); 10 processed++; 11 await job.updateProgress(Math.round((processed / items.length) * 100)); 12 } 13 14 return { processed }; 15 }, 16 { connection } 17); 18 19// Check progress from API 20app.get('/api/jobs/:id/progress', async (req, res) => { 21 const job = await reportsQueue.getJob(req.params.id); 22 23 if (!job) { 24 return res.status(404).json({ error: 'Job not found' }); 25 } 26 27 res.json({ 28 id: job.id, 29 state: await job.getState(), 30 progress: job.progress, 31 data: job.returnvalue, 32 }); 33});

Error Handling#

1const worker = new Worker( 2 'critical-jobs', 3 async (job) => { 4 try { 5 await processJob(job.data); 6 } catch (error) { 7 // Determine if retryable 8 if (error instanceof TemporaryError) { 9 throw error; // Will retry 10 } 11 12 // Permanent failure - don't retry 13 if (error instanceof ValidationError) { 14 await job.moveToFailed(error, job.token!); 15 return; 16 } 17 18 // Log and rethrow 19 logger.error('Job failed', { 20 jobId: job.id, 21 error: error.message, 22 }); 23 throw error; 24 } 25 }, 26 { 27 connection, 28 settings: { 29 backoffStrategy: (attemptsMade: number) => { 30 // Custom backoff: 1s, 5s, 25s, 2m, 10m 31 return Math.min(Math.pow(5, attemptsMade) * 1000, 600000); 32 }, 33 }, 34 } 35); 36 37// Dead letter queue for failed jobs 38const dlqWorker = new Worker( 39 'emails', 40 async (job) => { 41 // Process... 42 }, 43 { connection } 44); 45 46dlqWorker.on('failed', async (job, error) => { 47 if (job.attemptsMade >= job.opts.attempts!) { 48 // Move to dead letter queue 49 await deadLetterQueue.add('failed-email', { 50 originalJob: job.data, 51 error: error.message, 52 attempts: job.attemptsMade, 53 }); 54 } 55});

Monitoring Dashboard#

1import { createBullBoard } from '@bull-board/api'; 2import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; 3import { ExpressAdapter } from '@bull-board/express'; 4 5const serverAdapter = new ExpressAdapter(); 6serverAdapter.setBasePath('/admin/queues'); 7 8createBullBoard({ 9 queues: [ 10 new BullMQAdapter(emailQueue), 11 new BullMQAdapter(imageQueue), 12 new BullMQAdapter(reportsQueue), 13 ], 14 serverAdapter, 15}); 16 17app.use('/admin/queues', requireAdmin, serverAdapter.getRouter());

Best Practices#

DO: ✓ Make jobs idempotent ✓ Store minimal data in job payload ✓ Set appropriate timeouts ✓ Monitor queue depths ✓ Handle job failures gracefully ✓ Use separate queues for priorities DON'T: ✗ Store large payloads in jobs ✗ Rely on job order ✗ Ignore failed jobs ✗ Run workers without monitoring ✗ Use infinite retries

Conclusion#

Background jobs improve user experience and system reliability. Use queues for anything that can be deferred, implement proper error handling, and monitor your queues.

The key is making jobs idempotent—they should be safe to run multiple times.

Share this article

Help spread the word about Bootspring