belajarkoding Platform belajar web development Indonesia. Artikel, cheat sheets, roadmap, dan code challenges untuk developer Indonesia.
© 2026 BelajarKoding. All rights reserved.
Bagian dari ekosistem Galih Pratama
Apache Kafka Cheat Sheet Referensi cepat Apache Kafka. Topics, partitions, consumer groups, producers, CLI commands, dan Python client. Perfect buat developer yang bangun event-driven architecture.
Python 8 min read 1.507 kata
Silakan
login atau
daftar untuk membaca cheat sheet ini.
Baca Cheat Sheet Lengkap Login atau daftar akun gratis untuk membaca cheat sheet ini.
Apache Kafka Cheat Sheet - BelajarKoding | BelajarKoding
# Konsep Dasar
# Arsitektur Kafka
Producer --> [Topic: Partition 0, Partition 1, Partition 2] --> Consumer Group
Topic = Stream/kategori data (kayak tabel di database)
Partition = Unit paralelisme dalam topic (ordered)
Offset = Posisi consumer dalam partition
Broker = Server Kafka
Consumer Group = Group consumer yang share load
# Komponen Utama
Producer Kirim pesan ke topic Consumer Baca pesan dari topic Broker Server Kafka (cluster bisa multi-broker) ZooKeeper Koordinasi cluster (Kafka 4.0+ pakai KRaft, tanpa ZooKeeper) Topic Kategori/stream data Partition Unit paralelisme dalam topic, di-replicate antar broker
# Create topic
kafka-topics.sh --create \
--topic orders \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# List topics
kafka-topics.sh --list \
# Produce ke topic (interactive)
kafka-console-producer.sh \
--topic orders \
--bootstrap-server localhost:9092
# Produce dari file
kafka-console-producer.sh \
--topic orders \
--bootstrap-server localhost:9092 < orders.json
# Consume dari awal topic
kafka-console-consumer.sh \
--topic orders \
--from-beginning \
--bootstrap-server localhost:9092
# Consume real-time (baru saja)
kafka-console-consumer.sh \
--topic orders \
--bootstrap-server
# Consumer Group Management# List consumer groups
kafka-consumer-groups.sh \
--list \
--bootstrap-server localhost:9092
# Describe group (lag, offset)
kafka-consumer-groups.sh \
--describe \
# Python Client (confluent-kafka)from confluent_kafka import Producer
import json
conf = {
'bootstrap.servers' : 'localhost:9092' ,
'client.id' : 'order-service'
from confluent_kafka import Consumer
conf = {
'bootstrap.servers' : 'localhost:9092' ,
'group.id' : 'order-processors' ,
conf = {
'bootstrap.servers' : 'localhost:9092' ,
'group.id' : 'order-processors' ,
'enable.auto.commit' : False , # Manual commit
}
consumer = Consumer(conf)
# Consumer dengan Partitions Assignmentfrom confluent_kafka import Consumer, TopicPartition
consumer = Consumer(conf)
# Manual assignment ke partition spesifik
consumer.assign([
TopicPartition( 'orders' , 0 , offset = 0 ), # partition 0, start from offset 0
TopicPartition( 'orders' ,
# Pesan dengan key yang sama selalu ke partition yang sama
# Berguna untuk ordering guarantee per entity
producer.produce( 'orders' , key = 'user-123' , value = data)
producer.produce( 'orders' , key = 'user-123' , value = data2)
# Consumer Group RebalancingJika consumer join/leave group:
1. Kafka trigger rebalance
2. Partitions di-assign ulang ke consumer yang ada
3. Consumer yang dapat partition baru resume dari committed offset
Tips:
- Pastikan processing idempotent (bisa di-process ulang saat rebalance)
- Pakai cooperative rebalancing (Kafka 2.4+) untuk minimize disruption # Kafka default: at-least-once
# Pesan bisa di-process lebih dari sekali kalau consumer crash sebelum commit
# Solusi: idempotent processing
processed_ids = set ()
def process_message (msg):
msg_id = extract_id(msg)
if msg_id in processed_ids:
return # Skip, udah diproses
do_work(msg)
# Retention by time (default 7 hari)
kafka-configs.sh --alter \
--topic orders \
--add-config retention.ms= 604800000 \
--bootstrap-server localhost:9092
# Retention by size (1 GB per partition)
kafka-configs.sh --alter \
--topic orders \
# Python: kafka-python (Alternative Library)from kafka import KafkaProducer, KafkaConsumer
import json
# Producer
producer = KafkaProducer(
bootstrap_servers =
# Simpan semua perubahan state sebagai event stream
# Topic: account-events
# Key: account_id
# Value: {"type": "deposit", "amount": 100, "timestamp": "..."}
# Rebuild state dari events
def rebuild_state (consumer, account_id):
balance = 0
for msg in consumer:
if msg.key ==
# Write side: produce commands ke Kafka
producer.produce( 'commands' , value = json.dumps({
'type' : 'create_order' ,
'data' : { 'product' : 'laptop' , 'qty' : 1 },
}))
# Read side: consume events dan update read model
# Saga Pattern (Distributed Transactions)# Coordinator publish command
producer.produce( 'saga' , key = 'saga-123' , value = json.dumps({
'step' : 1 ,
'action' : 'reserve_inventory' ,
'data' : { 'item' : 'laptop' , 'qty'
# Describe all topics
kafka-topics.sh --bootstrap-server localhost:9092 --describe
# Under-replicated partitions
kafka-topics.sh --bootstrap-server localhost:9092 --describe |
grep "UnderReplicated"
# Consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups |
awk # Comparison dengan AlternatifFitur Kafka RabbitMQ Redis Streams Model Log (append-only) Queue Log Ordering Per partition Per queue Per stream Replay Ya (by offset) Tidak Ya (by ID) Retention Configurable Ack-based Configurable Throughput Sangat tinggi Medium Tinggi Latency Medium Low Low Consumer groups Ya Ya Ya Partitioning Ya Ya (sharding) Ya
Topic : Stream/kategori data. Pesan dikirim ke topic dan dibaca dari topic.
Partition : Unit paralelisme di dalam topic. Pesan dalam satu partition dijamin ordered.
Offset : Posisi pesan dalam partition. Consumer track offset untuk tau posisi baca.
Consumer Group : Group consumer yang share topic. Tiap partition dibaca oleh satu consumer dalam group.
Replication : Setiap partition di-copy ke multiple broker buat fault tolerance.
ISR (In-Sync Replicas) : Partitions yang up-to-date dengan leader. Hanya ISR yang bisa jadi leader kalau leader gagal.
Retention : Berapa lama pesan disimpan di Kafka. By time atau by size.
Compaction : Cleanup policy yang simpan hanya value terakhir per key. Kayak changelog.
Rebalance : Proses re-assign partitions ke consumer saat consumer join/leave group.
ACL : Access Control List. Permission untuk produce/consume/admin topic tertentu.
--bootstrap-server localhost:9092
# Describe topic (detail partitions, replicas)
kafka-topics.sh --describe \
--topic orders \
--bootstrap-server localhost:9092
# Delete topic
kafka-topics.sh --delete \
--topic orders \
--bootstrap-server localhost:9092
# Alter partitions (increase only)
kafka-topics.sh --alter \
--topic orders \
--partitions 6 \
--bootstrap-server localhost:9092
# Dengan key
kafka-console-producer.sh \
--topic orders \
--bootstrap-server localhost:9092 \
--property "key.separator=:" \
--property "parse.key=true"
# Input: order-123:{"product": "laptop", "price": 999}
# Dengan header
kafka-console-producer.sh \
--topic orders \
--bootstrap-server localhost:9092 \
--property "headers=event_type=created,source=api"
localhost:9092
# Dengan key
kafka-console-consumer.sh \
--topic orders \
--property "print.key=true" \
--property "key.separator= - " \
--from-beginning \
--bootstrap-server localhost:9092
# Consumer group
kafka-console-consumer.sh \
--topic orders \
--group order-processors \
--bootstrap-server localhost:9092
# Lihat offsets per partition
kafka-consumer-groups.sh \
--group order-processors \
--describe \
--bootstrap-server localhost:9092
--group
order-processors
\
--bootstrap-server localhost:9092
# Reset offset ke earliest
kafka-consumer-groups.sh \
--reset-offsets \
--group order-processors \
--topic orders \
--to-earliest \
--execute \
--bootstrap-server localhost:9092
# Reset offset ke latest
kafka-consumer-groups.sh \
--reset-offsets \
--group order-processors \
--topic orders \
--to-latest \
--execute \
--bootstrap-server localhost:9092
# Shift offset (-2 = mundur 2 pesan)
kafka-consumer-groups.sh \
--reset-offsets \
--group order-processors \
--topic orders \
--shift-by -2 \
--execute \
--bootstrap-server localhost:9092
# Reset ke timestamp spesifik
kafka-consumer-groups.sh \
--reset-offsets \
--group order-processors \
--topic orders \
--to-datetime 2026-06-21T00:00:00.000 \
--execute \
--bootstrap-server localhost:9092
,
}
producer = Producer(conf)
def delivery_report (err, msg):
"""Callback dipanggil saat Kafka confirm delivery."""
if err is not None :
print ( f 'Delivery failed: { err } ' )
else :
print ( f 'Delivered to { msg.topic() } [ { msg.partition() } ]@ { msg.offset() } ' )
# Produce message (async)
producer.produce(
'orders' ,
key = str (order_id),
value = json.dumps({ 'product' : 'laptop' , 'price' : 999 }).encode( 'utf-8' ),
callback = delivery_report,
)
# Flush untuk memastikan semua pesan terkirim
producer.flush()
# Produce dengan header
producer.produce(
'orders' ,
key = 'order-123' ,
value = b '{"product": "laptop"}' ,
headers = [( 'event_type' , b 'created' ), ( 'source' , b 'api' )],
)
# Produce synchronous (blocking, gunakan dengan hemat)
producer.produce( 'orders' , value = b 'urgent' , sync = True )
'auto.offset.reset' : 'earliest' , # atau 'latest'
'enable.auto.commit' : True ,
'auto.commit.interval.ms' : 5000 ,
}
consumer = Consumer(conf)
# Subscribe ke satu atau multiple topics
consumer.subscribe([ 'orders' , 'payments' ])
try :
while True :
msg = consumer.poll( timeout = 1.0 )
if msg is None :
continue
if msg.error():
print ( f 'Consumer error: { msg.error() } ' )
continue
# Process message
key = msg.key().decode( 'utf-8' ) if msg.key() else None
value = msg.value().decode( 'utf-8' )
topic = msg.topic()
partition = msg.partition()
offset = msg.offset()
headers = dict (msg.headers()) if msg.headers() else {}
print ( f '[ { topic } ][ { partition } ][ { offset } ] key= { key } value= { value } ' )
except KeyboardInterrupt :
pass
finally :
consumer.close()
consumer.subscribe([
'orders'
])
while True :
msg = consumer.poll( 1.0 )
if msg is None :
continue
if msg.error():
continue
try :
# Process message
process_order(msg.value())
# Commit setelah sukses
consumer.commit(msg)
except Exception as e:
print ( f 'Error processing: { e } ' )
# Tidak commit = pesan akan di-process lagi
1
,
offset
=
5
),
# partition 1, start from offset 5
])
# Seek ke offset tertentu
consumer.seek(TopicPartition( 'orders' , 0 , offset = 100 ))
# Kedua pesan dijamin ke partition yang sama, jadi ordered
# Custom partitioner (Python)
class UserPartitioner :
def __call__ (self, key, all_partitions, available):
# Route berdasarkan user_id hash
user_id = int (key.decode())
return all_partitions[user_id % len (all_partitions)]
processed_ids.add(msg_id)
--add-config retention.bytes= 1073741824 \
--bootstrap-server localhost:9092
# Compact topic (simpan hanya value terakhir per key)
kafka-configs.sh --alter \
--topic user-state \
--add-config cleanup.policy=compact \
--bootstrap-server localhost:9092
[
'localhost:9092'
],
value_serializer =lambda v: json.dumps(v).encode( 'utf-8' ),
key_serializer =lambda k: k.encode( 'utf-8' ) if k else None ,
acks = 'all' , # Tunggu semua replicas confirm
retries = 3 ,
max_in_flight_requests_per_connection = 1 , # Preserve order saat retry
)
# Send
future = producer.send( 'orders' , key = 'user-123' , value = { 'product' : 'book' })
metadata = future.get( timeout = 10 ) # Wait for confirmation
print ( f 'Sent to partition { metadata.partition } at offset { metadata.offset } ' )
producer.flush()
producer.close()
# Consumer
consumer = KafkaConsumer(
'orders' ,
bootstrap_servers = [ 'localhost:9092' ],
group_id = 'order-processors' ,
auto_offset_reset = 'earliest' ,
enable_auto_commit = False ,
value_deserializer =lambda x: json.loads(x.decode( 'utf-8' )),
consumer_timeout_ms = 10000 , # Stop setelah 10s tanpa pesan
)
for message in consumer:
print ( f ' { message.topic } : { message.partition } : { message.offset } ' )
print ( f ' Key: { message.key } ' )
print ( f ' Value: { message.value } ' )
consumer.commit() # Manual commit
account_id:
event = json.loads(msg.value)
if event[ 'type' ] == 'deposit' :
balance += event[ 'amount' ]
elif event[ 'type' ] == 'withdraw' :
balance -= event[ 'amount' ]
return balance
consumer = KafkaConsumer( 'events' , ... )
for msg in consumer:
event = json.loads(msg.value)
if event[ 'type' ] == 'order_created' :
db.execute( 'INSERT INTO order_view ...' , event[ 'data' ])
:
1
},
}))
# Service consume, process, publish next step atau compensating action
# Jika reserve_inventory gagal: publish 'release_payment' (compensating)
'{print $1, $2, $5, $6}'
|
column
-t