Distributed Messaging Queue
Asynchronous communication patterns with message brokers
Message Queues
Message queues enable asynchronous communication between services by allowing producers to send messages that consumers process independently. This decoupling improves scalability, reliability, and flexibility.
Why Use Message Queues?
Decoupling
Services don't need to know about each other:
Without Queue:
Order Service → User Service → Email Service → Analytics
With Queue:
Order Service → Queue ← User Service
← Email Service
← AnalyticsLoad Leveling
Handle traffic spikes by buffering requests:
Traffic Spike: Normal Processing:
████████████ ████
████████████ → ████
████████████ ████
Queue WorkersReliability
Messages persist until successfully processed:
// Producer sends message
await queue.send({ orderId: '123', action: 'process' });
// Consumer processes with acknowledgment
const message = await queue.receive();
try {
await processOrder(message.data);
await message.ack(); // Confirm processing
} catch (error) {
await message.nack(); // Return to queue
}Message Queue Patterns
Point-to-Point (Queue)
One producer, one consumer per message.
Producer → Queue → Consumer
↓
Message goes to
ONE consumer onlyPublish-Subscribe (Topic)
One producer, multiple subscribers receive copies.
Publisher → Topic → Subscriber A
→ Subscriber B
→ Subscriber C
All subscribers receive
every messageFan-Out
Broadcast to multiple queues for different processing.
Order Created → Fan-out Exchange
├→ Inventory Queue → Inventory Service
├→ Email Queue → Email Service
└→ Analytics Queue → Analytics ServicePopular Message Queues
Apache Kafka
Distributed streaming platform designed for high throughput.
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
});
// Producer
const producer = kafka.producer();
await producer.send({
topic: 'orders',
messages: [
{ key: 'order-123', value: JSON.stringify({ amount: 100 }) },
],
});
// Consumer
const consumer = kafka.consumer({ groupId: 'order-processor' });
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Received: ${message.value}`);
},
});Best for:
- High throughput (millions of messages/sec)
- Event streaming and replay
- Log aggregation
- Real-time analytics
RabbitMQ
Traditional message broker with flexible routing.
import amqp from 'amqplib';
// Producer
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue('tasks');
channel.sendToQueue('tasks', Buffer.from(JSON.stringify({ task: 'process' })));
// Consumer
await channel.consume('tasks', (msg) => {
if (msg) {
const task = JSON.parse(msg.content.toString());
processTask(task);
channel.ack(msg);
}
});Best for:
- Complex routing requirements
- Request-reply patterns
- Traditional job queues
- Lower latency requirements
Amazon SQS
Fully managed queue service.
import { SQS } from '@aws-sdk/client-sqs';
const sqs = new SQS({ region: 'us-east-1' });
// Send message
await sqs.sendMessage({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
MessageBody: JSON.stringify({ orderId: '123' }),
});
// Receive message
const response = await sqs.receiveMessage({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
MaxNumberOfMessages: 10,
});Best for:
- AWS-native applications
- Simple queue needs
- Serverless architectures
Delivery Guarantees
At-Most-Once
Message may be lost, never duplicated.
Producer → Queue → Consumer
↓
If consumer crashes,
message is lostImplementation: No acknowledgment, fire-and-forget
At-Least-Once
Message will be delivered, may be duplicated.
Producer → Queue → Consumer (crash!)
↓
Message redelivered
to another consumerImplementation: Acknowledge after processing
const message = await queue.receive();
await processMessage(message); // May run twice
await message.ack();Exactly-Once
Message delivered exactly once—the holy grail.
Implementation: Requires idempotent processing or transactional guarantees
// Idempotent processing
async function processOrder(orderId: string, data: OrderData) {
// Check if already processed
const existing = await db.orders.findById(orderId);
if (existing) {
return existing; // Already processed, skip
}
// Process and store
return db.orders.create({ id: orderId, ...data });
}Kafka vs RabbitMQ
| Feature | Kafka | RabbitMQ |
|---|---|---|
| Model | Log-based | Queue-based |
| Throughput | Very High | High |
| Message Retention | Configurable (days) | Until consumed |
| Replay | Yes | No |
| Routing | Topics/Partitions | Exchanges/Bindings |
| Ordering | Per partition | Per queue |
| Protocol | Custom | AMQP |
Handling Failures
Dead Letter Queues (DLQ)
Messages that fail processing go to a separate queue.
async function processWithDLQ(message: Message) {
try {
await processMessage(message);
await message.ack();
} catch (error) {
if (message.retryCount >= 3) {
// Send to dead letter queue
await dlq.send(message);
await message.ack();
} else {
// Retry with backoff
await message.nack({ delay: Math.pow(2, message.retryCount) * 1000 });
}
}
}Idempotency Keys
Ensure processing the same message twice has the same effect.
async function processPayment(payment: Payment) {
const idempotencyKey = `payment:${payment.orderId}:${payment.amount}`;
// Check if already processed
const processed = await redis.get(idempotencyKey);
if (processed) {
return JSON.parse(processed);
}
// Process payment
const result = await paymentGateway.charge(payment);
// Store result for idempotency
await redis.setex(idempotencyKey, 86400, JSON.stringify(result));
return result;
}Best Practices
- Make Consumers Idempotent: Assume messages may be delivered multiple times
- Use Dead Letter Queues: Don't lose failed messages
- Set Appropriate Timeouts: Visibility timeout > processing time
- Monitor Queue Depth: Alert on growing backlogs
- Order When Necessary: Use partition keys for ordering guarantees
- Batch for Throughput: Process messages in batches when possible