All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive ordered messages

Last Updated:Mar 11, 2026

When your application requires sequential processing, such as order status updates, transaction matchmaking, or incremental data synchronization, use ordered messages. ApsaraMQ for RocketMQ delivers and consumes ordered messages in strict first-in, first-out (FIFO) order, so downstream systems always process events in the sequence they occurred.

This topic demonstrates how to send and consume ordered messages with the Apache RocketMQ TCP client SDK for Java.

How ordering works

ApsaraMQ for RocketMQ supports two ordering scopes:

Ordering scopeBehaviorUse when
Globally orderedAll messages in a topic follow a single FIFO sequence.Strict total ordering is required and throughput is not a concern.
Partitionally orderedMessages are distributed across partitions by sharding key. FIFO order is maintained within each partition; partitions are independent.You need both ordering and parallelism, such as ordering per order ID or per user ID.

Sharding keys

A sharding key identifies which partition a message belongs to. Messages with the same sharding key are always delivered in order. Choose sharding keys that match your business grouping -- for example, use order IDs so that all status updates for a single order are processed sequentially, while updates for different orders can be processed in parallel.

Avoid using a single sharding key for all messages. This funnels everything into one partition and eliminates parallelism.

Note

A sharding key is different from a normal message key. Set the sharding key through the __SHARDINGKEY user property, or MessageConst.PROPERTY_SHARDING_KEY in SDK 5.x and later.

For details about the ordered message model, see Ordered messages.

Prerequisites

Before you begin, make sure that you have:

Send ordered messages

Important

The broker determines message order based on the sequence in which a single producer or thread sends messages. If multiple producers or threads send messages concurrently, the order is determined by the broker's receive sequence, which may differ from the intended business order.

Replace the following placeholders with your actual values:

PlaceholderDescriptionExample
<YOUR_GROUP_ID>Producer group IDGID_order_producer
<YOUR_ENDPOINT>Endpoint from the ApsaraMQ for RocketMQ consolehttp://MQ_INST_XXXX.aliyuncs.com:80
<YOUR_TOPIC>Topic name for ordered messagesorder_topic
<YOUR_TAG>Message tag for filteringTagA
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOrderProducer {
    private static RPCHook getAclRPCHook() {
        // Retrieve AccessKey ID and AccessKey secret from environment variables.
        // Set ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET
        // before running this example.
        return new AclClientRPCHook(new SessionCredentials(
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // Create a producer with message trace enabled.
        // To disable message trace, use:
        // DefaultMQProducer producer = new DefaultMQProducer("<YOUR_GROUP_ID>", getAclRPCHook());
        DefaultMQProducer producer = new DefaultMQProducer("<YOUR_GROUP_ID>", getAclRPCHook(), true, null);

        // Required for message trace on Alibaba Cloud.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Set the endpoint obtained from the ApsaraMQ for RocketMQ console.
        producer.setNamesrvAddr("<YOUR_ENDPOINT>");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("<YOUR_TOPIC>",
                    "<YOUR_TAG>",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                // Set the sharding key. Messages with the same sharding key
                // are delivered to the same partition in FIFO order.
                // For SDK 5.x and later, use:
                // msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                msg.putUserProperty("__SHARDINGKEY", orderId + "");

                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // Route messages with the same orderId to the same queue.
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

Consume ordered messages

Replace <YOUR_GROUP_ID>, <YOUR_ENDPOINT>, and <YOUR_TOPIC> with the same values used in the producer.

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() {
        // Retrieve AccessKey ID and AccessKey secret from environment variables.
        // Set ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET
        // before running this example.
        return new AclClientRPCHook(new SessionCredentials(
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // Create a push consumer with message trace enabled.
        // To disable message trace, use:
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<YOUR_GROUP_ID>", getAclRPCHook(), null);
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<YOUR_GROUP_ID>",
            getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);

        // Required for message trace on Alibaba Cloud.
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // Set the endpoint obtained from the ApsaraMQ for RocketMQ console.
        consumer.setNamesrvAddr("<YOUR_ENDPOINT>");

        // Subscribe to ordered messages. Use "*" to receive all tags,
        // or specify a tag expression to filter messages.
        consumer.subscribe("<YOUR_TOPIC>", "*");

        // Start consuming from the earliest available offset.
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // Register an orderly message listener to preserve FIFO order.
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n",
                    Thread.currentThread().getName(), msgs);

                // Return SUCCESS after processing.
                // To suspend and retry on failure, return:
                // ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

Best practices

Sending

  • Use a single producer thread for strict ordering. If multiple producers or threads send messages concurrently, the broker cannot guarantee the intended business order. When strict ordering matters, send all related messages from a single thread.

  • Choose fine-grained sharding keys. Use business identifiers like order IDs or user IDs as sharding keys so that different groups of messages can be processed in parallel. Avoid using a single sharding key for all messages -- this funnels everything into one partition and eliminates parallelism.

Consuming

  • Handle consumption failures correctly. On failure, return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead of SUCCESS to suspend and retry, preserving FIFO order within the partition.

What's next