Subscription consistency means that the processing logic of all consumer instances under the same group ID must be identical. If the subscriptions are inconsistent, the message consumption logic will be confusing, and even lead to message loss. This topic provides sample code of subscription inconsistency to help you subscribe to messages smoothly.

Background

In Message Queue for Apache RocketMQ, a group ID represents a consumer instance group. For most distributed applications, multiple consumer instances are often mounted to the same group ID.

The Message Queue for Apache RocketMQ subscription consists of topics and tags. Therefore, to ensure subscription consistency is to ensure that all instances under the same group ID maintain consistency in the following two aspects:

  • The subscribed topics must be the same.
  • The subscribed tags must be the same.

Correct subscription

Multiple groups subscribe to multiple topics, and the subscriptions of different consumer instances using the same group ID are consistent.

Correct message subscriptions

Incorrect subscriptions

One group subscribes to multiple topics, but the subscriptions of different consumer instances using the group ID are inconsistent.

Incorrect subscription

Sample code of incorrect subscription (1)

In the following example, the topics that two instances under the same group ID subscribe to are different:

  • Consumer instance 1-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 1-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    

Sample code of incorrect subscription (2)

In the following example, the tags of topics that instances using the same group ID subscribe to are different: Consumer instance 2-1 has subscribed to TagA while consumer instance 2-2 has not specified any tags.

  • Consumer instance 2-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 2-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                   

Sample code of incorrect subscription (3)

In the example, the error causes are as follows:

  • The quantities of topics that instances under the same group ID subscribe to are different.
  • The quantities of tags under topics that instances under the same group ID subscribe to are different.
  • Consumer instance 3-1:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 3-2:
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("jodie_test_A", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });