All Products
Search
Document Center

Subscription consistency

Last Updated: Jul 12, 2019

In RocketMQ, a group ID represents a consumer instance group. For most distributed applications, there are multiple consumer instances for one group ID. Subscription consistency means that the processing logic of all consumer instances under the same group ID must be completely the same. If the subscription relationships are inconsistent, the message consumption logic will be confusing, and even lead to message loss.

The RocketMQ subscription relationship consists of topics and tags. Therefore, to ensure subscription consistency is to ensure that all instances under the same group ID maintain consistency in the following two aspects:

  • The subscribed topics must be the same.

  • The subscribed tags must be the same.

Examples of the subscription relationship

Correct subscription relationship

As shown in the following figure, multiple group IDs subscribe to multiple topics, and the subscription relationships of different consumer instances using the same group ID are consistent.

subconsis2

Incorrect subscription relationship

As shown in the following figure, one group ID subscribes to multiple topics, but the subscription relationships of different consumer instances using the group ID are not consistent.

subinconsis

Sample code of subscription

Sample code of incorrect subscription relationship

[Example 1]

In the following example, the topics subscribed by two instances under the same group ID are different:

Consumer instance 1-1:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

Consumer instance 1-2:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_B", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

[Example 2]

In the following example, the tags of topics subscribed by instances using the same group ID are different: Consumer instance 2-1 has subscribed to TagA, while consumer instance 2-2 has not specified a tag.

Consumer instance 2-1:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

Consumer instance 2-2:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

[Example 3]

In this example, the subscription relationship is incorrect due to the following reasons:

  1. The quantities of topics subscribed by instances under the same group ID are different.

  2. The quantities of tags under topics subscribed by instances under the same group ID are different.

Consumer instance 3-1:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });
  10. consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
  11. public Action consume(Message message, ConsumeContext context) {
  12. System.out.println(message.getMsgID());
  13. return Action.CommitMessage;
  14. }
  15. });

Consumer instance 3-2:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "TagB", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });