After a consumer subscribes to a topic, the Message Queue for Apache RocketMQ broker delivers all messages in the topic to the consumer. If the consumer concerns with only some messages, you can configure message attributes in the producer and configure filter conditions in the consumer. Then, the Message Queue for Apache RocketMQ broker delivers only the messages that meet the specified filter conditions to the consumer. This topic describes the feature description, scenarios, and limits of the message filtering feature. The topic also describes how to configure message filtering in code and provides sample code.

Feature description

The message filtering feature is implemented by configuring message attributes. Message attributes are used to classify messages. You can configure message attributes to classify the messages that producers send to topics on the Message Queue for Apache RocketMQ broker. You can configure filter conditions in consumers so that the consumers can subscribe to messages with specified attributes in the topics. Then, the broker filters the messages sent from the producers and delivers only the messages that meet the specified conditions to the consumers.

If a consumer subscribes to a topic but no filter condition is configured in the consumer, all messages in the topic are delivered to the consumer no matter whether attributes are configured for the messages when they are sent from the producers.

The following table describes the message filtering methods supported by Message Queue for Apache RocketMQ.
Filtering method Description Scenario Limit on instances Limit on protocols
Filter messages by using tags (the default filtering method)
  • Producer: You can add tags to messages in the producer.
  • Consumer: The consumer subscribes to messages with specified tags.
If the tags of the messages to which the consumer subscribes are the same as the tags specified for messages in the producer, the messages are delivered to the consumer.
This method applies to simple filtering scenarios.

You can add only one tag to a message. This method can be used when you do not need to classify messages into multiple levels and want to filter messages by using tags.

None. None.
Filter messages by using SQL attributes
  • Producer: You can configure custom attributes for messages in the producer.
  • Consumer: You can customize an SQL filter expression in the consumer to filter messages based on their custom attributes.
Messages that conform to the calculation results of the filter expression are delivered to the consumer.
This method applies to complex filtering scenarios.

You can configure multiple custom attributes for a message and filter messages based on the custom attributes. You can use this method in scenarios where you need to classify messages into multiple levels and filter messages from multiple dimensions. In this case, you can customize a filter expression by using SQL syntax. Then, messages that conform to the calculation results of the filter expression are delivered to consumers.

Only Enterprise Platinum Edition instances support this method. Only the commercial edition of the TCP client SDK supports this method.

Filter messages by using tags

A tag is a label that classifies messages in a topic. You can add a tag to a message in aMessage Queue for Apache RocketMQ producer. The consumer subscribes to the message based on the specified tag.

Scenarios

The following figure shows an example in the e-commerce transaction scenario. In the process from placing an order to receiving the product by the customer, a series of messages are generated. The following information provides examples of related messages:
  • Order message
  • Payment message
  • Logistics message
These messages are sent to the Trade_Topic topic and subscribed to by different systems. The following information provides examples of related systems:
  • Payment system: needs to subscribe to only the payment message.
  • Logistics system: needs to subscribe to only the logistics message.
  • Analysis system for the transaction success rate: needs to subscribe to the order message and payment message.
  • Real-time computing system: needs to subscribe to all the messages related to transactions.
The following figure shows the filtering process. filtermessage

Configure message filtering in code

Message Queue for Apache RocketMQ allows you to define code in client SDKs to filter messages by using tags. You must add tags to messages in the producer and specify the tags in the consumer to subscribe to the corresponding messages. For more information about client SDKs, see Overview. The following information shows you how to define code in the producer and consumer:
  • Send messages

    A tag must be specified for each message before the message is sent.

        Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());                
  • Subscribe to all types of messages

    If a consumer needs to subscribe to all types of messages in a topic, use an asterisk (*) to indicate all tags.

        consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • Subscribe to messages of a specific type

    If a consumer needs to subscribe to messages of a specific type in a topic, specify the corresponding tag.

        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • Subscribe to multiple types of messages

    If a consumer needs to subscribe to multiple types of messages in a topic, separate tags with two vertical bars (||).

        consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • Error example

    If a consumer subscribes to messages with specific tags in a topic multiple times, the tags in the last subscription prevail:

        // In the following code, a consumer can receive only messages with TagB in MQ_TOPIC but cannot receive messages with TagA. 
        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

Filter messages by using SQL attributes

Messages are filtered by using SQL attributes in the following process: Configure custom attributes for messages in the producer and define a filter expression by using SQL syntax in the consumer. Message Queue for Apache RocketMQ calculates the filter expression to filter messages that are sent from the producer based on the custom attributes and delivers messages that meet the filter conditions to the consumer.

Note Tags are a special type of message attribute. Therefore, the SQL attribute-based filtering method is compatible with the tag-based filtering method. You can use the tag attribute to filter messages. In SQL syntax, the tag attribute is represented by TAGS.

Limits

When you use SQL attributes to filter messages, take note of the following limits:
  • Only Enterprise Platinum Edition instances allow you to filter messages by using SQL attributes. Standard Edition instances do not support this feature.
  • You can use only TCP clients to filter messages by using SQL attributes. HTTP clients do not support this feature.
  • If the producer does not allow you to filter messages by using SQL attributes but you define a filter expression in the consumer, an error is reported when the consumer is started, or the consumer cannot receive messages.

Scenarios

The following figure shows an example in the e-commerce transaction scenario. In the process from placing an order to receiving the product by the customer, a series of messages are generated. The messages include two types of messages: an order message and a logistics message. A region attribute is configured for the logistics message and the value of the region attribute for the logistics message is Hangzhou or Shanghai. The following information provides examples of related messages:
  • Order message
  • Logistics message
    • Logistics message whose value of the region attribute is Hangzhou
    • Logistics message whose value of the region attribute is Shanghai
These messages are sent to the Trade_Topic topic and subscribed to by different systems. The following information provides examples of related systems:
  • Logistics system: needs to subscribe to the logistics message whose value of the region attribute is Hangzhou.
  • Logistics system 2: needs to subscribe to the logistics message whose value of the region attribute is Hangzhou or Shanghai.
  • Order tracking system: needs to subscribe to only the order message.
  • Real-time computing system: needs to subscribe to all the messages related to transactions.
The following figure shows the filtering process. SQL attribute-based filtering

Configure message filtering in code

Message Queue for Apache RocketMQ allows you to define code in client SDKs to filter messages by using SQL attributes. You must configure custom attributes for messages in the producer and define a filter expression by using SQL syntax in the consumer. For more information about client SDKs, see Overview. The following information shows you how to define code in the producer and consumer:
  • Producer:

    Configure custom attributes for the message.

    Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
    // Configure Custom attribute A and set the value of this attribute to 1. 
    msg.putUserProperties("A", "1");
  • Consumer:

    Define a filter expression by using SQL syntax to filter messages based on the custom attributes.

    Notice To filter messages based a custom attribute, you must first define logic in the filter expression to check whether the attribute exists. If the attribute does not exist, the calculation result of the filter expression is NULL and the message will not be delivered to the consumer.
    // Subscribe to messages where Custom attribute A exists and whose attribute value is 1. 
    consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });
The following table describes different types of SQL syntax that can be used to filter messages.
Syntax Description Example
IS NULL Checks whether an attribute does not exist. a IS NULL: Attribute a does not exist.
IS NOT NULL Checks whether an attribute exists. a IS NOT NULL: Attribute a exists.
  • >
  • >=
  • <
  • <=
Compares numeric values. The syntax cannot be used to compare strings. If it is used to compare strings, an error is reported when the consumer is started.
Note Strings that can be converted into numeric values are also considered numeric values.
  • a IS NOT NULL AND a > 100: Attribute a exists and the value of Attribute a is greater than 100.
  • a IS NOT NULL AND a > 'abc': An error example. Attribute abc is a string. Therefore, you cannot compare a with abc.
BETWEEN xxx AND xxx Compares numeric values. The syntax cannot be used to compare strings. If it is used to compare strings, an error is reported when the consumer is started. The syntax is equivalent to >= xxx AND <= xxx. It means that the value of the attribute is between the two numeric values. a IS NOT NULL AND (a BETWEEN 10 AND 100): Attribute a exists and the value of Attribute a is at least 10 and at most 100.
NOT BETWEEN xxx AND xxx Compares numeric values. The syntax cannot be used to compare strings. If it is used to compare strings, an error is reported when the consumer is started. The syntax is equivalent to < xxx OR > xxx. It means that the value of the attribute is less than the left-side numeric value or greater than the right-side numeric value. a IS NOT NULL AND (a NOT BETWEEN 10 AND 100): Attribute a exists and the value of Attribute a is less than 10 or greater than 100.
IN (xxx, xxx) Indicates that the value of an attribute is included in a set. The elements in the set can only be strings. a IS NOT NULL AND (a IN ('abc', 'def')): Attribute a exists and the value of Attribute a is abc or def.
  • =
  • <>
The equal to operator and the not equal to operator. They can be used to compare numeric values and strings. a IS NOT NULL AND (a = 'abc' OR a<>'def'): Attribute a exists and the value of Attribute a is abc or the value of Attribute a is not def.
  • AND
  • OR
The logical AND operator and the logical OR operator. They can be used to combine simple logical functions, and each logical function must be put in parentheses. a IS NOT NULL AND a > 100) OR (b IS NULL): Attribute a exists and the value of Attribute a is greater than 100 or Attribute b does not exist.
SQL attribute-based filtering is implemented by adding custom attributes in the producer and defining an SQL filter expression in the consumer. This brings uncertainty to the calculation results of the filter expression. The Message Queue for Apache RocketMQ broker processes messages based on the following logic:
  • If an error is reported when the broker calculates the filter expression, the broker filters out the message by default and does not deliver the message to the consumer. For example, this occurs when numeric values and non-numeric values are compared.
  • If the calculation value of the filter expression is NULL or the value is not a Boolean value, the broker filters out the message by default and does not deliver the message to the consumer. A Boolean value represents a truth value, which can be true or false. Assume that you have not added a custom attribute for a message in the producer but this custom attribute is defined as a filter condition in the filter expression of the consumer. In this case, the calculation result of the filter expression is NULL.
  • If the type of the custom attribute of a message is floating-point but the type of attribute value used in the filter expression in the consumer is integer, the broker filters out the message by default and does not deliver the message to the consumer.

Sample code

  • Send messages

    Configure tags and custom attributes for messages.

    Producer producer = ONSFactory.createProducer(properties);
    // Set the value of Tag to tagA. 
    Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes());
    // Set the value of the custom attribute region to hangzhou. 
    msg.putUserProperties("region", "hangzhou");
    // Set the value of the custom attribute price to 50. 
    msg.putUserProperties("price", "50");
    SendResult sendResult = producer.send(msg);
  • Subscribe to messages based on a single custom attribute.
    Consumer consumer = ONSFactory.createConsumer(properties);
    // Subscribe only to messages whose value of the custom attribute region is hangzhou. If this attribute is not configured for a message or the value of this attribute for the message is not hangzhou, the message will not be delivered to the consumer. 
    consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    Expected results: The value of the custom attribute region of the message sent in the example meets the filter conditions defined in the consumer, and the message is delivered to the consumer.

  • Subscribe to messages based on both tags and custom attributes.
    Consumer consumer = ONSFactory.createConsumer(properties);
    // Subscribe only to messages where tagA is defined and whose value of the custom attribute price is greater than 30. 
    consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    Expected results: The tag of the message sent in the example and the value of the custom attribute price of the message meet the filter conditions defined in the consumer, and the message is delivered to the consumer.

  • Subscribe to messages based on multiple custom attributes.
    Consumer consumer = ONSFactory.createConsumer(properties);
    // Subscribe only to messages whose value of the custom attribute region is hangzhou and whose value of the custom attribute price is less than 20. 
    consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    Expected result: The message is not delivered to the consumer because the filter conditions defined in the consumer are not met. The consumer subscribes to messages whose value of the custom attribute price is less than 20, but the value of the custom attribute price set for the message in the producer is 50.

  • Subscribe to all messages in the topic without filtering.
    Consumer consumer = ONSFactory.createConsumer(properties);
    // Set the value of the SQL expression to TRUE to subscribe to all messages in the topic. 
    consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    Expected results: All messages in the topic are delivered to the consumer.

  • Error example

    A specified custom attribute is not configured for a message in the producer and no logic is defined in the filter expression in the consumer to check whether the custom attribute exists. Instead, the custom attribute is directly used as a filter condition in the filter expression in the consumer. In this case, the message is not delivered to the consumer.

    Consumer consumer = ONSFactory.createConsumer(properties);
    // The attribute product is not defined in the producer. The filtering fails and the message will not be delivered to the consumer. 
    consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });               

References

  • Consumer instances that use the same group ID must subscribe to the same topics. For more information, see Subscription consistency.
  • You can use topics and tags to filter messages. If you properly use the topics and tags, the structure of your business can become clearer. For more information, see Best practices of topics and tags.