Ordered messages in ApsaraMQ for RocketMQ guarantee strict first-in, first-out (FIFO) delivery. Use ordered messages when processing sequence matters to your business logic -- for example, transaction matching where the earliest bid at a given price must win, or database change synchronization where inserts, updates, and deletes must replay in order.
This topic shows how to send and receive ordered messages by using the @aliyunmq/mq-http-sdk package for Node.js.
How ordering works
ApsaraMQ for RocketMQ supports two ordering scopes:
Globally ordered messages -- All messages in a topic are delivered and consumed in a single FIFO sequence.
Partitionally ordered messages -- Messages are distributed across partitions by a sharding key. Within each partition, messages are delivered and consumed in FIFO order. Messages in different partitions may be consumed concurrently.
A sharding key determines which partition a message is routed to. A message key is a separate index used for message lookup. Do not confuse the two.
Ordering depends on two independent guarantees:
| Guarantee | Requirement |
|---|---|
| Production order | Send messages from a single producer on a single thread. If multiple producers or threads send concurrently, the broker records messages in arrival order, which may differ from the intended business order. |
| Consumption order | Acknowledge each batch of messages before consuming the next batch from the same partition. If the broker does not receive an acknowledgment (ACK), it redelivers the unacknowledged message before delivering new ones. |
Prerequisites
Before you begin, make sure that you have:
Created resources in the ApsaraMQ for RocketMQ console: an instance, a topic, and a consumer group
Created an AccessKey pair for your Alibaba Cloud account
Send ordered messages
Order is only guaranteed when a single producer sends messages on a single thread. If multiple producers or threads send concurrently, the broker records messages in arrival order, which may differ from the intended business order.
Replace the following placeholders with your actual values before running the code:
| Placeholder | Description |
|---|---|
${HTTP_ENDPOINT} | HTTP endpoint from the Instance Details page > HTTP Endpoint section in the ApsaraMQ for RocketMQ console |
${TOPIC} | Topic name created in the console |
${INSTANCE_ID} | Instance ID. If the instance has a namespace, specify the ID. If not, set to null or "". Check the Instance Details page for namespace information. |
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
// HTTP endpoint. Get this from Instance Details > HTTP Endpoint in the console.
const endpoint = "${HTTP_ENDPOINT}";
// Load credentials from environment variables.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topic created in the ApsaraMQ for RocketMQ console.
const topic = "${TOPIC}";
// Instance ID. Set to null or "" if the instance does not have a namespace.
const instanceId = "${INSTANCE_ID}";
const producer = client.getProducer(instanceId, topic);
(async function(){
try {
// Send 8 messages. Sharding key i % 2 routes messages to 2 partitions.
for(var i = 0; i < 8; i++) {
msgProps = new MessageProperties();
// Set a custom property.
msgProps.putProperty("a", i);
// Set the sharding key. Messages with the same key go to the same partition.
msgProps.shardingKey(i % 2);
// Publish with body "hello mq." and tag "TagA".
res = await producer.publishMessage("hello mq.", "TagA", msgProps);
console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
}
} catch(e) {
// Handle send failures. Implement retry or persistence logic as needed.
console.log(e)
}
})();Receive ordered messages
To consume messages in order, call consumeMessageOrderly() instead of consumeMessage(). This method ensures that messages from the same partition arrive in send order.
Key behaviors:
A single call may return messages from multiple partitions. Within each partition, order is preserved.
If the broker does not receive an ACK, it redelivers that message before delivering the next one in the same partition.
You must acknowledge all messages in a batch before the broker delivers the next batch from the same partition.
Long polling: if no messages are available, the broker holds the connection open for the specified wait time and responds immediately when a message arrives.
const {
MQClient,
} = require('@aliyunmq/mq-http-sdk');
// HTTP endpoint. Get this from Instance Details > HTTP Endpoint in the console.
const endpoint = "${HTTP_ENDPOINT}";
// Load credentials from environment variables.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topic created in the ApsaraMQ for RocketMQ console.
const topic = "${TOPIC}";
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
const groupId = "GID_http";
// Instance ID. Set to null or "" if the instance does not have a namespace.
const instanceId = "${INSTANCE_ID}";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
while(true) {
try {
// consumeMessageOrderly(numOfMessages, waitSeconds)
// numOfMessages: max messages per call (up to 16)
// waitSeconds: long polling duration in seconds (up to 30)
res = await consumer.consumeMessageOrderly(
3, // Receive up to 3 messages per call.
3 // Wait up to 3 seconds if no messages are available.
);
if (res.code == 200) {
console.log("Consume Messages, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
",Props:%j,ShardingKey:%s,Prop-A:%s,Tag:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.ShardingKey,message.Properties.a);
return message.ReceiptHandle;
});
// Acknowledge consumed messages.
// If NextConsumeTime passes without an ACK, the broker redelivers the message.
res = await consumer.ackMessage(handles);
if (res.code != 204) {
// ACK failed -- typically caused by an expired receipt handle.
console.log("Ack Message Fail:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
// No messages available. Long polling continues.
console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();Best practices
Send from a single producer thread. Multiple producers or threads sending to the same sharding key can break the intended order because the broker records messages by arrival time, not by send time.
Acknowledge every batch before consuming the next. The broker blocks delivery of the next batch for a partition until all messages in the current batch are acknowledged. Skipping or delaying ACKs stalls consumption.