Back to Blog
Message QueuesRabbitMQRedisArchitecture

Message Queues Deep Dive: RabbitMQ, Redis, and SQS

Choose and implement the right message queue for your needs. Compare RabbitMQ, Redis, and SQS with practical examples.

B
Bootspring Team
Engineering
January 25, 2025
7 min read

Message queues decouple services, enable async processing, and improve system resilience. Here's a deep dive into popular options and when to use each.

Why Message Queues?#

Synchronous (without queue): Client → Service A → Service B → Service C → Response ↓ failure = entire request fails Asynchronous (with queue): Client → Service A → Queue → Response (immediate) ↓ Service B processes later Service C processes later Benefits: - Faster response times - Fault tolerance - Load leveling - Temporal decoupling

Queue Comparison#

Feature RabbitMQ Redis Streams AWS SQS ───────────────────────────────────────────────────────── Protocol AMQP Redis Protocol HTTP Ordering Per-queue Per-stream FIFO option Persistence Yes Yes (AOF/RDB) Yes Max message No limit 512MB 256KB Consumers Many Consumer groups Many Complexity Medium Low Low Managed CloudAMQP Redis Cloud AWS native

RabbitMQ#

Basic Publish/Subscribe#

1import amqp from 'amqplib'; 2 3// Publisher 4async function publish() { 5 const connection = await amqp.connect('amqp://localhost'); 6 const channel = await connection.createChannel(); 7 8 const queue = 'tasks'; 9 await channel.assertQueue(queue, { durable: true }); 10 11 const message = { taskId: '123', type: 'process_image' }; 12 channel.sendToQueue( 13 queue, 14 Buffer.from(JSON.stringify(message)), 15 { persistent: true } 16 ); 17 18 console.log('Message sent:', message); 19 20 await channel.close(); 21 await connection.close(); 22} 23 24// Consumer 25async function consume() { 26 const connection = await amqp.connect('amqp://localhost'); 27 const channel = await connection.createChannel(); 28 29 const queue = 'tasks'; 30 await channel.assertQueue(queue, { durable: true }); 31 await channel.prefetch(1); // Process one at a time 32 33 console.log('Waiting for messages...'); 34 35 channel.consume(queue, async (msg) => { 36 if (!msg) return; 37 38 const task = JSON.parse(msg.content.toString()); 39 console.log('Processing:', task); 40 41 try { 42 await processTask(task); 43 channel.ack(msg); 44 } catch (error) { 45 // Requeue on failure 46 channel.nack(msg, false, true); 47 } 48 }); 49}

Exchanges and Routing#

1// Direct exchange - route by exact key match 2async function setupDirectExchange() { 3 const channel = await connection.createChannel(); 4 5 await channel.assertExchange('notifications', 'direct', { durable: true }); 6 7 // Bind queues to specific routing keys 8 await channel.assertQueue('email-queue'); 9 await channel.bindQueue('email-queue', 'notifications', 'email'); 10 11 await channel.assertQueue('sms-queue'); 12 await channel.bindQueue('sms-queue', 'notifications', 'sms'); 13 14 // Publish with routing key 15 channel.publish( 16 'notifications', 17 'email', // routing key 18 Buffer.from(JSON.stringify({ to: 'user@example.com', subject: 'Hello' })) 19 ); 20} 21 22// Topic exchange - pattern matching 23async function setupTopicExchange() { 24 const channel = await connection.createChannel(); 25 26 await channel.assertExchange('events', 'topic', { durable: true }); 27 28 // Bind with patterns 29 await channel.assertQueue('order-events'); 30 await channel.bindQueue('order-events', 'events', 'order.*'); 31 32 await channel.assertQueue('all-created'); 33 await channel.bindQueue('all-created', 'events', '*.created'); 34 35 // Publish 36 channel.publish('events', 'order.created', Buffer.from('...')); // Both queues 37 channel.publish('events', 'user.created', Buffer.from('...')); // all-created only 38} 39 40// Fanout exchange - broadcast to all 41async function setupFanoutExchange() { 42 const channel = await connection.createChannel(); 43 44 await channel.assertExchange('broadcasts', 'fanout', { durable: true }); 45 46 // All bound queues receive all messages 47 await channel.assertQueue('service-a'); 48 await channel.bindQueue('service-a', 'broadcasts', ''); 49 50 await channel.assertQueue('service-b'); 51 await channel.bindQueue('service-b', 'broadcasts', ''); 52 53 channel.publish('broadcasts', '', Buffer.from('...')); // Both receive 54}

Dead Letter Queues#

1async function setupDeadLetterQueue() { 2 const channel = await connection.createChannel(); 3 4 // Dead letter exchange 5 await channel.assertExchange('dlx', 'direct', { durable: true }); 6 await channel.assertQueue('dead-letters', { durable: true }); 7 await channel.bindQueue('dead-letters', 'dlx', 'failed'); 8 9 // Main queue with DLX 10 await channel.assertQueue('tasks', { 11 durable: true, 12 deadLetterExchange: 'dlx', 13 deadLetterRoutingKey: 'failed', 14 messageTtl: 300000, // 5 minute timeout 15 }); 16}

Redis Streams#

Basic Operations#

1import Redis from 'ioredis'; 2 3const redis = new Redis(); 4 5// Add to stream 6async function addToStream() { 7 const id = await redis.xadd( 8 'orders', 9 '*', // Auto-generate ID 10 'orderId', '123', 11 'status', 'pending', 12 'amount', '99.99' 13 ); 14 console.log('Added with ID:', id); 15} 16 17// Read from stream 18async function readStream() { 19 const messages = await redis.xread( 20 'COUNT', 10, 21 'BLOCK', 5000, // Block for 5 seconds 22 'STREAMS', 'orders', '0' // From beginning 23 ); 24 25 for (const [stream, entries] of messages || []) { 26 for (const [id, fields] of entries) { 27 console.log(`ID: ${id}`, Object.fromEntries(pairs(fields))); 28 } 29 } 30} 31 32// Helper to convert flat array to pairs 33function pairs(arr: string[]): [string, string][] { 34 const result: [string, string][] = []; 35 for (let i = 0; i < arr.length; i += 2) { 36 result.push([arr[i], arr[i + 1]]); 37 } 38 return result; 39}

Consumer Groups#

1// Create consumer group 2await redis.xgroup('CREATE', 'orders', 'order-processors', '0', 'MKSTREAM'); 3 4// Consumer reads from group 5async function consumeGroup(consumerName: string) { 6 while (true) { 7 const messages = await redis.xreadgroup( 8 'GROUP', 'order-processors', consumerName, 9 'COUNT', 1, 10 'BLOCK', 5000, 11 'STREAMS', 'orders', '>' // Only new messages 12 ); 13 14 if (!messages) continue; 15 16 for (const [stream, entries] of messages) { 17 for (const [id, fields] of entries) { 18 try { 19 await processOrder(Object.fromEntries(pairs(fields))); 20 await redis.xack('orders', 'order-processors', id); 21 } catch (error) { 22 console.error('Failed to process:', id); 23 // Will be redelivered 24 } 25 } 26 } 27 } 28} 29 30// Check pending messages 31async function checkPending() { 32 const pending = await redis.xpending('orders', 'order-processors'); 33 console.log('Pending messages:', pending); 34 35 // Claim old pending messages 36 const claimed = await redis.xclaim( 37 'orders', 'order-processors', 'worker-2', 38 60000, // Min idle time 39 '1234-0' // Message ID 40 ); 41}

AWS SQS#

Standard Queue#

1import { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs'; 2 3const sqs = new SQSClient({ region: 'us-east-1' }); 4const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'; 5 6// Send message 7async function sendMessage(data: object) { 8 await sqs.send(new SendMessageCommand({ 9 QueueUrl: queueUrl, 10 MessageBody: JSON.stringify(data), 11 MessageAttributes: { 12 Type: { 13 DataType: 'String', 14 StringValue: 'OrderCreated', 15 }, 16 }, 17 })); 18} 19 20// Receive and process 21async function processMessages() { 22 const response = await sqs.send(new ReceiveMessageCommand({ 23 QueueUrl: queueUrl, 24 MaxNumberOfMessages: 10, 25 WaitTimeSeconds: 20, // Long polling 26 MessageAttributeNames: ['All'], 27 })); 28 29 for (const message of response.Messages || []) { 30 try { 31 const data = JSON.parse(message.Body!); 32 await handleMessage(data); 33 34 // Delete after successful processing 35 await sqs.send(new DeleteMessageCommand({ 36 QueueUrl: queueUrl, 37 ReceiptHandle: message.ReceiptHandle!, 38 })); 39 } catch (error) { 40 console.error('Failed to process:', message.MessageId); 41 // Message will return to queue after visibility timeout 42 } 43 } 44}

FIFO Queue#

1// FIFO queue URL ends with .fifo 2const fifoQueueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue.fifo'; 3 4async function sendFifoMessage(data: object, groupId: string) { 5 await sqs.send(new SendMessageCommand({ 6 QueueUrl: fifoQueueUrl, 7 MessageBody: JSON.stringify(data), 8 MessageGroupId: groupId, // Required for FIFO 9 MessageDeduplicationId: crypto.randomUUID(), // Or content-based 10 })); 11} 12 13// Messages with same group ID processed in order 14await sendFifoMessage({ action: 'create', userId: '123' }, 'user-123'); 15await sendFifoMessage({ action: 'update', userId: '123' }, 'user-123'); 16await sendFifoMessage({ action: 'delete', userId: '123' }, 'user-123');

Dead Letter Queue#

1// Configure DLQ in queue policy 2const queuePolicy = { 3 deadLetterTargetArn: 'arn:aws:sqs:us-east-1:123456789:my-dlq', 4 maxReceiveCount: 3, // After 3 failures, move to DLQ 5}; 6 7// Process DLQ 8async function processDLQ() { 9 const response = await sqs.send(new ReceiveMessageCommand({ 10 QueueUrl: dlqUrl, 11 MaxNumberOfMessages: 10, 12 })); 13 14 for (const message of response.Messages || []) { 15 // Log for investigation 16 console.error('Failed message:', { 17 id: message.MessageId, 18 body: message.Body, 19 attributes: message.Attributes, 20 }); 21 22 // Either fix and reprocess or delete 23 } 24}

Patterns#

Competing Consumers#

1// Multiple consumers process from same queue 2// Work is distributed automatically 3 4async function startConsumers(count: number) { 5 for (let i = 0; i < count; i++) { 6 startConsumer(`worker-${i}`); 7 } 8} 9 10// Each message processed by exactly one consumer

Publish-Subscribe#

1// RabbitMQ fanout or topic exchange 2// Redis pub/sub 3// SNS + SQS fan-out 4 5// Each message goes to all subscribers 6async function setupPubSub() { 7 // Publisher 8 await redis.publish('events', JSON.stringify({ type: 'order.created' })); 9 10 // Subscribers (each gets all messages) 11 await redis.subscribe('events'); 12 redis.on('message', (channel, message) => { 13 console.log(`Received on ${channel}:`, message); 14 }); 15}

Request-Reply#

1// Synchronous-style communication over async queue 2async function requestReply(request: object): Promise<object> { 3 const correlationId = crypto.randomUUID(); 4 const replyQueue = `reply-${correlationId}`; 5 6 // Create temporary reply queue 7 await channel.assertQueue(replyQueue, { exclusive: true, autoDelete: true }); 8 9 // Send request 10 channel.sendToQueue('requests', Buffer.from(JSON.stringify(request)), { 11 correlationId, 12 replyTo: replyQueue, 13 }); 14 15 // Wait for reply 16 return new Promise((resolve, reject) => { 17 const timeout = setTimeout(() => reject(new Error('Timeout')), 30000); 18 19 channel.consume(replyQueue, (msg) => { 20 if (msg?.properties.correlationId === correlationId) { 21 clearTimeout(timeout); 22 resolve(JSON.parse(msg.content.toString())); 23 } 24 }); 25 }); 26}

Choosing a Queue#

Use RabbitMQ when: - Need complex routing (topic, headers) - AMQP protocol required - Message acknowledgment critical - Multiple consumers per message type Use Redis when: - Already using Redis - Simple queue needs - Speed is critical - Stream processing Use SQS when: - AWS infrastructure - Managed service preferred - Simple pub/sub - Scale without management

Conclusion#

Message queues are fundamental to distributed systems. Start with the simplest option that meets your needs—often Redis for simple cases, RabbitMQ for complex routing, or SQS for AWS-native applications.

Focus on reliability: use persistent messages, dead letter queues, and idempotent consumers.

Share this article

Help spread the word about Bootspring