edit-icon download-icon

Send and receive ordered messages

Last Updated: Jun 21, 2018

Send ordered messages

Sample code:

  1. #include "ONSFactory.h"
  2. #include "ONSClientException.h"
  3. #include <iostream>
  4. using namespace ons;
  5. int main()
  6. {
  7. //parameter required for producer creation and message sending and receiving
  8. ONSFactoryProperty factoryInfo;
  9. factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "PID_xxxxxxxxxx");//The producerId created on the console
  10. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"xxxxxxxxxxxx" );// The msg topic created on the console
  11. factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "input msg content");//Message content
  12. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "xxxxxxxxx");//ONS AccessKey
  13. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "xxxxxxxxxxxxxxxxxxx" );// ONS SecretKey
  14. // This example is for the public cloud environment. For other environments, see: https://help.aliyun.com/document_detail/55448.html?spm=a2c4g.11186623.6.579.mu1gZX
  15. factoryInfo.setFactoryProperty(ONSFactoryProperty::ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet")
  16. //Create a producer
  17. OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
  18. //Before sending a message, the start method must be called once to start the producer.
  19. pProducer->start();
  20. Message msg(
  21. //Message Topic
  22. factoryInfo.getPublishTopics(),
  23. //Message tag, which is similar to tag in Gmail, and is used to classify messages. Consumers can then set filtering conditions for messages to be filtered in MQ broker.
  24. "TagA",
  25. //Message body. Any binary data is allowed and MQ does not perform any processing on it. Serialization and deserialization methods need to be negotiated and remain consistent between the producer and the consumer.
  26. factoryInfo.getMessageContent()
  27. );
  28. // The setting represents the key business property of the message, so please keep it globally unique.
  29. // You can query a message and resend it through the MQ console when you cannot receive the message properly.
  30. // Note: Normal sessage sending and receiving will not be affected if message key is not configured.
  31. msg.setKey("ORDERID_100");
  32. // For partitionally ordered messages, sharding keys are used to distinguish different partitions.
  33. // For globally ordered messages, it can be set as any string that is not null.
  34. std::string shardingKey = "abc"; //Messages with the same shardingKey will be sent in order.
  35. try
  36. {
  37. //Send message. If no exceptions are thrown, then the message is sent successfully.
  38. SendResultONS sendResult = pProducer->send(msg, key);
  39. std::cout << "send success" << std::endl;
  40. }
  41. catch(ONSClientException & e)
  42. {
  43. //Handle exceptions
  44. }
  45. // You must destroy the object Producer before exiting the application. Otherwise there will be memory leakage.
  46. pProducer->shutdown();
  47. return 0;
  48. }

Receive ordered messages

Sample code:

  1. #include "ONSFactory.h"
  2. using namespace std;
  3. using namespace ons;
  4. //Create a message consumer instance
  5. //When pushConsumer has pulled the message, it will actively invoke the consumeMessage function of the instance.
  6. class ONSCLIENT_API MyMsgListener : public MessageOrderListener
  7. {
  8. public:
  9. MyMsgListener()
  10. {
  11. }
  12. virtual ~MyMsgListener()
  13. {
  14. }
  15. virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
  16. {
  17. //Consume messages, based on business requirements
  18. return Success; //CONSUME_SUCCESS;
  19. }
  20. };
  21. int main(int argc, char* argv[])
  22. {
  23. //OrderConsumer property required for creation and normal operation
  24. ONSFactoryProperty factoryInfo;
  25. factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "");//The consumerId created on the console
  26. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"" );// The msg topic created on the console
  27. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "");// MQ AccessKey
  28. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "");//MQ SecretKey
  29. // This example is for the public cloud environment. For other environments, see [TCP access instruction](~~55448~~).
  30. factoryInfo.setFactoryProperty(ONSFactoryProperty::ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
  31. // Create orderConsumer
  32. OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
  33. MyMsgListener msglistener;
  34. //Specify the topics and tags that the orderConsumer subscribes to
  35. orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
  36. // Register the message listener processing instance. After orderConsumer pulls the message, it calls the consumeMessage method of this class.
  37. //Start orderConsumer
  38. orderConsumer->start();
  39. for(volatile int i = 0; i < 1000000000; ++i) {
  40. //wait
  41. }
  42. //Destroy orderConsumer. You must be destroy the object Consumer before exiting the application, otherwise there will be memory leaks.
  43. orderConsumer->shutdown();
  44. return 0;
  45. }
Thank you! We've received your feedback.