Subscription consistency means that all consumer instances with the same group ID must be exactly the same in terms of the group ID, subscribed topics, and tags of the topics. If the subscriptions are inconsistent, errors occur in the message consumption logic and messages may be lost. This topic provides sample code of consistent subscriptions and sample code of inconsistent subscriptions to help you smoothly subscribe to messages.

Background information

In Message Queue for Apache RocketMQ, a group ID represents a consumer instance group. For most distributed applications, multiple consumer instances are mounted on the same group ID.

Subscriptions in Message Queue for Apache RocketMQ consist of topics and tags. Therefore, all consumer instances with the same group ID must be consistent in the following two aspects to ensure subscription consistency:

  • The topics to which the consumer instances subscribe must be consistent.
  • The tags of the subscribed topics must be consistent in terms of the number of tags and the order of tags.

Diagram of consistent subscription examples

Multiple group IDs subscribe to multiple topics, and the subscriptions of different consumer instances with the same group ID are consistent.

Consistent subscriptions

Sample code of consistent subscriptions

In the following example, the topics to which the instances with the same group ID subscribe are consistent, and the tags of the topics are also consistent.

  • Consumer instance 1-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "Tag1||2", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 1-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "Tag1||2", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("jodie_test_A", "Tag1||2", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });      

Diagram of inconsistent subscription examples

One group ID subscribes to multiple topics, but the subscriptions of different consumer instances with the group ID are inconsistent.

Inconsistent subscriptions

Sample code of inconsistent subscriptions (1)

In the following sample, two consumer instances with the same group ID subscribe to different topics.

  • Consumer instance 1-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 1-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    

Sample code of inconsistent subscriptions (2)

In the following example, two consumer instances with the same group ID subscribe to the same topic but subscribe to different numbers of tags of the topic. Consumer instance 2-1 has subscribed to Tag A, whereas consumer instance 2-2 has not specified a tag.

  • Consumer instance 2-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 2-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                   

Sample code of inconsistent subscriptions (3)

In the following example, two consumer instances with the same group ID subscribe to the same topic and the same number of tags of the topic. However, the tags are specified in different orders for these two consumer instances. Consumer instance 3-1 and consumer instance 3-2 subscribe to the same topic and the same number of tags of the topic. However, the tags are specified in different orders for these two consumer instances.

  • Consumer instance 3-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagA||B", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                 
  • Consumer instance 3-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagB||A", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                   

Sample code of inconsistent subscriptions (4)

In the following example, two consumer instances with the same group ID subscribe to different numbers of topics. Both consumer instances subscribe to one same topic but subscribe to different tags of the topic.

  • Consumer instance 4-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_4");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 4-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_4");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });