Referensi cepat message queue patterns. Point-to-point, pub/sub, work queue, dead letter queue, retry backoff, idempotency. Perfect buat developer yang bangun sistem async dan event-driven.
Login atau daftar akun gratis untuk membaca cheat sheet ini.
Cheat sheet ini membahas pola-pola message queue yang umum dipakai di sistem terdistribusi, lengkap dengan contoh kode untuk RabbitMQ, Redis Streams, dan Amazon SQS/SNS. Memahami pola ini penting agar sistem kamu scalable, reliable, dan fault-tolerant.
Message queue adalah sistem komunikasi async antar komponen aplikasi. Producer mengirim pesan ke queue, consumer mengambil dan memproses pesan tersebut. Queue memutus (decouple) producer dari consumer sehingga keduanya bisa bekerja dengan kecepatan berbeda.
| Komponen | Fungsi |
|---|---|
| Producer / Publisher | Mengirim pesan ke queue atau exchange |
| Consumer / Subscriber | Menerima dan memproses pesan |
| Queue | Penampung pesan sementara |
| Exchange / Topic | Router yang mendistribusikan pesan (RabbitMQ) |
| Broker | Server yang menyimpan dan mengelola pesan |
Producer ----> [Broker: Queue/Exchange] ----> Consumer
|
+---> Dead Letter Queue (jika gagal)Satu producer, satu consumer. Setiap pesan diproses oleh tepat satu consumer.
Producer ----> [Queue] ----> ConsumerContoh RabbitMQ:
import pika
# Producer
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True
Satu producer, banyak consumer. Setiap consumer menerima copy pesan yang sama.
+---> Consumer A
Producer ----> [Exchange]
+---> Consumer B
+---> Consumer CContoh RabbitMQ dengan fanout exchange:
import pika
# Publisher
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type=
Satu queue, banyak consumer. Setiap pesan diproses oleh tepat satu consumer, tapi beban tersebar.
+---> Worker 1
Producer ----> [Queue]
+---> Worker 2
+---> Worker 3Contoh dengan fair dispatch di RabbitMQ:
# Pastikan prefetch_count = 1 untuk fair dispatch
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
import time
print(f" [x] Processing {body}")
time.sleep(body.count(b'.'))
Producer mengirim pesan dan menunggu balasan dari consumer. Dipakai untuk pola RPC async.
import pika
import uuid
import json
class RpcClient:
def __init__
| Tipe | Routing | Contoh Kasus |
|---|---|---|
direct | Routing key harus sama persis | Filter by severity |
fanout | Broadcast ke semua queue | Pub/sub, logs |
topic | Routing key dengan pattern wildcard | Hierarchical routing |
headers | Berdasarkan header message, bukan routing key | Complex routing |
Topic exchange memakai routing key dengan pattern yang dipisahkan titik. Wildcard: * (satu kata), # (nol atau lebih kata).
# Publisher
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = 'kern.critical'
channel.basic_publish(
exchange='topic_logs',
routing_key=
# Bind queue berdasarkan header
channel.queue_bind(
exchange='headers_logs',
queue=queue_name,
routing_key='',
arguments={
'format': 'pdf',
'type': 'report',
'x-match'
Redis Streams adalah struktur data log append-only yang mendukung consumer group. Cocok untuk message queue ringan tanpa broker terpisah.
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Producer: tambah entry ke stream
entry_id = r.xadd('mystream', {
Consumer group memungkinkan multiple consumer membagi stream secara paralel, mirip work queue.
# Buat consumer group
r.xgroup_create('mystream', 'mygroup', id='0', mkstream=True)
# Consumer ambil pesan dari group
messages = r.xreadgroup(
groupname='mygroup',
consumername
Pesan yang belum di ack tetap berada di pending entry list (PEL). Kalau consumer mati, consumer lain bisa claim pesannya.
# Lihat pending entries
pending = r.xpending('mystream', 'mygroup')
print(pending)
# Claim pesan yang idle terlalu lama
claimed = r.xautoclaim(
'mystream', 'mygroup', 'consumer_2',
min_idle_time=30000,
start_id
| Aspek | Redis Streams | RabbitMQ |
|---|---|---|
| Kompleksitas | Rendah (in-memory) | Sedang (broker terpisah) |
| Persistence | Optional (RDB/AOF) | Default persistent |
| Routing | Linear, consumer group | Exchange (direct, fanout, topic, header) |
| Throughput | Sangat tinggi | Tinggi |
| Fitur lanjutan | Terbatas | Rich (DLX, priority, TTL) |
| Cocok untuk | Streaming, log | Kompleks routing, enterprise |
SQS adalah managed queue service dari AWS. Tipe: Standard (at-least-once, tidak terurut) dan FIFO (exactly-once, terurut).
import boto3
sqs = boto3.client('sqs', region_name='us-east-1')
# Buat queue
response = sqs.create_queue(QueueName='my-queue'
Saat consumer menerima pesan, pesan jadi invisible untuk consumer lain selama visibility timeout. Kalau tidak di ack (delete), pesan akan muncul lagi.
# Set visibility timeout per queue
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={'VisibilityTimeout': '120'}
)
# Change visibility timeout per message
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout
SNS adalah pub/sub messaging. Topic mengirim ke multiple subscriber (SQS, Lambda, HTTP, email).
sns = boto3.client('sns', region_name='us-east-1')
# Buat topic
topic = sns.create_topic(Name='order-events')
topic_arn = topic['TopicArn']
# Subscribe SQS queue ke SNS topic
Fanout adalah pola di mana satu SNS topic mengirim ke multiple SQS queue, setiap queue mendapat copy pesan.
+---> SQS Queue A (email service)
SNS Topic ----> +---> SQS Queue B (inventory service)
+---> SQS Queue C (analytics)DLQ adalah queue tempat pesan yang gagal diproses dipindahkan setelah mencapai batas retry. Mencegah pesan beracun (poison pill) mengulang terus.
# Deklarasi DLQ
channel.queue_declare(queue='main_queue', arguments={
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'dlq',
'x-max-retries': 3
})
channel.queue_declare(queue='dlq'
# Buat DLQ
dlq = sqs.create_queue(QueueName='my-dlq')
dlq_arn = sqs.get_queue_attributes(
QueueUrl=dlq['QueueUrl'],
AttributeNames=['QueueArn']
)['Attributes'][
| Strategi | Cara Kerja | Cocok Untuk |
|---|---|---|
| Fixed delay | Tunggu waktu tetap (mis. 5 detik) antar retry | Sederhana, error sementara |
| Linear backoff | Tunggu makin lama secara linear | Beban moderat |
| Exponential backoff | Tunggu 2x lebih lama setiap retry | Sistem sibuk, menghindari overload |
| Exponential + jitter | Exponential backoff dengan random noise | Distribusi beban, mencegah thundering herd |
import time
import random
def retry_with_backoff(func, max_retries=5, base_delay=1.0, max_delay=60.0):
for attempt in range(max_retries):
# Pattern: TTL queue untuk delayed retry
# 1. Main queue dengan DLX
channel.queue_declare(queue='main', arguments={
'x-dead-letter-exchange': 'retry_exchange'
})
# 2. Retry queue dengan TTL, DLX kembali ke main
channel.queue_declare(queue='retry_30s'
Idempotency berarti memproses pesan yang sama berkali-kali menghasilkan efek yang sama dengan memproses sekali. Penting karena message queue bisa mengirim pesan duplikat (at-least-once delivery).
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
processed = {}
@app.post("/api/order")
async def create_order(request: Request):
idempotency_key = request.headers.get(
| Garansi | Arti | Trade-off |
|---|---|---|
| At-most-once | Pesan mungkin hilang, tidak duplikat | Cepat tapi bisa kehilangan data |
| At-least-once | Pesan tidak hilang, mungkin duplikat | Butuh idempotency |
| Exactly-once | Tidak hilang, tidak duplikat | Sulit, biasanya simulasi dengan at-least-once + idempotency |