All Products
Search
Document Center

Subscribe to messages

Last Updated: Mar 29, 2019

This topic describes how to subscribe to messages by using the C/C++ SDK of RocketMQ.

Note:

  • Maintain consistent subscription for all consumer instances with the same group ID. For more information, see Subscription consistency.

Subscription modes

RocketMQ supports the following two message subscription modes:

  • Clustering subscription:

Clustering consumption is realized in this mode. All consumers that are identified by the same group ID consume messages in an even manner. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes three messages.

  1. ```
  2. // Set the subscription mode to clustering. Clustering subscription is used by default when this parameter is not configured.
  3. factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  4. ```
  • Broadcasting subscription:

Broadcasting subscription is implemented in this mode. Each of the consumers that is identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes nine messages.

  1. ```
  2. // Set the subscription mode to broadcasting.
  3. factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
  4. ```

Sample code

  1. #include "ONSFactory.h"
  2. using namespace ons;
  3. // MyMsgListener: Create a message consumer instance.
  4. //When pushConsumer pulls the message, it actively calls the consumer function of the instance.
  5. class MyMsgListener : public MessageListener
  6. {
  7. public:
  8. MyMsgListener()
  9. {
  10. }
  11. virtual ~MyMsgListener()
  12. {
  13. }
  14. virtual Action consume(Message &message, ConsumeContext &context)
  15. {
  16. //Customize the details of the message processing policy.
  17. return CommitMessage; //CONSUME_SUCCESS;
  18. }
  19. };
  20. int main(int argc, char* argv[])
  21. {
  22. //Required parameter for creating and executing pushConsumer.
  23. ONSFactoryProperty factoryInfo;
  24. factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//The group ID you created in the console.
  25. factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
  26. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//The message topic you created in the console.
  27. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//The AccessKeyId you created in the Alibaba Cloud console for identify authentication.
  28. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");//The AccessKeySecret you created in the Alibaba Cloud console for identify authentication.
  29. // Set the subscription mode to clustering, which is the default subscription mode.
  30. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  31. //Set the subscription mode to broadcasting.
  32. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
  33. //create pushConsumer
  34. PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
  35. //Specify the topic and tag of the message to which pushConsumer subscribes, and register the message callback function.
  36. MyMsgListener msglistener;
  37. pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
  38. //start pushConsumer
  39. pushConsumer->start();
  40. //Note: Shutdown can be called only after the consumer no longer receives messages. After shutdown is called, the consumer exits and cannot receive any message any more.
  41. //Destroy pushConsumer. You must destroy the consumer before exiting the application. Otherwise, memory leakage may occur.
  42. pushConsumer->shutdown();
  43. return 0;
  44. }