Ordered messages, also known as first in, first out (FIFO) messages, are published and consumed in strict order. This topic provides C++ sample code that uses the TCP client SDK for C++ of the Community Edition to send and receive ordered messages.
How it works
ApsaraMQ for RocketMQ supports two types of ordered messages:
| Type | Ordering scope | Description |
|---|---|---|
| Globally ordered | Entire topic | All messages in a topic are published and consumed in FIFO order. |
| Partitionally ordered | Per partition | Messages are distributed to partitions by sharding key. Messages within each partition are published and consumed in FIFO order. |
A sharding key is a key field that identifies different partitions for ordered messages. It differs from the key of a normal message.
For more information, see Ordered messages.
Prerequisites
Before you begin, make sure that you have:
The C++ dynamic library installed. For more information, see Install the C++ dynamic library
An AccessKey pair created for your Alibaba Cloud account. For more information, see Create an AccessKey pair
Send ordered messages
The ApsaraMQ for RocketMQ broker determines message order based on the order in which a single producer or thread sends messages. If the sender uses multiple producers or threads to send messages concurrently, the broker orders messages by arrival time, which may differ from the intended business order.
Save the following code as OrderProducerDemo.cpp, then compile it:
g++ -o order_producer_demo -std=c++11 -lz -lrocketmq OrderProducerDemo.cppRun the compiled binary. The code sends 32 ordered messages, routing each to a queue based on the partition key:
#include <iostream> #include <chrono> #include <thread> #include "DefaultMQProducer.h" using namespace std; using namespace rocketmq; // Custom queue selector: routes messages to a queue based on custom // partitioning logic. In this example, the modulo of orderId is used. class ExampleSelectMessageQueueByHash : public MessageQueueSelector { public: MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) { int orderId = *static_cast<int *>(arg); int index = orderId % mqs.size(); return mqs[0]; } }; int main() { std::cout << "=======Before sending messages=======" << std::endl; // Replace with the Group ID that you created in the ApsaraMQ for RocketMQ console. DefaultMQProducer producer("GID_XXXXXXXX"); // Replace with the TCP endpoint from the Instance Details page // in the ApsaraMQ for RocketMQ console. producer.setNamesrvAddr("http://MQ_INST_XXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Set credentials from environment variables. // ALIBABA_CLOUD_ACCESS_KEY_ID: your AccessKey ID // ALIBABA_CLOUD_ACCESS_KEY_SECRET: your AccessKey secret // "ALIYUN": the user channel (default value) producer.setSessionCredentials( getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN" ); producer.start(); auto start = std::chrono::system_clock::now(); int count = 32; int retryTimes = 1; // Create the queue selector for ordered message routing. ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash(); for (int i = 0; i < count; ++i) { // Replace "YOUR ORDERLY TOPIC" with the topic that you created // in the ApsaraMQ for RocketMQ console. MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message."); try { // Send the message to the queue determined by the selector. // The fourth argument (&i) serves as the partition key. SendResult sendResult = producer.send(msg, pSelector, &i, 1, false); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Send " << count << " messages OK, costs " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; producer.shutdown(); std::cout << "=======After sending messages=======" << std::endl; return 0; }
Replace the following placeholders with actual values:
| Placeholder | Description | Example |
|---|---|---|
GID_XXXXXXXX | The Group ID from the ApsaraMQ for RocketMQ console | GID_OrderGroup |
MQ_INST_XXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80 | The TCP endpoint from the Instance Details page | MQ_INST_abc123.mq-internet-access.mq-internet.aliyuncs.com:80 |
YOUR ORDERLY TOPIC | The topic created in the ApsaraMQ for RocketMQ console | OrderTopic |
Consume ordered messages
The consumer uses MessageListenerOrderly to process messages in strict FIFO order within each queue. The following sample code uses the commercial edition of the TCP client SDK for C++.
Save the following code as OrderConsumerDemo.cpp, then compile it:
g++ -o order_consumer_demo -std=c++11 -lz -lrocketmq OrderConsumerDemo.cppRun the compiled binary. The code subscribes to a topic and consumes ordered messages sequentially:
#include <iostream> #include <thread> #include "DefaultMQPushConsumer.h" using namespace rocketmq; // Listener that processes messages in the order they were stored. class ExampleOrderlyMessageListener : public MessageListenerOrderly { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Before consuming messages=======" << std::endl; // Replace with the Group ID from the ApsaraMQ for RocketMQ console. DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXXX"); // Replace with the TCP endpoint from the Instance Details page // in the ApsaraMQ for RocketMQ console. consumer->setNamesrvAddr( "http://MQ_INST_XXXXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80" ); // Set credentials from environment variables. // ALIBABA_CLOUD_ACCESS_KEY_ID: your AccessKey ID // ALIBABA_CLOUD_ACCESS_KEY_SECRET: your AccessKey secret // "ALIYUN": the user channel (default value) consumer->setSessionCredentials( getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN" ); auto start = std::chrono::system_clock::now(); // Register the orderly message listener. ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener(); // Subscribe to the topic. The "*" expression matches all tags. consumer->subscribe("YOUR ORDERLY TOPIC", "*"); consumer->registerMessageListener(messageListener); // Start the consumer. // Before starting: // - Make sure that the subscription is configured. // - Make sure that all consumers in the same group use consistent subscriptions. consumer->start(); // Keep the main thread alive for 60 seconds to receive messages. std::this_thread::sleep_for(std::chrono::seconds(60)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }
Replace the following placeholders with actual values:
| Placeholder | Description | Example |
|---|---|---|
GID_XXXXXXXXXXX | The Group ID from the ApsaraMQ for RocketMQ console | GID_OrderConsumerGroup |
MQ_INST_XXXXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80 | The TCP endpoint from the Instance Details page | MQ_INST_abc123.mq-internet-access.mq-internet.aliyuncs.com:80 |
YOUR ORDERLY TOPIC | The topic created in the ApsaraMQ for RocketMQ console | OrderTopic |
What's next
Ordered messages: Learn how global and partitional ordering work in detail.
Explore other message types supported by the TCP client SDK for C++.