Back to Blog
Message QueueArchitectureDistributed SystemsRabbitMQ

Message Queue Patterns for Distributed Systems

Build reliable distributed systems with message queues. From pub/sub to work queues to dead letter handling.

B
Bootspring Team
Engineering
February 5, 2023
7 min read

Message queues decouple services and enable asynchronous communication. Here are the essential patterns for building reliable distributed systems.

Message Queue Concepts#

Queue: FIFO storage for messages Exchange: Routes messages to queues Binding: Rules connecting exchanges to queues Consumer: Processes messages from queue Producer: Sends messages to exchange Patterns: - Work Queue: Distribute tasks among workers - Pub/Sub: Broadcast to all subscribers - Routing: Send to specific queues - Topics: Pattern-based routing - RPC: Request/reply over queue

Work Queue Pattern#

1import { Queue, Worker } from 'bullmq'; 2import Redis from 'ioredis'; 3 4const redis = new Redis(process.env.REDIS_URL); 5 6// Producer - Add jobs to queue 7const emailQueue = new Queue('emails', { connection: redis }); 8 9async function queueEmail(to: string, subject: string, body: string): Promise<string> { 10 const job = await emailQueue.add( 11 'send-email', 12 { to, subject, body }, 13 { 14 attempts: 3, 15 backoff: { 16 type: 'exponential', 17 delay: 1000, 18 }, 19 removeOnComplete: 100, 20 removeOnFail: 1000, 21 } 22 ); 23 24 return job.id!; 25} 26 27// Consumer - Process jobs 28const emailWorker = new Worker( 29 'emails', 30 async (job) => { 31 const { to, subject, body } = job.data; 32 33 console.log(`Processing email to ${to}`); 34 35 await sendEmail(to, subject, body); 36 37 return { sent: true, timestamp: new Date() }; 38 }, 39 { 40 connection: redis, 41 concurrency: 5, 42 limiter: { 43 max: 100, 44 duration: 60000, // 100 per minute 45 }, 46 } 47); 48 49emailWorker.on('completed', (job, result) => { 50 console.log(`Email ${job.id} sent successfully`); 51}); 52 53emailWorker.on('failed', (job, error) => { 54 console.error(`Email ${job?.id} failed:`, error.message); 55});

Pub/Sub Pattern#

1import Redis from 'ioredis'; 2 3// Publisher 4class EventPublisher { 5 constructor(private redis: Redis) {} 6 7 async publish(channel: string, event: any): Promise<void> { 8 const message = JSON.stringify({ 9 id: crypto.randomUUID(), 10 timestamp: new Date().toISOString(), 11 data: event, 12 }); 13 14 await this.redis.publish(channel, message); 15 } 16} 17 18// Subscriber 19class EventSubscriber { 20 private subscriber: Redis; 21 private handlers: Map<string, ((event: any) => Promise<void>)[]> = new Map(); 22 23 constructor(redisUrl: string) { 24 this.subscriber = new Redis(redisUrl); 25 26 this.subscriber.on('message', async (channel, message) => { 27 const event = JSON.parse(message); 28 const handlers = this.handlers.get(channel) || []; 29 30 for (const handler of handlers) { 31 try { 32 await handler(event); 33 } catch (error) { 34 console.error(`Handler error for ${channel}:`, error); 35 } 36 } 37 }); 38 } 39 40 subscribe(channel: string, handler: (event: any) => Promise<void>): void { 41 if (!this.handlers.has(channel)) { 42 this.handlers.set(channel, []); 43 this.subscriber.subscribe(channel); 44 } 45 this.handlers.get(channel)!.push(handler); 46 } 47} 48 49// Usage 50const publisher = new EventPublisher(redis); 51const subscriber = new EventSubscriber(process.env.REDIS_URL!); 52 53subscriber.subscribe('orders', async (event) => { 54 console.log('Order event:', event.data); 55 await processOrder(event.data); 56}); 57 58subscriber.subscribe('orders', async (event) => { 59 await sendNotification(event.data); 60}); 61 62await publisher.publish('orders', { 63 type: 'ORDER_CREATED', 64 orderId: '123', 65 customerId: 'cust_456', 66});

Dead Letter Queue#

1// Main queue with dead letter handling 2const orderQueue = new Queue('orders', { 3 connection: redis, 4 defaultJobOptions: { 5 attempts: 3, 6 backoff: { 7 type: 'exponential', 8 delay: 1000, 9 }, 10 }, 11}); 12 13// Dead letter queue 14const deadLetterQueue = new Queue('orders-dlq', { connection: redis }); 15 16const orderWorker = new Worker( 17 'orders', 18 async (job) => { 19 const order = job.data; 20 21 try { 22 await processOrder(order); 23 } catch (error) { 24 // Check if should go to DLQ 25 if (job.attemptsMade >= job.opts.attempts!) { 26 await deadLetterQueue.add('failed-order', { 27 originalJob: job.data, 28 error: error.message, 29 failedAt: new Date(), 30 attempts: job.attemptsMade, 31 }); 32 } 33 throw error; 34 } 35 }, 36 { connection: redis } 37); 38 39// DLQ processor - manual review or retry 40const dlqWorker = new Worker( 41 'orders-dlq', 42 async (job) => { 43 const { originalJob, error } = job.data; 44 45 // Log for investigation 46 console.error('DLQ item:', { 47 order: originalJob, 48 error, 49 }); 50 51 // Optionally notify support 52 await notifySupport(job.data); 53 }, 54 { connection: redis } 55); 56 57// Retry from DLQ 58async function retryDeadLetter(jobId: string): Promise<void> { 59 const job = await deadLetterQueue.getJob(jobId); 60 if (!job) throw new Error('Job not found'); 61 62 // Re-queue original job 63 await orderQueue.add('process-order', job.data.originalJob); 64 65 // Remove from DLQ 66 await job.remove(); 67}

Priority Queues#

1// Add jobs with priority 2await orderQueue.add( 3 'process-order', 4 { orderId: '123', type: 'express' }, 5 { priority: 1 } // Lower number = higher priority 6); 7 8await orderQueue.add( 9 'process-order', 10 { orderId: '456', type: 'standard' }, 11 { priority: 5 } 12); 13 14// Separate queues for different priorities 15const highPriorityQueue = new Queue('orders-high', { connection: redis }); 16const normalPriorityQueue = new Queue('orders-normal', { connection: redis }); 17 18// Process high priority first 19const highWorker = new Worker( 20 'orders-high', 21 processOrder, 22 { connection: redis, concurrency: 10 } 23); 24 25const normalWorker = new Worker( 26 'orders-normal', 27 processOrder, 28 { connection: redis, concurrency: 5 } 29);

Delayed Messages#

1// Schedule job for later 2await emailQueue.add( 3 'send-reminder', 4 { userId: '123', type: 'cart-abandonment' }, 5 { 6 delay: 24 * 60 * 60 * 1000, // 24 hours 7 } 8); 9 10// Recurring jobs 11await emailQueue.add( 12 'daily-digest', 13 { type: 'digest' }, 14 { 15 repeat: { 16 pattern: '0 9 * * *', // Daily at 9 AM 17 }, 18 } 19); 20 21// Job that runs once at specific time 22const futureDate = new Date('2024-12-25T10:00:00Z'); 23await emailQueue.add( 24 'christmas-email', 25 { campaign: 'xmas-2024' }, 26 { 27 delay: futureDate.getTime() - Date.now(), 28 } 29);

Request/Reply Pattern#

1// RPC over message queue 2class RpcClient { 3 private responsePromises: Map<string, { 4 resolve: (value: any) => void; 5 reject: (error: Error) => void; 6 }> = new Map(); 7 8 constructor( 9 private requestQueue: Queue, 10 private responseQueue: Queue 11 ) { 12 // Listen for responses 13 new Worker( 14 'rpc-responses', 15 async (job) => { 16 const { correlationId, result, error } = job.data; 17 const pending = this.responsePromises.get(correlationId); 18 19 if (pending) { 20 if (error) { 21 pending.reject(new Error(error)); 22 } else { 23 pending.resolve(result); 24 } 25 this.responsePromises.delete(correlationId); 26 } 27 }, 28 { connection: redis } 29 ); 30 } 31 32 async call<T>(method: string, params: any, timeout = 30000): Promise<T> { 33 const correlationId = crypto.randomUUID(); 34 35 const promise = new Promise<T>((resolve, reject) => { 36 this.responsePromises.set(correlationId, { resolve, reject }); 37 38 // Timeout 39 setTimeout(() => { 40 if (this.responsePromises.has(correlationId)) { 41 this.responsePromises.delete(correlationId); 42 reject(new Error('RPC timeout')); 43 } 44 }, timeout); 45 }); 46 47 await this.requestQueue.add('rpc-request', { 48 correlationId, 49 method, 50 params, 51 replyTo: 'rpc-responses', 52 }); 53 54 return promise; 55 } 56} 57 58// RPC Server 59const rpcServer = new Worker( 60 'rpc-requests', 61 async (job) => { 62 const { correlationId, method, params, replyTo } = job.data; 63 const replyQueue = new Queue(replyTo, { connection: redis }); 64 65 try { 66 const result = await executeMethod(method, params); 67 await replyQueue.add('response', { correlationId, result }); 68 } catch (error) { 69 await replyQueue.add('response', { 70 correlationId, 71 error: error.message, 72 }); 73 } 74 }, 75 { connection: redis } 76); 77 78// Usage 79const client = new RpcClient(requestQueue, responseQueue); 80const result = await client.call('calculatePrice', { productId: '123' });

Idempotency#

1// Ensure messages are processed exactly once 2class IdempotentWorker { 3 constructor( 4 private redis: Redis, 5 private ttl = 24 * 60 * 60 // 24 hours 6 ) {} 7 8 async processOnce<T>( 9 messageId: string, 10 processor: () => Promise<T> 11 ): Promise<T | null> { 12 const key = `processed:${messageId}`; 13 14 // Check if already processed 15 const existing = await this.redis.get(key); 16 if (existing) { 17 console.log(`Message ${messageId} already processed`); 18 return JSON.parse(existing); 19 } 20 21 // Process 22 const result = await processor(); 23 24 // Mark as processed 25 await this.redis.setex(key, this.ttl, JSON.stringify(result)); 26 27 return result; 28 } 29} 30 31const idempotentWorker = new IdempotentWorker(redis); 32 33const worker = new Worker( 34 'orders', 35 async (job) => { 36 return idempotentWorker.processOnce(job.id!, async () => { 37 return processOrder(job.data); 38 }); 39 }, 40 { connection: redis } 41);

Monitoring#

1// Queue metrics 2async function getQueueMetrics(queue: Queue) { 3 const [waiting, active, completed, failed, delayed] = await Promise.all([ 4 queue.getWaitingCount(), 5 queue.getActiveCount(), 6 queue.getCompletedCount(), 7 queue.getFailedCount(), 8 queue.getDelayedCount(), 9 ]); 10 11 return { waiting, active, completed, failed, delayed }; 12} 13 14// Expose metrics endpoint 15app.get('/metrics/queues', async (req, res) => { 16 const metrics = { 17 orders: await getQueueMetrics(orderQueue), 18 emails: await getQueueMetrics(emailQueue), 19 }; 20 21 res.json(metrics); 22});

Best Practices#

Reliability: ✓ Use acknowledgments ✓ Implement dead letter queues ✓ Make handlers idempotent ✓ Set appropriate retries Performance: ✓ Batch when possible ✓ Tune concurrency ✓ Monitor queue depth ✓ Scale consumers dynamically Operations: ✓ Log job processing ✓ Alert on queue buildup ✓ Track processing time ✓ Plan for failures

Conclusion#

Message queues enable reliable, scalable distributed systems. Use work queues for task distribution, pub/sub for broadcasts, and dead letter queues for failure handling. Always ensure idempotency and monitor queue health.

Share this article

Help spread the word about Bootspring