ApsaraMQ for RocketMQ mendukung pengiriman pesan normal dalam mode sinkron, asinkron, dan satu arah. Dokumen ini menjelaskan prinsip kerja, skenario penggunaan, serta menyediakan contoh kode untuk ketiga mode tersebut. Selain itu, dokumen ini juga membandingkan ketiga mode transmisi.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
Unduh Community Edition dari SDK for Java versi 4.5.2 atau yang lebih baru. Untuk informasi lebih lanjut, kunjungi Halaman Unduhan RocketMQ.
Persiapkan lingkungan. Untuk detail lebih lanjut, lihat Persiapkan Lingkungan.
Buat pair AccessKey pada akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pair AccessKey.
Transmisi Sinkron
Cara Kerja Transmisi Sinkron
Dalam mode transmisi sinkron, pengirim mengirimkan pesan hanya setelah menerima respons untuk pesan sebelumnya dari broker ApsaraMQ for RocketMQ.

Skema Penggunaan
Mode ini cocok digunakan untuk mengirim notifikasi penting melalui email, pesan pendaftaran, dan pesan promosi.
Contoh Kode
import java.util.Date; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQProducer { /** * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda. * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))); } public static void main(String[] args) throws MQClientException { /** * Buat produser dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser: *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); */ DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null); /** * Tentukan Alibaba Cloud sebagai saluran akses. Jika Anda ingin menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong. */ producer.setAccessChannel(AccessChannel.CLOUD); /** * Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80. */ producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { // Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang. System.out.println(new Date() + " Pengiriman pesan mq gagal."); e.printStackTrace(); } } // Sebelum keluar dari aplikasi, matikan objek produser. // Catatan: Operasi ini opsional. producer.shutdown(); } }
Transmisi Asinkron
Cara Kerja Transmisi Asinkron
Dalam mode transmisi asinkron, pengirim mengirimkan pesan tanpa menunggu respons dari broker ApsaraMQ for RocketMQ. Jika menggunakan mode ini di ApsaraMQ for RocketMQ, Anda perlu menulis logika implementasi operasi SendCallback. Pengirim langsung mengirimkan pesan lain setelah mengirim pesan pertama tanpa menunggu respons dari broker. Pengirim kemudian memanggil operasi SendCallback untuk menerima dan memproses respons dari broker.

Skema Penggunaan
Mode ini ideal untuk proses yang memerlukan waktu lama dalam skenario bisnis dengan sensitivitas tinggi terhadap waktu respons. Contohnya, setelah mengunggah video, callback digunakan untuk memicu transkoding. Setelah transkoding selesai, callback digunakan untuk memberikan hasil transkoding.
Contoh Kode
import java.util.Date; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQAsyncProducer { /** * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda. * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))); } public static void main(String[] args) throws MQClientException { /** * Buat produser dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser: * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); */ DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null); /** * Tentukan Alibaba Cloud sebagai saluran akses. Sebelum Anda menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong. */ producer.setAccessChannel(AccessChannel.CLOUD); /** * Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80. */ producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // Pesan dikirim ke konsumen. System.out.println("pengiriman pesan berhasil. msgId= " + result.getMsgId()); } @Override public void onException(Throwable throwable) { // Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang. System.out.println("pengiriman pesan gagal."); throwable.printStackTrace(); } }); } catch (Exception e) { // Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang. System.out.println(new Date() + " Pengiriman pesan mq gagal."); e.printStackTrace(); } } // Sebelum keluar dari aplikasi, matikan objek produser. // Catatan: Operasi ini opsional. producer.shutdown(); } }
Transmisi Satu Arah
Cara Kerja Transmisi Satu Arah
Dalam mode transmisi satu arah, produser hanya mengirimkan pesan tanpa menunggu respons dari broker ApsaraMQ for RocketMQ atau memicu fungsi callback. Dalam mode ini, pesan dapat dikirim dalam mikrodetik.

Skema Penggunaan
Mode ini cocok untuk skenario di mana pesan dikirim dalam waktu singkat namun memiliki persyaratan keandalan rendah, seperti pengumpulan log.
Contoh Kode
import java.util.Date; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQOnewayProducer { /** * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda. * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))); } public static void main(String[] args) throws MQClientException { /** * Buat produser dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser: * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); */ DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null); /** * Tentukan Alibaba Cloud sebagai saluran akses. Sebelum Anda menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong. */ producer.setAccessChannel(AccessChannel.CLOUD); /** * Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80. */ producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } catch (Exception e) { // Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang. System.out.println(new Date() + " Pengiriman pesan mq gagal."); e.printStackTrace(); } } // Sebelum keluar dari aplikasi, matikan objek produser. // Catatan: Operasi ini opsional. producer.shutdown(); } }
Perbandingan di antara ketiga mode transmisi
Mode transmisi | TPS | Respons | Keandalan |
Sinkron | Tinggi | Ya | Tidak ada kehilangan pesan |
Asinkron | Tinggi | Ya | Tidak ada kehilangan pesan |
Satu arah | Paling tinggi | Tidak | Kemungkinan kehilangan pesan |
Berlangganan pesan normal
Anda hanya dapat menggunakan metode berikut untuk berlangganan pesan normal. Berikut adalah contoh kode:
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/**
* ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda.
* Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* Buat konsumen dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
* Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
// Titik akhir dari instance ApsaraMQ for RocketMQ.
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
// Atur parameter AccessChannel ke CLOUD. Jika Anda ingin menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong.
consumer.setAccessChannel(AccessChannel.CLOUD);
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
consumer.subscribe("YOUR TOPIC", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Terima Pesan Baru: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}