All Products
Search
Document Center

ApsaraMQ for RocketMQ:Message filtering

Last Updated:Dec 28, 2023

After a consumer subscribes to a topic, ApsaraMQ for RocketMQ delivers all messages in the topic to the consumer. If you want a consumer to receive only specific messages in the topic, you can configure filtering conditions to filter messages on the ApsaraMQ for RocketMQ broker. This prevents the consumer from receiving a large number of invalid messages.

Scenarios

ApsaraMQ for RocketMQ is a middleware that is based on the publish-subscribe (pub/sub) model and used to integrate upstream and downstream business. In actual business scenarios, messages in a topic are processed by different downstream applications that have different consumption logic. Each application needs to consume only messages that meet its consumption logic.

You can use the message filtering feature provided by ApsaraMQ for RocketMQ to efficiently filter the messages that a consumer requires. This prevents a large number of invalid messages from being delivered to the consumer and reduces the workload in the downstream systems.

The message filtering feature of ApsaraMQ for RocketMQ solves the pain point of filtering messages in a topic and is used in scenarios in which detailed categorization is required for the same line of business. If you want to manage messages for different lines of business, we recommend that you use different topics.

Overview

What is message filtering?

The message filtering feature of ApsaraMQ for RocketMQ filters messages based on consumer-configured conditions and sends the messages that meet the conditions to the consumers.

After message attributes and tags are defined for producers and consumers, messages are filtered and matched based on the filtering conditions on the ApsaraMQ for RocketMQ broker. Only messages that meet the filtering conditions are delivered to consumers.

How does message filtering work?

消息过滤

Message filtering involves the following steps:

  • Producer: Before a producer initializes messages, the producer attaches attributes and tags that are used to match the filtering conditions configured by a consumer to the messages.

  • Consumer: During message initialization and consumption, a consumer calls the subscription registration operation to inform the ApsaraMQ for RocketMQ broker of the messages to which the consumer wants to subscribe in a specific topic. This is also known as reporting the filtering conditions.

  • Broker: When a consumer consumes messages, the ApsaraMQ for RocketMQ broker dynamically matches messages based on the expressions of the reported filtering conditions and delivers the messages that meet the filtering conditions to the consumer.

Classification

ApsaraMQ for RocketMQ supports the tag-based filtering and attribute-based SQL filtering methods. The following table describes the methods.

Item

Tag-based filtering

Attribute-based SQL filtering

Target

Message tags.

Message attributes, which include custom attributes and system attributes. Message tags are a type of system attribute.

Capacity

Exact match.

SQL syntax-based match.

Scenario

Filtering scenarios that involve simple and lightweight computing logic.

Filtering scenarios that involve complex computing logic.

For more information, see Tag-based filtering and Attribute SQL-based filtering.

Subscription consistency

Filtering expressions constitute a part of a subscription. Based on the domain model of ApsaraMQ for RocketMQ, the filtering expressions of subscriptions of consumers in the same consumer group must be the same. Otherwise, specific messages cannot be consumed. For more information, see Subscriptions.

Tag-based filtering

Tag-based filtering is a basic filtering method provided by ApsaraMQ for RocketMQ. In this method, messages are matched based on the tags that are defined by producers. Consumers use the tags to specify the messages that they want to consume.

Sample scenario

The following items describe messages that are produced in an e-commerce transaction scenario:

  • Order messages

  • Payment messages

  • Logistics messages

The messages are sent to the topic named Trade_Topic, which is subscribed to by the following downstream systems:

  • Payment system: subscribes only to payment messages.

  • Logistics system: subscribes only to logistics messages.

  • Transaction success rate analysis system: subscribes to order and payment messages.

  • Real-time computing system: subscribes to all messages.

The following figure shows the filtering effect.Tag过滤

Tag setting

  • Before a producer sends messages, the producer can attach only one tag to each message.

  • Each tag consists of a string of characters. We recommend that each tag do not exceed 128 characters in length.

  • Tags are case-sensitive. For example, TAG A and tag a are different tags.

Filtering rules

Tag-based filtering implements precise filtering based on character strings. The following filtering rules are supported:

  • Single-tag match: You can specify a single tag in the filter expression to receive only messages to which the tag is attached.

  • Multi-tag match: You can specify multiple tags in the filter expression to receive messages to which one of the tags is attached. Separate multiple tags with two vertical bars (||). For example, Tag1||Tag2||Tag3 specifies that messages to which Tag1, Tag2, or Tag3 is attached are all sent to the consumer.

  • All match: You can use an asterisk (*) as a wildcard character to match all tags. This specifies that all messages in the topic are sent to the consumer.

Sample code

  • Specify a tag before message sending

    Message message = messageBuilder.setTopic("topic")
                    // The message key. You can use the key to search for the message. 
                    .setKeys("messageKey")
                    // The message tag. Consumers can use the tag to filter messages. 
                    // In this example, the tag of the message is set to TagA. 
                    .setTag("TagA")
                    // The message body. 
                    .setBody("messageBody".getBytes())
                    .build();
                        
  • Subscribe to messages that match a single tag

    String topic = "Your Topic";
    // Subscribe to messages to which TagA is attached. 
    FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);
  • Subscribe to messages that match multiple tags

    String topic = "Your Topic";
    // Subscribe to messages to which TagA, TagB, or TagC is attached. 
    FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);
  • Subscribe to all messages in the topic

    String topic = "Your Topic";
    // Subscribe to all messages. 
    FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);

Attribute-based SQL filtering

Attribute-based SQL filtering is an advanced filtering method provided by ApsaraMQ for RocketMQ. In this method, messages are matched based on the key-value pair that is specified by producers. When producers send messages, the producers can specify multiple attributes for each message. Then, the consumer can specify attributes in SQL expressions to receive specific messages.

Note

Tag-based filtering is also a type of attribute-based SQL filtering because tags are a type of system attribute. In SQL syntax, the tag attribute is represented by TAGS.

Sample scenario

The following items describe messages that are generated in an e-commerce transaction scenario. The messages are classified into order messages and logistics messages. A region attribute is specified for the logistics messages. The values of the region attribute are Hangzhou and Shanghai.

  • Order messages

  • Logistics messages

    • Logistics messages whose region attribute value is Hangzhou

    • Logistics messages whose region attribute value is Shanghai

The messages are sent to the topic named Trade_Topic, which is subscribed to by the following downstream systems:

  • Logistics system 1: subscribes only to logistics messages whose region attribute value is Hangzhou.

  • Logistics system 2: subscribes to all logistics messages.

  • Order tracking system: subscribes only to order messages.

  • Real-time computing system: subscribes to all messages.

The following figure shows the filtering effect.sql过滤

Message attribute setting

Before a producer sends messages, the producer can specify custom attributes for each message. Each attribute is a custom key-value pair.

Multiple attributes can be specified for each message.

Filtering rules

You must follow the SQL92 syntax when you write filter expressions. The following table describes the syntax.

Syntax

Description

Example

IS NULL

Specifies that an attribute does not exist.

a IS NULL: Attribute a does not exist.

IS NOT NULL

Specifies that an attribute exists.

a IS NOT NULL: Attribute a exists.

  • >

  • >=

  • <

  • <=

Compares numeric values. You cannot use the syntax to compare strings. If you use the syntax to compare strings, an error is reported when the consumer is started.

Note

Strings that can be converted into numeric values are considered as numeric values.

  • a IS NOT NULL AND a > 100: Attribute a exists and the value of Attribute a is greater than 100.

  • a IS NOT NULL AND a > 'abc': An error example. You cannot compare a with ABC because abc is a string.

BETWEEN xxx AND xxx

Compares numeric values. You cannot use the syntax to compare strings. If you use the syntax to compare strings, an error is reported when the consumer is started. The syntax is equivalent to >= xxx AND <= xxx, which specifies that the value of the attribute is between two numeric values or equal to one of the two numeric values.

a IS NOT NULL AND (a BETWEEN 10 AND 100): Attribute a exists and the value of Attribute a is greater than or equal to 10 and less than or equal to 100.

NOT BETWEEN xxx AND xxx

Compares numeric values. You cannot use the syntax to compare strings. If you use the syntax to compare strings, an error is reported when the consumer is started. The syntax is equivalent to < xxx OR > xxx, which specifies that the value of the attribute is less than the left-side numeric value or greater than the right-side numeric value.

a IS NOT NULL AND (a NOT BETWEEN 10 AND 100): Attribute a exists and the value of Attribute a is less than 10 or greater than 100.

IN (xxx, xxx)

Specifies that the value of the attribute is included in a set. The elements in the set can only be strings.

a IS NOT NULL AND (a IN ('abc', 'def')): Attribute a exists and the value of Attribute a is abc or def.

  • =

  • <>

The equal to operator and the not equal to operator. You can use the operators to compare numeric values and strings.

a IS NOT NULL AND (a = 'abc' OR a<>'def'): Attribute a exists and the value of Attribute a is abc or not def.

  • AND

  • OR

The logical AND operator and the logical OR operator. You can use the operators to combine simple logical functions. Each logical function must be enclosed in parentheses.

a IS NOT NULL AND (a > 100) OR (b IS NULL): Attribute a exists and the value of Attribute a is greater than 100 or Attribute b does not exist.

In SQL attribute-based filtering, producers specify custom message attributes, and then consumers define SQL filter expressions to consume messages. As a result, the calculation results of filtering expressions may be uncertain. In this case, the ApsaraMQ for RocketMQ broker processes messages based on the following logic:

  • Exception handling: If an exception is reported when a filter expression is being calculated, the broker automatically filters out the received messages and does not deliver the messages to the consumer. For example, an exception occurs when numeric values and non-numeric values are compared.

  • Handling of null values: If the calculated result of the filter expression is NULL or the value is not a Boolean value, the broker automatically filters out the received messages and does not deliver the messages to the consumer. A Boolean value represents a truth value, which can be true or false. For example, when a consumer subscribes to a message, the consumer uses an attribute that the producer does not specify as a filter condition. In this case, the calculation result of the filter expression is NULL.

  • Handling of inconsistent numeric values: If the values of a custom message attribute are floating-point numbers, but the attribute values that are used in the filter expression are integers, the broker automatically filters out the received messages and does not deliver the messages to the consumer.

Sample code

  • Specify a tag and custom attributes before message sending

    Message message = messageBuilder.setTopic("topic")
                    // The message key. You can use the key to search for the message. 
                    .setKeys("messageKey")
                    // The message tag. Consumers can use the tag to filter messages. 
                    // In this example, the message tag is set to messageTag. 
                    .setTag("messageTag")
                    // You can also specify custom attributes for the messages, such as environment, region, and logical branch. 
                    // In this example, the custom attribute is region and the attribute value is Hangzhou. 
                    .addProperty("Region", "Hangzhou")
                    // The message body. 
                    .setBody("messageBody".getBytes())
                    .build();
  • Subscribe to messages that match a single attribute

    String topic = "topic";
    // Subscribe only to messages whose region attribute value is Hangzhou. 
    FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);
  • Subscribe to messages that match multiple attributes

    String topic = "topic";
    // Subscribe to messages whose region attribute value is Hangzhou and price attribute value is greater than 30. 
    FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);
  • Subscribe to all messages in the topic

    String topic = "topic";
    // Subscribe to all messages. 
    FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);

Usage notes

Properly plan topics and specify tags

You can use topics, tags, and attributes to distribute messages. Take note of the following principles when you distribute messages:

  • Message type: Messages of different types, such as ordered messages and normal messages, must be distributed to different topics. Do not use tags to categorize messages.

  • Business domain: Different business domains and departments must use different topics. For example, the topics for logistics messages and payment messages must be different. Logistics messages can be divided into ordinary messages and urgent messages by using tags.

  • Consistency between message quantity and importance: Messages that differ in quantity or link importance must be distributed to different topics.

FAQ

Why are messages lost when multiple consumers subscribe to different tags in a topic?

Possible cause: If the consumers belong to the same consumer group, the tags to which the consumers subscribe must be the same. Otherwise, subscription inconsistency occurs and specific messages are lost.

How do I calculate the number of messages that are consumed by a consumer when filtering conditions are specified?

The number of messages that a consumer consumes is calculated after the filtering conditions are specified.

References

For information about the complete sample code for messaging, see Overview.