メッセージを指定された期間内にコンシュームする必要がある場合は、ApsaraMQ for RabbitMQ が提供する遅延メッセージ機能を使用できます。ApsaraMQ for RabbitMQ は、遅延メッセージ機能をネイティブでサポートしています。オープンソースの RabbitMQ と比較して、ApsaraMQ for RabbitMQ は、より使いやすい遅延メッセージ機能を提供します。
遅延メッセージとは
遅延メッセージとは、プロデューサーによって送信された後、特定の期間内にコンシュームされるメッセージです。
一般的なシナリオ
遅延メッセージは、以下のシナリオで使用できます。
メッセージの生成とコンシュームの間に時間枠が必要な場合。たとえば、e コマースプラットフォームで注文が作成されると、プロデューサーは ApsaraMQ for RabbitMQ ブローカーに遅延メッセージを送信します。このメッセージは、必要な期間内に注文が支払われたかどうかをコンシューマーに確認するために使用されます。ApsaraMQ for RabbitMQ ブローカーは、ブローカーがメッセージを受信してから 30 分後にメッセージをコンシューマーに配信します。コンシューマーは、メッセージを受信した後、支払いが完了したかどうかを確認します。支払いが完了していない場合、注文はクローズされます。それ以外の場合、コンシューマーはメッセージを無視します。
スケジュールされたタスクをトリガーするためにメッセージが使用される場合。たとえば、遅延メッセージを使用して、ユーザーに通知を送信するスケジュールされたタスクをトリガーできます。
遅延期間を指定するためのルール
メッセージの遅延期間の値は、負でない整数である必要があります。単位:ミリ秒。
遅延メッセージに指定した遅延期間が、メッセージに許可されている最大遅延期間よりも長い場合、メッセージは通常のメッセージとして処理され、すぐにコンシューマーに配信されます。最大遅延期間は、インスタンスタイプによって異なります。詳細については、クラスターをご参照ください。
遅延メッセージの有効期限(TTL)を指定した場合、実際のメッセージ TTL は次の式を使用して計算されます。
Actual message TTL = min {Message-level TTL or queue-level TTL} + Delay period。詳細については、メッセージ TTLをご参照ください。
遅延メッセージソリューションの比較
ApsaraMQ for RabbitMQ を使用すると、コードを変更することなく、オープンソースの RabbitMQ が提供するすべての遅延メッセージソリューションを実装できます。次の表は、ApsaraMQ for RabbitMQ とオープンソースの RabbitMQ の遅延メッセージソリューションを比較したものです。
ソリューション | オープンソース RabbitMQ | ApsaraMQ for RabbitMQ |
デッドレター交換 + キューレベル TTL | サポート | サポート |
デッドレター交換 + メッセージレベル TTL | サポート | サポート |
サポート | サポート | |
サポートなし | サポート |
オープンソース遅延メッセージプラグイン
オープンソース RabbitMQ と連携するために、ApsaraMQ for RabbitMQ はオープンソースの遅延メッセージプラグインと互換性があります。プラグインをインストールすることなく、プラグインを使用して遅延メッセージを送受信できます。オープンソースの遅延メッセージプラグインを使用するには、次の操作を実行します。
x-delayed-messageタイプの交換を宣言し、交換の x-delayed-type 拡張引数を設定してルーティングルールを指定します。サンプルコード:Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("ExchangeName", "x-delayed-message", true, false, args);次の表は、前のコードのパラメーターについて説明しています。
パラメーター
説明
x-delayed-type
交換タイプ。このパラメーターは、ルーティングルールを指定するために使用されます。有効な値:
direct
fanout
topic
headers
x-jms-topic
ExchangeName
交換名。
説明宣言された交換が ApsaraMQ for RabbitMQ コンソールで作成されていることを確認してください。詳細については、交換の作成をご参照ください。
x-delayed-message
遅延メッセージをルーティングするために使用される交換のタイプ。
遅延メッセージを送信します。メッセージの headers 属性にキーと値のペアを追加し、前の手順で宣言された交換にメッセージが送信されるように指定します。キーと値のペアのキーは x-delay で、値はミリ秒単位の数値です。サンプルコード:
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000);// メッセージの遅延期間を 5,000 ミリ秒に指定します。 AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("ExchangeName", "", props.build(), messageBodyBytes);
ネイティブ遅延メッセージソリューション
ApsaraMQ for RabbitMQ では、メッセージの delay 属性を設定してメッセージを遅延させることができます。次のセクションでは、ApsaraMQ for RabbitMQ でのネイティブ遅延メッセージのデータ転送プロセスについて説明します。
プロデューサーは、delay 属性が設定されたメッセージを交換に送信します。
交換は、メッセージをキューにルーティングします。
コンシューマーは、delay 属性で指定された期間が経過した後でのみ、キューからメッセージをコンシュームできます。

ネイティブ遅延メッセージソリューションのベストプラクティス
プロデューサークライアント
ApsaraMQ for RabbitMQ のネイティブ遅延メッセージソリューションは使いやすくなっています。プロデューサークライアントから送信されるメッセージの delay 属性を設定するだけで済みます。
次のサンプルコードは、Java で遅延メッセージを送信する方法の例を示しています。
Map<String, Object> headers = new HashMap<>(); headers.put("delay", "5000");// メッセージの遅延期間を 5,000 ミリ秒に指定します。 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();他のプログラミング言語のサンプルコードについては、AMQP デモをご参照ください。
コンシューマークライアント
指定された遅延期間に達した後、遅延メッセージをコンシューマーにすぐに配信できるようにするために、メッセージのコンシュームには、プルモードの
basic.getメソッドではなく、プッシュモードのbasic.consumeメソッドを使用することをお勧めします。ApsaraMQ for RabbitMQ は、メッセージを分散モードで保存します。プルモードのbasic.getメソッドを使用してメッセージを取得する場合、メッセージが保存されているノードにすぐに到達できない可能性があります。
FAQ
実際の遅延期間が指定された遅延値よりも長いのはなぜですか?
コンシューマークライアントは、basic.get メソッドをプルモードで使用してメッセージをコンシュームします。ApsaraMQ for RabbitMQ のメッセージはクラスターに保存されます。プルモードの basic.get メソッドを使用して ApsaraMQ for RabbitMQ ブローカーにメッセージがルーティングされると、コンシューマーは指定された遅延期間の直後に他のブローカーからメッセージをプルできない場合があります。