すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:スケジュールされたメッセージと遅延メッセージの送受信

最終更新日:Nov 22, 2024

このトピックでは、Community EditionのTCPクライアントSDK for Javaを使用して、スケジュールされたメッセージと遅延されたメッセージを送受信するためのサンプルコードを提供します。

背景情報

  • スケジュールされたメッセージ: スケジュールされたメッセージは、ApsaraMQ for RocketMQブローカーによって指定された時点でコンシューマに配信されるメッセージです。

  • 遅延メッセージ: 遅延メッセージは、ApsaraMQ for RocketMQブローカーによって指定された期間後にコンシューマに配信されるメッセージです。

詳細については、「スケジュールされたメッセージと遅延メッセージ」をご参照ください。

重要

スケジュールされたメッセージと遅延メッセージの設定方法と結果は、Apache RocketMQとApsaraMQ for RocketMQの間で異なります。 Apache RocketMQは遅延メッセージをサポートしますが、スケジュールメッセージはサポートしません。 したがって、スケジュールメッセージに専用のインターフェイスは使用できません。 ApsaraMQ for RocketMQは、遅延メッセージとスケジュール済みメッセージをサポートしています。 これにより、秒単位の正確なスケジュール時間と遅延期間を設定でき、同時実行性が高くなります。 クラウド上でスケジュールメッセージと遅延メッセージを送受信することを推奨します。 詳細は、以下のセクションをご参照ください。

前提条件

開始する前に、次の操作が実行されていることを確認してください。

  • SDK for Java 4.5.2以降のCommunity Editionがダウンロードされます。 詳細については、RocketMQのダウンロードページをご覧ください。

  • 環境を整えます。 詳細については、「環境の準備」をご参照ください。

  • Alibaba CloudアカウントにAccessKeyペアが作成されます。 詳細については、「AccessKey の作成」をご参照ください。

スケジュールされたメッセージと遅延メッセージの送信

次のサンプルコードは、Community EditionのJava用TCPクライアントSDKを使用して、スケジュールされたメッセージと遅延されたメッセージを送信する方法の例を示しています。

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
    /**
    * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    */ 
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         * Create a producer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
         * If you do not want to enable the message trace feature, you can use the following method to create a producer:
         *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
        /**
         * Specify Alibaba Cloud as the access channel. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
         */
        producer.setAccessChannel(AccessChannel.CLOUD);
        /**
         * The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80. 
         */
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try { 
                /* Specify the topic that you created in the ApsaraMQ for RocketMQ console. */
                Message msg = new Message("YOUR TOPIC",
                    /* The message tag. */
                    "YOUR MESSAGE TAG",
                    // The message body. */
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                /* The delay period after which the delayed message is sent. Unit: milliseconds. For example, if you want the delayed message to be sent after 3 seconds, specify the value as 3000. */
                long delayTime = System.currentTimeMillis()+3000;
                msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(delayTime));
      
                /**
                * The point in time at which the scheduled message is sent. For example, if you want the scheduled message to be sent at 18:45:00 on August 10, 2021, specify the value as 2021-08-10 18:45:00. 
                * The value of this parameter is in the format of yyyy-MM-dd HH:mm:ss. If you specify a time that is earlier than the current time, the message is immediately sent to the consumer. 
                * long timeStamp = newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                * msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                */
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // The logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer object. 
        // Note: This operation is optional. 
        producer.shutdown();
    }
}

定期メッセージと遅延メッセージの購読

次のサンプルコードは、Community EditionのJava用TCPクライアントSDKを使用して、スケジュールされたメッセージと遅延したメッセージをサブスクライブする方法の例を示しています。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * Create a consumer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
         * If you do not want to enable the message trace feature, you can use the following method to create a producer:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        // The endpoint of the ApsaraMQ for RocketMQ instance. 
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        // Set the AccessChannel parameter to CLOUD. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}