edit-icon download-icon

Subscribe to messages

Last Updated: Oct 09, 2018

This topic describes how to subscribe to messages using C/C++ SDK.

Note:

  • Make sure the subscriptions of all consumer instances under the same consumer ID are consistent. For detailed information, see Subscription consistency.

Subscription Methods

MQ supports the following two subscription methods:

  • Clustering Subscription: All the consumers with the same Consumer ID evenly share message consumption. For example, if there are 9 messages under a certain topic, and 3 consumer instances with same consumer ID, then in clustering consumption mode, each instance consumes 3 messages respectively.

    1. // Clustering subscription settings (default method)
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • Broadcasting subscription: All the consumers with the same consumer ID consume each message once. For example, if there are 9 messages under a certain topic, and 3 consumer instances with the same consumer ID, then each instance consumes all the 9 messages respectively.

    1. ## Broadcasting subscription settings
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);

Sample code

  1. #include "ONSFactory.h"
  2. using namespace ons;
  3. // MyMsgListener: Create a message consumer instance
  4. // When pushConsumer has pulled the message, it will actively invoke the consumer function of the instance.
  5. class MyMsgListener : public MessageListener
  6. {
  7. public:
  8. MyMsgListener()
  9. {
  10. }
  11. virtual ~MyMsgListener()
  12. {
  13. }
  14. virtual Action consume(Message &message, ConsumeContext &context)
  15. {
  16. //Customize the details for processing messages
  17. return CommitMessage; //CONSUME_SUCCESS;
  18. }
  19. };
  20. int main(int argc, char* argv[])
  21. {
  22. // Mandatory parameter required for the creation and operation of pushConsumer
  23. ONSFactoryProperty factoryInfo;
  24. factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//The consumer ID you have created in the MQ console
  25. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//The msg topic you have created on the MQ console
  26. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  27. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");//Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  28. // Clustering subscription method (default)
  29. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  30. // Broadcasting subscription method
  31. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
  32. //create pushConsumer
  33. PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
  34. //Specify the topic and tag of the message subscribed by pushConsumer, and register message callback function
  35. MyMsgListener msglistener;
  36. pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
  37. //start pushConsumer
  38. pushConsumer->start();
  39. //Note: Shutdown can only be invoked until no message is to be received; when shutdown is invoked, the consumer will quit, and cannot receive any messages.
  40. //Destroy pushConsumer. The object Consumer must be destroyed before exiting the application. Otherwise there will be memory leakage.
  41. pushConsumer->shutdown();
  42. return 0;
  43. }
Thank you! We've received your feedback.