全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Tiga mode untuk mengirim pesan normal

更新时间:Jun 28, 2025

ApsaraMQ for RocketMQ mendukung pengiriman pesan normal dalam mode sinkron, asinkron, dan satu arah. Dokumen ini menjelaskan prinsip kerja, skenario penggunaan, serta menyediakan contoh kode untuk ketiga mode tersebut. Selain itu, dokumen ini juga membandingkan ketiga mode transmisi.

Prasyarat

Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:

  • Unduh Community Edition dari SDK for Java versi 4.5.2 atau yang lebih baru. Untuk informasi lebih lanjut, kunjungi Halaman Unduhan RocketMQ.

  • Persiapkan lingkungan. Untuk detail lebih lanjut, lihat Persiapkan Lingkungan.

  • Buat pair AccessKey pada akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pair AccessKey.

Transmisi Sinkron

  • Cara Kerja Transmisi Sinkron

    Dalam mode transmisi sinkron, pengirim mengirimkan pesan hanya setelah menerima respons untuk pesan sebelumnya dari broker ApsaraMQ for RocketMQ.同步发送

  • Skema Penggunaan

    Mode ini cocok digunakan untuk mengirim notifikasi penting melalui email, pesan pendaftaran, dan pesan promosi.

  • Contoh Kode

    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQProducer {
        /**
        * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda.
        * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
        */
        private static RPCHook getAclRPCHook() {
    	  return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * Buat produser dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
             * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
             *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * Tentukan Alibaba Cloud sebagai saluran akses. Jika Anda ingin menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong.
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80.
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    // Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang.
                    System.out.println(new Date() + " Pengiriman pesan mq gagal.");
                    e.printStackTrace();
                }
            }
    
            // Sebelum keluar dari aplikasi, matikan objek produser.
            // Catatan: Operasi ini opsional.
            producer.shutdown();
        }
    }

Transmisi Asinkron

  • Cara Kerja Transmisi Asinkron

    Dalam mode transmisi asinkron, pengirim mengirimkan pesan tanpa menunggu respons dari broker ApsaraMQ for RocketMQ. Jika menggunakan mode ini di ApsaraMQ for RocketMQ, Anda perlu menulis logika implementasi operasi SendCallback. Pengirim langsung mengirimkan pesan lain setelah mengirim pesan pertama tanpa menunggu respons dari broker. Pengirim kemudian memanggil operasi SendCallback untuk menerima dan memproses respons dari broker.

    异步发送

  • Skema Penggunaan

    Mode ini ideal untuk proses yang memerlukan waktu lama dalam skenario bisnis dengan sensitivitas tinggi terhadap waktu respons. Contohnya, setelah mengunggah video, callback digunakan untuk memicu transkoding. Setelah transkoding selesai, callback digunakan untuk memberikan hasil transkoding.

  • Contoh Kode

    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQAsyncProducer {
        /**
        * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda.
        * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
        */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * Buat produser dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
             * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
             * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * Tentukan Alibaba Cloud sebagai saluran akses. Sebelum Anda menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong.
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80.
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override public void onSuccess(SendResult result) {
                            // Pesan dikirim ke konsumen.
                            System.out.println("pengiriman pesan berhasil. msgId= " + result.getMsgId());
                        }
    
                        @Override public void onException(Throwable throwable) {
                            // Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang.
                            System.out.println("pengiriman pesan gagal.");
                            throwable.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    // Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang.
                    System.out.println(new Date() + " Pengiriman pesan mq gagal.");
                    e.printStackTrace();
                }
            }
    
            // Sebelum keluar dari aplikasi, matikan objek produser.
            // Catatan: Operasi ini opsional.
            producer.shutdown();
        }
    }

Transmisi Satu Arah

  • Cara Kerja Transmisi Satu Arah

    Dalam mode transmisi satu arah, produser hanya mengirimkan pesan tanpa menunggu respons dari broker ApsaraMQ for RocketMQ atau memicu fungsi callback. Dalam mode ini, pesan dapat dikirim dalam mikrodetik.

    单向发送

  • Skema Penggunaan

    Mode ini cocok untuk skenario di mana pesan dikirim dalam waktu singkat namun memiliki persyaratan keandalan rendah, seperti pengumpulan log.

  • Contoh Kode

    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQOnewayProducer {
        /**
        * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda.
        * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
        */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * Buat produser dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
             * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
             * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * Tentukan Alibaba Cloud sebagai saluran akses. Sebelum Anda menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong.
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80.
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.sendOneway(msg);
                } catch (Exception e) {
                    // Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang.
                    System.out.println(new Date() + " Pengiriman pesan mq gagal.");
                    e.printStackTrace();
                }
            }
    
            // Sebelum keluar dari aplikasi, matikan objek produser.
            // Catatan: Operasi ini opsional.
            producer.shutdown();
        }
    }

Perbandingan di antara ketiga mode transmisi

Mode transmisi

TPS

Respons

Keandalan

Sinkron

Tinggi

Ya

Tidak ada kehilangan pesan

Asinkron

Tinggi

Ya

Tidak ada kehilangan pesan

Satu arah

Paling tinggi

Tidak

Kemungkinan kehilangan pesan

Berlangganan pesan normal

Anda hanya dapat menggunakan metode berikut untuk berlangganan pesan normal. Berikut adalah contoh kode:

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda.
    * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * Buat konsumen dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
         * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        // Titik akhir dari instance ApsaraMQ for RocketMQ.
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        // Atur parameter AccessChannel ke CLOUD. Jika Anda ingin menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong.
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Terima Pesan Baru: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}