Message queues enable asynchronous, decoupled communication. This guide covers implementing reliable messaging.
RabbitMQ Basics#
Publisher#
1import amqp from 'amqplib';
2
3const connection = await amqp.connect(process.env.RABBITMQ_URL);
4const channel = await connection.createChannel();
5
6// Ensure queue exists
7await channel.assertQueue('orders', { durable: true });
8
9// Publish message
10function publishOrder(order: Order) {
11 channel.sendToQueue(
12 'orders',
13 Buffer.from(JSON.stringify(order)),
14 {
15 persistent: true,
16 contentType: 'application/json',
17 messageId: order.id,
18 }
19 );
20}Consumer#
1await channel.assertQueue('orders', { durable: true });
2await channel.prefetch(10); // Process 10 at a time
3
4channel.consume('orders', async (msg) => {
5 if (!msg) return;
6
7 try {
8 const order = JSON.parse(msg.content.toString());
9 await processOrder(order);
10 channel.ack(msg);
11 } catch (error) {
12 // Requeue on failure (or send to DLQ)
13 channel.nack(msg, false, true);
14 }
15});Exchange Patterns#
1// Topic exchange for routing
2await channel.assertExchange('events', 'topic', { durable: true });
3
4// Publish with routing key
5channel.publish('events', 'order.created', Buffer.from(JSON.stringify(event)));
6channel.publish('events', 'order.shipped', Buffer.from(JSON.stringify(event)));
7
8// Subscribe to patterns
9await channel.bindQueue('notifications', 'events', 'order.*');
10await channel.bindQueue('analytics', 'events', '#'); // All eventsAWS SQS#
Standard Queue#
1import { SQS } from '@aws-sdk/client-sqs';
2
3const sqs = new SQS({ region: 'us-east-1' });
4
5// Send message
6await sqs.sendMessage({
7 QueueUrl: process.env.QUEUE_URL,
8 MessageBody: JSON.stringify(payload),
9 MessageAttributes: {
10 EventType: { DataType: 'String', StringValue: 'order.created' },
11 },
12});
13
14// Receive messages
15const response = await sqs.receiveMessage({
16 QueueUrl: process.env.QUEUE_URL,
17 MaxNumberOfMessages: 10,
18 WaitTimeSeconds: 20, // Long polling
19 MessageAttributeNames: ['All'],
20});
21
22for (const message of response.Messages || []) {
23 await processMessage(message);
24
25 // Delete after processing
26 await sqs.deleteMessage({
27 QueueUrl: process.env.QUEUE_URL,
28 ReceiptHandle: message.ReceiptHandle,
29 });
30}FIFO Queue#
1await sqs.sendMessage({
2 QueueUrl: process.env.FIFO_QUEUE_URL,
3 MessageBody: JSON.stringify(payload),
4 MessageGroupId: userId, // Orders from same user processed in order
5 MessageDeduplicationId: orderId, // Prevent duplicates
6});Dead Letter Queues#
1// RabbitMQ DLQ
2await channel.assertQueue('orders', {
3 durable: true,
4 deadLetterExchange: 'dlx',
5 deadLetterRoutingKey: 'orders-dlq',
6});
7
8// Reject to DLQ after retries
9if (retryCount >= 3) {
10 channel.nack(msg, false, false); // Don't requeue
11} else {
12 channel.nack(msg, false, true); // Requeue
13}Message Patterns#
Request-Reply#
1// Request
2const correlationId = crypto.randomUUID();
3const replyQueue = await channel.assertQueue('', { exclusive: true });
4
5channel.sendToQueue('rpc-queue', Buffer.from(request), {
6 correlationId,
7 replyTo: replyQueue.queue,
8});
9
10// Wait for reply
11const reply = await new Promise((resolve) => {
12 channel.consume(replyQueue.queue, (msg) => {
13 if (msg?.properties.correlationId === correlationId) {
14 resolve(JSON.parse(msg.content.toString()));
15 }
16 });
17});Pub/Sub with SNS + SQS#
1import { SNS } from '@aws-sdk/client-sns';
2
3const sns = new SNS({});
4
5// Publish to topic
6await sns.publish({
7 TopicArn: process.env.ORDERS_TOPIC_ARN,
8 Message: JSON.stringify(order),
9 MessageAttributes: {
10 eventType: { DataType: 'String', StringValue: 'order.created' },
11 },
12});
13
14// Multiple SQS queues subscribe to the topic
15// - notifications-queue
16// - analytics-queue
17// - inventory-queueBest Practices#
- Use persistent messages: Survive broker restarts
- Implement idempotency: Handle duplicate messages
- Set message TTL: Prevent queue buildup
- Use dead letter queues: Capture failed messages
- Monitor queue depth: Alert on growing backlogs
- Graceful shutdown: Finish processing before stopping
Choose RabbitMQ for complex routing, SQS for AWS-native simplicity.