すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RabbitMQ:ヘッダー交換の使用方法

最終更新日:Jan 14, 2025

ヘッダー交換は、メッセージのルーティングキーではなく、ヘッダーとバインディング属性に基づいて、メッセージをキューにルーティングします。このトピックでは、ヘッダー交換の使用方法について説明します。

背景情報

  • ヘッダー交換にメッセージを送信する場合、キーと値の形式で headers 属性を設定する必要があります。ヘッダー交換はメッセージを受信すると、メッセージの headers 属性とバインドされたキューのバインディング属性の一致に基づいてメッセージをルーティングします。

    特別なバインディング属性 x-match を使用して、一致方法を決定します。x-match の値は、all または any に設定できます。

    • all: ヘッダー交換は、x-match を除くキューのすべてのバインディング属性がメッセージの headers 属性と一致する場合にのみ、メッセージをキューにルーティングします。

    • any: ヘッダー交換は、x-match を除くキューの1つ以上のバインディング属性がメッセージの headers 属性と一致する場合、メッセージをキューにルーティングします。

    詳細については、「ヘッダー交換」をご参照ください。

  • 交換がバインドされた後、メッセージ検索ApsaraMQ for RabbitMQ コンソールの対応するインスタンスの [メッセージクエリ] ページで、キューごとのバインディング結果をクエリできます。詳細については、「メッセージのクエリ」をご参照ください。

サンプルコード

Java の次のサンプルコードは、ヘッダー交換をバインドする方法の例を示しています。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class HeadersExchange {
    // ApsaraMQ for RabbitMQ インスタンスのエンドポイント。
    private static String host = "xxx.xxx.aliyuncs.com";
    // ApsaraMQ for RabbitMQ インスタンスの静的ユーザー名とパスワード。
    private static String userName = "${UserName}";
    private static String password = "${PassWord}";
    // ApsaraMQ for RabbitMQ インスタンスの vhost。
    private static String vhost = "${VirtualHost}";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(userName);
        factory.setPassword(password);
        // 注:自動接続回復機能は、機能が有効になっている場合にのみ有効になります。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(vhost);
        // デフォルトポート。暗号化されていない接続にはポート 5672 を使用し、暗号化された接続にはポート 5671 を使用します。
        factory.setPort(5672);
        // ネットワーク環境に基づいてタイムアウト期間を指定します。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        // 永続的な接続を使用します。このようにして、クライアントはメッセージを送信するためにクライアントを使用するたびに接続を確立する必要はありません。接続が頻繁に確立されると、大量のネットワークとブローカーリソースが消費され、ブローカーで SYN フラッド攻撃に対する保護がトリガーされる可能性があります。
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "${ExchangeName}";
        String exchangeType = "headers";
        String queueName = "${QueueName}";
        String routingKey = "${RoutingKey}";


        Map<String, Object> argument = new HashMap<>();
        argument.put("format", "pdf");
        argument.put("type", "log");
        argument.put("x-match", "all");

        channel.queueDeclare(queueName, true, false, false, null);
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey, argument);


        // mandatory パラメーターの値が構成ファイルで true で、メッセージのルーティングに失敗した場合、メッセージはクライアントに返されます。
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("no route, msgId=" + properties.getMessageId());
            }
        });

        // キーと値のペアをメッセージの headers 属性として設定します。
        // 1. キーと値のペア(type、log)がアノテーションとしてリストされている場合、キーと値のペア(format、pdf)のみが引数と一致します。コード実行後、メッセージはルーティングできません。
        // 2. キーと値のペア(type、log)がアノテーションとしてリストされていない場合、キーと値のペア(format、pdf)と(type、log)が引数と一致します。コード実行後、メッセージはルーティングできます。
        Map<String, Object> headers = new HashMap<>();
        headers.put("format", "pdf");
        //headers.put("type", "log");

        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();
        channel.basicPublish(exchangeName, routingKey, true, props, ("Sent message body").getBytes(StandardCharsets.UTF_8));

        Thread.sleep(10000);
        connection.close();
    }
}