Ordered messages, which are also known as first in, first out (FIFO) messages, are a type of message provided by ApsaraMQ for RocketMQ. This type of message is published and consumed in a strict order. This topic provides sample code based on which you can use the TCP client SDK for C++ of the Community Edition to send and receive ordered messages.
Ordered messages are classified into the following types:
Globally ordered messages: All messages of a specified topic are published and consumed in first-in-first-out (FIFO) order.
Partitionally ordered messages: All messages of a specified topic are distributed to different partitions by using sharding keys. The messages in each partition are published and consumed in FIFO order. A sharding key is a key field that is used for ordered messages to identify different partitions. A sharding key is different from the key of a normal message.
For more information, see Ordered messages.
Prerequisites
The C++ dynamic library is installed. For more information, see Install the C++ dynamic library.
An AccessKey pair is created for your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Send ordered messages
An ApsaraMQ for RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.
Copy the following code to the OrderProducerDemo.cpp file. In this file, configure the corresponding parameters, run the g++ command to compile the code, and then generate an executable file that you can run.
g++ -o order_producer_demo -std=c++11 -lz -lrocketmq OrderProducerDemo.cppThe following sample code provides an example on how to send ordered messages by using the TCP client SDK for C++ of the Community Edition:
#include <iostream> #include <chrono> #include <thread> #include "DefaultMQProducer.h" using namespace std; using namespace rocketmq; class ExampleSelectMessageQueueByHash : public MessageQueueSelector { public: MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) { // Specify a custom logic for partitioning. The system calculates the queue to which a message is routed based on the arg parameter that specifies the partition key. In this example, the arg parameter of the INT type is used. int orderId = *static_cast<int *>(arg); int index = orderId % mqs.size(); return mqs[0]; } }; int main() { std::cout << "=======Before sending messages=======" << std::endl; // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. DefaultMQProducer producer("GID_XXXXXXXX"); // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. producer.setNamesrvAddr("http://MQ_INST_XXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. // The user channel. Default value: ALIYUN. producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); // After you configure the required parameters, start the producer. producer.start(); auto start = std::chrono::system_clock::now(); int count = 32; // The number of retries that the message can perform after it fails to be sent. This ensures the successful delivery of the message. int retryTimes = 1; // Use the configured custom parameter arg to calculate the queue to which a message is sent and send the message to the specified queue. The arg parameter specifies the partition ID. For more information about how to select a message queue, see the MessageQueueSelector operation. ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash(); for (int i = 0; i < count; ++i) { // Before you send the message, specify the topic for which you applied in the ApsaraMQ for RocketMQ console. MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message."); try { SendResult sendResult = producer.send(msg, pSelector, &i, 1, false); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Send " << count << " messages OK, costs " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; producer.shutdown(); std::cout << "=======After sending messages=======" << std::endl; return 0; }
Consume ordered messages
Copy the following code to the OrderConsumerDemo.cpp file. In this file, configure the corresponding parameters, run the g++ command to compile the code, and then generate an executable file that you can run.
g++ -o order_consumer_demo -std=c++11 -lz -lrocketmq OrderConsumerDemo.cppThe following sample code provides an example on how to consume ordered messages by using the commercial edition of the TCP client SDK for C++:
#include <iostream> #include <thread> #include "DefaultMQPushConsumer.h" using namespace rocketmq; class ExampleOrderlyMessageListener : public MessageListenerOrderly { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Before consuming messages=======" << std::endl; // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXXX"); // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. // The user channel. Default value: ALIYUN. consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); auto start = std::chrono::system_clock::now(); //register your own listener here to handle the messages received. // Use a custom listener function to process the received message and return the result. ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener(); consumer->subscribe("YOUR ORDERLY TOPIC", "*"); consumer->registerMessageListener(messageListener); //Start this consumer // The preparation is complete. You must invoke the startup function to start the consumer. // ******************************************** // 1. Before you start the consumer, make sure that the subscription is configured. // 2. Make sure that the subscriptions of consumers in the same group are consistent. // ********************************************* consumer->start(); //Keep main thread running until process finished. // Keep the thread running and do not shut down the consumer. std::this_thread::sleep_for(std::chrono::seconds (60 )); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }