Topik ini menjelaskan cara klien ApsaraMQ for RocketMQ mengonsumsi pesan, alasan terjadinya akumulasi pesan, serta langkah-langkah untuk menangani akumulasi pesan dan penundaan saat menggunakan SDK Java untuk mengirim dan menerima pesan melalui TCP. Ini membantu Anda merencanakan sumber daya dan mengonfigurasi pengaturan sebelum penyebaran bisnis, serta menyesuaikan logika bisnis selama operasi & pemeliharaan (O&M) untuk mencegah masalah yang disebabkan oleh akumulasi pesan dan penundaan.
Informasi latar belakang
Jika laju konsumsi pesan pada klien tidak dapat mengimbangi laju produksi pesan pada broker selama pemrosesan pesan, maka akan terjadi akumulasi pesan yang menyebabkan penundaan dalam konsumsi pesan. Kami menyarankan Anda memperhatikan akumulasi pesan dan penundaan dalam skenario berikut:
Pesan terus-menerus terakumulasi karena kemampuan konsumsi sistem hilir tidak sesuai dengan kemampuan produksi sistem hulu. Selain itu, konsumsi pesan tidak dapat pulih secara otomatis.
Sistem bisnis memiliki persyaratan tinggi terhadap konsumsi pesan real-time sehingga bahkan penundaan yang disebabkan oleh akumulasi pesan sementara tidak dapat diterima.
Bagaimana cara klien mengonsumsi pesan?
Gambar berikut menunjukkan cara klien ApsaraMQ for RocketMQ mengonsumsi pesan melalui TCP.
Menggunakan klien untuk mengonsumsi pesan dalam mode push mencakup dua fase berikut:
Fase 1: Tarik pesan. Klien menarik pesan dari broker ApsaraMQ for RocketMQ dalam batch menggunakan polling panjang. Pesan yang ditarik kemudian disimpan dalam antrian buffer lokal.
Ketika pesan ditarik dalam batch, throughput tinggi dapat dicapai di jaringan internal tipikal. Sebagai contoh, transaksi per detik (TPS) server spesifikasi rendah (4 vCPU dan 8 GB memori) dengan satu thread dan satu partisi bisa mencapai puluhan ribu. Untuk server dengan beberapa partisi, TPS bisa mencapai ratusan ribu. Oleh karena itu, hambatan dalam akumulasi pesan tidak berasal dari fase ini.
Fase 2: Kirim pesan ke thread konsumsi. Klien mengirimkan pesan yang disimpan secara lokal ke thread konsumsi, lalu thread tersebut menggunakan logika konsumsi pesan untuk memproses pesan.
Kemampuan konsumsi klien bergantung pada kompleksitas (durasi konsumsi) logika bisnis dan konkurensi konsumsi. Jika logika bisnis kompleks dan banyak waktu diperlukan untuk memproses satu pesan, throughput keseluruhan tidak dapat tinggi. Ketika jumlah pesan yang disimpan dalam antrian buffer lokal mencapai batas atas, klien akan berhenti menarik pesan dari broker.
Mekanisme konsumsi sisi klien yang disebutkan di atas menunjukkan bahwa hambatan dalam akumulasi pesan disebabkan oleh kemampuan konsumsi klien lokal, yaitu durasi konsumsi dan konkurensi konsumsi. Untuk mencegah dan menyelesaikan masalah yang disebabkan oleh akumulasi pesan, Anda harus mengonfigurasi durasi konsumsi dan konkurensi dengan benar. Perhatikan bahwa durasi konsumsi lebih diprioritaskan daripada konkurensi konsumsi.
Durasi konsumsi
Faktor-faktor yang memengaruhi durasi konsumsi mencakup CPU, komputasi dalam memori, dan operasi I/O eksternal. Dalam kebanyakan kasus, rekursi dan loop tidak didefinisikan dalam kode. Oleh karena itu, waktu komputasi internal hampir dapat diabaikan dibandingkan dengan operasi I/O eksternal. Operasi I/O eksternal biasanya mencakup logika bisnis berikut:
Baca dan tulis data pada database eksternal, seperti database MySQL.
Baca dan tulis data pada sistem cache eksternal, seperti Redis.
Panggilan sistem hilir, seperti panggilan Dubbo dan panggilan antarmuka HTTP hilir.
Anda harus menyusun logika dan kapasitas sistem dari panggilan eksternal semacam itu untuk memahami waktu yang diharapkan dikonsumsi oleh setiap panggilan. Ini memungkinkan Anda menentukan apakah waktu yang dikonsumsi oleh operasi I/O dalam logika konsumsi masuk akal. Dalam kebanyakan kasus, pesan terakumulasi karena durasi konsumsi meningkat akibat pengecualian layanan atau batas kapasitas di sistem hilir.
Sebagai contoh, Anda menentukan logika konsumsi pesan untuk menulis data ke database, dan durasi konsumsi setiap pesan adalah 1 milidetik. Saat volume pesan kecil, tidak ada pengecualian yang terjadi. Namun, selama promosi penjualan, jumlah pesan yang ditulis ke database per detik meningkat secara signifikan, dan batas kapasitas database segera tercapai. Akibatnya, durasi konsumsi setiap pesan meningkat menjadi 100 milidetik, menyebabkan penurunan tajam dalam laju konsumsi. Anda tidak dapat menyelesaikan masalah ini hanya dengan mengubah konkurensi konsumsi klien ApsaraMQ for RocketMQ. Sebaliknya, Anda harus meningkatkan kapasitas database.
Konkurensi konsumsi
Tabel berikut menjelaskan metode untuk menghitung konkurensi konsumsi di ApsaraMQ for RocketMQ.
Tipe pesan | Konkurensi konsumsi |
Pesan normal | Jumlah thread per node × Jumlah node |
Pesan terjadwal dan tertunda | |
Pesan transaksional | |
Pesan terurut | Min (Jumlah thread per node × Jumlah node, Jumlah partisi) |
Konkurensi konsumsi klien ditentukan oleh jumlah thread per node dan jumlah node. Dalam kebanyakan kasus, Anda harus terlebih dahulu menyesuaikan jumlah thread pada satu node. Jika sumber daya perangkat keras pada node telah mencapai batas atas, Anda harus menambahkan node untuk meningkatkan konkurensi konsumsi.
Konkurensi konsumsi pesan terurut juga dibatasi oleh jumlah partisi dalam sebuah topik. Untuk mengevaluasi jumlah partisi, hubungi Layanan Pelanggan Alibaba Cloud.
Saat menetapkan konkurensi konsumsi pada satu node, jumlah thread yang terlalu besar dapat menyebabkan overhead besar dalam pergantian thread. Gunakan model berikut untuk menghitung jumlah thread optimal pada satu node dalam lingkungan ideal:
Jumlah vCPU dari satu node adalah C.
Waktu yang dikonsumsi untuk pergantian thread diabaikan, dan operasi I/O tidak mengonsumsi sumber daya CPU.
Thread memiliki cukup pesan yang menunggu untuk diproses, dan memori mencukupi.
Dalam logika, waktu CPU adalah T1, dan waktu operasi I/O eksternal adalah T2.
Sebuah thread tunggal dapat mencapai TPS sebesar 1/(T1 + T2). Jika pemanfaatan CPU mencapai nilai yang diinginkan 100%, Anda harus menetapkan C × (T1 + T2)/T1 thread untuk membuat satu node mencapai kemampuan konsumsi maksimumnya.
Jumlah maksimum thread dalam contoh ini hanya merupakan data teoretis yang diperoleh di bawah lingkungan ideal. Dalam lingkungan aplikasi aktual, kami menyarankan Anda secara bertahap meningkatkan jumlah thread, mengamati efeknya, lalu melakukan penyesuaian.
Bagaimana cara mencegah akumulasi pesan dan penundaan?
Untuk mencegah akumulasi pesan dan penundaan yang tidak terduga, periksa dan susun logika bisnis pada tahap awal desain. Susun baseline kinerja untuk operasi bisnis normal, sehingga Anda dapat dengan cepat mengidentifikasi masalah ketika terjadi kegagalan. Tugas utamanya adalah menyusun durasi konsumsi dan konkurensi.
Susun Durasi Konsumsi
Peroleh durasi konsumsi dengan melakukan uji stres dan menganalisis logika kode operasi yang memakan waktu. Untuk informasi lebih lanjut tentang cara memeriksa durasi konsumsi, lihat Periksa Durasi Konsumsi. Perhatikan hal-hal berikut saat menyusun durasi konsumsi:
Periksa apakah kompleksitas komputasi logika konsumsi pesan terlalu tinggi dan apakah kode memiliki cacat, seperti loop tak terbatas dan rekursi.
Periksa apakah operasi I/O dalam logika konsumsi pesan, seperti panggilan eksternal dan penyimpanan baca/tulis, diperlukan. Selain itu, periksa apakah solusi, seperti cache lokal, dapat digunakan untuk menghindari operasi ini.
Periksa apakah operasi yang kompleks dan memakan waktu dalam logika konsumsi pesan dapat diproses secara asinkron. Jika demikian, periksa apakah mereka akan menghasilkan logika yang membingungkan. Misalnya, konsumsi selesai, tetapi operasi asinkron belum.
Tetapkan Konkurensi Konsumsi
Secara bertahap tingkatkan jumlah thread pada satu node. Kemudian, amati metrik node untuk mendapatkan jumlah thread konsumsi optimal dan throughput maksimum pada node.
Setelah mendapatkan jumlah thread optimal dan throughput pada satu node, hitung jumlah node yang diperlukan. Anda dapat melakukannya berdasarkan lalu lintas puncak sistem hulu dan hilir, menggunakan rumus berikut: Jumlah node = Lalu lintas puncak/Throughput satu thread.
Bagaimana cara menangani akumulasi pesan dan penundaan?
Konfigurasikan Peringatan Akumulasi Pesan.
Tetapkan aturan peringatan dengan menggunakan fitur pemantauan dan peringatan yang disediakan oleh ApsaraMQ for RocketMQ untuk memantau dan menangani akumulasi pesan. Untuk informasi lebih lanjut tentang cara menetapkan aturan peringatan, lihat Konfigurasikan Peringatan Akumulasi Pesan.
Tangani Pesan Terakumulasi.
Untuk informasi tentang cara menangani pesan terakumulasi, lihat Bagaimana Cara Menangani Pesan Terakumulasi?