Dokumen ini menjelaskan praktik terbaik untuk konsumen ApsaraMQ for Kafka guna membantu mengurangi kesalahan dalam konsumsi pesan.
Proses konsumsi pesan
Konsumen ApsaraMQ for Kafka mengikuti proses berikut untuk mengonsumsi pesan: meminta data, mengeksekusi logika konsumsi, dan meminta data kembali. Gambar berikut menggambarkan proses tersebut.
Pemerataan beban
Setiap grup di ApsaraMQ for Kafka terdiri dari beberapa instance konsumen. Anda dapat memulai beberapa konsumen dan mengatur parameter group.id ke nilai yang sama. Konsumen dalam grup yang sama akan mengonsumsi pesan dari topik yang berlangganan dalam mode pemerataan beban.
Sebagai contoh, Grup A berlangganan Topik A, dan konsumen C1, C2, dan C3 dimulai dalam grup tersebut. Dalam kasus ini, setiap pesan yang diterima oleh Topik A hanya dikirimkan ke salah satu dari C1, C2, atau C3. Secara default, ApsaraMQ for Kafka mendistribusikan pesan secara merata ke konsumen untuk menyeimbangkan beban konsumsi.
Untuk menerapkan pemerataan beban dalam konsumsi, ApsaraMQ for Kafka mendistribusikan partisi dari topik yang berlangganan secara merata ke setiap konsumen. Jumlah konsumen tidak boleh lebih besar dari jumlah partisi. Jika tidak, konsumen tertentu mungkin tidak ditugaskan partisi dan dapat masuk ke keadaan idle. Pemerataan beban dipicu ketika konsumen pertama kali dimulai, di-restart, ditambahkan, atau dihapus.
Rebalance sering pada klien konsumen
Rebalance terjadi pada klien konsumen jika detak jantung klien habis waktu. Untuk mencegah rebalance, Anda dapat mengubah parameter terkait atau meningkatkan laju konsumsi. Untuk informasi lebih lanjut, lihat Mengapa Rebalance Sering Terjadi pada Klien Konsumen Saya?.
Partisi
Jumlah partisi memengaruhi jumlah konsumen bersamaan.
Pesan dalam satu partisi hanya dapat dikonsumsi oleh satu konsumen dalam grup yang sama. Jumlah konsumen tidak boleh lebih besar dari jumlah partisi. Jika tidak, konsumen tertentu mungkin tidak ditugaskan partisi dan dapat masuk ke keadaan idle.
Secara default, jumlah partisi diatur menjadi 12 di Konsol ApsaraMQ for Kafka. Ini dapat memenuhi persyaratan bisnis dalam sebagian besar skenario. Anda dapat meningkatkan nilai tersebut sesuai dengan kebutuhan bisnis Anda. Kami merekomendasikan Anda mengatur jumlah partisi ke nilai dalam rentang 12 hingga 100. Nilai kurang dari 12 dapat memengaruhi kinerja produksi dan konsumsi pesan. Nilai lebih dari 100 dapat memicu rebalance pada klien konsumen.
Anda tidak dapat mengurangi jumlah partisi setelah peningkatan partisi. Kami merekomendasikan Anda sedikit meningkatkan jumlah partisi.
Mode langganan
ApsaraMQ for Kafka mendukung mode langganan berikut:
Satu grup berlangganan ke beberapa topik
Satu grup dapat berlangganan ke beberapa topik. Dalam mode langganan ini, pesan dari beberapa topik dikonsumsi secara merata oleh konsumen dalam grup. Sebagai contoh, Grup A berlangganan ke Topik A, Topik B, dan Topik C. Pesan dari ketiga topik dikonsumsi secara merata oleh konsumen dalam Grup A.
Contoh kode:
String topicStr = kafkaProperties.getProperty("topic"); String[] topics = topicStr.split(","); for (String topic: topics) { subscribedTopics.add(topic.trim()); } consumer.subscribe(subscribedTopics);Beberapa grup berlangganan ke satu topik
Beberapa grup dapat berlangganan ke topik yang sama. Dalam mode langganan ini, setiap grup secara terpisah mengonsumsi semua pesan dari topik tersebut. Sebagai contoh, Grup A dan Grup B berlangganan ke Topik A. Setiap pesan yang diterima oleh Topik A dikirimkan ke konsumen dalam Grup A dan Grup B. Proses pengiriman tersebut independen satu sama lain tanpa saling memengaruhi.
Satu grup untuk satu aplikasi
Kami merekomendasikan Anda mengonfigurasi satu grup untuk satu aplikasi. Ini berarti bahwa kode yang digunakan oleh setiap aplikasi untuk mengonsumsi pesan bervariasi. Jika Anda ingin menulis potongan kode yang berbeda dalam aplikasi yang sama, Anda harus menyiapkan file kafka.properties yang berbeda, seperti kafka1.properties dan kafka2.properties.
Offset konsumen
Di ApsaraMQ for Kafka, setiap topik memiliki beberapa partisi. Setiap partisi menghitung jumlah total pesan, yang dikenal sebagai offset maksimum.
Konsumen ApsaraMQ for Kafka mengonsumsi pesan dalam partisi secara berurutan dan mencatat jumlah pesan yang dikonsumsi, yang dikenal sebagai offset konsumen.
Jumlah pesan yang belum dikonsumsi dihitung dengan mengurangkan offset konsumen dari offset maksimum. Angka ini menunjukkan jumlah pesan yang terakumulasi.
Komit offset konsumen
ApsaraMQ for Kafka menyediakan parameter berikut untuk konsumen untuk mengomitm offset konsumen:
enable.auto.commit: menentukan apakah akan mengaktifkan komit otomatis. Nilai default: true.
auto.commit.interval.ms: interval di mana offset konsumen dikomit secara otomatis. Nilai default: 1000. Unit: milidetik.
Setelah Anda mengonfigurasi parameter sebelumnya, klien memeriksa waktu offset konsumen terakhir dikomit sebelum setiap permintaan poll. Jika interval antara waktu offset terakhir dikomit dan waktu saat ini melebihi interval yang ditentukan oleh parameter auto.commit.interval.ms, klien mengomit offset konsumen.
Jika Anda mengatur parameter enable.auto.commit ke true, Anda harus memastikan bahwa semua data yang terakhir di-poll telah dikonsumsi sebelum setiap poll. Jika tidak, pesan yang belum dikonsumsi mungkin dilewati.
Untuk mengomit offset konsumen secara manual, atur parameter enable.auto.commit ke false dan panggil fungsi commit(offsets).
Reset offset konsumen
Offset konsumen direset dalam skenario berikut:
Tidak ada offset yang dikomit ke broker. Sebagai contoh, konsumen pertama kali terhubung ke broker.
Pesan ditarik dari offset yang tidak valid. Sebagai contoh, offset maksimum dalam partisi adalah 10, tetapi konsumen mulai mengonsumsi dari Offset 11.
Pada klien Java, Anda dapat mengatur parameter auto.offset.reset ke salah satu nilai berikut untuk menentukan cara mereset offset konsumen:
latest: mereset offset konsumen ke offset maksimum.
earliest: mereset offset konsumen ke offset minimum.
none: tidak mereset offset konsumen.
Kami merekomendasikan Anda mengatur parameter ini ke latest alih-alih earliest. Ini mencegah konsumen mengonsumsi pesan dari awal karena offset yang tidak valid.
Jika Anda mengomit offset dengan memanggil fungsi commit(offsets), Anda dapat mengatur parameter ini ke none.
Tarik pesan besar
Selama konsumsi pesan, klien secara proaktif menarik pesan dari broker. Jika Anda ingin menarik pesan besar, Anda dapat mengontrol laju tarik dengan mengonfigurasi parameter berikut:
max.poll.records: jumlah maksimum pesan yang dikembalikan untuk satu pemanggilan metode poll. Jika setiap pesan melebihi 1 MB dalam ukuran, kami merekomendasikan Anda mengatur parameter ini ke 1.
fetch.max.bytes: jumlah maksimum data yang dikembalikan untuk permintaan fetch. Atur parameter ini ke nilai yang sedikit lebih besar dari ukuran pesan tunggal.
max.partition.fetch.bytes: jumlah maksimum data yang dikembalikan untuk partisi untuk permintaan fetch. Atur parameter ini ke nilai yang sedikit lebih besar dari ukuran pesan tunggal.
Klien menarik pesan besar satu per satu.
Duplikasi pesan dan idempotensi konsumsi
Semantik pengiriman di ApsaraMQ for Kafka adalah setidaknya sekali. Ini berarti bahwa pesan dikirim setidaknya sekali untuk memastikan bahwa pesan tidak hilang. Namun, ini tidak menjamin bahwa pesan tidak diduplikasi. Saat terjadi kesalahan jaringan atau klien di-restart, sejumlah kecil pesan mungkin dikirim ulang. Jika konsumen sensitif terhadap duplikasi pesan, seperti dalam skenario transaksi online, idempotensi konsumsi harus diimplementasikan.
Jika aplikasi Anda adalah aplikasi database, Anda dapat melakukan operasi berikut untuk menerapkan pemeriksaan idempotensi:
Saat Anda mengirim pesan, tentukan kunci sebagai ID transaksi unik.
Saat Anda mengonsumsi pesan, periksa apakah kunci telah dikonsumsi. Jika kunci telah dikonsumsi, lewati pesan. Jika kunci belum dikonsumsi, konsumsi pesan sekali.
Jika aplikasi Anda tidak sensitif terhadap duplikasi beberapa pesan, pemeriksaan idempotensi tidak diperlukan.
Kegagalan konsumsi
Pesan di ApsaraMQ for Kafka dikonsumsi satu per satu dalam partisi. Jika konsumen gagal mengeksekusi logika konsumsi setelah konsumen menerima pesan, Anda dapat menggunakan salah satu metode berikut untuk pemecahan masalah. Contoh kegagalan tersebut adalah pesan gagal diproses karena data kotor di server aplikasi.
Tetap mencoba mengeksekusi logika konsumsi saat gagal. Metode ini dapat memblokir thread konsumsi pada pesan saat ini dan menyebabkan akumulasi pesan.
ApsaraMQ for Kafka tidak menentukan logika untuk memproses pesan yang gagal. Anda dapat mengekspor pesan yang gagal atau menyimpan pesan di layanan. Sebagai contoh, Anda dapat membuat topik yang didedikasikan untuk menyimpan pesan yang gagal. Kemudian, Anda dapat secara teratur memeriksa pesan yang gagal, menganalisis penyebabnya, dan mengambil tindakan yang sesuai.
Latensi konsumsi
Di ApsaraMQ for Kafka, klien secara proaktif menarik pesan dari broker. Jika klien dapat segera mengonsumsi data, latensi rendah. Jika latensi tinggi, periksa apakah pesan terakumulasi dan kemudian tingkatkan laju konsumsi berdasarkan persyaratan bisnis Anda.
Blok konsumsi dan akumulasi pesan
Akumulasi pesan adalah masalah paling umum pada klien konsumen. Masalah ini mungkin disebabkan oleh alasan berikut:
Laju konsumsi lebih rendah daripada laju produksi. Dalam hal ini, Anda harus meningkatkan laju konsumsi. Untuk informasi lebih lanjut, lihat Tingkatkan Laju Konsumsi.
Thread konsumen diblokir.
Setelah konsumen menerima pesan, konsumen memulai panggilan jarak jauh untuk mengeksekusi logika konsumsi. Jika konsumen menunggu hasil panggilan selama proses ini, konsumen mungkin terus menunggu. Ini menyebabkan proses konsumsi terhenti.
Konsumen harus berusaha mencegah thread konsumsi diblokir. Jika konsumen perlu menunggu hasil panggilan, kami merekomendasikan Anda menentukan periode timeout untuk menunggu. Dengan cara ini, jika tidak ada hasil yang dikembalikan setelah periode timeout berakhir, konsumsi dianggap gagal dan pesan berikutnya dapat terus dikonsumsi.
Tingkatkan laju konsumsi
Anda dapat menggunakan salah satu metode berikut untuk meningkatkan laju konsumsi:
Tambah konsumen
Anda dapat menambah konsumen dalam proses dan memastikan bahwa setiap konsumen sesuai dengan satu thread. Alternatifnya, Anda dapat menerapkan beberapa proses konsumen. Jika jumlah konsumen melebihi jumlah partisi, laju konsumsi tidak dapat ditingkatkan dan konsumen tertentu menjadi idle.
Tambah thread konsumsi
Menambah konsumen sama dengan menambah thread konsumsi untuk meningkatkan laju konsumsi. Oleh karena itu, metode penting untuk meningkatkan kinerja adalah menambah thread konsumsi. Anda dapat melakukan langkah-langkah berikut untuk menambah thread konsumsi:
Tentukan pool thread.
Poll data.
Kirim data ke pool thread untuk pemrosesan bersamaan.
Setelah hasil pemrosesan bersamaan dikembalikan, poll data lagi.
Filter pesan
ApsaraMQ for Kafka tidak menyediakan semantik untuk penyaringan pesan. Anda dapat menggunakan salah satu metode berikut untuk menyaring pesan:
Jika Anda ingin menyaring hanya beberapa jenis pesan, Anda dapat menggunakan beberapa topik.
Jika Anda ingin menyaring banyak jenis pesan, kami merekomendasikan Anda menyaring pesan berdasarkan bisnis di klien.
Anda dapat menggunakan salah satu metode atau mengintegrasikan kedua metode berdasarkan persyaratan bisnis Anda.
Pesan siaran
ApsaraMQ for Kafka tidak menyediakan semantik untuk penyiaran pesan. Anda dapat mensimulasikan penyiaran pesan dengan membuat grup yang berbeda.
Langganan
Untuk memudahkan pemecahan masalah, kami merekomendasikan konsumen dalam grup yang sama untuk berlangganan ke topik yang sama.