Akumulasi pesan, juga dikenal sebagai consumer lag, merupakan metrik pemantauan umum dalam penggunaan Kafka. Memahami dan menangani akumulasi pesan sangat penting untuk menjaga stabilitas sistem, kinerja real-time, serta konsistensi data.
Apa itu akumulasi pesan Kafka
Akumulasi pesan Kafka terjadi ketika konsumen tidak mampu mengimbangi kecepatan penulisan pesan oleh produsen, sehingga pesan yang belum dikonsumsi menumpuk di suatu partisi.
Total akumulasi pesan = Latest offset (semua partisi) - Consumer offset (semua partisi)
Semakin tinggi total akumulasi pesan, semakin parah masalahnya.
Total akumulasi pesan mendekati nol menunjukkan bahwa konsumen mampu mengimbangi laju produksi.
Topic: test (Partition 0)
+----+----+----+----+----+----+----+
| M1 | M2 | M3 | M4 | M5 | M6 | M7 | ← 7 messages written
+----+----+----+----+----+----+----+
↑ ↑
Consumer offset M3 Latest offset (M7)
Topic: test (Partition 1)
+----+----+----+----+----+----+
| M1 | M2 | M3 | M4 | M5 | M6 | ← 6 messages written
+----+----+----+----+----+----+
↑ ↑
Consumer offset M3 Latest offset (M6)
Current total message accumulation = 7 - 3 + 6 - 3 = 7
7 unconsumed messages → 7 accumulated messagesUntuk menyelesaikan beberapa peringatan tertentu, Anda dapat menggunakan ApsaraMQ for Kafka untuk mengatur ulang consumer offset pada partisi topik menjadi 0. Saat consumer offset bernilai 0, akumulasinya juga 0.
Jika consumer offset tidak tersedia—karena konsumen belum melakukan commit offset atau offset telah kedaluwarsa dan dihapus—dan terdapat thread konsumen dalam Group yang sedang online, maka
Total akumulasi pesan = Latest offset (semua partisi) - Earliest offset (semua partisi). Jika semua thread konsumen dalam Group sedang offline, akumulasinya adalah 0.
Akar penyebab akumulasi pesan
Kapasitas pemrosesan konsumen tidak mencukupi: logika pemrosesan kompleks, I/O lambat, atau bottleneck CPU/memori.
Kenaikan tiba-tiba pada laju produksi: puncak lalu lintas atau impor batch.
Konsumen sering mengalami gangguan atau restart: crash, jeda garbage collection (GC) yang lama, atau pembaruan deployment.
Penyeimbangan ulang (rebalancing) yang sering: konsumen sering bergabung atau keluar dari Group, timeout heartbeat, atau session timeout.
Masalah pada kode konsumen: infinite loop, exception yang tidak ditangani, atau interval yang terlalu lama antara panggilan
poll().Pembatasan laju (rate limiting) pada konsumen: laju konsumsi mencapai batas reserved atau elastic dari instans.
Commit offset yang tertunda atau gagal: hal ini menyebabkan penarikan ulang pesan dan akumulasi palsu.
Cara melihat akumulasi pesan
Untuk detail mengenai metrik akumulasi, lihat dokumentasi sesuai tipe instans Anda:
Subscription/Pay-as-you-go: Prometheus monitoring
Serverless: Dashboard
Dampak akumulasi pesan
Menurunnya kinerja real-time: latensi pemrosesan data meningkat dan memengaruhi pengambilan keputusan bisnis.
Respons sistem lebih lambat: thread konsumen yang terblokir dapat menyebabkan timeout dan memicu circuit breaker.
Risiko penyeimbangan ulang meningkat: keterlambatan pemrosesan oleh konsumen menyebabkan heartbeat timeout, yang memicu rebalancing partisi. Rebalancing yang sering memperpanjang periode jeda konsumsi, meningkatkan kemungkinan penarikan ulang pesan, serta memperburuk latensi konsumsi, sehingga menciptakan siklus umpan balik negatif.
Risiko error out-of-memory (OOM): jika konsumen tidak segera memproses pesan setelah memanggil
poll(), banyak pesan menumpuk di buffer memori client, yang dapat menyebabkan overflow memori heap.
Cara menangani dan mengoptimalkan akumulasi pesan
Tingkatkan kapasitas throughput konsumen:
Tambahkan instans konsumen: tambahkan lebih banyak konsumen ke Group yang sama. Jumlah partisi harus lebih besar dari atau sama dengan jumlah konsumen (
Jumlah partisi ≥ Jumlah konsumen).Tingkatkan jumlah partisi: hal ini meningkatkan tingkat paralelisme.
Gunakan pemrosesan asinkron: jadikan operasi yang memakan waktu sebagai asinkron agar loop poll berjalan lebih cepat.
Gunakan pemrosesan batch: proses beberapa pesan sekaligus.
Sesuaikan parameter konsumen:
Parameter
Nilai yang direkomendasikan
Deskripsi
max.poll.records1 hingga 500
Tarik lebih sedikit pesan pada setiap poll untuk mengurangi overhead jaringan.
fetch.min.bytes1 KB hingga 1 MB
Tingkatkan throughput dan kurangi polling kosong.
fetch.max.wait.ms500 ms
Tunggu lebih banyak data untuk dikembalikan sekaligus.
session.timeout.ms30 s
Hindari salah mengidentifikasi konsumen sebagai down.
heartbeat.interval.ms≤ session.timeout / 3
Pertahankan heartbeat normal.
enable.auto.committrue
Autocommit direkomendasikan.
Ambil tindakan darurat sementara:
Jika akumulasi terlalu besar untuk diproses dengan cepat, Anda dapat mengatur ulang consumer offset ke latest offset.