すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:スケジュールされたメッセージの送受信

最終更新日:Mar 12, 2026

一部のビジネスシナリオでは、メッセージをすぐに配信するのではなく、将来の特定の時刻に配信する必要があります。たとえば、E コマースプラットフォームで未払いの注文を 30 分後にキャンセルする必要がある場合や、モニタリングシステムで定期的なヘルスチェックをトリガーする必要がある場合などです。ApsaraMQ for RocketMQ のスケジュールされたメッセージを使用すると、各メッセージに配信タイムスタンプを設定できるため、ブローカーは指定された時刻までメッセージを保持し、コンシューマーに配信します。

このトピックでは、Java 向け TCP クライアント SDK を使用してスケジュールされたメッセージを送受信するためのサンプルコードを提供します。

用語と制約については、「スケジュールされたメッセージと遅延メッセージ」をご参照ください。

説明

ApsaraMQ for RocketMQ を初めて使用する場合は、スケジュールされたメッセージを実装する前に、「デモプロジェクト」を参照して、動作するプロジェクトをセットアップしてください。

利用シーン

  • 注文のタイムアウト処理:ユーザーが注文を行う際に、支払期限に設定されたスケジュールされたメッセージを送信します。メッセージが配信された時点で注文が未払いのままである場合、自動的にキャンセルされます。

  • 定期的なタスクのトリガー:将来の配信時刻を指定したメッセージを送信することで、日次ファイルクリーンアップや定期的なデータ同期などの操作をスケジュールします。

スケジュールされたメッセージの仕組み

  1. プロデューサーは msg.setStartDeliverTime(timestamp) を呼び出すことで、メッセージに将来の配信タイムスタンプを設定します。

  2. ブローカーは、指定されたタイムスタンプまでメッセージを保持します。

  3. スケジュールされた時刻になると、ブローカーはサブスクライブしているコンシューマーにメッセージを配信します。

時刻設定のルール:

ルール動作
タイムスタンプのフォーマットミリ秒レベルの UNIX タイムスタンプ。対象の配信時刻をエポックからのミリ秒数に変換します。
過去のタイムスタンプメッセージはすぐに配信されます。

前提条件

開始する前に、以下が準備できていることを確認してください。

スケジュールされたメッセージの送信

次の例では、将来の配信タイムスタンプを持つメッセージを送信します。msg.setStartDeliverTime(timestamp) メソッドは、いつメッセージを配信するかをブローカーに指示します。

プレースホルダーを実際の値に置き換えてください。

プレースホルダー説明
<your-topic>対象のトピック名my-scheduled-topic
<your-tag>コンシューマーがフィルタリングするためのメッセージタグpayment-timeout
<your-message-body>バイト配列としてのメッセージコンテンツ"Hello MQ".getBytes()
<your-message-key>ビジネス固有の一意の識別子ORDERID_100
<your-tcp-endpoint>ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページにある TCP エンドポイント--
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.text.SimpleDateFormat;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 環境変数から AccessKey ペアを読み込みます
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページから TCP エンドポイントを取得します
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");

        Producer producer = ONSFactory.createProducer(properties);
        // メッセージを送信する前に一度だけ start() を呼び出します
        producer.start();

        Message msg = new Message(
                "<your-topic>",            // トピック
                "<your-tag>",              // コンシューマー側でのフィルタリング用のタグ
                "<your-message-body>".getBytes()  // バイト単位のメッセージ本文
        );

        // オプション: コンソールでのメッセージ追跡のためにビジネスキーを設定します
        msg.setKey("<your-message-key>");

        try {
            // 配信タイムスタンプ (エポックからのミリ秒数) を設定します
            // 例: 2026-03-15 10:30:00 に配信
            long deliverTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                    .parse("2026-03-15 10:30:00")
                    .getTime();
            msg.setStartDeliverTime(deliverTime);

            SendResult sendResult = producer.send(msg);
            System.out.println("Message Id:" + sendResult.getMessageId());
        } catch (Exception e) {
            // 送信失敗の処理: メッセージをリトライまたは永続化します
            System.out.println("Send failed. Topic: " + msg.getTopic());
            e.printStackTrace();
        }

        // アプリケーションの終了時にプロデューサーをシャットダウンします。
        // メッセージを頻繁に送信する場合は、送信ごとにシャットダウンするのではなく、
        // プロデューサーを実行し続けます。
        producer.shutdown();
    }
}

キーポイント:

  • setStartDeliverTime(long timestamp) はミリ秒レベルの UNIX タイムスタンプを受け入れます。ブローカーはこの時刻までメッセージを保持します。

  • 送信前に一度 producer.start() を呼び出します。高スループットのシナリオでは、メッセージごとに新しいプロデューサーインスタンスを作成するのではなく、同じプロデューサーインスタンスを再利用してください。

  • msg.setKey() はオプションですが、ApsaraMQ for RocketMQ コンソールでビジネス ID によってメッセージを追跡するために推奨されます。

追加の例については、「ApsaraMQ for RocketMQ コードライブラリ」をご参照ください。

スケジュールされたメッセージのサブスクライブ

スケジュールされたメッセージのサブスクライブは、通常のメッセージと同じ方法で行います。特別なサブスクリプションロジックは必要ありません。スケジュールされたタイムスタンプに達すると、ブローカーがコンシューマーにメッセージを配信します。

サブスクリプションコードについては、「メッセージのサブスクライブ」をご参照ください。

ベストプラクティス

  • 一意のメッセージキーを設定する。メッセージキー (setKey) は、可能な限りグローバルで一意である必要があります。配信に問題が発生した場合に、ApsaraMQ for RocketMQ コンソールでメッセージを検索するために使用します。

  • プロデューサーインスタンスを再利用する。producer.shutdown() は、アプリケーションが終了するときにのみ呼び出してください。プロデューサーを繰り返し作成および削除すると、メモリを浪費します。

関連トピック