Skip to main content

Message Queues

Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.

When to Use

Use message queues when:

  • Long-running operations block HTTP requests (report generation, video processing)
  • Service decoupling required (microservices, event-driven architecture)
  • Guaranteed delivery needed (payment processing, order fulfillment)
  • Event streaming for analytics (log aggregation, metrics pipelines)
  • Workflow orchestration for complex processes (multi-step sagas, human-in-the-loop)
  • Background job processing (email sending, image resizing)

Multi-Language Support

This skill provides patterns for:

  • Python: Celery + Redis, confluent-kafka, Temporal
  • TypeScript: BullMQ + Redis, kafkajs, @temporalio/client
  • Rust: lapin (RabbitMQ), rdkafka
  • Go: Asynq + Redis, franz-go (Kafka), Temporal SDK

Broker Selection Decision Tree

Event Streaming / Log Aggregation

→ Apache Kafka

  • Throughput: 500K-1M msg/s
  • Replay events (event sourcing)
  • Exactly-once semantics
  • Long-term retention
  • Use: Analytics pipelines, CQRS, event sourcing

Simple Background Jobs

→ Task Queues

  • Python → Celery + Redis
  • TypeScript → BullMQ + Redis
  • Go → Asynq + Redis
  • Use: Email sending, report generation, webhooks

Complex Workflows / Sagas

→ Temporal

  • Durable execution (survives restarts)
  • Saga pattern support
  • Human-in-the-loop workflows
  • Use: Order processing, AI agent orchestration

Request-Reply / RPC Patterns

→ NATS

  • Built-in request-reply
  • Sub-millisecond latency
  • Cloud-native, simple operations
  • Use: Microservices RPC, IoT command/control

Complex Message Routing

→ RabbitMQ

  • Exchanges (direct, topic, fanout, headers)
  • Dead letter exchanges
  • Message TTL, priorities
  • Use: Multi-consumer patterns, pub/sub

Already Using Redis

→ Redis Streams

  • No new infrastructure
  • Simple consumer groups
  • Moderate throughput (100K+ msg/s)
  • Use: Notification queues, simple job queues

Performance Comparison

BrokerThroughputLatency (p99)Best For
Kafka500K-1M msg/s10-50msEvent streaming
NATS JetStream200K-400K msg/sSub-ms to 5msCloud-native microservices
RabbitMQ50K-100K msg/s5-20msTask queues, complex routing
Redis Streams100K+ msg/sSub-msSimple queues, caching

Quick Start Examples

Kafka Producer/Consumer (Python)

from confluent_kafka import Producer, Consumer

# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()

# Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])

while True:
msg = consumer.poll(1.0)
if msg is not None:
process_order(msg.value())

Celery Background Jobs (Python)

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
try:
result = expensive_image_processing(image_url)
return result
except RecoverableError as e:
raise self.retry(exc=e, countdown=60)

BullMQ Job Processing (TypeScript)

import { Queue, Worker } from 'bullmq'

const queue = new Queue('webhooks', {
connection: { host: 'localhost', port: 6379 }
})

// Enqueue job
await queue.add('send-webhook', {
url: 'https://example.com/webhook',
payload: { event: 'order.created' }
})

// Process jobs
const worker = new Worker('webhooks', async job => {
await fetch(job.data.url, {
method: 'POST',
body: JSON.stringify(job.data.payload)
})
}, { connection: { host: 'localhost', port: 6379 } })

Temporal Workflow Orchestration

from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Step 1: Reserve inventory
inventory_id = await workflow.execute_activity(
reserve_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=10),
)

# Step 2: Charge payment
payment_id = await workflow.execute_activity(
charge_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)

return f"Order {order_id} completed"

Core Patterns

Event Naming Convention

Use: Domain.Entity.Action.Version

Examples:

  • order.created.v1
  • user.profile.updated.v2
  • payment.failed.v1

Event Schema Structure

{
"event_type": "order.created.v2",
"event_id": "uuid-here",
"timestamp": "2025-12-02T10:00:00Z",
"version": "2.0",
"data": {
"order_id": "ord_123",
"customer_id": "cus_456"
},
"metadata": {
"producer": "order-service",
"trace_id": "abc123",
"correlation_id": "xyz789"
}
}

Dead Letter Queue Pattern

Route failed messages to dead letter queue (DLQ) after max retries:

@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
try:
result = perform_processing(order_id)
return result
except UnrecoverableError as e:
send_to_dlq(order_id, str(e))
raise Reject(e, requeue=False)

Idempotency for Exactly-Once Processing

@app.post("/process")
async def process_payment(
payment_data: dict,
idempotency_key: str = Header(None)
):
# Check if already processed
cached_result = redis_client.get(f"idempotency:{idempotency_key}")
if cached_result:
return {"status": "already_processed"}

result = process_payment_logic(payment_data)
redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
return {"status": "processed", "result": result}

Frontend Integration

Job Status Updates via SSE

# FastAPI endpoint for real-time job status
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
async def event_generator():
while True:
task = celery_app.AsyncResult(task_id)

if task.state == 'PROGRESS':
yield {"event": "progress", "data": task.info.get('progress', 0)}
elif task.state == 'SUCCESS':
yield {"event": "complete", "data": task.result}
break

await asyncio.sleep(0.5)

return EventSourceResponse(event_generator())

React Component

export function JobStatus({ jobId }: { jobId: string }) {
const [progress, setProgress] = useState(0)

useEffect(() => {
const eventSource = new EventSource(`/api/status/${jobId}`)

eventSource.addEventListener('progress', (e) => {
setProgress(JSON.parse(e.data))
})

eventSource.addEventListener('complete', (e) => {
toast({ title: 'Job complete', description: JSON.parse(e.data) })
eventSource.close()
})

return () => eventSource.close()
}, [jobId])

return <ProgressBar value={progress} />
}

Common Anti-Patterns to Avoid

1. Synchronous API for Long Operations

# ❌ BAD: Blocks request thread
@app.post("/generate-report")
def generate_report(user_id: str):
report = expensive_computation(user_id) # 5 minutes!
return report

# ✅ GOOD: Enqueue background job
@app.post("/generate-report")
async def generate_report(user_id: str):
task = generate_report_task.delay(user_id)
return {"task_id": task.id}

2. Non-Idempotent Consumers

Always implement idempotency checks to prevent duplicate processing.

3. Ignoring Dead Letter Queues

Always configure DLQs for manual inspection of failed messages.

4. Using Kafka for Request-Reply

Kafka is asynchronous - use NATS or HTTP/gRPC for request-reply patterns.

References