Subscription consistency means that all consumer instances under the same group ID must be exactly the same in terms of the group ID, subscribed topic, and tag of the topic. If the subscriptions are inconsistent, the message consumption logic is confusing and messages may be lost. This topic provides sample code of consistent subscriptions and sample code of inconsistent subscriptions to help you smoothly subscribe to messages.

Background information

In Message Queue for Apache RocketMQ, a group ID represents a consumer instance group. For most distributed applications, multiple consumer instances are often mounted to the same group ID.

Subscriptions in Message Queue for Apache RocketMQ consist of topics and tags. Therefore, all consumer instances under the same group ID must be consistent in the following two aspects to ensure subscription consistency:

  • The subscribed topics must be the same.
  • The tags of subscribed topics must be the same.

Diagram of consistent subscription examples

Multiple group IDs subscribe to multiple topics, and the subscriptions of different consumer instances under the same group ID are consistent.

Consistent subscriptions

Sample code of consistent subscriptions

In the following example, the topics to which the instances under the same group ID subscribe are consistent, and the tags of the topics are also consistent.

  • Consumer instance 1-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "Tag1||2", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 1-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "Tag1||2", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("jodie_test_A", "Tag1||2", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });      

Diagram of inconsistent subscription examples

One group ID subscribes to multiple topics, but the subscriptions of different consumer instances under the group ID are inconsistent.

Inconsistent subscriptions

Sample code of inconsistent subscriptions (1)

In the following example, the topics to which the two instances under the same group ID subscribe are inconsistent.

  • Consumer instance 1-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 1-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    

Sample code of inconsistent subscriptions (2)

In the following example, the tags of topics to which the instances under the same group ID subscribe are inconsistent. Consumer instance 2-1 subscribes to TagA, whereas consumer instance 2-2 has no specified tags.

  • Consumer instance 2-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 2-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                   

Sample code of inconsistent subscriptions (3)

In this example, the topics to which the instances under the same group ID subscribe are inconsistent, and the tags of the topics are also inconsistent.

  • Consumer instance 3-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 3-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });