edit-icon download-icon

Subscription consistency

Last Updated: Sep 17, 2018

In MQ, a consumer ID represents a group of consumer instances. For most distributed applications, there are multiple consumer instances under one consumer ID. Subscription consistency means the subscription logics of all consumer instances under the same consumer ID must be the same. Once the subscription relationship is inconsistent, the message consumption logic will be confused, and even results in message loss.

Because the subscriptions in MQ are defined by topic and tag, keeping subscription consistency means that all instances under the same consumer ID should keep consistency in the following two aspects:

  1. The subscribed topics must be the same.

  2. The subscribed tags must be the same.

Figure of subscription example

Figure of consistent subscription example

As shown below, consumer IDs subscribe to multiple topics, and the consumer instances under the consumer IDs subscribe to the exactly same topics and corresponding tags.

consistent subscription

Figure of inconsistent subscription example

As shown below, a consumer ID subscribes to multiple topics, but the consumer instances under the consumer ID do not subscribe to the exactly same topics and corresponding tags.

Inconsistent subscription

Codes of susbcription example

Codes of inconsistent subscription

[Example 1]

In the following example, two instances under the same consumer ID subscribe to different topics.

Consumer instance 1-1:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_1");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

Consumer instance 1-2:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_1");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

[Example 2]

In the following example, two instances under the same consumer ID subscribe to messages in the same topic, but with different tags. Consumer instance 2-1 subscribes to TagA, while consumer instance 2-2 does not specify a tag.

Consumer instance 2-1:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_2");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

Consumer instance 2-2:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_2");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A ", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

[Example 3]

In this example, there are two errors:

  1. Instances with the same consumer ID subscribe to an inconsistent numbers of topics.

  2. Instances with the same consumer ID subscribe to a topic with inconsistent tags.

Consumer instance 3-1:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_3");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });
  10. consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
  11. public Action consume(Message message, ConsumeContext context) {
  12. System.out.println(message.getMsgID());
  13. return Action.CommitMessage;
  14. }
  15. });

Consumer instance 3-2:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_3");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A ", "TagB", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });
Thank you! We've received your feedback.