全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Percobaan pengiriman pesan

更新时间:Jul 02, 2025

ApsaraMQ for RocketMQ menyediakan fitur percobaan konsumsi. Jika konsumen gagal mengonsumsi pesan, ApsaraMQ for RocketMQ akan mengirim ulang pesan tersebut ke konsumen berdasarkan kebijakan percobaan konsumsi. Topik ini menjelaskan kebijakan percobaan konsumsi untuk pesan yang dikirim melalui HTTP dan TCP di ApsaraMQ for RocketMQ.

Catatan penggunaan

  • ID pesan tetap tidak berubah terlepas dari jumlah percobaan.

  • ApsaraMQ for RocketMQ hanya mengirim ulang pesan dalam mode konsumsi klustering. Jika konsumen gagal mengonsumsi pesan dalam mode konsumsi siaran, ApsaraMQ for RocketMQ tidak akan mengirim ulang pesan kepada konsumen. Dalam hal ini, konsumen akan terus mengonsumsi pesan baru.

Ikhtisar

Jika terjadi kegagalan konsumsi atau waktu habis saat mengonsumsi pesan, ApsaraMQ for RocketMQ mengirim ulang pesan tersebut ke konsumen setelah interval percobaan yang ditentukan telah berlalu. Jika konsumsi pesan masih gagal setelah jumlah maksimum percobaan tercapai, ApsaraMQ for RocketMQ mengirimkan pesan ke antrian pesan gagal. Untuk informasi lebih lanjut tentang antrian pesan gagal, lihat Antrian Pesan Gagal. Anda dapat mengonsumsi pesan dalam antrian pesan gagal untuk memulihkan bisnis Anda.

Berikut adalah item-item utama yang menggambarkan perilaku utama untuk percobaan konsumsi:

  • Interval Percobaan: Interval dari titik waktu ketika kegagalan konsumsi atau waktu habis terjadi hingga titik waktu ketika konsumsi pesan berikutnya dimulai.

  • Jumlah Maksimum Percobaan: Jumlah maksimum kali ApsaraMQ for RocketMQ dapat mengirim ulang pesan ke konsumen setelah pesan gagal dikonsumsi.

Jumlah maksimum percobaan untuk pesan yang dikirim melalui TCP berbeda dengan jumlah maksimum percobaan untuk pesan yang dikirim melalui HTTP. Untuk informasi lebih lanjut, lihat Kebijakan Percobaan untuk Pesan yang Dikirim melalui TCP dan Kebijakan Percobaan untuk Pesan yang Dikirim melalui HTTP.

Kebijakan percobaan untuk pesan yang dikirim melalui TCP

Status percobaan

Gambar berikut menunjukkan bagaimana status pesan berubah ketika konsumen mengonsumsi pesan tersebut.

The status of a message that is sent over TCP

  • Siap

    Pesan siap untuk dikonsumsi di broker ApsaraMQ for RocketMQ.

  • Inflight

    Pesan diperoleh dan sedang dikonsumsi oleh konsumen, tetapi hasil konsumsi belum dikembalikan.

  • MenungguPercobaan

    Jika pesan gagal dikonsumsi atau waktu habis saat mengonsumsi pesan, logika percobaan konsumsi dipicu. Jika jumlah maksimum percobaan belum tercapai, status pesan berubah menjadi Siap setelah interval percobaan berlalu. Pesan gagal yang berada dalam status Siap dapat dikonsumsi kembali. Anda dapat meningkatkan interval antara percobaan untuk mencegah percobaan berulang yang terlalu sering.

  • Commit

    Pesan telah dikonsumsi. Setelah konsumen mengembalikan respons sukses, konsumsi selesai.

  • DLQ

    Mekanisme yang digunakan untuk memastikan implementasi logika konsumsi. Jika fitur untuk menyimpan pesan gagal diaktifkan, pesan yang gagal dikonsumsi setelah jumlah maksimum percobaan tercapai dikirim ke topik pesan gagal. Anda dapat mengonsumsi pesan dalam topik pesan gagal untuk memulihkan bisnis Anda. Untuk informasi lebih lanjut, lihat Antrian Pesan Gagal.

消息间隔时间

Gambar di atas menunjukkan contoh proses percobaan. Dalam gambar tersebut, pesan tetap dalam status Siap selama 5 detik dan membutuhkan 6 detik untuk dikonsumsi.

Setiap kali pesan dicoba ulang, status pesan berubah dari Siap ke Sedang Diproses dan kemudian ke MenungguPercobaan. Interval percobaan mengacu pada interval dari titik waktu ketika kegagalan konsumsi atau waktu habis terjadi hingga titik waktu ketika konsumsi pesan berikutnya dimulai. Interval antara dua konsumsi berturut-turut mencakup interval percobaan, durasi konsumsi, dan periode waktu pesan tetap dalam status Siap. Contoh:

  • Pertama kali pesan dikirim untuk dikonsumsi, pesan masuk ke status Siap pada detik ke-0.

  • Pesan ditarik pada detik ke-5. Pada detik ke-6, terjadi kesalahan konsumsi, dan pesan yang menunjukkan kegagalan konsumsi dikembalikan oleh klien.

  • Pesan tidak dapat segera dicoba ulang karena interval percobaan 10 detik telah ditentukan.

  • Pada detik ke-21, pesan kembali masuk ke status Siap.

  • Lima detik kemudian, klien mulai mengonsumsi pesan lagi.

Interval konsumsi dihitung sebesar 21 detik menggunakan rumus berikut: Interval konsumsi = Durasi konsumsi + Interval percobaan + Durasi dalam status Siap = 6 + 10 + 5 = 21.

Interval percobaan dan jumlah percobaan

Protokol

Tipe pesan

Interval percobaan

Jumlah maksimum percobaan

TCP

Pesan terurut

Parameter suspendTimeMillis digunakan untuk menentukan interval percobaan untuk pesan terurut. Nilai valid: 10 hingga 30000. Unit: milidetik. Nilai default: 1000. Nilai default menunjukkan bahwa pesan terurut yang gagal dikirim ulang dengan interval 1 detik.

Parameter MaxReconsumeTimes digunakan untuk menentukan jumlah maksimum percobaan untuk pesan terurut. Tidak ada batas atas yang diberlakukan pada nilai parameter ini. Jika Anda membiarkan parameter ini kosong, jumlah maksimum percobaan adalah Integer.MAX.

Pesan tidak terurut

Interval percobaan untuk pesan tidak terurut bervariasi berdasarkan jumlah percobaan. Interval percobaan berkisar antara 10 detik hingga 2 jam. Anda tidak dapat menentukan interval percobaan kustom untuk pesan tidak terurut.

  • Jika jumlah maksimum percobaan untuk pesan tidak melebihi 16, pesan dikirim ulang pada interval yang telah ditentukan. Untuk informasi lebih lanjut, lihat Interval percobaan untuk pesan tidak terurut yang dikirim melalui TCP.

  • Jika jumlah maksimum percobaan untuk pesan melebihi 16, pesan dikirim ulang pada interval yang telah ditentukan untuk 16 percobaan pertama, dan pada interval 2 jam untuk percobaan berikutnya.

Parameter MaxReconsumeTimes digunakan untuk menentukan jumlah maksimum percobaan untuk pesan tidak terurut. Nilai default: 16. Tidak ada batas atas yang diberlakukan pada nilai parameter ini. Kami merekomendasikan agar Anda menggunakan nilai default.

Tabel 1. Interval Percobaan untuk Pesan Tidak Terurut yang Dikirim melalui TCP

Nomor

Interval

Nomor

Interval

1

10 detik

9

7 menit

2

30 detik

10

8 menit

3

1 menit

11

9 menit

4

2 menit

12

10 menit

5

3 menit

13

20 menit

6

4 menit

14

30 menit

7

5 menit

15

1 jam

8

6 menit

16

2 jam

Metode konfigurasi

Penting

Contoh-contoh berikut menggunakan pesan yang dikirim melalui TCP untuk menggambarkan metode konfigurasi kebijakan percobaan.

  • Aktifkan Fitur Percobaan Pesan

    Jika Anda ingin ApsaraMQ for RocketMQ mengirim ulang pesan yang gagal dikonsumsi dalam mode konsumsi klustering, gunakan salah satu metode berikut untuk mengimplementasikan metode MessageListener:

    • Metode 1: Kembalikan Action.ReconsumeLater. Kami merekomendasikan agar Anda menggunakan metode ini.

    • Metode 2: Kembalikan null.

    • Metode 3: Lempar pengecualian.

    Kode Contoh

    public class MessageListenerImpl implements MessageListener {
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            // Jika logika konsumsi melempar pengecualian, pesan akan dicoba ulang. 
            doConsumeMessage(message);
            // Metode 1: Kembalikan Action.ReconsumeLater dan coba ulang pesan. 
            return Action.ReconsumeLater;
            // Metode 2: Kembalikan null dan coba ulang pesan. 
            return null;
            // Metode 3: Lempar pengecualian dan coba ulang pesan. 
            throw new RuntimeException("Consumer Message exception");
        }
    }
  • Nonaktifkan Fitur Percobaan Pesan

    Jika Anda tidak ingin ApsaraMQ for RocketMQ mengirim ulang pesan yang gagal dikonsumsi dalam mode konsumsi klustering, konfigurasikan kode konsumsi pesan untuk menangkap semua pengecualian yang dilempar oleh logika konsumsi dan kembalikan Action.CommitMessage. Dengan cara ini, ApsaraMQ for RocketMQ tidak akan mengirim ulang pesan.

    Kode Contoh

    public class MessageListenerImpl implements MessageListener {
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                doConsumeMessage(message);
            } catch (Throwable e) {
                // Tangkap semua pengecualian yang dilempar oleh logika konsumsi dan kembalikan Action.CommitMessage.
                return Action.CommitMessage;
            }
            // Pesan diproses sesuai harapan dan Action.CommitMessage dikembalikan.
            return Action.CommitMessage;
        }
    }
  • Tentukan Interval Percobaan Kustom dan Jumlah Maksimum Percobaan

    Catatan

    Jika Anda ingin menentukan nilai kustom untuk konfigurasi log klien ApsaraMQ for RocketMQ Anda, perbarui SDK klien TCP Java Anda ke versi 1.2.2 atau lebih baru. Untuk informasi lebih lanjut, lihat Catatan Rilis.

    ApsaraMQ for RocketMQ memungkinkan Anda menentukan interval percobaan kustom dan jumlah maksimum percobaan saat Anda memulai konsumen. Anda tidak dapat menentukan interval percobaan kustom untuk pesan tidak terurut. Untuk informasi lebih lanjut tentang interval percobaan untuk pesan tidak terurut, lihat Interval Percobaan untuk Pesan Tidak Terurut yang Dikirim melalui TCP.

    Kode contoh berikut memberikan contoh cara menentukan interval kustom dan jumlah maksimum percobaan:

    Properties properties = new Properties();
    // Tetapkan jumlah maksimum percobaan yang diizinkan untuk pesan dalam grup konsumen tertentu menjadi 20. Nilainya adalah string. 
    properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
    // Tetapkan interval percobaan untuk pesan dalam grup konsumen tertentu menjadi 3.000 milidetik. Nilainya adalah string. 
    properties.put(PropertyKeyConst.SuspendTimeMillis,"3000");
    Consumer consumer = ONSFactory.createConsumer(properties);
    Penting

    Konfigurasi terbaru berlaku untuk konsumen dalam grup yang sama. Konfigurasi untuk konsumen yang dimulai pada titik waktu terbaru akan menimpa konfigurasi untuk konsumen yang dimulai pada titik waktu sebelumnya. Pastikan semua konsumen dalam grup konsumen menggunakan konfigurasi yang sama untuk interval percobaan dan jumlah maksimum percobaan. Jika semua konsumen dalam grup konsumen tidak menggunakan konfigurasi yang sama untuk interval percobaan dan jumlah maksimum percobaan, konfigurasi untuk konsumen akan saling menimpa.

  • Kueri Jumlah Percobaan

    Setelah konsumen menerima pesan, Anda dapat menggunakan metode berikut untuk menanyakan jumlah percobaan. Dalam kebanyakan kasus, Anda tidak perlu menanyakan interval percobaan.

    public class MessageListenerImpl implements MessageListener {
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            // Tanyakan jumlah percobaan. 
            System.out.println(message.getReconsumeTimes());
            return Action.CommitMessage;
        }
    }

Kebijakan percobaan untuk pesan yang dikirim melalui HTTP

Protokol

Tipe pesan

Interval percobaan

Jumlah maksimum percobaan

Konfigurasi

HTTP

Pesan terurut

1 menit

288

Metode percobaan pesan telah ditentukan sebelumnya oleh sistem dan tidak dapat diubah.

Pesan tidak terurut

5 menit

288

Konfigurasi telah ditentukan sebelumnya oleh sistem dan tidak dapat diubah.