ApsaraMQ for RocketMQ mendukung dua jenis konsumen: Push consumer dan Simple consumer. Topik ini menjelaskan penggunaan, prinsip implementasi, keandalan, mekanisme pengulangan (retry), serta skenario yang sesuai untuk masing-masing jenis.
Informasi latar belakang
ApsaraMQ for RocketMQ menyediakan berbagai jenis konsumen untuk skenario bisnis yang berbeda. Setiap jenis konsumen memiliki metode integrasi dan kontrol yang berbeda. Menjawab pertanyaan-pertanyaan berikut dapat membantu Anda memilih jenis konsumen yang paling sesuai dengan skenario bisnis Anda.
Konsumsi konkuren: Bagaimana cara konsumen menggunakan mekanisme multi-threaded untuk memproses pesan dan meningkatkan efisiensi pemrosesan?
Pemrosesan pesan sinkron dan asinkron: Untuk skenario integrasi yang berbeda, konsumen mungkin perlu mendistribusikan pesan yang diterima secara asinkron ke logika bisnis untuk diproses. Bagaimana cara mengimplementasikan pemrosesan pesan asinkron?
Pemrosesan pesan yang andal: Bagaimana cara konsumen mengembalikan tanggapan setelah memproses pesan? Bagaimana penanganan retry dilakukan jika terjadi pengecualian untuk memastikan pemrosesan pesan yang andal?
Untuk jawaban atas pertanyaan-pertanyaan tersebut, lihat bagian Push consumer dan Simple consumer.
Tinjauan fitur

Seperti yang ditunjukkan pada gambar di atas, ketika sebuah konsumen di ApsaraMQ for RocketMQ memproses sebuah pesan, pesan tersebut melewati tahapan-tahapan berikut: menerima pesan, memproses pesan, dan mengirimkan status konsumsi.
Untuk tahapan-tahapan tersebut, ApsaraMQ for RocketMQ menyediakan dua jenis konsumen: Push dan Simple. Kedua jenis konsumen ini menggunakan metode dan antarmuka implementasi yang berbeda untuk memenuhi kebutuhan konsumsi Anda dalam berbagai skenario bisnis. Tabel berikut menjelaskan perbedaan keduanya.
Jika skenario bisnis Anda berubah, atau jika jenis konsumen yang sedang Anda gunakan tidak lagi sesuai dengan bisnis Anda, Anda dapat mengganti jenis konsumen tersebut. Mengganti jenis konsumen tidak akan memengaruhi sumber daya ApsaraMQ for RocketMQ atau pemrosesan bisnis yang sudah ada.
Fitur | Push consumer | Simple consumer |
Metode antarmuka | Mengembalikan hasil konsumsi melalui callback listener. Logika konsumsi harus ditangani di dalam listener. | Aplikasi Anda mengimplementasikan pemrosesan pesan dan memanggil antarmuka untuk mengembalikan hasil konsumsi. |
Manajemen konkurensi | SDK mengelola konkurensi. | Logika aplikasi Anda mengelola thread konsumsi. |
Fleksibilitas | Sangat terenkapsulasi dan kurang fleksibel. | Bersifat atomik dan sangat dapat dikustomisasi. |
Skenario | Ideal untuk skenario pengembangan yang tidak memerlukan alur kerja khusus. | Ideal untuk skenario pengembangan yang memerlukan alur kerja bisnis yang sangat dikustomisasi. |
Kelas SDK yang sesuai | PushConsumer, LitePushConsumer | SimpleConsumer |
Konsumen tipe Push
Push consumer adalah konsumen yang sangat terenkapsulasi. Anda hanya perlu memproses pesan dan mengembalikan hasil konsumsi melalui listener pesan. Kit pengembangan perangkat lunak (SDK) klien ApsaraMQ for RocketMQ menangani pengambilan pesan, pengiriman status konsumsi, dan retry konsumsi.
Penggunaan
Menggunakan Push consumer cukup mudah. Anda mendaftarkan listener pesan selama inisialisasi konsumen dan mengimplementasikan logika pemrosesan pesan di dalam listener tersebut. SDK ApsaraMQ for RocketMQ menangani pengambilan pesan, pemanggilan listener, dan pemrosesan ulang di latar belakang. Untuk contoh kode, lihat kelas SDK yang sesuai.
Listener pesan untuk Push consumer mengembalikan salah satu dari tiga hasil berikut:
Konsumsi berhasil: Misalnya, saat menggunakan SDK untuk Java,
ConsumeResult.SUCCESSdikembalikan. Ini menunjukkan bahwa pesan berhasil diproses. Server kemudian memperbarui progres konsumsi berdasarkan hasil ini.Konsumsi gagal: Misalnya, saat menggunakan SDK untuk Java,
ConsumeResult.FAILUREdikembalikan. Ini menunjukkan bahwa pesan gagal diproses. Sistem kemudian menentukan apakah akan melakukan retry konsumsi berdasarkan kebijakan retry.Kegagalan tak terduga: Jika terjadi pengecualian (exception), hasilnya dianggap sebagai kegagalan konsumsi. Sistem kemudian menentukan apakah akan melakukan retry konsumsi berdasarkan kebijakan retry.
Ketika Push consumer mengonsumsi pesan, jika terjadi pemblokiran tak terduga dalam logika pemrosesan sehingga pesan tidak dapat diproses, SDK akan mengalami timeout. SDK kemudian secara paksa mengirimkan hasil kegagalan konsumsi dan menangani pesan berdasarkan kebijakan retry. Untuk informasi lebih lanjut tentang timeout pesan, lihat Kebijakan retry PushConsumer.
Ketika terjadi timeout konsumsi, SDK mengirimkan hasil kegagalan konsumsi. Namun, thread konsumsi saat ini mungkin tidak merespons interupsi dan mungkin terus memproses pesan tersebut.
Mekanisme internal
Untuk Push consumer, pemrosesan pesan real-time didasarkan pada model thread Reactor khas dalam SDK. Seperti yang ditunjukkan pada gambar berikut, SDK memiliki thread long-polling bawaan. Thread ini menarik pesan secara asinkron ke dalam antrian cache internal SDK. Pesan-pesan tersebut kemudian dikirimkan ke thread konsumen, yang memicu listener untuk menjalankan logika konsumsi lokal.

Keandalan dan retry
Untuk Push consumer, satu-satunya batas antara SDK klien dan logika konsumsi adalah antarmuka listener pesan. SDK klien secara ketat menentukan apakah sebuah pesan berhasil dikonsumsi berdasarkan hasil yang dikembalikan oleh listener dan melakukan retry untuk memastikan keandalan. Semua pesan harus diproses secara sinkron. Hasil panggilan harus dikembalikan ketika antarmuka listener selesai. Distribusi asinkron tidak diperbolehkan. Untuk informasi lebih lanjut tentang retry pesan, lihat Kebijakan retry PushConsumer.
Ketika menggunakan PushConsumer untuk mengonsumsi pesan, jangan memproses pesan dengan cara-cara berikut. Jika tidak, ApsaraMQ for RocketMQ tidak dapat menjamin keandalan pesan.
Metode salah 1: Mengembalikan hasil sukses sebelum pemrosesan pesan selesai. Jika pemrosesan pesan kemudian gagal, server ApsaraMQ for RocketMQ tidak mengetahui kegagalan tersebut dan tidak akan melakukan retry konsumsi.
Metode salah 2: Mendistribusikan ulang pesan ke thread kustom lainnya di dalam listener pesan dan mengembalikan hasil konsumsi lebih awal. Jika pemrosesan pesan kemudian gagal, server ApsaraMQ for RocketMQ juga tidak mengetahui kegagalan tersebut dan tidak akan melakukan retry konsumsi.
Jaminan urutan
Berdasarkan definisi pesan terurut di ApsaraMQ for RocketMQ, jika kelompok konsumen diatur ke mode konsumsi terurut, Push consumer secara ketat mengikuti urutan pesan saat memanggil listener pesan. Urutan konsumsi dijamin tanpa perubahan apa pun pada logika bisnis.
Pemrosesan pesan terurut memerlukan pengiriman status secara sinkron. Jika logika bisnis Anda mengimplementasikan distribusi asinkron kustom, ApsaraMQ for RocketMQ tidak dapat menjamin urutan pesan.
Skenario
Push consumer secara ketat menerapkan pemrosesan pesan sinkron dan timeout pemrosesan untuk setiap pesan. Jenis ini cocok untuk skenario berikut:
Waktu pemrosesan pesan dapat diprediksi: Jika waktu pemrosesan tidak pasti dan pesan sering memakan waktu lebih lama dari yang diharapkan, jaminan keandalan Push consumer sering memicu retry. Hal ini dapat menyebabkan banyak pesan duplikat.
Tidak ada pemrosesan asinkron atau kustomisasi tingkat lanjut: Push consumer membatasi model thread untuk logika konsumsi. SDK klien secara internal memicu pemrosesan pesan dengan throughput maksimum. Model ini menyederhanakan pengembangan tetapi tidak memungkinkan pemrosesan asinkron atau alur kerja kustom.
Kelas SDK yang sesuai
ApsaraMQ for RocketMQ menyediakan dua kelas SDK untuk Push consumer: PushConsumer dan LitePushConsumer.
PushConsumer: Cocok untuk mengonsumsi pesan dari topik yang bukan tipe Lite.
LitePushConsumer: Khusus untuk mengonsumsi pesan dari topik tipe Lite. Memungkinkan kontrol konsumsi pada tingkat granularitas topik Lite.
PushConsumer
Contoh kode:
// Contoh konsumsi: Gunakan PushConsumer untuk mengonsumsi pesan normal.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// Setel kelompok konsumen.
.setConsumerGroup("Your ConsumerGroup")
// Setel titik akhir.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// Setel hubungan langganan yang telah ditentukan sebelumnya.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Setel listener pesan.
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// Konsumsi pesan dan kembalikan hasil pemrosesan.
return ConsumeResult.SUCCESS;
}
})
.build();LitePushConsumer
Contoh kode:
// Contoh konsumsi: Gunakan LitePushConsumer untuk mengonsumsi pesan normal.
ClientServiceProvider provider = ClientServiceProvider.loadService();
LitePushConsumer litePushConsumer = provider.newLitePushConsumerBuilder()
// Setel titik akhir.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// Setel topik.
.bindTopic("Your Topic")
// Setel kelompok konsumen.
.setConsumerGroup("Your ConsumerGroup")
// Setel listener pesan.
.setMessageListener(messageView -> {
// Konsumsi pesan dan kembalikan hasil pemrosesan.
return ConsumeResult.SUCCESS;
})
.build();
// Berlangganan ke topik Lite target.
litePushConsumer.subscribeLite("Your Lite Topic 1");
litePushConsumer.subscribeLite("Your Lite Topic 2");Simple consumer
Simple consumer adalah jenis konsumen yang mendukung operasi atomik untuk pemrosesan pesan. Logika bisnis Anda memanggil operasi untuk mengambil pesan, mengirimkan status konsumsi, dan melakukan retry konsumsi.
Penggunaan
Menggunakan Simple consumer melibatkan pemanggilan beberapa operasi API dari logika bisnis Anda. Anda memanggil operasi untuk mengambil pesan dan mendistribusikannya ke thread bisnis untuk diproses. Setelah pemrosesan selesai, Anda memanggil operasi commit untuk mengembalikan hasil ke server. Untuk contoh kode, lihat kelas SDK yang sesuai.
Retry untuk keandalan
Untuk Simple consumer, kit pengembangan perangkat lunak (SDK) klien dan server berkomunikasi menggunakan operasi ReceiveMessage dan AckMessage. Jika SDK klien berhasil memproses pesan, SDK tersebut memanggil operasi AckMessage. Jika pemrosesan gagal, jangan mengirim respons ACK. Hal ini akan memicu retry setelah durasi invisibilitas pesan berakhir. Untuk informasi lebih lanjut, lihat Kebijakan retry SimpleConsumer.
Urutan pesan yang dijamin
Simple consumer memproses pesan terurut dari ApsaraMQ for RocketMQ sesuai urutan penyimpanannya. Untuk sekelompok pesan yang harus tetap terurut, jika pesan sebelumnya belum diproses, pesan berikutnya tidak dapat diambil.
Skenario
SimpleConsumer menyediakan operasi API atomik untuk mengambil pesan dan mengirimkan hasil konsumsi. Metode ini lebih fleksibel dibandingkan menggunakan PushConsumer. SimpleConsumer cocok untuk skenario berikut:
Durasi pemrosesan pesan tidak dapat diprediksi: Gunakan SimpleConsumer jika durasi pemrosesan pesan tidak dapat diperkirakan atau jika pesan sering memerlukan waktu lama untuk diproses. Anda dapat menentukan perkiraan durasi pemrosesan selama konsumsi. Jika perkiraan tersebut tidak sesuai dengan kebutuhan bisnis Anda, Anda dapat mengubahnya dengan memanggil operasi API.
Skenario kustom tingkat lanjut: SDK SimpleConsumer tidak memiliki enkapsulasi thread yang kompleks. Logika bisnis Anda memiliki kontrol penuh. Hal ini memungkinkan Anda mengimplementasikan skenario tingkat lanjut seperti distribusi asinkron dan konsumsi batch.
Laju konsumsi kustom: Dengan SimpleConsumer, logika bisnis Anda secara aktif memanggil operasi untuk mengambil pesan. Hal ini memungkinkan Anda menyesuaikan frekuensi pengambilan pesan untuk mengontrol laju konsumsi.
Kelas SDK yang sesuai
ApsaraMQ for RocketMQ menyediakan satu kelas SDK untuk Simple consumer: SimpleConsumer. Kelas ini menawarkan berbagai operasi kustom. SimpleConsumer tidak dapat mengonsumsi pesan dari topik Lite.
SimpleConsumer
Kode berikut memberikan contoh:
// Contoh konsumsi: Gunakan SimpleConsumer untuk mengonsumsi pesan normal. Secara aktif ambil pesan, proses, dan kirimkan hasilnya.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// Setel kelompok konsumen.
.setConsumerGroup("Your ConsumerGroup")
// Setel titik akhir.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// Setel hubungan langganan yang telah ditentukan sebelumnya.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List<MessageView> messageViewList = null;
try {
// SimpleConsumer harus secara aktif mengambil dan memproses pesan.
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// Setelah pemrosesan selesai, aktif panggil ack() untuk mengirimkan hasil konsumsi.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// Jika pengambilan gagal karena alasan seperti throttling sistem, Anda harus menginisiasi ulang permintaan untuk mengambil pesan.
e.printStackTrace();
}SimpleConsumer terutama melibatkan operasi API berikut:
Nama Antarmuka | Fitur utama | Parameter yang dapat dimodifikasi |
| Konsumen secara aktif memanggil operasi ini untuk mengambil pesan dari server. Catatan Karena server menggunakan penyimpanan terdistribusi, operasi ini mungkin mengembalikan hasil kosong meskipun pesan tersedia di server. Untuk mengatasi hal ini, panggil kembali operasi ReceiveMessage atau tingkatkan konkurensi pemanggilan ReceiveMessage. |
|
| Setelah konsumen berhasil mengonsumsi pesan, konsumen secara aktif memanggil operasi ini untuk mengembalikan tanggapan sukses ke server. | Tidak ada |
| Dalam skenario retry konsumsi, konsumen dapat memanggil operasi ini untuk mengubah durasi pemrosesan pesan, yang mengontrol interval retry. | Durasi invisibilitas pesan: Panggil operasi ini untuk mengubah nilai durasi invisibilitas pesan yang telah ditentukan sebelumnya dalam operasi |
Saran
Kontrol waktu konsumsi untuk push consumer
Kontrol secara ketat waktu konsumsi pesan untuk Push consumer guna menghindari pemrosesan ulang akibat timeout. Jika aplikasi Anda sering memproses pesan yang memerlukan waktu lama, gunakan Simple consumer dan atur durasi invisibilitas pesan.