Some tasks don't belong in request handlers—sending emails, processing images, generating reports. Background jobs handle these asynchronously, improving response times and reliability.
When to Use Background Jobs#
Good candidates:
- Sending emails/notifications
- Processing uploads (images, videos)
- Generating reports
- Syncing with external services
- Batch data processing
- Scheduled tasks (cron)
- Webhooks delivery
BullMQ Setup#
1import { Queue, Worker, Job } from 'bullmq';
2import Redis from 'ioredis';
3
4const connection = new Redis({
5 host: process.env.REDIS_HOST,
6 port: parseInt(process.env.REDIS_PORT || '6379'),
7 maxRetriesPerRequest: null,
8});
9
10// Create queue
11const emailQueue = new Queue('emails', { connection });
12const imageQueue = new Queue('image-processing', { connection });
13
14// Add job
15async function sendWelcomeEmail(userId: string) {
16 await emailQueue.add(
17 'welcome-email',
18 { userId },
19 {
20 attempts: 3,
21 backoff: {
22 type: 'exponential',
23 delay: 1000,
24 },
25 removeOnComplete: 100,
26 removeOnFail: 1000,
27 }
28 );
29}
30
31// Add delayed job
32async function sendReminderEmail(userId: string) {
33 await emailQueue.add(
34 'reminder-email',
35 { userId },
36 {
37 delay: 24 * 60 * 60 * 1000, // 24 hours
38 }
39 );
40}
41
42// Add scheduled job (cron)
43async function setupDailyReport() {
44 await emailQueue.add(
45 'daily-report',
46 {},
47 {
48 repeat: {
49 pattern: '0 9 * * *', // 9 AM daily
50 },
51 }
52 );
53}Worker Implementation#
1// Email worker
2const emailWorker = new Worker(
3 'emails',
4 async (job: Job) => {
5 const { userId } = job.data;
6
7 switch (job.name) {
8 case 'welcome-email':
9 await sendWelcomeEmailToUser(userId);
10 break;
11 case 'reminder-email':
12 await sendReminderEmailToUser(userId);
13 break;
14 case 'daily-report':
15 await generateAndSendDailyReport();
16 break;
17 default:
18 throw new Error(`Unknown job type: ${job.name}`);
19 }
20
21 return { success: true };
22 },
23 {
24 connection,
25 concurrency: 5,
26 limiter: {
27 max: 100,
28 duration: 60000, // 100 jobs per minute
29 },
30 }
31);
32
33// Event handlers
34emailWorker.on('completed', (job) => {
35 logger.info('Job completed', {
36 jobId: job.id,
37 name: job.name,
38 duration: job.finishedOn! - job.processedOn!,
39 });
40});
41
42emailWorker.on('failed', (job, error) => {
43 logger.error('Job failed', {
44 jobId: job?.id,
45 name: job?.name,
46 error: error.message,
47 attempts: job?.attemptsMade,
48 });
49});
50
51emailWorker.on('error', (error) => {
52 logger.error('Worker error', { error: error.message });
53});Job Patterns#
Fan-Out#
1// Process multiple items in parallel
2async function processOrder(orderId: string) {
3 const order = await prisma.order.findUnique({
4 where: { id: orderId },
5 include: { items: true },
6 });
7
8 // Create child jobs for each item
9 const jobs = order.items.map((item) => ({
10 name: 'process-item',
11 data: { orderId, itemId: item.id },
12 }));
13
14 await orderQueue.addBulk(jobs);
15}Job Chaining#
1// Sequential processing
2const pipelineQueue = new Queue('pipeline', { connection });
3
4async function startImagePipeline(imageId: string) {
5 await pipelineQueue.add('download', { imageId });
6}
7
8const pipelineWorker = new Worker(
9 'pipeline',
10 async (job) => {
11 const { imageId } = job.data;
12
13 switch (job.name) {
14 case 'download':
15 const image = await downloadImage(imageId);
16 await pipelineQueue.add('resize', { imageId, path: image.path });
17 break;
18
19 case 'resize':
20 const resized = await resizeImage(job.data.path);
21 await pipelineQueue.add('upload', { imageId, path: resized.path });
22 break;
23
24 case 'upload':
25 const url = await uploadToS3(job.data.path);
26 await prisma.image.update({
27 where: { id: imageId },
28 data: { url, status: 'processed' },
29 });
30 break;
31 }
32 },
33 { connection }
34);Job Dependencies (Flow)#
1import { FlowProducer } from 'bullmq';
2
3const flowProducer = new FlowProducer({ connection });
4
5async function createOrderFlow(orderId: string) {
6 await flowProducer.add({
7 name: 'complete-order',
8 queueName: 'orders',
9 data: { orderId },
10 children: [
11 {
12 name: 'send-confirmation',
13 queueName: 'emails',
14 data: { orderId },
15 },
16 {
17 name: 'update-inventory',
18 queueName: 'inventory',
19 data: { orderId },
20 },
21 {
22 name: 'notify-shipping',
23 queueName: 'shipping',
24 data: { orderId },
25 },
26 ],
27 });
28}
29
30// Parent job runs after all children completeProgress Tracking#
1// Update progress from worker
2const worker = new Worker(
3 'reports',
4 async (job) => {
5 const { items } = job.data;
6 let processed = 0;
7
8 for (const item of items) {
9 await processItem(item);
10 processed++;
11 await job.updateProgress(Math.round((processed / items.length) * 100));
12 }
13
14 return { processed };
15 },
16 { connection }
17);
18
19// Check progress from API
20app.get('/api/jobs/:id/progress', async (req, res) => {
21 const job = await reportsQueue.getJob(req.params.id);
22
23 if (!job) {
24 return res.status(404).json({ error: 'Job not found' });
25 }
26
27 res.json({
28 id: job.id,
29 state: await job.getState(),
30 progress: job.progress,
31 data: job.returnvalue,
32 });
33});Error Handling#
1const worker = new Worker(
2 'critical-jobs',
3 async (job) => {
4 try {
5 await processJob(job.data);
6 } catch (error) {
7 // Determine if retryable
8 if (error instanceof TemporaryError) {
9 throw error; // Will retry
10 }
11
12 // Permanent failure - don't retry
13 if (error instanceof ValidationError) {
14 await job.moveToFailed(error, job.token!);
15 return;
16 }
17
18 // Log and rethrow
19 logger.error('Job failed', {
20 jobId: job.id,
21 error: error.message,
22 });
23 throw error;
24 }
25 },
26 {
27 connection,
28 settings: {
29 backoffStrategy: (attemptsMade: number) => {
30 // Custom backoff: 1s, 5s, 25s, 2m, 10m
31 return Math.min(Math.pow(5, attemptsMade) * 1000, 600000);
32 },
33 },
34 }
35);
36
37// Dead letter queue for failed jobs
38const dlqWorker = new Worker(
39 'emails',
40 async (job) => {
41 // Process...
42 },
43 { connection }
44);
45
46dlqWorker.on('failed', async (job, error) => {
47 if (job.attemptsMade >= job.opts.attempts!) {
48 // Move to dead letter queue
49 await deadLetterQueue.add('failed-email', {
50 originalJob: job.data,
51 error: error.message,
52 attempts: job.attemptsMade,
53 });
54 }
55});Monitoring Dashboard#
1import { createBullBoard } from '@bull-board/api';
2import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
3import { ExpressAdapter } from '@bull-board/express';
4
5const serverAdapter = new ExpressAdapter();
6serverAdapter.setBasePath('/admin/queues');
7
8createBullBoard({
9 queues: [
10 new BullMQAdapter(emailQueue),
11 new BullMQAdapter(imageQueue),
12 new BullMQAdapter(reportsQueue),
13 ],
14 serverAdapter,
15});
16
17app.use('/admin/queues', requireAdmin, serverAdapter.getRouter());Best Practices#
DO:
✓ Make jobs idempotent
✓ Store minimal data in job payload
✓ Set appropriate timeouts
✓ Monitor queue depths
✓ Handle job failures gracefully
✓ Use separate queues for priorities
DON'T:
✗ Store large payloads in jobs
✗ Rely on job order
✗ Ignore failed jobs
✗ Run workers without monitoring
✗ Use infinite retries
Conclusion#
Background jobs improve user experience and system reliability. Use queues for anything that can be deferred, implement proper error handling, and monitor your queues.
The key is making jobs idempotent—they should be safe to run multiple times.