This topic describes how Message Queue for Apache RocketMQ consumers filter messages on a Message Queue for Apache RocketMQ broker according to tags, ensuring that the consumers consume messages of types they are concerned with.

A tag is a label that classifies messages into different types under a topic. A Message Queue for Apache RocketMQ producer has assigned a tag for the message when sending a message. The consumer subscribes to the message based on the specified tag.

Example

The following figure shows an example in the e-commerce transaction scenario. In the process from placing an order to receiving the product by the customer, a series of messages are generated. The following lists some messages:

  • Order message
  • Payment message
  • Logistics message

These messages are sent to the queue with the Trade_Topic topic and subscribed to by different systems. The following lists some systems:

  • Payment system, which only needs to subscribe to payment messages.
  • Logistics system, which only needs to subscribe to logistics messages.
  • Transaction success rate analysis system, which needs to subscribe to order messages and payment messages.
  • Real-time computing system, which needs to subscribe to all the messages related to the transaction.
The following diagram shows the filtering process:filtermessage

Sample code

  • Send messages

    A tag must be specified for each message before it is sent:

        Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());                
  • Subscribe to all tags

    If a consumer needs to subscribe to all types of messages under a topic, the asterisk symbol (*) can be used to represent all tags:

        consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • Subscribe to a single tag

    If a consumer needs to subscribe to a certain type of message under a topic, the tag must be specified:

        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • Subscribe to multiple tags

    If a consumer needs to subscribe to multiple types of messages under a topic, separate tags with separators (||):

        consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • Error example

    If a consumer subscribes to messages with the tags under a topic multiple times, the tags in the last subscription prevail:

        // In the following error code, a consumer can receive only messages with TagB under MQ_TOPIC and cannot receive messages with TagA:
        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

References