All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive ordered messages

Last Updated:Apr 24, 2025

Ordered messages are a type of message provided by ApsaraMQ for RocketMQ. Ordered messages are published and consumed in strict first-in, first-out (FIFO) order. This topic provides sample code used to send and receive ordered messages over TCP using Apache RocketMQ SDK for Java.

Prerequisites

Make sure that the following operations are performed:

Background information

Partitionally ordered messages in a specific topic are partitioned based on sharding keys. Messages in each partition are consumed in strict FIFO order. A sharding key is a key field that is used for ordered messages to identify different partitions. Sharding keys are different from the keys of normal messages.

For more information, see Ordered messages.

Note

If you are a new user of ApsaraMQ for RocketMQ, we recommend that you refer to the demo project to build a project before you use ApsaraMQ for RocketMQ to send and receive messages.

Send ordered messages

Important

An ApsaraMQ for RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.

For information about the detailed sample code, see the ApsaraMQ for RocketMQ code repository.

Sample code:

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Properties;


public class ProducerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // The AccessKey ID that is used for authentication. 
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // The AccessKey secret that is used for authentication. 
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        // Before you send the message, call the start() method only once to start the producer. 
        producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    // The topic to which the message that you want to send belongs. 
                    "Order_global_topic",
                    // The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker. 
                    "TagA",
                    // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. 
                    "send order global msg".getBytes()
            );
            // The message key. The key is the business-specific attribute of the message and must be globally unique whenever possible. 
            // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console and send it again. 
            // Note: A message can be sent and received even if you do not specify the message key. 
            msg.setKey(orderId);
            // The key field that is used in the ordered message to identify the partition. The sharding key is different from the key of a normal message. 
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // Send the message. If no exception is thrown, the message is sent. 
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // Specify the logic to resend or persist the message if the message fails to be sent and needs to be re-sent. 
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }

}

Subscribe to ordered messages

Sample code:

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;


public class ConsumerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // The AccessKey ID that is used for authentication. 
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // The AccessKey secret that is used for authentication. 
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
          // The timeout period in milliseconds before a retry is performed when the system fails to consume an ordered message. Valid values: 10 to 30,000. 
        properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
        // The maximum number of retries that can be performed on the message if the message fails to be consumed. 
        properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");

        // Before you use the consumer to subscribe to messages, call the start() method only once to start the consumer. 
        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

        consumer.subscribe(
                // The topic to which the message to which you want to subscribe belongs. 
                "Order_global_topic",
                // Subscribe to messages that contain the specified tags in the specified topic.
                // 1. An asterisk (*) specifies that the consumer subscribes to all messages. 
                // 2. TagA || TagB || TagC specifies that the consumer subscribes to messages that contain TagA, TagB, or TagC. 
                "*",
                new MessageOrderListener() {
                    /**
                     * 1 If a message fails to be consumed or an exception occurs during message processing, OrderAction.Suspend is returned. 
                     * 2. If a message is processed, OrderAction.Success is returned. 
                     */
                    @Override
                    public OrderAction consume(Message message, ConsumeOrderContext context) {
                        System.out.println(message);
                        return OrderAction.Success;
                    }
                });

        consumer.start();
    }
}