一部のビジネスシナリオでは、メッセージをすぐに配信するのではなく、将来の特定の時刻に配信する必要があります。たとえば、E コマースプラットフォームで未払いの注文を 30 分後にキャンセルする必要がある場合や、モニタリングシステムで定期的なヘルスチェックをトリガーする必要がある場合などです。ApsaraMQ for RocketMQ のスケジュールされたメッセージを使用すると、各メッセージに配信タイムスタンプを設定できるため、ブローカーは指定された時刻までメッセージを保持し、コンシューマーに配信します。
このトピックでは、Java 向け TCP クライアント SDK を使用してスケジュールされたメッセージを送受信するためのサンプルコードを提供します。
用語と制約については、「スケジュールされたメッセージと遅延メッセージ」をご参照ください。
ApsaraMQ for RocketMQ を初めて使用する場合は、スケジュールされたメッセージを実装する前に、「デモプロジェクト」を参照して、動作するプロジェクトをセットアップしてください。
利用シーン
注文のタイムアウト処理:ユーザーが注文を行う際に、支払期限に設定されたスケジュールされたメッセージを送信します。メッセージが配信された時点で注文が未払いのままである場合、自動的にキャンセルされます。
定期的なタスクのトリガー:将来の配信時刻を指定したメッセージを送信することで、日次ファイルクリーンアップや定期的なデータ同期などの操作をスケジュールします。
スケジュールされたメッセージの仕組み
プロデューサーは
msg.setStartDeliverTime(timestamp)を呼び出すことで、メッセージに将来の配信タイムスタンプを設定します。ブローカーは、指定されたタイムスタンプまでメッセージを保持します。
スケジュールされた時刻になると、ブローカーはサブスクライブしているコンシューマーにメッセージを配信します。
時刻設定のルール:
| ルール | 動作 |
|---|---|
| タイムスタンプのフォーマット | ミリ秒レベルの UNIX タイムスタンプ。対象の配信時刻をエポックからのミリ秒数に変換します。 |
| 過去のタイムスタンプ | メッセージはすぐに配信されます。 |
前提条件
開始する前に、以下が準備できていることを確認してください。
Java SDK がインストール済みです。詳細については、「環境の準備」をご参照ください。
ApsaraMQ for RocketMQ インスタンス、トピック、および使用者グループ(ApsaraMQ for RocketMQ コンソールで作成)
Alibaba Cloud アカウントの AccessKey ペア
(オプション)ログ設定が構成済みです。
スケジュールされたメッセージの送信
次の例では、将来の配信タイムスタンプを持つメッセージを送信します。msg.setStartDeliverTime(timestamp) メソッドは、いつメッセージを配信するかをブローカーに指示します。
プレースホルダーを実際の値に置き換えてください。
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-topic> | 対象のトピック名 | my-scheduled-topic |
<your-tag> | コンシューマーがフィルタリングするためのメッセージタグ | payment-timeout |
<your-message-body> | バイト配列としてのメッセージコンテンツ | "Hello MQ".getBytes() |
<your-message-key> | ビジネス固有の一意の識別子 | ORDERID_100 |
<your-tcp-endpoint> | ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページにある TCP エンドポイント | -- |
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.text.SimpleDateFormat;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 環境変数から AccessKey ペアを読み込みます
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページから TCP エンドポイントを取得します
properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");
Producer producer = ONSFactory.createProducer(properties);
// メッセージを送信する前に一度だけ start() を呼び出します
producer.start();
Message msg = new Message(
"<your-topic>", // トピック
"<your-tag>", // コンシューマー側でのフィルタリング用のタグ
"<your-message-body>".getBytes() // バイト単位のメッセージ本文
);
// オプション: コンソールでのメッセージ追跡のためにビジネスキーを設定します
msg.setKey("<your-message-key>");
try {
// 配信タイムスタンプ (エポックからのミリ秒数) を設定します
// 例: 2026-03-15 10:30:00 に配信
long deliverTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse("2026-03-15 10:30:00")
.getTime();
msg.setStartDeliverTime(deliverTime);
SendResult sendResult = producer.send(msg);
System.out.println("Message Id:" + sendResult.getMessageId());
} catch (Exception e) {
// 送信失敗の処理: メッセージをリトライまたは永続化します
System.out.println("Send failed. Topic: " + msg.getTopic());
e.printStackTrace();
}
// アプリケーションの終了時にプロデューサーをシャットダウンします。
// メッセージを頻繁に送信する場合は、送信ごとにシャットダウンするのではなく、
// プロデューサーを実行し続けます。
producer.shutdown();
}
}キーポイント:
setStartDeliverTime(long timestamp)はミリ秒レベルの UNIX タイムスタンプを受け入れます。ブローカーはこの時刻までメッセージを保持します。送信前に一度
producer.start()を呼び出します。高スループットのシナリオでは、メッセージごとに新しいプロデューサーインスタンスを作成するのではなく、同じプロデューサーインスタンスを再利用してください。msg.setKey()はオプションですが、ApsaraMQ for RocketMQ コンソールでビジネス ID によってメッセージを追跡するために推奨されます。
追加の例については、「ApsaraMQ for RocketMQ コードライブラリ」をご参照ください。
スケジュールされたメッセージのサブスクライブ
スケジュールされたメッセージのサブスクライブは、通常のメッセージと同じ方法で行います。特別なサブスクリプションロジックは必要ありません。スケジュールされたタイムスタンプに達すると、ブローカーがコンシューマーにメッセージを配信します。
サブスクリプションコードについては、「メッセージのサブスクライブ」をご参照ください。
ベストプラクティス
一意のメッセージキーを設定する。メッセージキー (
setKey) は、可能な限りグローバルで一意である必要があります。配信に問題が発生した場合に、ApsaraMQ for RocketMQ コンソールでメッセージを検索するために使用します。プロデューサーインスタンスを再利用する。
producer.shutdown()は、アプリケーションが終了するときにのみ呼び出してください。プロデューサーを繰り返し作成および削除すると、メモリを浪費します。
関連トピック
スケジュール済みメッセージおよび遅延メッセージ -- スケジューリング機構と制約
メッセージのサブスクライブ -- メッセージ消費の設定
ApsaraMQ for RocketMQ コードライブラリ -- その他の Java SDK の例
デモプロジェクト -- 完全な動作プロジェクト