A headers exchange routes messages to queues based on the headers and binding attributes instead of the routing keys of the messages. This topic describes how to use a headers exchange.
Background information
When you send a message to a headers exchange, you must configure the headers attribute in the key-value format. After a headers exchange receives a message, the exchange routes the message based on the matching between the headers attributes of the message and the binding attributes of the bound queues.
A special binding attribute x-match is used to determine the matching method. The value of x-match can be set to all or any.
all: A headers exchange routes a message to a queue only if all binding attributes of the queue except for x-match match the headers attributes of the message.
any: A headers exchange routes a message to a queue if one or more binding attributes of the queue except for x-match match the headers attributes of the message.
For more information, see Headers exchanges.
After an exchange is bound, you can query the binding result by queue on the Message Query page of the corresponding instance in the ApsaraMQ for RabbitMQ console. For more information, see Query messages.
Sample code
The following sample code in Java provides an example on how to bind a header exchange:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class HeadersExchange {
// The endpoint of the ApsaraMQ for RabbitMQ instance.
private static String host = "xxx.xxx.aliyuncs.com";
// The static username and password of the ApsaraMQ for RabbitMQ instance.
private static String userName = "${UserName}";
private static String password = "${PassWord}";
// The vhost of the ApsaraMQ for RabbitMQ instance.
private static String vhost = "${VirtualHost}";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
// Note: The automatic connection recovery feature takes effect only after the feature is enabled.
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(vhost);
// The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.
factory.setPort(5672);
// Specify a timeout period based on the network environment.
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// Use persistent connections. This way, the client does not need to establish connections every time you use the client to send messages. If connections are frequently established, a large number of network and broker resources may be consumed and protection against SYN flood attacks may be triggered on the broker.
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "${ExchangeName}";
String exchangeType = "headers";
String queueName = "${QueueName}";
String routingKey = "${RoutingKey}";
Map<String, Object> argument = new HashMap<>();
argument.put("format", "pdf");
argument.put("type", "log");
argument.put("x-match", "all");
channel.queueDeclare(queueName, true, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey, argument);
// If the value of the mandatory parameter is true in the configuration file and the message fails to be routed, the message is returned to the client.
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("no route, msgId=" + properties.getMessageId());
}
});
// Set the key-value pairs as the headers attribute of the message.
// 1. If the key-value pair (type, log) is listed as an annotation, only the key-value pair (format, pdf) matches an argument. The message cannot be routed after code execution.
// 2. If the key-value pair (type, log) is not listed as an annotation, key-value pairs (format, pdf) and (type, log) match the arguments. The message can be routed after code execution.
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
//headers.put("type", "log");
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();
channel.basicPublish(exchangeName, routingKey, true, props, ("Sent message body").getBytes(StandardCharsets.UTF_8));
Thread.sleep(10000);
connection.close();
}
}