全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發定時訊息和延時訊息

更新時間:Nov 23, 2024

本文提供使用社區版TCP協議下的Java SDK收發定時訊息和延時訊息的範例程式碼。

背景資訊

  • 定時訊息:Producer將訊息發送到雲訊息佇列 RocketMQ 版服務端,但並不期望立馬投遞這條訊息,而是延遲到在目前時間點之後的某一個時間投遞到Consumer進行消費,該訊息即定時訊息。
  • 延時訊息:Producer將訊息發送到雲訊息佇列 RocketMQ 版服務端,但並不期望立馬投遞這條訊息,而是延遲一定時間後才投遞到Consumer進行消費,該訊息即延時訊息。

更多資訊,請參見定時和延時訊息

重要

社區版的Apache RocketMQ和阿里雲雲訊息佇列 RocketMQ 版配置方式不同,實現效果也有所差異。社區版的Apache RocketMQ支援延時訊息,但不支援定時訊息,因此沒有專門的定時訊息介面。阿里雲雲訊息佇列 RocketMQ 版不僅同時支援配置延時訊息和定時訊息,並且定時和延時時間可以精確到秒級、擁有更高的並發性。建議您優先使用雲上定時延時的方式,使用方法,請參考以下步驟。

前提條件

您已完成以下操作:

發送定時訊息或延時訊息

發送定時訊息或延時訊息的範例程式碼如下。

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    /**
     * 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
     * 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         *建立Producer,並開啟訊息軌跡。設定為您在阿里雲訊息佇列RocketMQ版控制台建立的Group ID。
         *如果不想開啟訊息軌跡,可以按照如下方式建立:
         *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
        /**
         *設定使用接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
         */
        producer.setAccessChannel(AccessChannel.CLOUD);
        /**
         *設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“http://MQ_INST_XXXX.aliyuncs.com:80”。
         */
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                /*設定為您在訊息佇列RocketMQ版控制台建立的Topic。*/
                Message msg = new Message("YOUR TOPIC",
                        /*設定訊息的Tag。*/
                        "YOUR MESSAGE TAG",
                        /*訊息內容。*/
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                /*發送延時訊息,需要設定延時時間,單位毫秒(ms),訊息將在指定延時時間後投遞,例如訊息將在3秒後投遞。*/
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                /**
                 *若需要發送定時訊息,則需要設定定時時間,訊息將在指定時間進行投遞,例如訊息將在2021-08-10 18:45:00投遞。
                 *定時時間格式為:yyyy-MM-dd HH:mm:ss,若設定的時間戳記在目前時間之前,則訊息將被立即投遞給Consumer。
                 * long timeStamp = newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                 * msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                 */
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                //訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        //在應用退出前,銷毀Producer對象。
        //注意:如果不銷毀也沒有問題。
        producer.shutdown();
    }
}

訂閱定時訊息或延時訊息

訂閱定時訊息或延時訊息的範例程式碼如下。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
    * 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * 建立Consumer,並開啟訊息軌跡。設定為您在阿里雲訊息佇列RocketMQ版控制台建立的Group ID。
         * 如果不想開啟訊息軌跡,可以按照如下方式建立:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        //設定為阿里雲訊息佇列RocketMQ版執行個體的存取點。
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        //阿里雲上訊息軌跡需要設定為CLOUD方式,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // 設定為您在阿里雲訊息佇列RocketMQ版控制台上建立的Topic。
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}