All Products
Search
Document Center

ApsaraMQ for RocketMQ:Message filtering

Last Updated:Feb 27, 2026

Message filtering enables the ApsaraMQ for RocketMQ broker to deliver only messages that match specific conditions to consumers. Producers classify messages by setting attributes, and consumers specify filter conditions when subscribing. The broker evaluates each message against these conditions and delivers only matching messages.

If a consumer subscribes to a topic without specifying a filter condition, the consumer receives all messages in the topic, regardless of any attributes set on those messages.

Filtering methods

ApsaraMQ for RocketMQ supports two filtering methods.

MethodDescriptionUse whenInstance requirementProtocol requirement
Tag-based filtering (default)Assign a tag to each message on the producer side. Specify which tags to receive on the consumer side. The broker delivers messages whose tag matches the subscription.Filtering by a single attribute. Only one tag per message.NoneNone
Attribute-based SQL filteringSet multiple custom attributes on each message. Define SQL expressions on the consumer side to filter by attribute values.Filtering by multiple attributes with complex conditions.Enterprise Platinum Edition onlyTCP client SDK only

Tag-based filtering

A tag is a label that classifies messages within a topic. The producer assigns a tag to each message before sending it. The consumer subscribes to messages based on specific tags.

Example scenario

In an e-commerce transaction system, three types of messages are produced:

  • Order messages

  • Payment messages

  • Logistics messages

These messages are sent to a topic named Trade_Topic. Four downstream systems subscribe to this topic:

  • Payment system: subscribes only to payment messages.

  • Logistics system: subscribes only to logistics messages.

  • Transaction success rate analysis system: subscribes to order and payment messages.

  • Real-time computing system: subscribes to all messages.

Tag-based filtering process

Configure tag-based filtering

Define filtering logic in the ApsaraMQ for RocketMQ client SDK. Set tags on messages before sending, and specify which tags to subscribe to on the consumer side. For SDK details, see Overview.

Send a message with a tag

Specify a tag for each message before sending:

Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());

Subscribe to all messages

Use an asterisk (*) to subscribe to all messages in a topic:

consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });

Subscribe to messages with a specific tag

Specify the tag to receive only matching messages:

consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });

Subscribe to messages with multiple tags

Separate multiple tags with two vertical bars (||):

consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });

Incorrect: multiple subscriptions to the same topic

If a consumer calls consumer.subscribe() multiple times for the same topic with different tags, only the last subscription takes effect. Earlier subscriptions are overridden.

// The consumer receives only messages with TagB. Messages with TagA are NOT received.
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });
consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });

Attribute-based SQL filtering

Attribute-based SQL filtering lets producers set multiple custom attributes on each message. Consumers define SQL expressions to filter messages by those attributes. The broker evaluates each message against the SQL expression and delivers only matching messages.

A tag is a special type of message attribute. Attribute-based SQL filtering is compatible with tag-based filtering. In SQL syntax, reference the tag attribute as TAGS.

Limitations

  • Only Enterprise Platinum Edition instances support attribute-based SQL filtering.

  • Only TCP client SDKs support this method.

  • If the broker does not support attribute-based SQL filtering and a consumer defines a filter expression, an error is reported when the consumer starts or the consumer may not receive messages.

Example scenario

In an e-commerce transaction system, messages are classified into order messages and logistics messages. Logistics messages include a region attribute with values of Hangzhou or Shanghai.

  • Order messages

  • Logistics messages

    • Logistics messages with region = Hangzhou

    • Logistics messages with region = Shanghai

These messages are sent to a topic named Trade_Topic. Four downstream systems subscribe:

  • Logistics system 1: subscribes only to logistics messages where region is Hangzhou.

  • Logistics system 2: subscribes to all logistics messages.

  • Order tracking system: subscribes only to order messages.

  • Real-time computing system: subscribes to all messages.

Attribute-based SQL filtering process

Configure SQL filtering

Define filtering logic in the ApsaraMQ for RocketMQ client SDK. Set custom message attributes in the producer code and define an SQL filter expression in the consumer code. For SDK details, see Overview.

Attribute key rules

Each custom attribute is a key-value pair. The key must follow these rules:

  • Can contain letters, digits, and underscores (_).

  • Must start with a letter or an underscore (_).

Multiple attributes can be set on each message.

Producer: set custom attributes

Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
// Set custom attribute A to 1.
msg.putUserProperties("A", "1");

Consumer: define a filter expression

Use MessageSelector.bySql() to define an SQL filter expression.

Important

Always include an IS NOT NULL check for each custom attribute in the filter expression. If the attribute does not exist on a message, the expression evaluates to NULL and the message is not delivered.

// Subscribe to messages where attribute A exists and equals 1.
consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        System.out.printf("Receive New Messages: %s %n", message);
        return Action.CommitMessage;
    }
});

SQL syntax reference

SyntaxDescriptionExample
IS NULLAttribute does not exist.a IS NULL -- Attribute a does not exist.
IS NOT NULLAttribute exists.a IS NOT NULL -- Attribute a exists.
>, >=, <, <=Numeric comparison. Cannot compare strings. An error is reported at consumer startup if used with strings. Strings convertible to numbers are treated as numeric values.a IS NOT NULL AND a > 100 -- Attribute a exists and exceeds 100. a IS NOT NULL AND a > 'abc' -- Error. Cannot compare a with the string abc.
BETWEEN xxx AND xxxNumeric range, inclusive. Equivalent to >= xxx AND <= xxx. Cannot compare strings.a IS NOT NULL AND (a BETWEEN 10 AND 100) -- Attribute a exists and is between 10 and 100, inclusive.
NOT BETWEEN xxx AND xxxOutside a numeric range. Equivalent to < xxx OR > xxx. Cannot compare strings.a IS NOT NULL AND (a NOT BETWEEN 10 AND 100) -- Attribute a exists and is less than 10 or greater than 100.
IN (xxx, xxx)Value is in a set. Set elements must be strings.a IS NOT NULL AND (a IN ('abc', 'def')) -- Attribute a exists and equals abc or def.
=, <>Equal to and not equal to. Works with both numbers and strings.a IS NOT NULL AND (a = 'abc' OR a<>'def') -- Attribute a exists and equals abc or does not equal def.
AND, ORLogical operators. Enclose each condition in parentheses.a IS NOT NULL AND (a > 100) OR (b IS NULL) -- Attribute a exists and exceeds 100, or attribute b does not exist.

Filter expression error handling

When a filter expression cannot produce a valid result, the broker does not deliver the message. This occurs when:

  • Calculation error: An exception occurs during expression evaluation, such as comparing numeric and non-numeric values.

  • NULL or non-Boolean result: The expression returns NULL or a value that is not Boolean. For example, the consumer filters on an attribute that the producer did not set.

  • Type mismatch: The custom attribute value is a floating-point number but the filter expression uses an integer for comparison.

Sample code

These examples use a single producer message and multiple consumer subscriptions to demonstrate different filtering outcomes.

Send a message with a tag and custom attributes

Producer producer = ONSFactory.createProducer(properties);
// Set the tag to tagA.
Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes());
// Set custom attribute region to hangzhou.
msg.putUserProperties("region", "hangzhou");
// Set custom attribute price to 50.
msg.putUserProperties("price", "50");
SendResult sendResult = producer.send(msg);

Filter by a custom attribute

Consumer consumer = ONSFactory.createConsumer(properties);
// Subscribe only to messages where region is hangzhou.
// Messages without the region attribute or with a different value are not delivered.
consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        System.out.printf("Receive New Messages: %s %n", message);
        return Action.CommitMessage;
    }
});

Expected result: The message is delivered. It has the region attribute set to hangzhou, which matches the filter condition.

Filter by tag and custom attribute

Consumer consumer = ONSFactory.createConsumer(properties);
// Subscribe only to messages with tagA and price greater than 30.
consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        System.out.printf("Receive New Messages: %s %n", message);
        return Action.CommitMessage;
    }
});

Expected result: The message is delivered. It has tagA and the price attribute is 50, which is greater than 30.

Filter by multiple custom attributes

Consumer consumer = ONSFactory.createConsumer(properties);
// Subscribe only to messages where region is hangzhou and price is less than 20.
consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        System.out.printf("Receive New Messages: %s %n", message);
        return Action.CommitMessage;
    }
});

Expected result: The message is not delivered. The consumer requires price less than 20, but the message has price set to 50.

Subscribe to all messages with SQL

Consumer consumer = ONSFactory.createConsumer(properties);
// Set the SQL expression to TRUE to receive all messages.
consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        System.out.printf("Receive New Messages: %s %n", message);
        return Action.CommitMessage;
    }
});

Expected result: All messages in the topic are delivered.

Incorrect: missing attribute and null check

If a producer does not set a custom attribute on a message, and the consumer's filter expression references that attribute without an IS NOT NULL check, the expression evaluates to NULL. The message is not delivered.

Consumer consumer = ONSFactory.createConsumer(properties);
// The product attribute was not set when the message was sent.
// The filter fails and the message is not delivered.
consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        System.out.printf("Receive New Messages: %s %n", message);
        return Action.CommitMessage;
    }
});

References