Background jobs handle work that's too slow or unreliable for synchronous requests. Email sending, image processing, report generation—these operations belong in a job queue. Here's how to build reliable background processing systems.
Why Background Jobs?#
Move Work Out of Requests#
Synchronous (slow, risky):
POST /orders
→ Charge payment (500ms)
→ Send confirmation email (200ms)
→ Generate invoice PDF (300ms)
→ Update analytics (100ms)
→ Return response (1100ms+)
Asynchronous (fast, resilient):
POST /orders
→ Charge payment (500ms)
→ Queue: SendConfirmationEmail
→ Queue: GenerateInvoice
→ Queue: UpdateAnalytics
→ Return response (500ms)
Job Queue Benefits#
✓ Faster response times
✓ Retry failed operations
✓ Rate limit external services
✓ Scale workers independently
✓ Survive service outages
✓ Schedule future work
Queue Technologies#
Redis-Based (BullMQ)#
1import { Queue, Worker } from 'bullmq';
2
3const connection = { host: 'localhost', port: 6379 };
4
5// Create queue
6const emailQueue = new Queue('email', { connection });
7
8// Add job
9await emailQueue.add('send-confirmation', {
10 to: 'user@example.com',
11 orderId: '123',
12});
13
14// Process jobs
15const worker = new Worker('email', async job => {
16 await sendEmail(job.data.to, job.data.orderId);
17}, { connection });PostgreSQL-Based (Graphile Worker)#
1import { run, quickAddJob } from 'graphile-worker';
2
3// Add job
4await quickAddJob(pool, 'send_email', {
5 to: 'user@example.com',
6 orderId: '123',
7});
8
9// Worker
10await run({
11 pgPool: pool,
12 taskList: {
13 send_email: async (payload, helpers) => {
14 await sendEmail(payload.to, payload.orderId);
15 },
16 },
17});Cloud Services#
1// AWS SQS + Lambda
2import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
3
4const sqs = new SQSClient({});
5
6await sqs.send(new SendMessageCommand({
7 QueueUrl: process.env.QUEUE_URL,
8 MessageBody: JSON.stringify({
9 type: 'send_email',
10 data: { to: 'user@example.com', orderId: '123' },
11 }),
12}));
13
14// Google Cloud Tasks
15import { CloudTasksClient } from '@google-cloud/tasks';
16
17const client = new CloudTasksClient();
18
19await client.createTask({
20 parent: queuePath,
21 task: {
22 httpRequest: {
23 url: 'https://api.example.com/tasks/send-email',
24 body: Buffer.from(JSON.stringify({ to: 'user@example.com' })).toString('base64'),
25 },
26 },
27});Job Design Patterns#
Idempotency#
1// Jobs may run multiple times - make them idempotent
2async function processPayment(job) {
3 const { orderId, idempotencyKey } = job.data;
4
5 // Check if already processed
6 const existing = await db.payments.findByIdempotencyKey(idempotencyKey);
7 if (existing) {
8 console.log(`Payment already processed: ${idempotencyKey}`);
9 return existing;
10 }
11
12 // Process payment
13 const result = await paymentService.charge(orderId);
14
15 // Store with idempotency key
16 await db.payments.create({
17 ...result,
18 idempotencyKey,
19 });
20
21 return result;
22}Job Chunking#
1// Break large jobs into smaller chunks
2async function processLargeExport(job) {
3 const { exportId, offset = 0, limit = 1000 } = job.data;
4
5 const records = await db.records.findMany({
6 skip: offset,
7 take: limit,
8 });
9
10 if (records.length === 0) {
11 // All done
12 await finalizeExport(exportId);
13 return;
14 }
15
16 // Process this chunk
17 await processRecords(exportId, records);
18
19 // Queue next chunk
20 await queue.add('process-export', {
21 exportId,
22 offset: offset + limit,
23 limit,
24 });
25}Job Dependencies#
1// Jobs that depend on other jobs
2const workflow = await queue.add('order-fulfillment', {
3 orderId: '123',
4});
5
6// Child jobs
7await queue.add('charge-payment', { orderId: '123' }, {
8 parent: { id: workflow.id, queue: 'order-fulfillment' },
9});
10
11await queue.add('reserve-inventory', { orderId: '123' }, {
12 parent: { id: workflow.id, queue: 'order-fulfillment' },
13});
14
15// Only runs after dependencies complete
16await queue.add('ship-order', { orderId: '123' }, {
17 parent: { id: workflow.id, queue: 'order-fulfillment' },
18 waitChildrenKey: 'all',
19});Error Handling#
Retry Strategies#
1const queue = new Queue('email', {
2 defaultJobOptions: {
3 attempts: 5,
4 backoff: {
5 type: 'exponential',
6 delay: 1000, // 1s, 2s, 4s, 8s, 16s
7 },
8 },
9});
10
11// Per-job retry config
12await queue.add('send-email', data, {
13 attempts: 3,
14 backoff: {
15 type: 'fixed',
16 delay: 5000,
17 },
18});Dead Letter Queue#
1const worker = new Worker('email', async job => {
2 await sendEmail(job.data);
3}, {
4 connection,
5});
6
7worker.on('failed', async (job, err) => {
8 if (job.attemptsMade >= job.opts.attempts) {
9 // Move to dead letter queue for investigation
10 await deadLetterQueue.add('failed-email', {
11 originalJob: job.data,
12 error: err.message,
13 failedAt: new Date(),
14 });
15 }
16});Graceful Error Handling#
1async function processJob(job) {
2 try {
3 await doWork(job.data);
4 } catch (error) {
5 if (isRetryable(error)) {
6 // Let it retry
7 throw error;
8 }
9
10 if (isUserError(error)) {
11 // Don't retry, log and continue
12 await logUserError(job, error);
13 return;
14 }
15
16 // Unexpected error - alert and retry
17 await alertOncall(job, error);
18 throw error;
19 }
20}
21
22function isRetryable(error) {
23 return error.code === 'ECONNREFUSED' ||
24 error.code === 'ETIMEDOUT' ||
25 error.status === 503;
26}Scheduling#
Cron Jobs#
1// BullMQ repeatable jobs
2await queue.add('daily-report', {}, {
3 repeat: {
4 pattern: '0 9 * * *', // 9 AM daily
5 },
6});
7
8await queue.add('cleanup', {}, {
9 repeat: {
10 every: 3600000, // Every hour
11 },
12});Delayed Jobs#
1// Send reminder 24 hours after signup
2await queue.add('send-reminder', { userId }, {
3 delay: 24 * 60 * 60 * 1000,
4});
5
6// Schedule for specific time
7await queue.add('scheduled-post', { postId }, {
8 delay: targetDate.getTime() - Date.now(),
9});Concurrency and Rate Limiting#
Worker Concurrency#
1const worker = new Worker('heavy-processing', async job => {
2 await processImage(job.data);
3}, {
4 connection,
5 concurrency: 5, // Process 5 jobs simultaneously
6});Rate Limiting#
1const queue = new Queue('external-api', {
2 limiter: {
3 max: 10, // 10 jobs
4 duration: 1000, // per second
5 },
6});
7
8// Or per-job rate limit
9const worker = new Worker('api-calls', async job => {
10 await rateLimiter.schedule(() => callApi(job.data));
11}, { connection });Priority Queues#
// Higher priority jobs processed first
await queue.add('urgent-email', data, { priority: 1 });
await queue.add('normal-email', data, { priority: 5 });
await queue.add('bulk-email', data, { priority: 10 });Monitoring#
Job Metrics#
1const worker = new Worker('email', processor, { connection });
2
3worker.on('completed', (job) => {
4 metrics.increment('jobs.completed', { queue: 'email' });
5 metrics.timing('jobs.duration', Date.now() - job.timestamp, { queue: 'email' });
6});
7
8worker.on('failed', (job, err) => {
9 metrics.increment('jobs.failed', { queue: 'email', error: err.name });
10});
11
12worker.on('stalled', (jobId) => {
13 metrics.increment('jobs.stalled', { queue: 'email' });
14 alertOncall(`Stalled job: ${jobId}`);
15});Queue Health#
1// Check queue depth
2const waiting = await queue.getWaitingCount();
3const active = await queue.getActiveCount();
4const delayed = await queue.getDelayedCount();
5
6if (waiting > 10000) {
7 alert('Queue backing up');
8}
9
10// Track queue latency
11const oldestWaiting = await queue.getJobs(['waiting'], 0, 0);
12if (oldestWaiting.length) {
13 const age = Date.now() - oldestWaiting[0].timestamp;
14 metrics.gauge('queue.oldest_waiting', age);
15}Dashboard#
1// Bull Board for visualization
2import { createBullBoard } from '@bull-board/api';
3import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
4import { ExpressAdapter } from '@bull-board/express';
5
6const serverAdapter = new ExpressAdapter();
7
8createBullBoard({
9 queues: [
10 new BullMQAdapter(emailQueue),
11 new BullMQAdapter(processingQueue),
12 ],
13 serverAdapter,
14});
15
16app.use('/admin/queues', serverAdapter.getRouter());Testing#
1describe('Email Worker', () => {
2 let queue: Queue;
3
4 beforeEach(async () => {
5 queue = new Queue('test-email', { connection });
6 });
7
8 afterEach(async () => {
9 await queue.obliterate({ force: true });
10 });
11
12 it('sends email on job completion', async () => {
13 const sendEmail = jest.fn().mockResolvedValue(true);
14 const worker = new Worker('test-email', async (job) => {
15 await sendEmail(job.data);
16 }, { connection });
17
18 await queue.add('send', { to: 'test@example.com' });
19
20 await new Promise(resolve => worker.on('completed', resolve));
21
22 expect(sendEmail).toHaveBeenCalledWith({ to: 'test@example.com' });
23
24 await worker.close();
25 });
26});Conclusion#
Background jobs are essential for scalable, resilient applications. Choose the right queue technology for your needs, design jobs to be idempotent and retriable, and invest in monitoring and alerting.
Start simple—a basic queue with retries handles most use cases. Add complexity (priorities, dependencies, rate limiting) only when needed. The goal is reliable execution, not clever architecture.