edit-icon download-icon

Subscribe to messages

Last Updated: Jun 21, 2018

This topic describes how to subscribe to messages using C/C++ SDK.

Note:

  • Make sure the subscriptions of all consumer instances under the same consumer ID are consistent. For detailed information, see Subscription consistency.For information on TCP access point domain names, see TCP access instruction.

Subscription Methods

MQ supports the following two subscription methods:

  • Clustering Subscription: All the consumers with the same Consumer ID evenly share message consumption. For example, if there are 9 messages under a certain topic, and 3 consumer instances with same consumer ID, then in clustering consumption mode, each instance consumes 3 messages respectively.

    1. // Clustering subscription settings (default method)
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • Broadcasting subscription: All the consumers with the same consumer ID consume each message once. For example, if there are 9 messages under a certain topic, and 3 consumer instances with the same consumer ID, then each instance consumes all the 9 messages respectively.

    1. ## Broadcasting subscription settings
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);

Sample code

  1. #include "ONSFactory.h"
  2. using namespace ons;
  3. // MyMsgListener: Create a message consumer instance
  4. // When pushConsumer has pulled the message, it will actively invoke the consumer function of the instance.
  5. class MyMsgListener : public MessageListener
  6. {
  7. public:
  8. MyMsgListener()
  9. {
  10. }
  11. virtual ~MyMsgListener()
  12. {
  13. }
  14. virtual Action consume(Message &message, ConsumeContext &context)
  15. {
  16. //Customize the details for processing messages
  17. return CommitMessage; //CONSUME_SUCCESS;
  18. }
  19. };
  20. int main(int argc, char* argv[])
  21. {
  22. // Mandatory parameter required for the creation and operation of pushConsumer
  23. ONSFactoryProperty factoryInfo;
  24. factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//The consumer ID you have created in the MQ console
  25. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//The msg topic you have created on the MQ console
  26. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  27. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");//Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  28. // Clustering subscription method (default)
  29. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  30. // Broadcasting subscription method
  31. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
  32. //create pushConsumer
  33. PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
  34. //Specify the topic and tag of the message subscribed by pushConsumer, and register message callback function
  35. MyMsgListener msglistener;
  36. pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
  37. //start pushConsumer
  38. pushConsumer->start();
  39. //Note: Shutdown can only be invoked until no message is to be received; when shutdown is invoked, the consumer will quit, and cannot receive any messages.
  40. //Destroy pushConsumer. The object Consumer must be destroyed before exiting the application. Otherwise there will be memory leakage.
  41. pushConsumer->shutdown();
  42. return 0;
  43. }
Thank you! We've received your feedback.