Ordered messages, also known as first-in-first-out (FIFO) messages, are a type of message provided by Message Queue for Apache RocketMQ, which are delivered and consumed in a strict order. This topic provides the sample code for sending and subscribing to ordered messages through the Java SDK over TCP.

Prerequisites

You have completed the following operations:

Background information

Ordered messages are classified into the following types:

  • Globally ordered messages: All messages of the specified topic are delivered and consumed strictly in the FIFO order.
  • Partitionally ordered messages: All messages of the specified topic are partitioned by the shard key. Messages in one shard are published and consumed strictly in FIFO order. A shard key is a key field that is used in ordered messages to distinguish different shards. It is completely different from the key used in normal messages.

For more information, see Ordered messages.

Note For new users, we recommend that you read the Demo project (TCP) Demo Project (TCP) to learn how to build a Message Queue for Apache RocketMQ project before sending and subscribing to messages.

Send ordered messages

For more information about the sample code, see Message Queue for Apache RocketMQ code library.

Globally ordered messages and partitionally ordered messages are delivered in the same way. An example of the code is as follows:

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Properties;


public class ProducerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The group ID you created in the console.
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
          "XXX");
        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        // Before sending a message, call the start method once to start the producer.
        producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(//
                    // The topic of the message.
                    "Order_global_topic",
                    // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on the specified criteria.
                    "TagA",
                    // The message body in any binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must negotiate consistent serialization and deserialization methods.
                    "send order global msg".getBytes()
            );
            // The message key, which must be globally unique.
            // A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
            // Note: Messages can still be sent and received even if this attribute is not set.
            msg.setKey(orderId);
            // The key field that identifies the shard of partitionally ordered messages. This shard key is different from the key of normal messages.
            // This field can be set to any non-empty string for globally ordered messages.
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // The message sending result, which is successful if no exception occurs.
                if (sendResult ! = null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // The message failed to be sent and must be resent. The system can resend the message or store message data persistently.
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // Destroy the producer object before exiting the application.
        // Note: You can choose not to destroy the producer object.
        producer.shutdown();
    }

}

Subscribe to ordered messages

Globally ordered messages and partitionally ordered messages are subscribed in the same way. The sample code is as follows:

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;


public class ConsumerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The group ID you created in the console.
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
          "XXX");
          // The delay time in milliseconds before the retry upon an ordered message consumption failure. Value range: 10 to 30000.
        properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
        // The maximum number of retries upon a message consumption failure.
        properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");

        // Before message subscription, call the start method once to start the consumer.
        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

        consumer.subscribe(
                // The topic of the message.
                "Jodie_Order_Topic",
                // Subscribe to message tags under the specified topic.
                // 1. * indicates the subscription to all messages.
                // 2. TagA || TagB || TagC indicates the subscription to messages with TagA, TagB, or TagC.
                "*",
                new MessageOrderListener() {
                    /**
                     * 1. OrderAction.Suspend is returned if a message fails to be consumed or an exception occurs during message processing.<br>
                     * 2. OrderAction.Success is returned if a message is processed.
                     */
                    @Override
                    public OrderAction consume(Message message, ConsumeOrderContext context) {
                        System.out.println(message);
                        return OrderAction.Success;
                    }
                });

        consumer.start();
    }
}