ApsaraMQ for RocketMQ delivers ordered messages in strict first-in-first-out (FIFO) sequence. Use ordered messages when your application requires messages to be processed in the exact order they were sent, such as processing trade orders by submission time or synchronizing database changes in sequence.
This topic provides sample code for sending and receiving ordered messages by using the HTTP client SDK for Java.
How it works
Ordered messages fall into two categories:
Globally ordered messages: All messages in a topic follow a single FIFO sequence. This provides the strongest ordering guarantee but limits throughput to a single partition.
Partitionally ordered messages: Messages are distributed across partitions by sharding key. Within each partition, messages follow FIFO order. Different partitions are consumed independently, enabling higher throughput while preserving order where it matters.
A sharding key identifies which partition a message belongs to. Messages with the same sharding key are always delivered to the same partition and consumed in order. A sharding key is different from a message key.
An ApsaraMQ for RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.
Production order and consumption order
End-to-end FIFO delivery depends on two independent factors:
| Production order | Consumption order | Result |
|---|---|---|
| Single producer, serial sends | Ordered consumption (consumeMessageOrderly) | FIFO guaranteed per sharding key |
| Multiple producers or threads | Ordered consumption | Order determined by broker arrival, not business intent |
| Single producer, serial sends | Concurrent consumption | Consumption order not guaranteed |
For strict ordering, both conditions must hold: send from a single producer thread and consume with consumeMessageOrderly.
Prerequisites
Before you start, make sure that the following operations are performed:
Install the SDK for Java. For more information, see Set up your Java environment.
Create the resources that you want to specify in the code in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.
Obtain the AccessKey pair of your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Send ordered messages
The following example sends eight ordered messages across two partitions by using sharding keys.
Replace the placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-http-endpoint> | HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console | http://xxx.mqrest.cn-hangzhou.aliyuncs.com |
<your-topic> | Topic name | OrderTopic |
<your-instance-id> | Instance ID. Set to null or "" if the instance has no namespace | MQ_INST_xxx |
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class OrderProducer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
"<your-http-endpoint>",
// Read credentials from environment variables to avoid hardcoding secrets
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
final String topic = "<your-topic>";
// Set to null or "" if the instance has no namespace
final String instanceId = "<your-instance-id>";
// Get the producer for the specified topic
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// Send 8 messages, alternating between 2 sharding keys (partitions)
for (int i = 0; i < 8; i++) {
TopicMessage pubMsg = new TopicMessage(
"hello mq!".getBytes(), // Message body
"A" // Message tag for filtering
);
// Sharding key determines the partition. Messages with the same
// sharding key are delivered to the same partition in FIFO order.
pubMsg.setShardingKey(String.valueOf(i % 2));
pubMsg.getProperties().put("a", String.valueOf(i));
// Publish synchronously. No exception means the send succeeded.
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
System.out.println(new Date() + " Send mq message success. Topic is:" + topic
+ ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// Handle send failure: retry or persist the message for later delivery
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}Each topic supports only one message type. A topic created for ordered messages cannot send or receive normal messages.
Receive ordered messages
The following example consumes ordered messages by using consumeMessageOrderly, which uses long polling and guarantees partition-level FIFO order.
Replace the placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-http-endpoint> | HTTP endpoint from the Instance Details page | http://xxx.mqrest.cn-hangzhou.aliyuncs.com |
<your-topic> | Topic name | OrderTopic |
<your-group-id> | Consumer group ID | GID_Order |
<your-instance-id> | Instance ID. Set to null or "" if the instance has no namespace | MQ_INST_xxx |
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
"<your-http-endpoint>",
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
final String topic = "<your-topic>";
final String groupId = "<your-group-id>";
// Set to null or "" if the instance has no namespace
final String instanceId = "<your-instance-id>";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// Continuously poll for messages
do {
List<Message> messages = null;
try {
// consumeMessageOrderly guarantees partition-level FIFO order.
// The consumer must ACK all messages in a batch before the broker
// delivers the next batch from the same partition.
messages = consumer.consumeMessageOrderly(
3, // Max messages per batch (up to 16)
3 // Long polling timeout in seconds (up to 30)
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}
// Process messages
System.out.println("Receive " + messages.size() + " messages:");
for (Message message : messages) {
System.out.println(message);
System.out.println("ShardingKey: " + message.getShardingKey()
+ ", a:" + message.getProperties().get("a"));
}
// ACK consumed messages. If the broker does not receive an ACK before
// the retry timeout, it redelivers the message.
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:"
+ errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle : errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle
+ ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}How ordered consumption works
When you call consumeMessageOrderly, the broker enforces these behaviors:
Partition-level locking: The consumer may pull messages from multiple partitions, but within each partition, messages arrive in send order.
Batch acknowledgement gating: The consumer must ACK all messages in the current batch before receiving the next batch from the same partition. If the broker does not receive an ACK before the retry timeout, it redelivers the unacknowledged message.
Long polling: If no messages are available, the broker holds the request for the specified polling duration (up to 30 seconds) and responds immediately when a message arrives.
Usage notes
Single-type topics: Each topic supports only one message type. Do not mix ordered messages with normal messages on the same topic.
Single producer for strict order: Send all messages that require relative ordering from a single producer on a single thread. Using multiple producers or concurrent threads breaks the ordering guarantee.
Sharding key design: Choose a sharding key that groups related messages without creating hot partitions. For example, use an order ID to keep all events for one order in sequence, rather than a customer ID that might concentrate too many messages on a single partition.
Prompt acknowledgement: Delayed acknowledgement blocks subsequent message delivery for the entire partition. Process and ACK each batch as quickly as possible.
Receipt handle expiration: Each message receipt handle has a unique timestamp. If the handle expires before you ACK it, the ACK fails and the broker redelivers the message.
See also
Ordered messages: Ordering guarantees, ordering types, and detailed concepts
Create resources: Set up instances, topics, and consumer groups in the console