Ordered messages, also known as first-in-first-out (FIFO) messages, are provided by Message Queue for Apache RocketMQ. Ordered messages are published and consumed in a strict order. This topic provides sample code to show you how to send and subscribe to ordered messages by using TCP client SDK for Java.

Prerequisites

The following operations are complete:
  • The SDK for Java 1.2.7 or later is downloaded. For information about the release notes of the SDK for Java, see Release notes.
  • An environment is set up. For more information, see Prepare the environment.
  • Optional. Logging settings are configured. For more information, see Logging settings.

Background information

Ordered messages are classified into the following types:

  • Globally ordered messages: All messages of a specified topic are published and consumed in first-in-first-out (FIFO) order.
  • Partitionally ordered messages: All messages of a specified topic are distributed to different partitions by using Sharding Keys. The messages in each partition are published and consumed in FIFO order. A Sharding Key is a key field that is used for ordered messages to identify different partitions. The Sharding Key is different from the key of a normal message.

For more information, see Ordered messages 2.0.

Note The first time you use Message Queue for Apache RocketMQ, we recommend that you refer to Demo project to build a Message Queue for Apache RocketMQ project before you send and subscribe to messages.

Send ordered messages

Notice The Message Queue for Apache RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer to concurrently send messages in a single thread. If the sender uses multiple producers or multiple threads to concurrently send messages, the message order is determined based on the order in which the messages are received by the Message Queue for Apache RocketMQ broker. This order may be different from the sending order on the business side.

For information about the sample code, see Message Queue for Apache RocketMQ code library.

Globally ordered messages and partitionally ordered messages are published in the same manner. The following sample code is provided:

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Properties;


public class ProducerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The ID of the group that you created in the Message Queue for Apache RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // The AccessKey ID that you created in the Resource Access Management (RAM) console for identity authentication. 
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // The AccessKey secret that you created in the RAM console for identity authentication. 
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        // Before you send a message, call the start() method to start the producer. You can call the start() method only once. 
        producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    // The topic of the message. 
                    "Order_global_topic",
                    // The message tag, which is similar to a Gmail tag. The message tag is used to sort messages and filter messages for the consumer on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                    "TagA",
                    // The message body in a binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and the consumer must agree on the serialization and deserialization methods. 
                    "send order global msg".getBytes()
            );
            // The key of the message. The key is the business-specific attribute of the message and must be globally unique. 
            // A unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
            // Note: Messages can be sent and received even if you do not specify the message key. 
            msg.setKey(orderId);
            // The key field that is used in ordered messages to identify different partitions. The Sharding Key is different from the key of a normal message. 
            // This field can be set to a non-empty string for globally ordered messages. 
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // Send the message. If no exception is thrown, the message is sent. 
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // Specify the logic to resend or persist the message if the message fails to be sent and needs to be re-sent. 
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // Before you exit the application, shut down the producer. 
        // Note: You do not need to shut down the producer. 
        producer.shutdown();
    }

}

Subscribe to ordered messages

Globally ordered messages and partitionally ordered messages are subscribed in the same manner. The following sample code is provided:

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;


public class ConsumerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The ID of the group that you created in the Message Queue for Apache RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // The AccessKey ID that you created in the RAM console for identity authentication. 
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // The AccessKey secret that you created in the RAM console for identity authentication. 
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
          // The timeout period in milliseconds before a retry is performed when the system fails to consume an ordered message. Value range: 10 to 30,000. 
        properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
        // The maximum number of retries when the message fails to be consumed. 
        properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");

        // Before you use the consumer to subscribe to a message, call the start() method only once to start the consumer. 
        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

        consumer.subscribe(
                // The topic of the message. 
                "Order_global_topic",
                // Subscribe to messages that contain specified tags in the specified topic.
                // 1. An asterisk (*) specifies that the consumer subscribes to all messages. 
                // 2. TagA || TagB || TagC specifies that the consumer subscribes to messages that contain TagA, TagB, or TagC. 
                "*",
                new MessageOrderListener() {
                    /**
                     * 1 If a message fails to be consumed or an exception occurs during message processing, OrderAction.Suspend is returned. 
                     * 2. If a message is processed, OrderAction.Success is returned. 
                     */
                    @Override
                    public OrderAction consume(Message message, ConsumeOrderContext context) {
                        System.out.println(message);
                        return OrderAction.Success;
                    }
                });

        consumer.start();
    }
}