Ordered messages are delivered and consumed in strict first-in, first-out (FIFO) order. This topic provides Java sample code for sending and receiving ordered messages over TCP with the Apache RocketMQ SDK for Java.
How ordered messages work
Ordered messages are partitioned by sharding key. Each sharding key maps to a dedicated partition, and messages within that partition are delivered in the exact order they were sent.
Three rules govern this behavior:
In-order within a partition -- Messages that share the same sharding key are always delivered in FIFO order.
No ordering across partitions -- Messages with different sharding keys may arrive in any order relative to each other.
Sharding key is required -- Every ordered message must include a sharding key. This key is separate from the general message key used for lookups.
Use sharding keys to model your business ordering requirements. For example, set the sharding key to an order ID so that all messages for the same order are processed sequentially, while messages for different orders are processed in parallel.
For more information about ordering semantics and constraints, see Ordered messages.
Prerequisites
Before you begin, make sure that you have:
The SDK for Java 1.2.7 or later. See Release notes
A prepared environment. See Prepare the environment
(Optional) Logging settings configured. See Logging settings
If you are new to ApsaraMQ for RocketMQ, start with the demo project to set up a working project before sending and receiving messages.
Send ordered messages
Ordering guarantee
The broker determines message order based on the sequence in which the sender uses a single producer or thread to send messages. If multiple producers or threads send messages concurrently, the broker orders them by arrival time, which may differ from the intended business order.
Sample code
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-group-id> | Group ID created in the ApsaraMQ for RocketMQ console | GID_order_group |
<your-access-key> | AccessKey ID for authentication | LTAI5tXxx |
<your-secret-key> | AccessKey secret for authentication | xXxXxXx |
<your-tcp-endpoint> | TCP endpoint from the TCP Endpoint section on the Instance Details page | http://MQ_INST_xxx.mq-internet-access.mq-internet.aliyuncs.com:80 |
For the complete code, see the ApsaraMQ for RocketMQ code repository.
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import java.util.Date;
import java.util.Properties;
public class ProducerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// Group ID created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID, "<your-group-id>");
// AccessKey ID for authentication.
properties.put(PropertyKeyConst.AccessKey, "<your-access-key>");
// AccessKey secret for authentication.
properties.put(PropertyKeyConst.SecretKey, "<your-secret-key>");
// TCP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");
OrderProducer producer = ONSFactory.createOrderProducer(properties);
// Call start() once before sending any messages.
producer.start();
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
Message msg = new Message(
// Topic for ordered messages.
"Order_global_topic",
// Tag for filtering on the broker side.
"TagA",
// Message body in binary format.
// Producer and consumer must agree on serialization and deserialization.
"send order global msg".getBytes()
);
// Message key -- a business identifier, ideally globally unique.
// Use this key to look up the message in the ApsaraMQ for RocketMQ console.
msg.setKey(orderId);
// Sharding key -- determines the partition for ordered delivery.
// Messages with the same sharding key are delivered in FIFO order.
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = producer.send(msg, shardingKey);
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// Handle failure: retry or persist the message for later resend.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// Shut down the producer before exiting.
// Skip this if the producer sends messages frequently.
producer.shutdown();
}
}Receive ordered messages
Retry behavior
When an ordered message fails to be consumed, the broker blocks all subsequent messages in the same partition until the current message is either successfully processed or reaches the maximum retry count. This blocking behavior preserves FIFO ordering but can delay processing of other messages in the partition.
Two parameters control retry behavior:
| Parameter | Description | Valid values |
|---|---|---|
SuspendTimeMillis | Wait time in milliseconds between retries for a failed ordered message | 10 -- 30,000 |
MaxReconsumeTimes | Maximum retry attempts before the message is skipped | Integer (for example, 20) |
Set these values based on your tolerance for processing delays. A lower SuspendTimeMillis retries faster but increases broker load. A lower MaxReconsumeTimes unblocks the partition sooner but may skip messages that could have succeeded with more retries.
Sample code
The following sample code uses the same placeholders as the producer sample. Replace them with your actual values before running the code.
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import java.util.Properties;
public class ConsumerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID, "<your-group-id>");
// AccessKey ID for authentication.
properties.put(PropertyKeyConst.AccessKey, "<your-access-key>");
// AccessKey secret for authentication.
properties.put(PropertyKeyConst.SecretKey, "<your-secret-key>");
// TCP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");
// Wait time (ms) before retrying a failed ordered message. Valid values: 10 to 30,000.
properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
// Maximum retry attempts for a failed message.
properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
consumer.subscribe(
// Topic to subscribe to.
"Order_global_topic",
// Tag filter expression:
// "*" -- subscribe to all tags
// "TagA || TagB || TagC" -- subscribe to TagA, TagB, or TagC only
"*",
new MessageOrderListener() {
/**
* Return OrderAction.Success after processing the message.
* Return OrderAction.Suspend if processing fails or throws
* an exception -- the broker will retry after SuspendTimeMillis.
*/
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
System.out.println(message);
return OrderAction.Success;
}
});
// Call start() once to begin consuming.
consumer.start();
}
}What to do next
Learn about ordered messages concepts and constraints.
Explore the full ApsaraMQ for RocketMQ code repository for additional examples.