Topik ini menjelaskan cara berlangganan pesan menggunakan TCP Client SDK untuk Java yang disediakan oleh ApsaraMQ for RocketMQ.
Mode langganan
ApsaraMQ for RocketMQ mendukung mode langganan berikut:
Langganan Klustering
Semua konsumen dengan ID grup yang sama mengonsumsi jumlah pesan yang sama. Sebagai contoh, sebuah topik berisi sembilan pesan dan grup konsumen terdiri dari tiga konsumen. Dalam mode konsumsi klustering, setiap konsumen mengonsumsi tiga pesan. Kode berikut menunjukkan cara mengonfigurasi mode langganan klustering:
// Konfigurasikan langganan klustering, yang merupakan mode default. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);Langganan Siaran
Setiap konsumen dengan ID grup yang sama mengonsumsi semua pesan sekali. Sebagai contoh, sebuah topik berisi sembilan pesan dan grup konsumen terdiri dari tiga konsumen. Dalam mode konsumsi siaran, setiap konsumen mengonsumsi sembilan pesan. Kode berikut menunjukkan cara mengonfigurasi mode langganan siaran:
// Konfigurasikan langganan siaran. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Pastikan langganan konsisten untuk semua instance konsumen dengan ID grup yang sama. Untuk informasi lebih lanjut, lihat Konsistensi Langganan.
Batas berbeda diterapkan pada mode langganan sebelumnya. Sebagai contoh, dalam mode langganan siaran, Anda tidak dapat mengirim atau menerima pesan terurut, mempertahankan kemajuan konsumsi, atau menyetel ulang offset konsumen. Untuk informasi lebih lanjut, lihat Konsumsi Klustering dan Konsumsi Siaran.
Mode untuk mendapatkan pesan
ApsaraMQ for RocketMQ memungkinkan Anda mendapatkan pesan menggunakan salah satu mode berikut:
Push: Pesan didorong dari ApsaraMQ for RocketMQ ke konsumen. Dalam mode push, ApsaraMQ for RocketMQ mendukung fitur konsumsi batch. Fitur ini memungkinkan pengiriman pesan ke konsumen dalam batch. Untuk informasi lebih lanjut, lihat Konsumsi Batch.
Pull: Pesan ditarik dari ApsaraMQ for RocketMQ oleh konsumen.
Dibandingkan dengan mode push, mode pull memberikan lebih banyak opsi dalam penerimaan pesan serta kebebasan lebih dalam menarik pesan. Untuk informasi lebih lanjut, lihat Metode dan Parameter.
Untuk menggunakan konsumen pull, pastikan bahwa instance ApsaraMQ for RocketMQ Anda adalah Edisi Platinum Perusahaan.
Konsumen pull hanya dapat mengakses instance ApsaraMQ for RocketMQ dalam virtual private cloud (VPC).
Kode contoh
Untuk informasi tentang kode contoh, lihat ApsaraMQ for RocketMQ Repositori Kode. Kode contoh berikut menunjukkan cara berlangganan pesan menggunakan mode push dan pull.
Mode Push
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. properties.put(PropertyKeyConst.GROUP_ID, "XXX"); // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. // AccessKey ID yang digunakan untuk otentikasi. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // Rahasia AccessKey yang digunakan untuk otentikasi. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ. properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); // Mode langganan klustering, yang merupakan mode default. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // Mode konsumsi siaran. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); Consumer consumer = ONSFactory.createConsumer(properties); // Berlangganan beberapa tag. consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Terima: " + message); return Action.CommitMessage; } }); // Berlangganan topik lain. Untuk berhenti berlangganan dari topik, hapus kode untuk langganan dan mulai ulang konsumen. // Berlangganan semua tag. consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Terima: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Konsumen Dimulai"); } }Mode Push (Konsumsi Batch)
PentingUntuk menggunakan fitur konsumsi batch yang disediakan oleh ApsaraMQ for RocketMQ, tingkatkan TCP Client SDK untuk Java Anda ke versi 1.8.7.3 atau lebih baru. Untuk informasi lebih lanjut, lihat Catatan Rilis.
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.batch.BatchConsumer; import com.aliyun.openservices.ons.api.batch.BatchMessageListener; import java.util.List; import java.util.Properties; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.tcp.example.MqConfig; public class SimpleBatchConsumer { public static void main(String[] args) { Properties consumerProperties = new Properties(); // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID); // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. // AccessKey ID yang digunakan untuk otentikasi. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // Rahasia AccessKey yang digunakan untuk otentikasi. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ. consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR); // Jumlah maksimum pesan yang akan dikonsumsi sekaligus. Dalam contoh ini, nilai tersebut diatur menjadi 128. Jika jumlah pesan yang disimpan dalam topik yang ditentukan mencapai nilai ini, SDK segera memanggil metode callback untuk konsumen untuk mengonsumsi pesan. Nilai valid: 1 hingga 1024. Nilai default: 32. consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128)); // Waktu tunggu maksimum antara dua batch berturut-turut. Dalam contoh ini, nilai tersebut ditentukan sebagai 10 detik. Jika waktu tunggu yang ditentukan tercapai, SDK segera memanggil metode callback untuk konsumen untuk mengonsumsi pesan. Nilai valid: 0 hingga 450. Nilai default: 0. Unit: detik. consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10)); BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties); batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() { @Override public Action consume(final List<Message> messages, ConsumeContext context) { System.out.printf("Ukuran-Batch: %d\n", messages.size()); // Proses pesan dalam batch. return Action.CommitMessage; } }); // Mulai konsumen untuk konsumsi batch. batchConsumer.start(); System.out.println("Konsumen berhasil dimulai."); // Tunggu periode waktu tertentu untuk mencegah proses keluar. try { Thread.sleep(200000); } catch (InterruptedException e) { e.printStackTrace(); } } }Mode Pull
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.PullConsumer; import com.aliyun.openservices.ons.api.TopicPartition; import java.util.List; import java.util.Properties; import java.util.Set; public class PullConsumerClient { public static void main(String[] args){ Properties properties = new Properties(); // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx"); // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. // AccessKey ID yang digunakan untuk otentikasi. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // Rahasia AccessKey yang digunakan untuk otentikasi. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ. properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx"); PullConsumer consumer = ONSFactory.createPullConsumer(properties); // Mulai konsumen. consumer.start(); // Query semua partisi dalam topic-xxx. Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx"); // Tentukan partisi dari mana Anda ingin menarik pesan. consumer.assign(topicPartitions); while (true) { // Tarik pesan. Tentukan periode timeout sebagai 3.000 milidetik. List<Message> messages = consumer.poll(3000); System.out.printf("Pesan diterima: %s %n", messages); } } }Untuk informasi tentang partisi dan offset, lihat Istilah.
Informasi tambahan
Jika konsumsi pesan gagal atau habis waktu, ApsaraMQ for RocketMQ akan mengirim ulang pesan berdasarkan mekanisme pengulangan pesan. Untuk informasi lebih lanjut, lihat Pengulangan Pesan.