ApsaraMQ for RocketMQ は、2 種類の時間ベースのメッセージ配信をサポートしています。特定の時点で配信されるスケジュールされたメッセージと、一定の遅延後に配信される遅延メッセージです。どちらも __STARTDELIVERTIME プロパティを使用して、ブローカーがコンシューマーにメッセージを配信するタイミングを制御します。
以下の Java サンプルコードは、TCP クライアント SDK (Community Edition) を使用して、スケジュールされたメッセージと遅延メッセージを送受信し、サブスクライブする方法を示しています。
仕組み
プロデューサーが __STARTDELIVERTIME プロパティを持つメッセージを送信すると、ApsaraMQ for RocketMQ のブローカーは指定された配信時間までメッセージを保持します。その時点になると、ブローカーはサブスクライブしているコンシューマーに、通常のメッセージと同様にメッセージを配信します。
遅延メッセージ:
__STARTDELIVERTIMEをSystem.currentTimeMillis() + delayMillisに設定します。ブローカーは、遅延時間が経過した後にメッセージを配信します。スケジュールされたメッセージ:
__STARTDELIVERTIMEをミリ秒単位の未来の UNIX タイムスタンプに設定します。ブローカーは、その正確な時刻にメッセージを配信します。
指定された時刻が過去の場合、ブローカーはメッセージを即座に配信します。
スケジュールされたメッセージと遅延メッセージの使い分け
| タイプ | ユースケース | 例 |
|---|---|---|
| 遅延 | 一定時間待機した後にアクションをトリガーする | 作成から 30 分後に未払いの注文をキャンセルする |
| スケジュール済み | 特定の時刻にアクションをトリガーする | 毎週月曜日の 09:00 に通知を送信する |
Apache RocketMQ との違い
Apache RocketMQ は遅延メッセージをサポートしていますが、スケジュールされたメッセージはサポートしておらず、スケジュールされたメッセージ専用のインターフェイスも存在しません。
ApsaraMQ for RocketMQ は、以下の追加機能を提供します:
遅延メッセージとスケジュールされたメッセージの両方のタイプ
秒単位の配信時間精度
時間ベースのメッセージ処理におけるより高い同時実行数
Apache RocketMQ と ApsaraMQ for RocketMQ では、設定方法と結果が異なります。クラウド上の ApsaraMQ for RocketMQ については、このページのサンプルコードを使用してください。
前提条件
開始する前に、以下が準備されていることを確認してください:
Java SDK の Community Edition 4.5.2 以降がインストールされていること
Alibaba Cloud アカウント用に作成されたAccessKey ペア
トピックとグループ ID がApsaraMQ for RocketMQ コンソールで作成されていること
スケジュールされたメッセージと遅延メッセージの送信
スケジュールされたメッセージと遅延メッセージは、どちらも同じ __STARTDELIVERTIME ユーザープロパティを使用します。唯一の違いは、タイムスタンプ値の計算方法です。
以下のプレースホルダーを実際の値に置き換えてください:
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-group-id> | ApsaraMQ for RocketMQ コンソールで作成したグループ ID | GID_example |
<your-access-point> | コンソールから取得したインスタンスのエンドポイント | http://MQ_INST_XXXX.aliyuncs.com:80 |
<your-topic> | ApsaraMQ for RocketMQ コンソールで作成したトピック | Topic_example |
<your-message-tag> | メッセージフィルタリング用のタグ | TagA |
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
/**
* 環境変数から認証情報を読み取ります:
* ALIBABA_CLOUD_ACCESS_KEY_ID
* ALIBABA_CLOUD_ACCESS_KEY_SECRET
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
// メッセージトレースを有効にしてプロデューサーを作成します。
// メッセージトレースを無効にするには、以下を使用します:
// new DefaultMQProducer("<your-group-id>", getAclRPCHook());
DefaultMQProducer producer = new DefaultMQProducer(
"<your-group-id>", getAclRPCHook(), true, null);
// クラウド上でメッセージトレースを行うために必要です。
producer.setAccessChannel(AccessChannel.CLOUD);
// ApsaraMQ for RocketMQ コンソールからインスタンスのエンドポイントを設定します。
producer.setNamesrvAddr("<your-access-point>");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(
"<your-topic>",
"<your-message-tag>",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// --- オプション A:遅延メッセージ ---
// 今から 3 秒後に配信します。
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
// --- オプション B:スケジュールされたメッセージ ---
// 特定の日時に配信します。
// 使用するには、以下の行のコメントを解除し、オプション A をコメントアウトします。
//
// long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// .parse("2025-08-10 18:45:00").getTime();
// msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// 終了する前にプロデューサーをシャットダウンします (任意)。
producer.shutdown();
}
}概要:
__STARTDELIVERTIMEプロパティは、ミリ秒単位の UNIX タイムスタンプを受け入れます。遅延メッセージの場合、現在の時刻に目的の遅延時間 (ミリ秒単位) を加算します。
スケジュールされたメッセージの場合、ターゲットの日時文字列を
yyyy-MM-dd HH:mm:ssフォーマットで解析し、UNIX タイムスタンプに変換します。指定された時刻が現在の時刻より前の場合、メッセージは即座に配信されます。
スケジュールされたメッセージと遅延メッセージのサブスクライブ
スケジュールされたメッセージと遅延メッセージのサブスクライブは、通常のメッセージのサブスクライブと同じです。ブローカーは指定された配信時間までメッセージを保持するため、コンシューマー側で特別な設定は必要ありません。
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/**
* 環境変数から認証情報を読み取ります:
* ALIBABA_CLOUD_ACCESS_KEY_ID
* ALIBABA_CLOUD_ACCESS_KEY_SECRET
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
// メッセージトレースを有効にしてコンシューマーを作成します。
// メッセージトレースを無効にするには、以下を使用します:
// new DefaultMQPushConsumer("<your-group-id>", getAclRPCHook(),
// new AllocateMessageQueueAveragely());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"<your-group-id>", getAclRPCHook(),
new AllocateMessageQueueAveragely(), true, null);
// ApsaraMQ for RocketMQ コンソールからインスタンスのエンドポイントを設定します。
// 値は http://xxxx.mq-internet.aliyuncs.com:80 のフォーマットです。
consumer.setNamesrvAddr("<your-access-point>");
// クラウド上でメッセージトレースを行うために必要です。
consumer.setAccessChannel(AccessChannel.CLOUD);
// トピックのすべてのタグ (*) をサブスクライブします。
consumer.subscribe("<your-topic>", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}関連ドキュメント
スケジュール済みメッセージおよび遅延メッセージ:概念、配信精度、および使用制限
トランザクションメッセージや順序付きメッセージなど、他のメッセージタイプを調べる