Topik ini menjelaskan cara mengonfigurasi parameter client untuk ApsaraMQ for Kafka. Parameter yang dikonfigurasi dengan tepat secara langsung memengaruhi throughput pesan, keandalan pengiriman, dan stabilitas konsumen. Bagian berikut mencakup parameter produsen dan konsumen beserta nilai yang direkomendasikan serta panduan penyetelan untuk beban kerja produksi.
Parameter produsen
Pengiriman pesan
acks
Mengontrol jumlah acknowledgment broker yang diperlukan oleh produsen sebelum menganggap pengiriman berhasil.
| Nilai | Perilaku | Trade-off |
|---|---|---|
0 | Tidak ada acknowledgment dari broker. | Throughput tertinggi, risiko kehilangan data tertinggi. |
1 | Acknowledgment setelah leader menulis data. | Keseimbangan antara throughput dan daya tahan. Kehilangan data mungkin terjadi jika leader gagal sebelum follower mereplikasi data. |
all | Acknowledgment setelah leader dan semua in-sync replica menulis data. | Throughput terendah, daya tahan terkuat. Kehilangan data hanya terjadi jika leader dan semua in-sync replica gagal secara bersamaan. |
Nilai yang direkomendasikan: 1 untuk sebagian besar beban kerja yang memprioritaskan throughput dibandingkan daya tahan ketat.
retries
Jumlah maksimum percobaan ulang yang dilakukan produsen saat pengiriman gagal. Nilai yang lebih tinggi membantu produsen pulih dari kegagalan sementara pada broker, seperti pemilihan leader. Gabungkan dengan retry.backoff.ms untuk mengatur jeda antar percobaan ulang.
retry.backoff.ms
Jeda antara percobaan ulang pengiriman. Nilai yang terlalu rendah dapat menyebabkan badai percobaan ulang selama failover broker.
| Direkomendasikan | Bawaan | Unit |
|---|---|---|
| 1000 | -- | milidetik |
Pengelompokan (batching) dan throughput
Pengelompokan mengurangi overhead jaringan dengan menggabungkan beberapa catatan menjadi satu permintaan. Dua parameter mengontrol kapan sebuah batch dikirim: ukuran dan waktu.
batch.size
Ukuran maksimum satu batch per partisi. Saat batch mencapai ukuran ini, produsen segera mengirimkannya.
| Tipe | Bawaan | Nilai valid | Unit |
|---|---|---|---|
| int | 16384 | [0,...] | byte |
Pertahankan nilai bawaan 16384 untuk sebagian besar beban kerja. Nilai yang lebih kecil meningkatkan jumlah permintaan jaringan dan mengurangi throughput. Jika Anda menaikkan batch.size, pastikan buffer.memory cukup besar untuk menampung batch yang lebih besar.
linger.ms
Waktu maksimum yang ditunggu produsen agar batch terisi penuh sebelum dikirim. Ini bekerja seperti algoritma Nagle pada TCP: begitu batch mencapai batch.size, batch tersebut langsung dikirim terlepas dari timer linger. Jika batch masih di bawah batch.size saat linger.ms berakhir, produsen mengirim data yang telah terakumulasi.
| Direkomendasikan | Bawaan | Unit |
|---|---|---|
| 100 hingga 1000 | 0 | milidetik |
Nilai linger.ms yang lebih tinggi meningkatkan efisiensi batching dan throughput dengan mengorbankan latensi per pesan.
Manajemen memori
buffer.memory
Total memori yang dialokasikan produsen untuk buffering catatan yang belum dikirim di seluruh partisi. Jika pool ini habis, send() akan diblokir atau melemparkan exception tergantung pada max.block.ms. Buffer yang terlalu kecil menyebabkan alokasi memori lambat, throughput berkurang, atau timeout pengiriman.
Unit: byte. Bawaan: 33554432 (32 MB).
Rumus penentuan ukuran:
buffer.memory >= batch.size x jumlah_partisi x 2Contohnya, dengan batch.size=16384 dan 50 partisi:
16384 x 50 x 2 = 1.638.400 byte (~1,6 MB minimum)Jika Anda menaikkan batch.size demi throughput, sesuaikan buffer.memory secara proporsional.
Partisi
partitioner.class
Menentukan cara produsen menetapkan catatan ke partisi. Strategi sticky partitioning mengurangi jumlah batch yang tidak lengkap dengan mengisi batch satu partisi sebelum beralih ke partisi berikutnya.
| Versi client Kafka | Strategi bawaan |
|---|---|
| 2.4 dan lebih baru | Sticky partitioning (bawaan) |
| Sebelum 2,4 | Round-robin |
Jika client produsen Anda menggunakan versi lebih lama dari 2.4, atur secara eksplisit sticky partitioner untuk meningkatkan efisiensi batching.
Parameter konsumen
Penyetelan fetch
Parameter ini mengontrol jumlah data yang diambil konsumen per permintaan fetch. Penyetelan memengaruhi throughput dan latensi.
fetch.min.bytes
Jumlah minimum data yang diakumulasi broker sebelum mengembalikan respons fetch. Nilai yang lebih besar mengurangi frekuensi fetch dan beban CPU broker, sehingga meningkatkan throughput tetapi menambah latensi end-to-end pesan. Unit: byte.
Evaluasi laju pesan produsen sebelum menetapkan nilai ini. Jika pesan tiba secara perlahan, nilai fetch.min.bytes yang besar akan menambah penundaan yang tidak perlu.
fetch.max.wait.ms
Waktu maksimum yang ditunggu broker untuk mengakumulasi fetch.min.bytes sebelum mengembalikan respons. Unit: milidetik.
Perilaku berbeda berdasarkan tipe penyimpanan:
Penyimpanan lokal: Broker menunggu hingga
fetch.min.bytestercapai ataufetch.max.wait.msberakhir, mana yang lebih dulu terjadi.Penyimpanan cloud: Broker langsung mengembalikan respons saat data baru tiba, terlepas dari
fetch.min.bytes.
max.partition.fetch.bytes
Jumlah maksimum data yang dikembalikan broker per partisi dalam satu fetch. Unit: byte.
Manajemen session dan rebalancing
Parameter session dan polling yang salah konfigurasi merupakan penyebab paling umum dari rebalance konsumen yang tidak diinginkan. Rebalance menghentikan seluruh konsumsi dalam kelompok hingga partisi ditetapkan ulang, sehingga hindari pemicu rebalance yang tidak perlu.
session.timeout.ms
Waktu maksimum antar heartbeat sebelum broker menganggap konsumen mati dan memicu rebalance.
| Direkomendasikan | Rentang valid | Bawaan | Unit |
|---|---|---|---|
| 30000 hingga 60000 | 6000 hingga 300000 | 10000 | milidetik |
poll(). Pada versi Java sebelumnya atau client non-Java, heartbeat dikirim selama panggilan poll(), sehingga session.timeout.ms harus mempertimbangkan waktu pemrosesan data dan interval heartbeat.heartbeat.interval.ms tidak lebih dari sepertiga session.timeout.ms. Misalnya, jika session.timeout.ms adalah 45000, atur heartbeat.interval.ms menjadi 15000 atau lebih rendah.max.poll.records
Jumlah maksimum catatan yang dikembalikan dalam satu panggilan poll(). Jika konsumen tidak dapat memproses sebanyak ini sebelum tenggat poll() berikutnya, broker menganggapnya mati dan memicu rebalance.
Rumus penentuan ukuran:
max.poll.records < pesan_per_thread_per_detik x thread_konsumen x session_timeout_detikContohnya, dengan 500 msg/detik per thread, 4 thread, dan session timeout 45 detik:
500 x 4 x 45 = 90.000Atur max.poll.records di bawah nilai ini untuk memastikan konsumen selalu menyelesaikan pemrosesan sebelum session timeout.
max.poll.interval.ms
Interval maksimum antara panggilan poll() berturut-turut sebelum broker menghapus konsumen dari kelompok. Parameter ini hanya berlaku untuk client Java 0.10.1 dan lebih baru, di mana heartbeat berjalan pada thread terpisah.
| Bawaan | Unit |
|---|---|
| 300000 | milidetik |
Rumus penentuan ukuran:
max.poll.interval.ms > waktu_per_catatan x max.poll.recordsPada sebagian besar kasus, nilai bawaan 300000 (5 menit) sudah cukup. Naikkan hanya jika logika pemrosesan Anda sangat lambat.
Manajemen offset
enable.auto.commit
Mengontrol apakah konsumen secara otomatis melakukan commit offset pada interval tetap.
| Nilai | Perilaku |
|---|---|
true (bawaan) | Offset di-commit secara otomatis setiap auto.commit.interval.ms milidetik. Lebih mudah dikelola, tetapi dapat menyebabkan pemrosesan duplikat setelah crash. |
false | Aplikasi Anda harus memanggil commitSync() atau commitAsync() secara eksplisit. Gunakan ini untuk semantik at-least-once atau exactly-once bila dikombinasikan dengan pemrosesan idempoten. |
auto.commit.interval.ms
Interval untuk commit offset otomatis saat enable.auto.commit bernilai true.
| Bawaan | Unit |
|---|---|
| 1000 | milidetik |
Interval yang lebih pendek mengurangi jendela pesan duplikat setelah crash tetapi meningkatkan jumlah permintaan commit ke broker.
auto.offset.reset
Menentukan apa yang terjadi ketika konsumen tidak memiliki offset yang di-commit atau offset yang di-commit tidak valid (misalnya, offset telah dihapus karena kebijakan retensi).
| Nilai | Perilaku |
|---|---|
latest | Mulai mengonsumsi dari offset terbaru. Hanya pesan baru. |
earliest | Mulai mengonsumsi dari offset tertua yang tersedia. Memproses ulang semua pesan yang disimpan. |
none | Melemparkan exception. Gunakan ini ketika aplikasi Anda mengelola offset secara manual. |
Nilai yang direkomendasikan: latest. Menggunakan earliest menyebabkan konsumen memproses ulang semua pesan yang disimpan setiap kali menemui offset tidak valid, yang dapat menyebabkan pemrosesan duplikat dan lonjakan lag konsumen.
Jika aplikasi Anda menangani manajemen offset secara manual (misalnya, menyimpan offset di database eksternal), atur nilai ini menjadi none.
Referensi cepat parameter
Parameter produsen
| Parameter | Bawaan | Direkomendasikan | Unit |
|---|---|---|---|
acks | -- | 1 (throughput) atau all (daya tahan) | -- |
retries | -- | Atur berdasarkan kebutuhan ketersediaan | -- |
retry.backoff.ms | 100 | 1000 | ms |
batch.size | 16384 | 16384 (bawaan) | byte |
linger.ms | 0 | 100 hingga 1000 | ms |
buffer.memory | 33554432 | >= batch.size x partisi x 2 | byte |
partitioner.class | Sticky (2.4+) | Sticky partitioning | -- |
Parameter konsumen
| Parameter | Bawaan | Direkomendasikan | Unit |
|---|---|---|---|
fetch.min.bytes | 1 | Sesuaikan berdasarkan laju pesan produsen | byte |
fetch.max.wait.ms | 500 | -- | ms |
max.partition.fetch.bytes | 1048576 | -- | byte |
session.timeout.ms | 10000 | 30000 hingga 60000 | ms |
heartbeat.interval.ms | 3000 | <= 1/3 dari session.timeout.ms | ms |
max.poll.records | 500 | Lihat rumus penentuan ukuran | -- |
max.poll.interval.ms | 300000 | 300000 (bawaan) | ms |
enable.auto.commit | true | Bergantung pada semantik pengiriman | -- |
auto.commit.interval.ms | 1000 | 1000 (bawaan) | ms |
auto.offset.reset | latest | latest | -- |