Subscription consistency is required. This means that all consumer instances identified by the same group ID must subscribe to the same topics and tags. If the subscriptions of the consumer instances are inconsistent, errors occur in the message consumption logic and messages may be lost. This topic provides sample code that can help ensure that subscriptions are consistent. This topic also provides the possible causes of inconsistency in subscriptions to help you subscribe to messages in an efficient manner.

Background information

In Message Queue for Apache RocketMQ, a consumer group ID represents a group of consumer instances. For most distributed applications, the same group ID is assigned to multiple consumer instances.

In Message Queue for Apache RocketMQ, subscriptions are defined based on topics and tags. To ensure that subscriptions are consistent, all consumer instances identified by the same group ID must be consistent in the following aspects:

  • The consumer instances must subscribe to the same topics. For example, if Consumer 1 subscribes to Topic A and Topic B, Consumer 2 in the same group must also subscribe to Topic A and Topic B. Consumer 2 cannot subscribe only to Topic A or Topic B, or subscribe to Topic A and Topic C.
  • The consumer instances must subscribe to the same tags in the same topic. The tags must be listed in the same order in the subscriptions. For example, if Consumer 1 subscribes to Tag1||Tag2 in Topic B, Consumer 2 in the same group must also subscribe to Tag1||Tag2 in Topic B. Consumer 2 cannot subscribe only to Tag1 or Tag2, or subscribe to Tag2||Tag1 in Topic B.
The following figure shows an example of consistent subscriptions. Groups of different IDs subscribe to different topics. Each group contains consumer instances C1, C2, and C3. All consumer instances in the same group subscribe to the same topics and tags. Consistent subscriptions
Notice Message Queue for Apache RocketMQ allows you to use the clients in which the TCP-based or HTTP-based SDKs are installed to send and receive messages. Subscription consistency among all consumer instances that belong to the same group must be ensured. The protocol used by the consumer group that subscribes to messages must be the protocol used by the SDK. For example, if you install the TCP-based SDK to send and receive messages, you must use the ID of the group that supports TCP to subscribe to messages. Otherwise, the consumer instances can fail to consume messages.

Consistent subscription 1: one tag in one topic

In the following figure, consumer instances C1, C2, and C3 belong to the same group. These consumer instances subscribe to Tag 1 in Topic A. This scenario meets the requirements of subscription consistency.

Consistent subscription 1Sample code of consistent subscription 1
The subscriptions of C1, C2, and C3 are consistent. Therefore, the code that is written to subscribe to messages for C1, C2, and C3 must be identical. The following sample code is provided:
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "Tag1", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

Consistent subscription 2: multiple tags in one topic

In the following figure, consumer instances C1, C2, and C3 belong to the same group. These consumer instances subscribe to Tag1||Tag2 in Topic B. This way, all consumer instances subscribe to the messages with Tag 1 or Tag 2 in Topic B in the same order. This scenario meets the requirements of subscription consistency.

Consistent subscription 2Sample code of consistent subscription 2
The subscriptions of C1, C2, and C3 are consistent. Therefore, the code that is written to subscribe to messages for C1, C2, and C3 must be identical. The following sample code is provided:
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

Consistent subscription 3: multiple tags in multiple topics

In the following figure, consumer instances C1, C2, and C3 belong to the same group. These consumer instances subscribe to Topic A and Topic B. No tags in Topic A are specified whereas Tag1||Tag2 in Topic B are specified. In this case, the consumer instances subscribe to all messages in Topic A and the messages with Tag 1 or Tag 2 in Topic B in the same order. This scenario meets the requirements of subscription consistency.

Consistent subscription 3Sample code of consistent subscription 3
The subscriptions of C1, C2, and C3 are consistent. Therefore, the code that is written to subscribe to messages for C1, C2, and C3 must be identical. The following sample code is provided:
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });     
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                   

Check subscription consistency

In the Message Queue for Apache RocketMQ console, you can check whether the subscriptions of the consumer instances in a specified group are consistent on the Group Details page. For more information, see View the status of consumers. If the subscriptions are inconsistent, troubleshoot the issue in the code for consumer instances. For more information, see Common issues of subscription inconsistency.

Common issues of subscription inconsistency

For example, you use Message Queue for Apache RocketMQ to send and receive messages. The messages received by consumer instances are not as expected and the subscriptions of the consumer instances are inconsistent in the Message Queue for Apache RocketMQ console. This can be caused by the following issues that occur on your consumer instances:
  • Inconsistent subscription 1: Consumer instances that belong to the same group subscribe to different topics.

    In the following figure, consumer instances C1, C2, and C3 belong to the same group. These consumer instances subscribe to different topics. C1 subscribes to Topic A. C2 subscribes to Topic B. C3 subscribes to Topic C. This scenario does not meet the requirements of subscription consistency.

    Inconsistent subscription 1Sample code of inconsistent subscription 1
    • Consumer instance 1:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "*", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                    
    • Consumer instance 2:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicC", "*", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                    
    • Consumer instance 3:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicB", "*", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                    
  • Inconsistent subscription 2: Consumer instances that belong to the same group subscribe to different tags in the same topic.

    In the following figure, consumer instances C1, C2, and C3 belong to the same group. These consumer instances subscribe to Topic A. However, C1 subscribes to Tag 1 in Topic A whereas C2 and C3 subscribe to Tag 2 in Topic A. The consumer instances subscribe to different tags in the same topic. This scenario does not meet the requirements of subscription consistency.

    Inconsistent subscription 2

    Sample code of inconsistent subscription 2

    • Consumer instance 1:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "Tag1", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                    
    • Consumer instance 2:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "Tag2", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                   
    • Consumer instance 3:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "Tag2", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                   
  • Inconsistent subscription 3: Consumer instances that belong to the same group subscribe to the same topics and tags. However, the tags are specified in different orders.

    In the following figure, consumer instances C1, C2, and C3 belong to the same group. These consumer instances subscribe to Topic A and Topic B. No tags are specified for Topic A. Tag 1 and Tag 2 are specified for Topic B. However, C1 subscribes to Tag1||Tag2 in Topic B whereas C2 and C3 subscribe to Tag2||Tag1 in Topic B. This scenario does not meet the requirements of subscription consistency.

    订阅关系不一致3

    Sample code of inconsistent subscription 3

    • Consumer instance 1:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "*", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });   
          consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                         
    • Consumer instance 2:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "*", new MessageListener() {
             public Action consume(Message message, ConsumeContext context) {
                 System.out.println(message.getMsgID());
                 return Action.CommitMessage;
             }
          }); 
          consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                   
    • Consumer Instances 3:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "*", new MessageListener() {
             public Action consume(Message message, ConsumeContext context) {
                 System.out.println(message.getMsgID());
                 return Action.CommitMessage;
             }
          }); 
          consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });