Message queues decouple services and enable asynchronous communication. Here are the essential patterns for building reliable distributed systems.
Message Queue Concepts#
Queue: FIFO storage for messages
Exchange: Routes messages to queues
Binding: Rules connecting exchanges to queues
Consumer: Processes messages from queue
Producer: Sends messages to exchange
Patterns:
- Work Queue: Distribute tasks among workers
- Pub/Sub: Broadcast to all subscribers
- Routing: Send to specific queues
- Topics: Pattern-based routing
- RPC: Request/reply over queue
Work Queue Pattern#
1import { Queue, Worker } from 'bullmq';
2import Redis from 'ioredis';
3
4const redis = new Redis(process.env.REDIS_URL);
5
6// Producer - Add jobs to queue
7const emailQueue = new Queue('emails', { connection: redis });
8
9async function queueEmail(to: string, subject: string, body: string): Promise<string> {
10 const job = await emailQueue.add(
11 'send-email',
12 { to, subject, body },
13 {
14 attempts: 3,
15 backoff: {
16 type: 'exponential',
17 delay: 1000,
18 },
19 removeOnComplete: 100,
20 removeOnFail: 1000,
21 }
22 );
23
24 return job.id!;
25}
26
27// Consumer - Process jobs
28const emailWorker = new Worker(
29 'emails',
30 async (job) => {
31 const { to, subject, body } = job.data;
32
33 console.log(`Processing email to ${to}`);
34
35 await sendEmail(to, subject, body);
36
37 return { sent: true, timestamp: new Date() };
38 },
39 {
40 connection: redis,
41 concurrency: 5,
42 limiter: {
43 max: 100,
44 duration: 60000, // 100 per minute
45 },
46 }
47);
48
49emailWorker.on('completed', (job, result) => {
50 console.log(`Email ${job.id} sent successfully`);
51});
52
53emailWorker.on('failed', (job, error) => {
54 console.error(`Email ${job?.id} failed:`, error.message);
55});Pub/Sub Pattern#
1import Redis from 'ioredis';
2
3// Publisher
4class EventPublisher {
5 constructor(private redis: Redis) {}
6
7 async publish(channel: string, event: any): Promise<void> {
8 const message = JSON.stringify({
9 id: crypto.randomUUID(),
10 timestamp: new Date().toISOString(),
11 data: event,
12 });
13
14 await this.redis.publish(channel, message);
15 }
16}
17
18// Subscriber
19class EventSubscriber {
20 private subscriber: Redis;
21 private handlers: Map<string, ((event: any) => Promise<void>)[]> = new Map();
22
23 constructor(redisUrl: string) {
24 this.subscriber = new Redis(redisUrl);
25
26 this.subscriber.on('message', async (channel, message) => {
27 const event = JSON.parse(message);
28 const handlers = this.handlers.get(channel) || [];
29
30 for (const handler of handlers) {
31 try {
32 await handler(event);
33 } catch (error) {
34 console.error(`Handler error for ${channel}:`, error);
35 }
36 }
37 });
38 }
39
40 subscribe(channel: string, handler: (event: any) => Promise<void>): void {
41 if (!this.handlers.has(channel)) {
42 this.handlers.set(channel, []);
43 this.subscriber.subscribe(channel);
44 }
45 this.handlers.get(channel)!.push(handler);
46 }
47}
48
49// Usage
50const publisher = new EventPublisher(redis);
51const subscriber = new EventSubscriber(process.env.REDIS_URL!);
52
53subscriber.subscribe('orders', async (event) => {
54 console.log('Order event:', event.data);
55 await processOrder(event.data);
56});
57
58subscriber.subscribe('orders', async (event) => {
59 await sendNotification(event.data);
60});
61
62await publisher.publish('orders', {
63 type: 'ORDER_CREATED',
64 orderId: '123',
65 customerId: 'cust_456',
66});Dead Letter Queue#
1// Main queue with dead letter handling
2const orderQueue = new Queue('orders', {
3 connection: redis,
4 defaultJobOptions: {
5 attempts: 3,
6 backoff: {
7 type: 'exponential',
8 delay: 1000,
9 },
10 },
11});
12
13// Dead letter queue
14const deadLetterQueue = new Queue('orders-dlq', { connection: redis });
15
16const orderWorker = new Worker(
17 'orders',
18 async (job) => {
19 const order = job.data;
20
21 try {
22 await processOrder(order);
23 } catch (error) {
24 // Check if should go to DLQ
25 if (job.attemptsMade >= job.opts.attempts!) {
26 await deadLetterQueue.add('failed-order', {
27 originalJob: job.data,
28 error: error.message,
29 failedAt: new Date(),
30 attempts: job.attemptsMade,
31 });
32 }
33 throw error;
34 }
35 },
36 { connection: redis }
37);
38
39// DLQ processor - manual review or retry
40const dlqWorker = new Worker(
41 'orders-dlq',
42 async (job) => {
43 const { originalJob, error } = job.data;
44
45 // Log for investigation
46 console.error('DLQ item:', {
47 order: originalJob,
48 error,
49 });
50
51 // Optionally notify support
52 await notifySupport(job.data);
53 },
54 { connection: redis }
55);
56
57// Retry from DLQ
58async function retryDeadLetter(jobId: string): Promise<void> {
59 const job = await deadLetterQueue.getJob(jobId);
60 if (!job) throw new Error('Job not found');
61
62 // Re-queue original job
63 await orderQueue.add('process-order', job.data.originalJob);
64
65 // Remove from DLQ
66 await job.remove();
67}Priority Queues#
1// Add jobs with priority
2await orderQueue.add(
3 'process-order',
4 { orderId: '123', type: 'express' },
5 { priority: 1 } // Lower number = higher priority
6);
7
8await orderQueue.add(
9 'process-order',
10 { orderId: '456', type: 'standard' },
11 { priority: 5 }
12);
13
14// Separate queues for different priorities
15const highPriorityQueue = new Queue('orders-high', { connection: redis });
16const normalPriorityQueue = new Queue('orders-normal', { connection: redis });
17
18// Process high priority first
19const highWorker = new Worker(
20 'orders-high',
21 processOrder,
22 { connection: redis, concurrency: 10 }
23);
24
25const normalWorker = new Worker(
26 'orders-normal',
27 processOrder,
28 { connection: redis, concurrency: 5 }
29);Delayed Messages#
1// Schedule job for later
2await emailQueue.add(
3 'send-reminder',
4 { userId: '123', type: 'cart-abandonment' },
5 {
6 delay: 24 * 60 * 60 * 1000, // 24 hours
7 }
8);
9
10// Recurring jobs
11await emailQueue.add(
12 'daily-digest',
13 { type: 'digest' },
14 {
15 repeat: {
16 pattern: '0 9 * * *', // Daily at 9 AM
17 },
18 }
19);
20
21// Job that runs once at specific time
22const futureDate = new Date('2024-12-25T10:00:00Z');
23await emailQueue.add(
24 'christmas-email',
25 { campaign: 'xmas-2024' },
26 {
27 delay: futureDate.getTime() - Date.now(),
28 }
29);Request/Reply Pattern#
1// RPC over message queue
2class RpcClient {
3 private responsePromises: Map<string, {
4 resolve: (value: any) => void;
5 reject: (error: Error) => void;
6 }> = new Map();
7
8 constructor(
9 private requestQueue: Queue,
10 private responseQueue: Queue
11 ) {
12 // Listen for responses
13 new Worker(
14 'rpc-responses',
15 async (job) => {
16 const { correlationId, result, error } = job.data;
17 const pending = this.responsePromises.get(correlationId);
18
19 if (pending) {
20 if (error) {
21 pending.reject(new Error(error));
22 } else {
23 pending.resolve(result);
24 }
25 this.responsePromises.delete(correlationId);
26 }
27 },
28 { connection: redis }
29 );
30 }
31
32 async call<T>(method: string, params: any, timeout = 30000): Promise<T> {
33 const correlationId = crypto.randomUUID();
34
35 const promise = new Promise<T>((resolve, reject) => {
36 this.responsePromises.set(correlationId, { resolve, reject });
37
38 // Timeout
39 setTimeout(() => {
40 if (this.responsePromises.has(correlationId)) {
41 this.responsePromises.delete(correlationId);
42 reject(new Error('RPC timeout'));
43 }
44 }, timeout);
45 });
46
47 await this.requestQueue.add('rpc-request', {
48 correlationId,
49 method,
50 params,
51 replyTo: 'rpc-responses',
52 });
53
54 return promise;
55 }
56}
57
58// RPC Server
59const rpcServer = new Worker(
60 'rpc-requests',
61 async (job) => {
62 const { correlationId, method, params, replyTo } = job.data;
63 const replyQueue = new Queue(replyTo, { connection: redis });
64
65 try {
66 const result = await executeMethod(method, params);
67 await replyQueue.add('response', { correlationId, result });
68 } catch (error) {
69 await replyQueue.add('response', {
70 correlationId,
71 error: error.message,
72 });
73 }
74 },
75 { connection: redis }
76);
77
78// Usage
79const client = new RpcClient(requestQueue, responseQueue);
80const result = await client.call('calculatePrice', { productId: '123' });Idempotency#
1// Ensure messages are processed exactly once
2class IdempotentWorker {
3 constructor(
4 private redis: Redis,
5 private ttl = 24 * 60 * 60 // 24 hours
6 ) {}
7
8 async processOnce<T>(
9 messageId: string,
10 processor: () => Promise<T>
11 ): Promise<T | null> {
12 const key = `processed:${messageId}`;
13
14 // Check if already processed
15 const existing = await this.redis.get(key);
16 if (existing) {
17 console.log(`Message ${messageId} already processed`);
18 return JSON.parse(existing);
19 }
20
21 // Process
22 const result = await processor();
23
24 // Mark as processed
25 await this.redis.setex(key, this.ttl, JSON.stringify(result));
26
27 return result;
28 }
29}
30
31const idempotentWorker = new IdempotentWorker(redis);
32
33const worker = new Worker(
34 'orders',
35 async (job) => {
36 return idempotentWorker.processOnce(job.id!, async () => {
37 return processOrder(job.data);
38 });
39 },
40 { connection: redis }
41);Monitoring#
1// Queue metrics
2async function getQueueMetrics(queue: Queue) {
3 const [waiting, active, completed, failed, delayed] = await Promise.all([
4 queue.getWaitingCount(),
5 queue.getActiveCount(),
6 queue.getCompletedCount(),
7 queue.getFailedCount(),
8 queue.getDelayedCount(),
9 ]);
10
11 return { waiting, active, completed, failed, delayed };
12}
13
14// Expose metrics endpoint
15app.get('/metrics/queues', async (req, res) => {
16 const metrics = {
17 orders: await getQueueMetrics(orderQueue),
18 emails: await getQueueMetrics(emailQueue),
19 };
20
21 res.json(metrics);
22});Best Practices#
Reliability:
✓ Use acknowledgments
✓ Implement dead letter queues
✓ Make handlers idempotent
✓ Set appropriate retries
Performance:
✓ Batch when possible
✓ Tune concurrency
✓ Monitor queue depth
✓ Scale consumers dynamically
Operations:
✓ Log job processing
✓ Alert on queue buildup
✓ Track processing time
✓ Plan for failures
Conclusion#
Message queues enable reliable, scalable distributed systems. Use work queues for task distribution, pub/sub for broadcasts, and dead letter queues for failure handling. Always ensure idempotency and monitor queue health.