すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:スケジュールされたメッセージと遅延メッセージの送受信

最終更新日:Mar 11, 2026

ApsaraMQ for RocketMQ は、2 種類の時間ベースのメッセージ配信をサポートしています。特定の時点で配信されるスケジュールされたメッセージと、一定の遅延後に配信される遅延メッセージです。どちらも __STARTDELIVERTIME プロパティを使用して、ブローカーがコンシューマーにメッセージを配信するタイミングを制御します。

以下の Java サンプルコードは、TCP クライアント SDK (Community Edition) を使用して、スケジュールされたメッセージと遅延メッセージを送受信し、サブスクライブする方法を示しています。

仕組み

プロデューサーが __STARTDELIVERTIME プロパティを持つメッセージを送信すると、ApsaraMQ for RocketMQ のブローカーは指定された配信時間までメッセージを保持します。その時点になると、ブローカーはサブスクライブしているコンシューマーに、通常のメッセージと同様にメッセージを配信します。

  • 遅延メッセージ: __STARTDELIVERTIMESystem.currentTimeMillis() + delayMillis に設定します。ブローカーは、遅延時間が経過した後にメッセージを配信します。

  • スケジュールされたメッセージ: __STARTDELIVERTIME をミリ秒単位の未来の UNIX タイムスタンプに設定します。ブローカーは、その正確な時刻にメッセージを配信します。

指定された時刻が過去の場合、ブローカーはメッセージを即座に配信します。

スケジュールされたメッセージと遅延メッセージの使い分け

タイプユースケース
遅延一定時間待機した後にアクションをトリガーする作成から 30 分後に未払いの注文をキャンセルする
スケジュール済み特定の時刻にアクションをトリガーする毎週月曜日の 09:00 に通知を送信する

Apache RocketMQ との違い

Apache RocketMQ は遅延メッセージをサポートしていますが、スケジュールされたメッセージはサポートしておらず、スケジュールされたメッセージ専用のインターフェイスも存在しません。

ApsaraMQ for RocketMQ は、以下の追加機能を提供します:

  • 遅延メッセージとスケジュールされたメッセージの両方のタイプ

  • 秒単位の配信時間精度

  • 時間ベースのメッセージ処理におけるより高い同時実行数

重要

Apache RocketMQ と ApsaraMQ for RocketMQ では、設定方法と結果が異なります。クラウド上の ApsaraMQ for RocketMQ については、このページのサンプルコードを使用してください。

前提条件

開始する前に、以下が準備されていることを確認してください:

スケジュールされたメッセージと遅延メッセージの送信

スケジュールされたメッセージと遅延メッセージは、どちらも同じ __STARTDELIVERTIME ユーザープロパティを使用します。唯一の違いは、タイムスタンプ値の計算方法です。

以下のプレースホルダーを実際の値に置き換えてください:

プレースホルダー説明
<your-group-id>ApsaraMQ for RocketMQ コンソールで作成したグループ IDGID_example
<your-access-point>コンソールから取得したインスタンスのエンドポイントhttp://MQ_INST_XXXX.aliyuncs.com:80
<your-topic>ApsaraMQ for RocketMQ コンソールで作成したトピックTopic_example
<your-message-tag>メッセージフィルタリング用のタグTagA
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    /**
     * 環境変数から認証情報を読み取ります:
     *   ALIBABA_CLOUD_ACCESS_KEY_ID
     *   ALIBABA_CLOUD_ACCESS_KEY_SECRET
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // メッセージトレースを有効にしてプロデューサーを作成します。
        // メッセージトレースを無効にするには、以下を使用します:
        //   new DefaultMQProducer("<your-group-id>", getAclRPCHook());
        DefaultMQProducer producer = new DefaultMQProducer(
            "<your-group-id>", getAclRPCHook(), true, null);

        // クラウド上でメッセージトレースを行うために必要です。
        producer.setAccessChannel(AccessChannel.CLOUD);

        // ApsaraMQ for RocketMQ コンソールからインスタンスのエンドポイントを設定します。
        producer.setNamesrvAddr("<your-access-point>");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message(
                    "<your-topic>",
                    "<your-message-tag>",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                // --- オプション A:遅延メッセージ ---
                // 今から 3 秒後に配信します。
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                // --- オプション B:スケジュールされたメッセージ ---
                // 特定の日時に配信します。
                // 使用するには、以下の行のコメントを解除し、オプション A をコメントアウトします。
                //
                // long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                //     .parse("2025-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));

                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 終了する前にプロデューサーをシャットダウンします (任意)。
        producer.shutdown();
    }
}

概要:

  • __STARTDELIVERTIME プロパティは、ミリ秒単位の UNIX タイムスタンプを受け入れます。

  • 遅延メッセージの場合、現在の時刻に目的の遅延時間 (ミリ秒単位) を加算します。

  • スケジュールされたメッセージの場合、ターゲットの日時文字列を yyyy-MM-dd HH:mm:ss フォーマットで解析し、UNIX タイムスタンプに変換します。

  • 指定された時刻が現在の時刻より前の場合、メッセージは即座に配信されます。

スケジュールされたメッセージと遅延メッセージのサブスクライブ

スケジュールされたメッセージと遅延メッセージのサブスクライブは、通常のメッセージのサブスクライブと同じです。ブローカーは指定された配信時間までメッセージを保持するため、コンシューマー側で特別な設定は必要ありません。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
     * 環境変数から認証情報を読み取ります:
     *   ALIBABA_CLOUD_ACCESS_KEY_ID
     *   ALIBABA_CLOUD_ACCESS_KEY_SECRET
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // メッセージトレースを有効にしてコンシューマーを作成します。
        // メッセージトレースを無効にするには、以下を使用します:
        //   new DefaultMQPushConsumer("<your-group-id>", getAclRPCHook(),
        //       new AllocateMessageQueueAveragely());
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
            "<your-group-id>", getAclRPCHook(),
            new AllocateMessageQueueAveragely(), true, null);

        // ApsaraMQ for RocketMQ コンソールからインスタンスのエンドポイントを設定します。
        // 値は http://xxxx.mq-internet.aliyuncs.com:80 のフォーマットです。
        consumer.setNamesrvAddr("<your-access-point>");

        // クラウド上でメッセージトレースを行うために必要です。
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // トピックのすべてのタグ (*) をサブスクライブします。
        consumer.subscribe("<your-topic>", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

関連ドキュメント