edit-icon download-icon

Subscribe to messages

Last Updated: Jun 19, 2018

This topic introduces how to subscribe to messages using MQ Java SDK.

Note:

Subscription Methods

MQ supports the following two subscription methods:

  • Clustering Subscription: All the consumers with the same Consumer ID evenly share message consumption. For example, if there are 9 messages under a certain topic, and 3 consumer instances with same consumer ID, then in clustering consumption mode, each instance consumes 3 messages respectively.

    1. // Clustering subscription settings (default method)
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • Broadcasting subscription: All the consumers with the same consumer ID consume each message once. For example, if there are 9 messages under a certain topic, and 3 consumer instances with the same consumer ID, then each instance consumes all the 9 messages respectively.

    1. ## Broadcasting subscription settings
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

Sample code

  1. import com.aliyun.openservices.ons.api.Action;
  2. import com.aliyun.openservices.ons.api.ConsumeContext;
  3. import com.aliyun.openservices.ons.api.Consumer;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public class ConsumerTest {
  10. public static void main(String[] args) {
  11. Properties properties = new Properties();
  12. // The Consumer ID you have created on the console
  13. properties.put(PropertyKeyConst.ConsumerId, "XXX");
  14. // AccessKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  15. properties.put(PropertyKeyConst.AccessKey, "XXX");
  16. // SecretKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  17. properties.put(PropertyKeyConst.SecretKey, "XXX");
  18. // Set a TCP access domain name (the following uses public cloud production environment as an example)
  19. properties.put(PropertyKeyConst.ONSAddr,
  20. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  21. // Clustering subscription method (default)
  22. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  23. // Broadcasting subscription method
  24. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
  25. Consumer consumer = ONSFactory.createConsumer(properties);
  26. consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags
  27. public Action consume(Message message, ConsumeContext context) {
  28. System.out.println("Receive: " + message);
  29. return Action.CommitMessage;
  30. }
  31. });
  32. // Subscribe to another topic
  33. consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribe to all tags
  34. public Action consume(Message message, ConsumeContext context) {
  35. System.out.println("Receive: " + message);
  36. return Action.CommitMessage;
  37. }
  38. });
  39. consumer.start();
  40. System.out.println("Consumer Started");
  41. }
  42. }

Note:

  • In broadcasting consumption mode, you cannot set message accumulation alarms, or query message accumulation in the console. Alternatively, you can create multiple consumer IDs to achieve the effect of broadcasting mode. For detailed information, see the section “Use clustering mode to simulate broadcasting mode” in Clustering and broadcasting consumption.
  • For best practices on MQ traffic control, see Traffic control solution for MQ clients.
Thank you! We've received your feedback.