遅延メッセージを利用することで、メッセージがコンシューム可能になるタイミングを制御できます。ApsaraMQ for RocketMQ のブローカーは、指定された配信時間まで遅延メッセージを保持し、その後サブスクライバーにリリースします。遅延メッセージは、遅延キューに似ています。
一般的な利用シーン:
注文タイムアウト処理:キャンセルロジックをトリガーする遅延メッセージを送信することで、30 分後に未払いの注文をキャンセルします。
バックオフ付きリトライ:上流依存関係が一時的に利用できない場合、一定の遅延後にリトライをスケジュールします。
遅延通知:設定可能な待機期間の後に、リマインダーやフォローアップ通知を送信します。
概念と制約については、「スケジュールされたメッセージと遅延メッセージ」をご参照ください。
前提条件
開始する前に、以下のものが揃っていることを確認してください:
Java 用 TCP クライアント SDK がインストールされています。詳細については、「環境を準備する」をご参照ください。
ApsaraMQ for RocketMQ インスタンス、トピック、およびコンシューマーグループがコンソールで作成されています。詳細については、「リソースの作成」をご参照ください。
お使いの Alibaba Cloud アカウントの AccessKey ペアです。詳細については、「AccessKey ペアの作成」をご参照ください。
(オプション) ログ記録が設定済みです。詳細については、「ログ記録の設定」をご参照ください。
遅延メッセージの仕組み
プロデューサーはメッセージを作成し、
setStartDeliverTime()を使用して配信時間を設定します。ブローカーはメッセージを受信し、指定された配信時間まで保持します。
配信時間になると、ブローカーはメッセージをトピックにリリースし、コンシューマーが利用できるようにします。
| パラメーター | 詳細 |
|---|---|
| 時間フォーマット | ミリ秒単位の絶対タイムスタンプ (UNIX エポック) |
| 最小値 | 現在時刻より後である必要があります |
| 最大遅延時間 | 現在時刻から 40 日後 |
遅延メッセージの送信
通常メッセージの送信との唯一の違いは、msg.setStartDeliverTime(delayTime) というメソッド呼び出しが 1 つある点です。これにより、ブローカーがメッセージを配信する絶対タイムスタンプ (ミリ秒単位) が設定されます。
その他の例については、「ApsaraMQ for RocketMQ コードライブラリ」をご参照ください。
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 環境変数から AccessKey ペアを取得します。
// ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET が設定されていることを確認してください。
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// TCP エンドポイントを指定します。
// この値は、ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページの [TCP エンドポイント] セクションにあります。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");
Producer producer = ONSFactory.createProducer(properties);
// メッセージを送信する前に、一度 start() を呼び出します。
producer.start();
Message msg = new Message(
"<your-topic>", // コンソールで作成したトピック
"DelayMessageTag", // コンシューマー側でのフィルタリング用のタグ。Gmail のタグに似ています
"Hello MQ".getBytes() // メッセージ本文 (バイナリ形式)。
// ApsaraMQ for RocketMQ はメッセージ本文を処理しません。
// プロデューサーとコンシューマーはシリアル化について合意する必要があります。
);
// オプション:メッセージトレース用にビジネスキーを設定します。
// キーは可能な限りグローバルに一意である必要があります。このキーを使用して、ApsaraMQ for RocketMQ コンソールでメッセージのクエリや再送信ができます。
msg.setKey("ORDERID_100");
try {
// 配信時間を今から 3 秒後に設定します。
// 値はミリ秒単位の絶対タイムスタンプです。
// 指定できる最大遅延時間は 40 日です。
long delayTime = System.currentTimeMillis() + 3000;
msg.setStartDeliverTime(delayTime);
// 同期送信モードでメッセージを送信します。
// 例外がスローされなければ、メッセージは送信されます。
SendResult sendResult = producer.send(msg);
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:"
+ msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 送信失敗の処理:ログ記録、リトライ、またはメッセージの永続化。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// アプリケーションの終了時にプロデューサーをシャットダウンします。
// 高スループットのシナリオでは、プロデューサーを実行し続け、再利用してください。
producer.shutdown();
}
}以下のプレースホルダーを実際の値に置き換えてください:
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-tcp-endpoint> | [インスタンス詳細] ページの TCP エンドポイント | http://MQ_INST_xxxxx.mq-internet-access.mq-internet.aliyuncs.com:80 |
<your-topic> | コンソールで作成したトピック名 | DelayTopic |
ApsaraMQ for RocketMQ を初めて使用する場合は、「デモプロジェクト」をご参照のうえ、メッセージを送受信する前に動作するプロジェクトをセットアップしてください。
遅延メッセージのサブスクライブ
遅延メッセージのサブスクライブは、通常メッセージのサブスクライブと同じように機能します。特別なコンシューマー設定は不要です。遅延ロジックは完全にプロデューサー側にあります。
コンシューマーのサンプルコードについては、「メッセージのサブスクライブ」をご参照ください。
次のステップ
スケジュールされたメッセージと遅延メッセージ:遅延メッセージとスケジュールされたメッセージ配信の背後にある概念と制約を理解します。