All Products
Search
Document Center

Subscribe to messages

Last Updated: Mar 29, 2019

This topic describes how to subscribe to messages by using the Java SDK of RocketMQ.

Note:

  • Maintain consistent subscription for all consumer instances with the same group ID. For more information, see Subscription consistency.

Subscription modes

RocketMQ supports the following two message subscription modes:

  • Clustering subscription: All the consumers identified by the same group ID equally share messages. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes three messages.

    1. // The configuration of clustering subscription (default mode).
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • Broadcasting subscription: All the consumers identified by the same group ID consume every message once. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes nine messages.

    1. // The configuration of broadcasting subscription.
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

Sample code

  1. import com.aliyun.openservices.ons.api.Action;
  2. import com.aliyun.openservices.ons.api.ConsumeContext;
  3. import com.aliyun.openservices.ons.api.Consumer;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public class ConsumerTest {
  10. public static void main(String[] args) {
  11. Properties properties = new Properties();
  12. // The group ID you created in the console.
  13. properties.put(PropertyKeyConst.GROUP_ID, "XXX");
  14. // The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
  15. properties.put(PropertyKeyConst.AccessKey, "XXX");
  16. // The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
  17. properties.put(PropertyKeyConst.SecretKey, "XXX");
  18. // Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
  19. properties.put(PropertyKeyConst.NAMESRV_ADDR,
  20. "XXX ");
  21. // Clustering subscription (default)
  22. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  23. // Broadcasting subscription
  24. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
  25. Consumer consumer = ONSFactory.createConsumer(properties);
  26. consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags.
  27. public Action consume(Message message, ConsumeContext context) {
  28. System.out.println("Receive: " + message);
  29. return Action.CommitMessage;
  30. }
  31. });
  32. // Subscribe to another topic.
  33. consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribe to all tags.
  34. public Action consume(Message message, ConsumeContext context) {
  35. System.out.println("Receive: " + message);
  36. return Action.CommitMessage;
  37. }
  38. });
  39. consumer.start();
  40. System.out.println("Consumer Started");
  41. }
  42. }

Note:

  • In broadcasting consumption mode, you cannot set a message accumulation alarm or query the message accumulation condition in the console. Therefore, you can create multiple group IDs to enable message broadcasting. For more information, see Use the clustering consumption mode to simulate the broadcasting consumption mode in Clustering and broadcasting consumption.

  • For more information about the best practices for throttling on RocketMQ consumer clients, see RocketMQ client traffic control design.