ApsaraMQ for RocketMQ menyediakan fitur percobaan konsumsi. Jika konsumen gagal mengonsumsi pesan, ApsaraMQ for RocketMQ akan mengirim ulang pesan tersebut ke konsumen berdasarkan kebijakan percobaan konsumsi. Topik ini menjelaskan kebijakan percobaan konsumsi untuk pesan yang dikirim melalui HTTP dan TCP di ApsaraMQ for RocketMQ.
Catatan penggunaan
ID pesan tetap tidak berubah terlepas dari jumlah percobaan.
ApsaraMQ for RocketMQ hanya mengirim ulang pesan dalam mode konsumsi klustering. Jika konsumen gagal mengonsumsi pesan dalam mode konsumsi siaran, ApsaraMQ for RocketMQ tidak akan mengirim ulang pesan kepada konsumen. Dalam hal ini, konsumen akan terus mengonsumsi pesan baru.
Ikhtisar
Jika terjadi kegagalan konsumsi atau waktu habis saat mengonsumsi pesan, ApsaraMQ for RocketMQ mengirim ulang pesan tersebut ke konsumen setelah interval percobaan yang ditentukan telah berlalu. Jika konsumsi pesan masih gagal setelah jumlah maksimum percobaan tercapai, ApsaraMQ for RocketMQ mengirimkan pesan ke antrian pesan gagal. Untuk informasi lebih lanjut tentang antrian pesan gagal, lihat Antrian Pesan Gagal. Anda dapat mengonsumsi pesan dalam antrian pesan gagal untuk memulihkan bisnis Anda.
Berikut adalah item-item utama yang menggambarkan perilaku utama untuk percobaan konsumsi:
Interval Percobaan: Interval dari titik waktu ketika kegagalan konsumsi atau waktu habis terjadi hingga titik waktu ketika konsumsi pesan berikutnya dimulai.
Jumlah Maksimum Percobaan: Jumlah maksimum kali ApsaraMQ for RocketMQ dapat mengirim ulang pesan ke konsumen setelah pesan gagal dikonsumsi.
Jumlah maksimum percobaan untuk pesan yang dikirim melalui TCP berbeda dengan jumlah maksimum percobaan untuk pesan yang dikirim melalui HTTP. Untuk informasi lebih lanjut, lihat Kebijakan Percobaan untuk Pesan yang Dikirim melalui TCP dan Kebijakan Percobaan untuk Pesan yang Dikirim melalui HTTP.
Kebijakan percobaan untuk pesan yang dikirim melalui TCP
Status percobaan
Gambar berikut menunjukkan bagaimana status pesan berubah ketika konsumen mengonsumsi pesan tersebut.

Siap
Pesan siap untuk dikonsumsi di broker ApsaraMQ for RocketMQ.
Inflight
Pesan diperoleh dan sedang dikonsumsi oleh konsumen, tetapi hasil konsumsi belum dikembalikan.
MenungguPercobaan
Jika pesan gagal dikonsumsi atau waktu habis saat mengonsumsi pesan, logika percobaan konsumsi dipicu. Jika jumlah maksimum percobaan belum tercapai, status pesan berubah menjadi Siap setelah interval percobaan berlalu. Pesan gagal yang berada dalam status Siap dapat dikonsumsi kembali. Anda dapat meningkatkan interval antara percobaan untuk mencegah percobaan berulang yang terlalu sering.
Commit
Pesan telah dikonsumsi. Setelah konsumen mengembalikan respons sukses, konsumsi selesai.
DLQ
Mekanisme yang digunakan untuk memastikan implementasi logika konsumsi. Jika fitur untuk menyimpan pesan gagal diaktifkan, pesan yang gagal dikonsumsi setelah jumlah maksimum percobaan tercapai dikirim ke topik pesan gagal. Anda dapat mengonsumsi pesan dalam topik pesan gagal untuk memulihkan bisnis Anda. Untuk informasi lebih lanjut, lihat Antrian Pesan Gagal.

Gambar di atas menunjukkan contoh proses percobaan. Dalam gambar tersebut, pesan tetap dalam status Siap selama 5 detik dan membutuhkan 6 detik untuk dikonsumsi.
Setiap kali pesan dicoba ulang, status pesan berubah dari Siap ke Sedang Diproses dan kemudian ke MenungguPercobaan. Interval percobaan mengacu pada interval dari titik waktu ketika kegagalan konsumsi atau waktu habis terjadi hingga titik waktu ketika konsumsi pesan berikutnya dimulai. Interval antara dua konsumsi berturut-turut mencakup interval percobaan, durasi konsumsi, dan periode waktu pesan tetap dalam status Siap. Contoh:
Pertama kali pesan dikirim untuk dikonsumsi, pesan masuk ke status Siap pada detik ke-0.
Pesan ditarik pada detik ke-5. Pada detik ke-6, terjadi kesalahan konsumsi, dan pesan yang menunjukkan kegagalan konsumsi dikembalikan oleh klien.
Pesan tidak dapat segera dicoba ulang karena interval percobaan 10 detik telah ditentukan.
Pada detik ke-21, pesan kembali masuk ke status Siap.
Lima detik kemudian, klien mulai mengonsumsi pesan lagi.
Interval konsumsi dihitung sebesar 21 detik menggunakan rumus berikut: Interval konsumsi = Durasi konsumsi + Interval percobaan + Durasi dalam status Siap = 6 + 10 + 5 = 21.
Interval percobaan dan jumlah percobaan
Protokol | Tipe pesan | Interval percobaan | Jumlah maksimum percobaan |
TCP | Pesan terurut | Parameter suspendTimeMillis digunakan untuk menentukan interval percobaan untuk pesan terurut. Nilai valid: 10 hingga 30000. Unit: milidetik. Nilai default: 1000. Nilai default menunjukkan bahwa pesan terurut yang gagal dikirim ulang dengan interval 1 detik. | Parameter MaxReconsumeTimes digunakan untuk menentukan jumlah maksimum percobaan untuk pesan terurut. Tidak ada batas atas yang diberlakukan pada nilai parameter ini. Jika Anda membiarkan parameter ini kosong, jumlah maksimum percobaan adalah Integer.MAX. |
Pesan tidak terurut | Interval percobaan untuk pesan tidak terurut bervariasi berdasarkan jumlah percobaan. Interval percobaan berkisar antara 10 detik hingga 2 jam. Anda tidak dapat menentukan interval percobaan kustom untuk pesan tidak terurut.
| Parameter MaxReconsumeTimes digunakan untuk menentukan jumlah maksimum percobaan untuk pesan tidak terurut. Nilai default: 16. Tidak ada batas atas yang diberlakukan pada nilai parameter ini. Kami merekomendasikan agar Anda menggunakan nilai default. |
Tabel 1. Interval Percobaan untuk Pesan Tidak Terurut yang Dikirim melalui TCP
Nomor | Interval | Nomor | Interval |
1 | 10 detik | 9 | 7 menit |
2 | 30 detik | 10 | 8 menit |
3 | 1 menit | 11 | 9 menit |
4 | 2 menit | 12 | 10 menit |
5 | 3 menit | 13 | 20 menit |
6 | 4 menit | 14 | 30 menit |
7 | 5 menit | 15 | 1 jam |
8 | 6 menit | 16 | 2 jam |
Metode konfigurasi
Contoh-contoh berikut menggunakan pesan yang dikirim melalui TCP untuk menggambarkan metode konfigurasi kebijakan percobaan.
Aktifkan Fitur Percobaan Pesan
Jika Anda ingin ApsaraMQ for RocketMQ mengirim ulang pesan yang gagal dikonsumsi dalam mode konsumsi klustering, gunakan salah satu metode berikut untuk mengimplementasikan metode MessageListener:
Metode 1: Kembalikan Action.ReconsumeLater. Kami merekomendasikan agar Anda menggunakan metode ini.
Metode 2: Kembalikan null.
Metode 3: Lempar pengecualian.
Kode Contoh
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { // Jika logika konsumsi melempar pengecualian, pesan akan dicoba ulang. doConsumeMessage(message); // Metode 1: Kembalikan Action.ReconsumeLater dan coba ulang pesan. return Action.ReconsumeLater; // Metode 2: Kembalikan null dan coba ulang pesan. return null; // Metode 3: Lempar pengecualian dan coba ulang pesan. throw new RuntimeException("Consumer Message exception"); } }Nonaktifkan Fitur Percobaan Pesan
Jika Anda tidak ingin ApsaraMQ for RocketMQ mengirim ulang pesan yang gagal dikonsumsi dalam mode konsumsi klustering, konfigurasikan kode konsumsi pesan untuk menangkap semua pengecualian yang dilempar oleh logika konsumsi dan kembalikan Action.CommitMessage. Dengan cara ini, ApsaraMQ for RocketMQ tidak akan mengirim ulang pesan.
Kode Contoh
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { // Tangkap semua pengecualian yang dilempar oleh logika konsumsi dan kembalikan Action.CommitMessage. return Action.CommitMessage; } // Pesan diproses sesuai harapan dan Action.CommitMessage dikembalikan. return Action.CommitMessage; } }Tentukan Interval Percobaan Kustom dan Jumlah Maksimum Percobaan
CatatanJika Anda ingin menentukan nilai kustom untuk konfigurasi log klien ApsaraMQ for RocketMQ Anda, perbarui SDK klien TCP Java Anda ke versi 1.2.2 atau lebih baru. Untuk informasi lebih lanjut, lihat Catatan Rilis.
ApsaraMQ for RocketMQ memungkinkan Anda menentukan interval percobaan kustom dan jumlah maksimum percobaan saat Anda memulai konsumen. Anda tidak dapat menentukan interval percobaan kustom untuk pesan tidak terurut. Untuk informasi lebih lanjut tentang interval percobaan untuk pesan tidak terurut, lihat Interval Percobaan untuk Pesan Tidak Terurut yang Dikirim melalui TCP.
Kode contoh berikut memberikan contoh cara menentukan interval kustom dan jumlah maksimum percobaan:
Properties properties = new Properties(); // Tetapkan jumlah maksimum percobaan yang diizinkan untuk pesan dalam grup konsumen tertentu menjadi 20. Nilainya adalah string. properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); // Tetapkan interval percobaan untuk pesan dalam grup konsumen tertentu menjadi 3.000 milidetik. Nilainya adalah string. properties.put(PropertyKeyConst.SuspendTimeMillis,"3000"); Consumer consumer = ONSFactory.createConsumer(properties);PentingKonfigurasi terbaru berlaku untuk konsumen dalam grup yang sama. Konfigurasi untuk konsumen yang dimulai pada titik waktu terbaru akan menimpa konfigurasi untuk konsumen yang dimulai pada titik waktu sebelumnya. Pastikan semua konsumen dalam grup konsumen menggunakan konfigurasi yang sama untuk interval percobaan dan jumlah maksimum percobaan. Jika semua konsumen dalam grup konsumen tidak menggunakan konfigurasi yang sama untuk interval percobaan dan jumlah maksimum percobaan, konfigurasi untuk konsumen akan saling menimpa.
Kueri Jumlah Percobaan
Setelah konsumen menerima pesan, Anda dapat menggunakan metode berikut untuk menanyakan jumlah percobaan. Dalam kebanyakan kasus, Anda tidak perlu menanyakan interval percobaan.
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { // Tanyakan jumlah percobaan. System.out.println(message.getReconsumeTimes()); return Action.CommitMessage; } }
Kebijakan percobaan untuk pesan yang dikirim melalui HTTP
Protokol | Tipe pesan | Interval percobaan | Jumlah maksimum percobaan | Konfigurasi |
HTTP | Pesan terurut | 1 menit | 288 | Metode percobaan pesan telah ditentukan sebelumnya oleh sistem dan tidak dapat diubah. |
Pesan tidak terurut | 5 menit | 288 | Konfigurasi telah ditentukan sebelumnya oleh sistem dan tidak dapat diubah. |