全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發普通訊息(三種方式)

更新時間:Jul 01, 2024

阿里雲雲訊息佇列 RocketMQ 版提供三種方式來發送普通訊息:同步發送、非同步發送和單向(Oneway)發送。本文介紹了每種發送方式的原理、使用情境、範例程式碼,以及三種發送方式的對比;此外還提供了訂閱普通訊息的範例程式碼。

前提條件

您已完成以下操作:

同步發送

  • 原理

    同步發送是指訊息發送方發出一條訊息後,會在收到服務端返迴響應之後才發下一條訊息的通訊方式。同步發送

  • 應用情境

    此種方式應用情境非常廣泛,例如重要通知訊息、報名簡訊通知、營銷簡訊系統等。

  • 範例程式碼

    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQProducer {
        /**
        * 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
        * 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
        */
        private static RPCHook getAclRPCHook() {
    	  return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             *建立Producer,並開啟訊息軌跡。設定為您在阿里雲訊息佇列RocketMQ版控制台建立的Group ID。
             *如果不想開啟訊息軌跡,可以按照如下方式建立:
             *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             *設定使用接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             *設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“http://MQ_INST_XXXX.aliyuncs.com:80”。
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    //訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            //在應用退出前,銷毀Producer對象。
            //注意:如果不銷毀也沒有問題。
            producer.shutdown();
        }
    }

非同步發送

  • 原理

    非同步發送是指發送方發出一條訊息後,不等服務端返迴響應,接著發送下一條訊息的通訊方式。阿里雲雲訊息佇列 RocketMQ 版的非同步發送,需要實現非同步發送回調介面(SendCallback)。訊息發送方在發送了一條訊息後,不需要等待服務端響應即可發送第二條訊息。發送方通過回調介面接收服務端響應,並處理響應結果。

    非同步發送

  • 應用情境

    非同步發送一般用於鏈路耗時較長,對回應時間較為敏感的業務情境,例如,您視頻上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。

  • 範例程式碼

    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQAsyncProducer {
        /**
        * 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
        * 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
        */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * 建立Producer,並開啟訊息軌跡。設定為您在阿里雲訊息佇列RocketMQ版控制台建立的Group ID。
             * 如果不想開啟訊息軌跡,可以按照如下方式建立:
             * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * 設定使用接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“http://MQ_INST_XXXX.aliyuncs.com:80”。
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override public void onSuccess(SendResult result) {
                            // 消費發送成功。
                            System.out.println("send message success. msgId= " + result.getMsgId());
                        }
    
                        @Override public void onException(Throwable throwable) {
                            // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                            System.out.println("send message failed.");
                            throwable.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // 在應用退出前,銷毀Producer對象。
            // 注意:如果不銷毀也沒有問題。
            producer.shutdown();
        }
    }

單向(Oneway)發送

  • 原理

    發送方只負責發送訊息,不等待服務端返迴響應且沒有回呼函數觸發,即只發送請求不等待應答。此方式發送訊息的過程耗時非常短,一般在微秒層級。

    單向發送

  • 應用情境

    適用於某些耗時非常短,但對可靠性要求並不高的情境,例如日誌收集。

  • 範例程式碼

    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQOnewayProducer {
        /**
        * 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
        * 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
        */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * 建立Producer,並開啟訊息軌跡。設定為您在阿里雲訊息佇列RocketMQ版控制台建立的Group ID。
             * 如果不想開啟訊息軌跡,可以按照如下方式建立:
             * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * 設定使用接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“http://MQ_INST_XXXX.aliyuncs.com:80”。
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.sendOneway(msg);
                } catch (Exception e) {
                    // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // 在應用退出前,銷毀Producer對象。
            // 注意:如果不銷毀也沒有問題。
            producer.shutdown();
        }
    }

三種發送方式的對比

發送方式

發送TPS

發送結果反饋

可靠性

同步發送

不丟失

非同步發送

不丟失

單向發送

最快

可能丟失

訂閱普通訊息

訂閱普通訊息的方式只有以下一種。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
    * 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * 建立Consumer,並開啟訊息軌跡。設定為您在阿里雲訊息佇列RocketMQ版控制台建立的Group ID。
         * 如果不想開啟訊息軌跡,可以按照如下方式建立:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        //設定為阿里雲訊息佇列RocketMQ版執行個體的存取點。
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        //阿里雲上訊息軌跡需要設定為CLOUD方式,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // 設定為您在阿里雲訊息佇列RocketMQ版控制台上建立的Topic。
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}