Message queues decouple services, enable async processing, and improve system resilience. Here's a deep dive into popular options and when to use each.
Why Message Queues?#
Synchronous (without queue):
Client → Service A → Service B → Service C → Response
↓ failure = entire request fails
Asynchronous (with queue):
Client → Service A → Queue → Response (immediate)
↓
Service B processes later
Service C processes later
Benefits:
- Faster response times
- Fault tolerance
- Load leveling
- Temporal decoupling
Queue Comparison#
Feature RabbitMQ Redis Streams AWS SQS
─────────────────────────────────────────────────────────
Protocol AMQP Redis Protocol HTTP
Ordering Per-queue Per-stream FIFO option
Persistence Yes Yes (AOF/RDB) Yes
Max message No limit 512MB 256KB
Consumers Many Consumer groups Many
Complexity Medium Low Low
Managed CloudAMQP Redis Cloud AWS native
RabbitMQ#
Basic Publish/Subscribe#
1import amqp from 'amqplib';
2
3// Publisher
4async function publish() {
5 const connection = await amqp.connect('amqp://localhost');
6 const channel = await connection.createChannel();
7
8 const queue = 'tasks';
9 await channel.assertQueue(queue, { durable: true });
10
11 const message = { taskId: '123', type: 'process_image' };
12 channel.sendToQueue(
13 queue,
14 Buffer.from(JSON.stringify(message)),
15 { persistent: true }
16 );
17
18 console.log('Message sent:', message);
19
20 await channel.close();
21 await connection.close();
22}
23
24// Consumer
25async function consume() {
26 const connection = await amqp.connect('amqp://localhost');
27 const channel = await connection.createChannel();
28
29 const queue = 'tasks';
30 await channel.assertQueue(queue, { durable: true });
31 await channel.prefetch(1); // Process one at a time
32
33 console.log('Waiting for messages...');
34
35 channel.consume(queue, async (msg) => {
36 if (!msg) return;
37
38 const task = JSON.parse(msg.content.toString());
39 console.log('Processing:', task);
40
41 try {
42 await processTask(task);
43 channel.ack(msg);
44 } catch (error) {
45 // Requeue on failure
46 channel.nack(msg, false, true);
47 }
48 });
49}Exchanges and Routing#
1// Direct exchange - route by exact key match
2async function setupDirectExchange() {
3 const channel = await connection.createChannel();
4
5 await channel.assertExchange('notifications', 'direct', { durable: true });
6
7 // Bind queues to specific routing keys
8 await channel.assertQueue('email-queue');
9 await channel.bindQueue('email-queue', 'notifications', 'email');
10
11 await channel.assertQueue('sms-queue');
12 await channel.bindQueue('sms-queue', 'notifications', 'sms');
13
14 // Publish with routing key
15 channel.publish(
16 'notifications',
17 'email', // routing key
18 Buffer.from(JSON.stringify({ to: 'user@example.com', subject: 'Hello' }))
19 );
20}
21
22// Topic exchange - pattern matching
23async function setupTopicExchange() {
24 const channel = await connection.createChannel();
25
26 await channel.assertExchange('events', 'topic', { durable: true });
27
28 // Bind with patterns
29 await channel.assertQueue('order-events');
30 await channel.bindQueue('order-events', 'events', 'order.*');
31
32 await channel.assertQueue('all-created');
33 await channel.bindQueue('all-created', 'events', '*.created');
34
35 // Publish
36 channel.publish('events', 'order.created', Buffer.from('...')); // Both queues
37 channel.publish('events', 'user.created', Buffer.from('...')); // all-created only
38}
39
40// Fanout exchange - broadcast to all
41async function setupFanoutExchange() {
42 const channel = await connection.createChannel();
43
44 await channel.assertExchange('broadcasts', 'fanout', { durable: true });
45
46 // All bound queues receive all messages
47 await channel.assertQueue('service-a');
48 await channel.bindQueue('service-a', 'broadcasts', '');
49
50 await channel.assertQueue('service-b');
51 await channel.bindQueue('service-b', 'broadcasts', '');
52
53 channel.publish('broadcasts', '', Buffer.from('...')); // Both receive
54}Dead Letter Queues#
1async function setupDeadLetterQueue() {
2 const channel = await connection.createChannel();
3
4 // Dead letter exchange
5 await channel.assertExchange('dlx', 'direct', { durable: true });
6 await channel.assertQueue('dead-letters', { durable: true });
7 await channel.bindQueue('dead-letters', 'dlx', 'failed');
8
9 // Main queue with DLX
10 await channel.assertQueue('tasks', {
11 durable: true,
12 deadLetterExchange: 'dlx',
13 deadLetterRoutingKey: 'failed',
14 messageTtl: 300000, // 5 minute timeout
15 });
16}Redis Streams#
Basic Operations#
1import Redis from 'ioredis';
2
3const redis = new Redis();
4
5// Add to stream
6async function addToStream() {
7 const id = await redis.xadd(
8 'orders',
9 '*', // Auto-generate ID
10 'orderId', '123',
11 'status', 'pending',
12 'amount', '99.99'
13 );
14 console.log('Added with ID:', id);
15}
16
17// Read from stream
18async function readStream() {
19 const messages = await redis.xread(
20 'COUNT', 10,
21 'BLOCK', 5000, // Block for 5 seconds
22 'STREAMS', 'orders', '0' // From beginning
23 );
24
25 for (const [stream, entries] of messages || []) {
26 for (const [id, fields] of entries) {
27 console.log(`ID: ${id}`, Object.fromEntries(pairs(fields)));
28 }
29 }
30}
31
32// Helper to convert flat array to pairs
33function pairs(arr: string[]): [string, string][] {
34 const result: [string, string][] = [];
35 for (let i = 0; i < arr.length; i += 2) {
36 result.push([arr[i], arr[i + 1]]);
37 }
38 return result;
39}Consumer Groups#
1// Create consumer group
2await redis.xgroup('CREATE', 'orders', 'order-processors', '0', 'MKSTREAM');
3
4// Consumer reads from group
5async function consumeGroup(consumerName: string) {
6 while (true) {
7 const messages = await redis.xreadgroup(
8 'GROUP', 'order-processors', consumerName,
9 'COUNT', 1,
10 'BLOCK', 5000,
11 'STREAMS', 'orders', '>' // Only new messages
12 );
13
14 if (!messages) continue;
15
16 for (const [stream, entries] of messages) {
17 for (const [id, fields] of entries) {
18 try {
19 await processOrder(Object.fromEntries(pairs(fields)));
20 await redis.xack('orders', 'order-processors', id);
21 } catch (error) {
22 console.error('Failed to process:', id);
23 // Will be redelivered
24 }
25 }
26 }
27 }
28}
29
30// Check pending messages
31async function checkPending() {
32 const pending = await redis.xpending('orders', 'order-processors');
33 console.log('Pending messages:', pending);
34
35 // Claim old pending messages
36 const claimed = await redis.xclaim(
37 'orders', 'order-processors', 'worker-2',
38 60000, // Min idle time
39 '1234-0' // Message ID
40 );
41}AWS SQS#
Standard Queue#
1import { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
2
3const sqs = new SQSClient({ region: 'us-east-1' });
4const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue';
5
6// Send message
7async function sendMessage(data: object) {
8 await sqs.send(new SendMessageCommand({
9 QueueUrl: queueUrl,
10 MessageBody: JSON.stringify(data),
11 MessageAttributes: {
12 Type: {
13 DataType: 'String',
14 StringValue: 'OrderCreated',
15 },
16 },
17 }));
18}
19
20// Receive and process
21async function processMessages() {
22 const response = await sqs.send(new ReceiveMessageCommand({
23 QueueUrl: queueUrl,
24 MaxNumberOfMessages: 10,
25 WaitTimeSeconds: 20, // Long polling
26 MessageAttributeNames: ['All'],
27 }));
28
29 for (const message of response.Messages || []) {
30 try {
31 const data = JSON.parse(message.Body!);
32 await handleMessage(data);
33
34 // Delete after successful processing
35 await sqs.send(new DeleteMessageCommand({
36 QueueUrl: queueUrl,
37 ReceiptHandle: message.ReceiptHandle!,
38 }));
39 } catch (error) {
40 console.error('Failed to process:', message.MessageId);
41 // Message will return to queue after visibility timeout
42 }
43 }
44}FIFO Queue#
1// FIFO queue URL ends with .fifo
2const fifoQueueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue.fifo';
3
4async function sendFifoMessage(data: object, groupId: string) {
5 await sqs.send(new SendMessageCommand({
6 QueueUrl: fifoQueueUrl,
7 MessageBody: JSON.stringify(data),
8 MessageGroupId: groupId, // Required for FIFO
9 MessageDeduplicationId: crypto.randomUUID(), // Or content-based
10 }));
11}
12
13// Messages with same group ID processed in order
14await sendFifoMessage({ action: 'create', userId: '123' }, 'user-123');
15await sendFifoMessage({ action: 'update', userId: '123' }, 'user-123');
16await sendFifoMessage({ action: 'delete', userId: '123' }, 'user-123');Dead Letter Queue#
1// Configure DLQ in queue policy
2const queuePolicy = {
3 deadLetterTargetArn: 'arn:aws:sqs:us-east-1:123456789:my-dlq',
4 maxReceiveCount: 3, // After 3 failures, move to DLQ
5};
6
7// Process DLQ
8async function processDLQ() {
9 const response = await sqs.send(new ReceiveMessageCommand({
10 QueueUrl: dlqUrl,
11 MaxNumberOfMessages: 10,
12 }));
13
14 for (const message of response.Messages || []) {
15 // Log for investigation
16 console.error('Failed message:', {
17 id: message.MessageId,
18 body: message.Body,
19 attributes: message.Attributes,
20 });
21
22 // Either fix and reprocess or delete
23 }
24}Patterns#
Competing Consumers#
1// Multiple consumers process from same queue
2// Work is distributed automatically
3
4async function startConsumers(count: number) {
5 for (let i = 0; i < count; i++) {
6 startConsumer(`worker-${i}`);
7 }
8}
9
10// Each message processed by exactly one consumerPublish-Subscribe#
1// RabbitMQ fanout or topic exchange
2// Redis pub/sub
3// SNS + SQS fan-out
4
5// Each message goes to all subscribers
6async function setupPubSub() {
7 // Publisher
8 await redis.publish('events', JSON.stringify({ type: 'order.created' }));
9
10 // Subscribers (each gets all messages)
11 await redis.subscribe('events');
12 redis.on('message', (channel, message) => {
13 console.log(`Received on ${channel}:`, message);
14 });
15}Request-Reply#
1// Synchronous-style communication over async queue
2async function requestReply(request: object): Promise<object> {
3 const correlationId = crypto.randomUUID();
4 const replyQueue = `reply-${correlationId}`;
5
6 // Create temporary reply queue
7 await channel.assertQueue(replyQueue, { exclusive: true, autoDelete: true });
8
9 // Send request
10 channel.sendToQueue('requests', Buffer.from(JSON.stringify(request)), {
11 correlationId,
12 replyTo: replyQueue,
13 });
14
15 // Wait for reply
16 return new Promise((resolve, reject) => {
17 const timeout = setTimeout(() => reject(new Error('Timeout')), 30000);
18
19 channel.consume(replyQueue, (msg) => {
20 if (msg?.properties.correlationId === correlationId) {
21 clearTimeout(timeout);
22 resolve(JSON.parse(msg.content.toString()));
23 }
24 });
25 });
26}Choosing a Queue#
Use RabbitMQ when:
- Need complex routing (topic, headers)
- AMQP protocol required
- Message acknowledgment critical
- Multiple consumers per message type
Use Redis when:
- Already using Redis
- Simple queue needs
- Speed is critical
- Stream processing
Use SQS when:
- AWS infrastructure
- Managed service preferred
- Simple pub/sub
- Scale without management
Conclusion#
Message queues are fundamental to distributed systems. Start with the simplest option that meets your needs—often Redis for simple cases, RabbitMQ for complex routing, or SQS for AWS-native applications.
Focus on reliability: use persistent messages, dead letter queues, and idempotent consumers.