Ordered messages in ApsaraMQ for RocketMQ guarantee strict first-in-first-out (FIFO) delivery within a topic or partition. Use ordered messages when processing sequence affects your business logic, such as transaction matching where the first bid at a given price must win, or database change synchronization where insert, update, and delete operations must replay in order.
The following C++ samples show how to send and receive ordered messages through the HTTP client SDK.
How ordering works
ApsaraMQ for RocketMQ supports two ordering scopes:
Globally ordered messages -- All messages in a topic are delivered in FIFO order. Use this scope when every message must be processed sequentially.
Partitionally ordered messages -- Messages are distributed across partitions by sharding key. Within each partition, messages are delivered in FIFO order. Use this scope for higher throughput when only related messages need ordering, such as grouping by order ID or user ID.
A sharding key determines which partition receives a message. Messages with the same sharding key always go to the same partition and are consumed in send order. A sharding key is different from a message key.
For more information, see Ordered messages.
Production order
The broker determines message order based on the sequence in which a single producer or thread sends messages. To preserve ordering:
Send related messages from a single producer on a single thread.
If multiple producers or threads send concurrently, the broker orders messages by arrival time, which may differ from the intended business sequence.
Consumption order
The consumer pulls partitionally ordered messages from one or more partitions and processes each partition's messages in send order. The ordering guarantee depends on proper acknowledgment:
The consumer must acknowledge all messages in a batch before pulling the next batch from the same partition.
If the broker does not receive an acknowledgment (ACK) for a message before the timeout, it re-delivers that message.
Each re-delivery generates a new receipt handle with a unique timestamp.
Prerequisites
Before you begin, make sure that you have:
The C++ HTTP client SDK installed. See Prepare the environment
An ApsaraMQ for RocketMQ instance, topic, and consumer group created in the console. See Create resources
An Alibaba Cloud AccessKey pair. See Create an AccessKey pair
Send ordered messages
The broker sequences messages by arrival order from a single producer or thread. If multiple producers or threads send concurrently, the broker-side order may differ from the business-side send order.
Replace the following placeholders with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
${HTTP_ENDPOINT} | HTTP endpoint of your instance | Instance Details page > HTTP Endpoint in the ApsaraMQ for RocketMQ console |
${TOPIC} | Topic name | The topic you created in the console |
${INSTANCE_ID} | Instance ID | Instance Details page in the console. Set to an empty string if the instance has no namespace |
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// The AccessKey secret that is used for authentication.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
string topic = "${TOPIC}";
// The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
// If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instanceID parameter to null or an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
string instanceId = "${INSTANCE_ID}";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
// Cyclically send four messages.
for (int i = 0; i < 8; i++)
{
PublishMessageResponse pmResp;
// The message content.
TopicMessage pubMsg("Hello, mq!order msg!");
// The sharding key that is used to distribute ordered messages to a specific partition. Sharding keys can be used to identify partitions. A sharding key is different from a message key.
pubMsg.setShardingKey(std::to_string(i % 2));
// The custom attributes of the message.
pubMsg.putProperty("a",std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
cout << "Publish mq message success. Topic is: " << topic
<< ", msgId is:" << pmResp.getMessageId()
<< ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
return -2;
}
return 0;
}This sample sends eight messages using two sharding keys (0 and 1 via i % 2). Messages with the same sharding key are delivered to the same partition in send order. The putProperty call attaches a custom attribute to each message for tracking.
Receive ordered messages
Call consumeMessageOrderly instead of the standard consume method to maintain partition-level ordering. In long polling mode, the broker holds the request open for the specified duration if no message is available and responds immediately when a message arrives.
In addition to the placeholders listed above, replace the following:
| Placeholder | Description | Where to find it |
|---|---|---|
${GROUP_ID} | Consumer group ID | The consumer group you created in the ApsaraMQ for RocketMQ console |
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// The AccessKey secret that is used for authentication.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
string topic = "${TOPIC}";
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
string groupId = "${GROUP_ID}";
// The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
// If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instanceID parameter to null or an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Consume messages in long polling mode. The consumer may pull partitionally ordered messages from multiple partitions. The consumer consumes messages from the same partition in the order in which the messages are sent.
// Assume that a consumer pulls partitionally ordered messages from one partition. If the broker fails to receive an acknowledgement (ACK) for a message from the consumer, the broker delivers the message in the partition to the consumer again.
// The consumer can consume the next batch of messages from a partition only after all messages that are pulled from the partition in the previous batch are acknowledged as consumed.
// In long polling mode, if no message in the topic is available for consumption, the request is suspended on the broker for the specified period of time. If a message becomes available for consumption within the specified period of time, the broker immediately sends a response to the consumer. In this example, the value is specified as 3 seconds.
consumer->consumeMessageOrderly(
3, // The maximum number of messages that can be consumed at a time. In this example, the value is specified as 3. The maximum value that you can specify is 16.
3, // The duration of a long polling cycle. Unit: seconds. In this example, the value is specified as 3. The maximum value that you can specify is 30.
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// The message consumption logic.
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " ShardingKey: " << iter->getShardingKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// Obtain an ACK from the consumer.
// If the broker fails to receive an ACK for a message from the consumer before the period of time that is specified by the Message.NextConsumeTime parameter elapses, the broker delivers the message for consumption again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// If the handle of a message times out, the broker cannot receive an ACK for the message from the consumer.
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}consumeMessageOrderly parameters
| Parameter | Description | Range |
|---|---|---|
| Batch size | Maximum number of messages per pull | Maximum: 16 |
| Long polling duration | Seconds the broker holds the request when no message is available | Maximum: 30 |
Acknowledgment behavior
After processing each batch, call ackMessage with the receipt handles of all processed messages.
Batch gating: The consumer cannot pull the next batch from a partition until every message in the current batch is acknowledged.
Re-delivery on timeout: If the broker does not receive an ACK before
NextConsumeTime, it re-delivers the message with a new receipt handle that contains a unique timestamp.MessageNotExisthandling: This error code indicates no message is available. The sample handles it by continuing the polling loop without a delay.Error recovery: On other exceptions, the sample waits 2 seconds before retrying to avoid tight error loops.
Usage notes
Single producer per ordering scope. Send all messages that require relative ordering from a single producer on a single thread. Multi-producer or multi-thread sends may break the intended order.
Design sharding keys at the right granularity. Use business-meaningful keys such as order IDs or user IDs. Avoid putting too many messages under a single sharding key, which overloads one partition and limits throughput.
Acknowledge promptly. Delayed acknowledgment blocks consumption of subsequent messages in the same partition, increasing end-to-end latency.
Handle failures without breaking order. If message processing fails, let the message time out and re-deliver rather than acknowledging it and skipping ahead, which would break the ordering guarantee.