Topik ini menyediakan contoh kode untuk mengirim dan menerima pesan menggunakan Apache RocketMQ Java SDK.
Kode contoh
Jika Anda menggunakan instans Serverless, perhatikan versi SDK dan persyaratan lainnya untuk akses jaringan publik. Untuk informasi selengkapnya, lihat Persyaratan Versi SDK untuk Akses Jaringan Publik.
SDK protokol gRPC
Untuk contoh kode RocketMQ-Spring, lihat rocketmq-v5-client-spring-boot-samples.
SDK
rocketmq-client-javamenggunakan protokol gRPC. Berikut ini adalah contoh kode untuk SDK ini.PentingSaat mengirim pesan transaksional menggunakan SDK berbasis protokol gRPC, pemeriksaan transaksi akan tertunda jika Anda tidak menetapkan topik saat memulai produsen. Jika pesan tidak dikirim dalam waktu empat jam, pesan setengah-transaksional tersebut mungkin dibuang. Oleh karena itu, pastikan Anda menetapkan topik saat memulai produsen transaksi.
Tipe pesan
Kode contoh untuk mengirim pesan
Contoh kode untuk berlangganan pesan
Pengiriman sinkron: ProducerNormalMessageExample.java
Pengiriman asinkron: AsyncProducerExample.java
Berlangganan sinkron: SimpleConsumerExample.java
Berlangganan asinkron: AsyncSimpleConsumerExample.java
Tidak ada
SDK protokol Remoting
Untuk contoh kode RocketMQ-Spring, lihat rocketmq-spring-boot-samples.
SDK
rocketmq-clientmenggunakan protokol Remoting. Berikut ini adalah contoh kode untuk SDK ini.Pesan normal
Kirim Pesan Normal (Sinkron)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQProducer { /** * Jika Anda mengakses instans melalui jaringan publik, Anda harus mengonfigurasi RPCHook dengan nama pengguna dan kata sandi instans. * Anda dapat memperoleh nama pengguna dan kata sandi di tab Pengenalan Identitas Cerdas pada konsol Resource Access Management. * Penting: Jangan gunakan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda. * Jika Anda mengakses instans dari instans ECS Alibaba Cloud melalui jaringan internal, Anda tidak perlu menginisialisasi RPCHook. Server secara otomatis memperoleh informasi berdasarkan VPC. * Jika Anda menggunakan instans Serverless, Anda harus menetapkan nama pengguna dan kata sandi untuk akses jaringan publik. Jika akses tanpa kata sandi melalui jaringan internal diaktifkan, Anda tidak perlu menetapkan nama pengguna dan kata sandi untuk akses jaringan internal. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // Saat Anda menggunakan titik akhir publik, konfigurasikan RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // Saat Anda menggunakan titik akhir VPC, Anda tidak perlu mengonfigurasi RPCHook. // Jika Anda menggunakan instans Serverless, Anda harus mengonfigurasi RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. producer.setProducerGroup("YOUR GROUP ID"); // Tetapkan jenis koneksi ke Alibaba Cloud. Ini diperlukan untuk menggunakan fitur jejak pesan cloud. Jika Anda tidak mengaktifkan jejak pesan, Anda tidak perlu menjalankan kode ini. producer.setAccessChannel(AccessChannel.CLOUD); // Untuk SDK versi V5.3.0 dan yang lebih baru, Anda juga harus menambahkan parameter EnableTrace selain menyetel AccessChannel untuk mengaktifkan jejak pesan. producer.setEnableTrace(true); // Tetapkan ini ke titik akhir yang Anda peroleh dari konsol ApsaraMQ for RocketMQ. Contoh: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Penting: Masukkan nama domain dan port yang disediakan di konsol. Jangan tambahkan awalan http:// atau https://. Jangan gunakan alamat IP yang telah di-resolve. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { // Jika pesan gagal dikirim, coba kirim ulang atau simpan datanya untuk diproses nanti. System.out.println(new Date() + " Gagal mengirim pesan mq."); e.printStackTrace(); } } // Sebelum aplikasi keluar, hapus objek produsen. // Catatan: Menghapus objek produsen menghemat memori sistem. Untuk mengirim pesan secara sering, jangan hapus objek produsen. producer.shutdown(); } }Kirim Pesan Normal (Asinkron)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; import java.util.concurrent.TimeUnit; public class RocketMQAsyncProducer { /** * Jika Anda mengakses instans melalui jaringan publik, Anda harus mengonfigurasi RPCHook dengan nama pengguna dan kata sandi instans. * Anda dapat memperoleh nama pengguna dan kata sandi di tab Pengenalan Identitas Cerdas pada konsol Resource Access Management. * Penting: Jangan gunakan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda. * Jika Anda mengakses instans dari instans ECS Alibaba Cloud melalui jaringan internal, Anda tidak perlu menginisialisasi RPCHook. Server secara otomatis memperoleh informasi berdasarkan VPC. * Jika Anda menggunakan instans Serverless, Anda harus menetapkan nama pengguna dan kata sandi untuk akses jaringan publik. Jika akses tanpa kata sandi melalui jaringan internal diaktifkan, Anda tidak perlu menetapkan nama pengguna dan kata sandi untuk akses jaringan internal. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException, InterruptedException { // Saat Anda menggunakan titik akhir publik, konfigurasikan RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // Saat Anda menggunakan titik akhir VPC, Anda tidak perlu mengonfigurasi RPCHook. // Jika Anda menggunakan instans Serverless, Anda harus mengonfigurasi RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. producer.setProducerGroup("YOUR GROUP ID"); // Tetapkan jenis koneksi ke Alibaba Cloud. Ini diperlukan untuk menggunakan fitur jejak pesan cloud. Jika Anda tidak mengaktifkan jejak pesan, Anda tidak perlu menjalankan kode ini. producer.setAccessChannel(AccessChannel.CLOUD); // Untuk SDK versi V5.3.0 dan yang lebih baru, Anda juga harus menambahkan parameter EnableTrace selain menyetel AccessChannel untuk mengaktifkan jejak pesan. producer.setEnableTrace(true); // Tetapkan ini ke titik akhir yang Anda peroleh dari konsol ApsaraMQ for RocketMQ. Contoh: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Penting: Masukkan nama domain dan port yang disediakan di konsol. Jangan tambahkan awalan http:// atau https://. Jangan gunakan alamat IP yang telah di-resolve. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // Pesan berhasil dikirim. System.out.println("berhasil mengirim pesan. msgId= " + result.getMsgId()); } @Override public void onException(Throwable throwable) { // Jika pesan gagal dikirim, coba kirim ulang atau simpan datanya untuk diproses nanti. System.out.println("gagal mengirim pesan."); throwable.printStackTrace(); } }); } catch (Exception e) { // Jika pesan gagal dikirim, coba kirim ulang atau simpan datanya untuk diproses nanti. System.out.println(new Date() + " Gagal mengirim pesan mq."); e.printStackTrace(); } } // Blokir thread saat ini selama 3 detik untuk menunggu hasil pengiriman asinkron. TimeUnit.SECONDS.sleep(3); // Sebelum aplikasi keluar, hapus objek produsen. // Catatan: Menghapus objek produsen menghemat memori sistem. Untuk mengirim pesan secara sering, jangan hapus objek produsen. producer.shutdown(); } }Kirim Pesan Normal (One-way)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQOnewayProducer { /** * Jika Anda mengakses instans melalui jaringan publik, Anda harus mengonfigurasi RPCHook dengan nama pengguna dan kata sandi instans. * Anda dapat memperoleh nama pengguna dan kata sandi di tab Pengenalan Identitas Cerdas pada konsol Resource Access Management. * Penting: Jangan gunakan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda. * Jika Anda mengakses instans dari instans ECS Alibaba Cloud melalui jaringan internal, Anda tidak perlu menginisialisasi RPCHook. Server secara otomatis memperoleh informasi berdasarkan VPC. * Jika Anda menggunakan instans Serverless, Anda harus menetapkan nama pengguna dan kata sandi untuk akses jaringan publik. Jika akses tanpa kata sandi melalui jaringan internal diaktifkan, Anda tidak perlu menetapkan nama pengguna dan kata sandi untuk akses jaringan internal. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // Saat Anda menggunakan titik akhir publik, konfigurasikan RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // Saat Anda menggunakan titik akhir VPC, Anda tidak perlu mengonfigurasi RPCHook. // Jika Anda menggunakan instans Serverless, Anda harus mengonfigurasi RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. producer.setProducerGroup("YOUR GROUP ID"); // Tetapkan jenis koneksi ke Alibaba Cloud. Ini diperlukan untuk menggunakan fitur jejak pesan cloud. Jika Anda tidak mengaktifkan jejak pesan, Anda tidak perlu menjalankan kode ini. producer.setAccessChannel(AccessChannel.CLOUD); // Untuk SDK versi V5.3.0 dan yang lebih baru, Anda juga harus menambahkan parameter EnableTrace selain menyetel AccessChannel untuk mengaktifkan jejak pesan. producer.setEnableTrace(true); // Tetapkan ini ke titik akhir yang Anda peroleh dari konsol ApsaraMQ for RocketMQ. Contoh: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Penting: Masukkan nama domain dan port yang disediakan di konsol. Jangan tambahkan awalan http:// atau https://. Jangan gunakan alamat IP yang telah di-resolve. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } catch (Exception e) { // Jika pesan gagal dikirim, coba kirim ulang atau simpan datanya untuk diproses nanti. System.out.println(new Date() + " Gagal mengirim pesan mq."); e.printStackTrace(); } } // Sebelum aplikasi keluar, hapus objek produsen. // Catatan: Menghapus objek produsen menghemat memori sistem. Untuk mengirim pesan secara sering, jangan hapus objek produsen. producer.shutdown(); } }Berlangganan Pesan Normal
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import java.util.List; public class RocketMQPushConsumer { /** * Jika Anda mengakses instans melalui jaringan publik, Anda harus mengonfigurasi RPCHook dengan nama pengguna dan kata sandi instans. * Anda dapat memperoleh nama pengguna dan kata sandi di tab Pengenalan Identitas Cerdas pada konsol Resource Access Management. * Penting: Jangan gunakan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda. * Jika Anda mengakses instans dari instans ECS Alibaba Cloud melalui jaringan internal, Anda tidak perlu menginisialisasi RPCHook. Server secara otomatis memperoleh informasi berdasarkan VPC. * Jika Anda menggunakan instans Serverless, Anda harus menetapkan nama pengguna dan kata sandi untuk akses jaringan publik. Jika akses tanpa kata sandi melalui jaringan internal diaktifkan, Anda tidak perlu menetapkan nama pengguna dan kata sandi untuk akses jaringan internal. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // Saat Anda menggunakan titik akhir publik, konfigurasikan RPCHook. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook()); // Saat Anda menggunakan titik akhir VPC, Anda tidak perlu mengonfigurasi RPCHook. // Jika Anda menggunakan instans Serverless, Anda harus mengonfigurasi RPCHook. // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. consumer.setConsumerGroup("YOUR GROUP ID"); // Tetapkan jenis koneksi ke Alibaba Cloud. Ini diperlukan untuk menggunakan fitur jejak pesan cloud. Jika Anda tidak mengaktifkan jejak pesan, Anda tidak perlu menjalankan kode ini. consumer.setAccessChannel(AccessChannel.CLOUD); // Untuk SDK versi V5.3.0 dan yang lebih baru, Anda juga harus menambahkan parameter EnableTrace selain menyetel AccessChannel untuk mengaktifkan jejak pesan. consumer.setEnableTrace(true); // Tetapkan ini ke titik akhir yang Anda peroleh dari konsol ApsaraMQ for RocketMQ. Contoh: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Penting: Masukkan nama domain dan port yang disediakan di konsol. Jangan tambahkan awalan http:// atau https://. Jangan gunakan alamat IP yang telah di-resolve. consumer.setNamesrvAddr("YOUR ACCESS POINT"); // Tetapkan ini ke topik yang Anda buat di konsol ApsaraMQ for RocketMQ. consumer.subscribe("YOUR TOPIC", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("Menerima Pesan Baru: %s %n", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }Pesan terurut
Kirim Pesan Terurut
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.List; public class RocketMQOrderProducer { /** * Jika Anda mengakses instans melalui jaringan publik, Anda harus mengonfigurasi RPCHook dengan nama pengguna dan kata sandi instans. * Anda dapat memperoleh nama pengguna dan kata sandi di tab Pengenalan Identitas Cerdas pada konsol Resource Access Management. * Penting: Jangan gunakan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda. * Jika Anda mengakses instans dari instans ECS Alibaba Cloud melalui jaringan internal, Anda tidak perlu menginisialisasi RPCHook. Server secara otomatis memperoleh informasi berdasarkan VPC. * Jika Anda menggunakan instans Serverless, Anda harus menetapkan nama pengguna dan kata sandi untuk akses jaringan publik. Jika akses tanpa kata sandi melalui jaringan internal diaktifkan, Anda tidak perlu menetapkan nama pengguna dan kata sandi untuk akses jaringan internal. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // Saat Anda menggunakan titik akhir publik, konfigurasikan RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // Saat Anda menggunakan titik akhir VPC, Anda tidak perlu mengonfigurasi RPCHook. // Jika Anda menggunakan instans Serverless, Anda harus mengonfigurasi RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. producer.setProducerGroup("YOUR GROUP ID"); // Tetapkan jenis koneksi ke Alibaba Cloud. Ini diperlukan untuk menggunakan fitur jejak pesan cloud. Jika Anda tidak mengaktifkan jejak pesan, Anda tidak perlu menjalankan kode ini. producer.setAccessChannel(AccessChannel.CLOUD); // Untuk SDK versi V5.3.0 dan yang lebih baru, Anda juga harus menambahkan parameter EnableTrace selain menyetel AccessChannel untuk mengaktifkan jejak pesan. producer.setEnableTrace(true); // Tetapkan ini ke titik akhir yang Anda peroleh dari konsol ApsaraMQ for RocketMQ. Contoh: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Penting: Masukkan nama domain dan port yang disediakan di konsol. Jangan tambahkan awalan http:// atau https://. Jangan gunakan alamat IP yang telah di-resolve. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { int orderId = i % 10; Message msg = new Message("YOUR ORDER TOPIC", "YOUR MESSAGE TAG", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // Penting: Anda harus menyetel item konfigurasi ini untuk memastikan bahwa pesan terurut didistribusikan secara merata ke antrian. // Di V5.x, Anda dapat mengganti baris kode berikut dengan msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + ""); msg.putUserProperty("__SHARDINGKEY", orderId + ""); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // Pilih algoritma pemilihan partisi yang sesuai dengan kebutuhan Anda untuk memastikan bahwa parameter yang sama menghasilkan hasil yang sama. Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }Berlangganan Pesan Terurut
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import java.util.List; public class RocketMQOrderConsumer { private static RPCHook getAclRPCHook() { /** * Jika Anda mengakses instans melalui jaringan publik, Anda harus mengonfigurasi RPCHook dengan nama pengguna dan kata sandi instans. * Anda dapat memperoleh nama pengguna dan kata sandi di tab Pengenalan Identitas Cerdas pada konsol Resource Access Management. * Penting: Jangan gunakan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda. * Jika Anda mengakses instans dari instans ECS Alibaba Cloud melalui jaringan internal, Anda tidak perlu menginisialisasi RPCHook. Server secara otomatis memperoleh informasi berdasarkan VPC. * Jika Anda menggunakan instans Serverless, Anda harus menetapkan nama pengguna dan kata sandi untuk akses jaringan publik. Jika akses tanpa kata sandi melalui jaringan internal diaktifkan, Anda tidak perlu menetapkan nama pengguna dan kata sandi untuk akses jaringan internal. */ return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // Saat Anda menggunakan titik akhir publik, konfigurasikan RPCHook. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook()); // Saat Anda menggunakan titik akhir VPC, Anda tidak perlu mengonfigurasi RPCHook. // Jika Anda menggunakan instans Serverless, Anda harus mengonfigurasi RPCHook. // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. consumer.setConsumerGroup("YOUR GROUP ID"); // Tetapkan jenis koneksi ke Alibaba Cloud. Ini diperlukan untuk menggunakan fitur jejak pesan cloud. Jika Anda tidak mengaktifkan jejak pesan, Anda tidak perlu menjalankan kode ini. consumer.setAccessChannel(AccessChannel.CLOUD); // Untuk SDK versi V5.3.0 dan yang lebih baru, Anda juga harus menambahkan parameter EnableTrace selain menyetel AccessChannel untuk mengaktifkan jejak pesan. consumer.setEnableTrace(true); // Tetapkan ini ke titik akhir yang Anda peroleh dari konsol ApsaraMQ for RocketMQ. Contoh: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Penting: Masukkan nama domain dan port yang disediakan di konsol. Jangan tambahkan awalan http:// atau https://. Jangan gunakan alamat IP yang telah di-resolve. consumer.setNamesrvAddr("YOUR ACCESS POINT"); consumer.subscribe("YOUR ORDER TOPIC", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Menerima Pesan Baru: %s %n", Thread.currentThread().getName(), msgs); // Jika konsumsi gagal, kembalikan ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT untuk menangguhkan dan mencoba lagi. return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }Pesan terjadwal dan tertunda
Kirim Pesan Terjadwal atau Tertunda
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQDelayProducer { /** * Jika Anda mengakses instans melalui jaringan publik, Anda harus mengonfigurasi RPCHook dengan nama pengguna dan kata sandi instans. * Anda dapat memperoleh nama pengguna dan kata sandi di tab Pengenalan Identitas Cerdas pada konsol Resource Access Management. * Penting: Jangan gunakan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda. * Jika Anda mengakses instans dari instans ECS Alibaba Cloud melalui jaringan internal, Anda tidak perlu menginisialisasi RPCHook. Server secara otomatis memperoleh informasi berdasarkan VPC. * Jika Anda menggunakan instans Serverless, Anda harus menetapkan nama pengguna dan kata sandi untuk akses jaringan publik. Jika akses tanpa kata sandi melalui jaringan internal diaktifkan, Anda tidak perlu menetapkan nama pengguna dan kata sandi untuk akses jaringan internal. */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // Saat Anda menggunakan titik akhir publik, konfigurasikan RPCHook. DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // Saat Anda menggunakan titik akhir VPC, Anda tidak perlu mengonfigurasi RPCHook. // Jika Anda menggunakan instans Serverless, Anda harus mengonfigurasi RPCHook. // DefaultMQProducer producer = new DefaultMQProducer(); // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. producer.setProducerGroup("YOUR GROUP ID"); // Tetapkan jenis koneksi ke Alibaba Cloud. Ini diperlukan untuk menggunakan fitur jejak pesan cloud. Jika Anda tidak mengaktifkan jejak pesan, Anda tidak perlu menjalankan kode ini. producer.setAccessChannel(AccessChannel.CLOUD); // Untuk SDK versi V5.3.0 dan yang lebih baru, Anda juga harus menambahkan parameter EnableTrace selain menyetel AccessChannel untuk mengaktifkan jejak pesan. producer.setEnableTrace(true); // Tetapkan ini ke titik akhir yang Anda peroleh dari konsol ApsaraMQ for RocketMQ. Contoh: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Penting: Masukkan nama domain dan port yang disediakan di konsol. Jangan tambahkan awalan http:// atau https://. Jangan gunakan alamat IP yang telah di-resolve. producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { // Tetapkan ini ke topik yang Anda buat di konsol ApsaraMQ for RocketMQ. Message msg = new Message("YOUR TOPIC", // Tetapkan tag pesan. "YOUR MESSAGE TAG", // Isi pesan. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // Untuk mengirim pesan tertunda, tetapkan waktu tunda dalam milidetik (ms). Pesan akan dikirimkan setelah penundaan yang ditentukan. Misalnya, pesan dikirimkan setelah 3 detik. long delayTime = System.currentTimeMillis() + 3000; msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime)); // Untuk mengirim pesan terjadwal, tetapkan waktu spesifik untuk pengiriman. Misalnya, pesan dikirimkan pada pukul 18:45:00 tanggal 2021-08-10. // Format waktunya adalah yyyy-MM-dd HH:mm:ss. Jika waktu yang ditentukan lebih awal dari waktu saat ini, pesan akan langsung dikirimkan ke konsumen. // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime(); // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { // Jika pesan gagal dikirim, coba kirim ulang atau simpan datanya untuk diproses nanti. System.out.println(new Date() + " Gagal mengirim pesan mq."); e.printStackTrace(); } } // Sebelum aplikasi keluar, hapus objek produsen. // Catatan: Menghapus objek produsen menghemat memori sistem. Untuk mengirim pesan secara sering, jangan hapus objek produsen. producer.shutdown(); } }Kode contoh untuk berlangganan pesan terjadwal dan tertunda sama dengan kode untuk pesan normal.
Pesan transaksional
Kirim Pesan Transaksional
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQTransactionProducer { private static RPCHook getAclRPCHook() { /** * Jika Anda mengakses instans melalui jaringan publik, Anda harus mengonfigurasi RPCHook dengan nama pengguna dan kata sandi instans. * Anda dapat memperoleh nama pengguna dan kata sandi di tab Pengenalan Identitas Cerdas pada konsol Resource Access Management. * Penting: Jangan gunakan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda. * Jika Anda mengakses instans dari instans ECS Alibaba Cloud melalui jaringan internal, Anda tidak perlu menginisialisasi RPCHook. Server secara otomatis memperoleh informasi berdasarkan VPC. * Jika Anda menggunakan instans Serverless, Anda harus menetapkan nama pengguna dan kata sandi untuk akses jaringan publik. Jika akses tanpa kata sandi melalui jaringan internal diaktifkan, Anda tidak perlu menetapkan nama pengguna dan kata sandi untuk akses jaringan internal. */ return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // Saat Anda menggunakan titik akhir publik, konfigurasikan RPCHook. // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. Catatan: ID grup untuk pesan transaksional tidak dapat digunakan bersama dengan jenis pesan lainnya. TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook()); // Saat Anda menggunakan titik akhir VPC, Anda tidak perlu mengonfigurasi RPCHook. // Jika Anda menggunakan instans Serverless, Anda harus mengonfigurasi RPCHook. // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID"); // Tetapkan jenis koneksi ke Alibaba Cloud. Ini diperlukan untuk menggunakan fitur jejak pesan cloud. Jika Anda tidak mengaktifkan jejak pesan, Anda tidak perlu menjalankan kode ini. transactionMQProducer.setAccessChannel(AccessChannel.CLOUD); // Untuk SDK versi V5.3.0 dan yang lebih baru, Anda juga harus menambahkan parameter EnableTrace selain menyetel AccessChannel untuk mengaktifkan jejak pesan. transactionMQProducer.setEnableTrace(true); // Tetapkan ini ke titik akhir yang Anda peroleh dari konsol ApsaraMQ for RocketMQ. Contoh: rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Penting: Masukkan nama domain dan port yang disediakan di konsol. Jangan tambahkan awalan http:// atau https://. Jangan gunakan alamat IP yang telah di-resolve. transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT"); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("Mulai mengeksekusi transaksi lokal: " + msg); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("Menerima permintaan pemeriksaan transaksi, MsgId: " + msg.getMsgId()); return LocalTransactionState.COMMIT_MESSAGE; } }); transactionMQProducer.start(); for (int i = 0; i < 10; i++) { try { Message message = new Message("YOUR TRANSACTION TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null); assert sendResult != null; } catch (Exception e) { e.printStackTrace(); } } } }Kode contoh untuk berlangganan pesan transaksional sama dengan kode untuk pesan normal.
Panduan Versi Akses Jaringan Publik Instans Serverless
Untuk mengakses instans Serverless ApsaraMQ for RocketMQ melalui jaringan publik, SDK Anda harus memenuhi persyaratan versi berikut. Anda juga harus menambahkan kode yang ditentukan ke aplikasi Anda.
Ganti InstanceId dengan ID instans aktual Anda.
Versi SDK: rocketmq-client ≥ 5.2.0
Untuk mengirim pesan, tambahkan kode berikut:
producer.setNamespaceV2("InstanceId");Untuk mengonsumsi pesan, tambahkan kode berikut:
consumer.setNamespaceV2("InstanceId");Versi SDK: rocketmq-client-java ≥ 5.0.6
Untuk mengirim dan mengonsumsi pesan, tambahkan kode berikut:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();