All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and subscribe to ordered messages

Last Updated:Jan 08, 2024

Ordered messages, also known as first-in-first-out (FIFO) messages, are provided by ApsaraMQ for RocketMQ. Ordered messages are published and consumed in a strict order. This topic provides sample code to show you how to send and subscribe to ordered messages by using TCP client SDK for Java.

Prerequisites

The following operations are complete:

  • The SDK for Java 1.2.7 or later is downloaded. For information about the release notes of the SDK for Java, see Release notes.

  • An environment is set up. For more information, see Prepare the environment.

  • Optional. Logging settings are configured. For more information, see Logging settings.

Background information

Ordered messages are classified into the following types:

  • Globally ordered messages: All messages of a specified topic are published and consumed in first-in-first-out (FIFO) order.

  • Partitionally ordered messages: All messages of a specified topic are distributed to different partitions by using sharding keys. The messages in each partition are published and consumed in FIFO order. A sharding key is a key field that is used for ordered messages to identify different partitions. A sharding key is different from the key of a normal message.

For more information, see Ordered messages.

Note

The first time you use ApsaraMQ forRocketMQ, we recommend that you refer to Demo project to build a ApsaraMQ for RocketMQ project before you send and subscribe to messages.

Send ordered messages

Important

An ApsaraMQ for RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.

For information about the sample code, see ApsaraMQ for RocketMQ code library.

Globally ordered messages and partitionally ordered messages are published in the same manner. The following sample code is provided:

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Properties;


public class ProducerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // An AccessKey ID is used as the identifier for identity authentication. 
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // An AccessKey secret is used as the password for identity authentication. 
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        // Before you send a message, call the start() method only once to start the producer. 
        producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    // The topic to which the message belongs. 
                    "Order_global_topic",
                    // The message tag. A message tag is similar to a Gmail tag and is used to sort and filter messages on the ApsaraMQ for RocketMQ broker. 
                    "TagA",
                    // The message body. A message body is data in binary format. ApsaraMQ for RocketMQ does not process the message body. The producer and the consumer must agree on the serialization and deserialization methods. 
                    "send order global msg".getBytes()
            );
            // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. 
            // A key helps you query and resend a message in the ApsaraMQ for RocketMQ console if the message fails to be received. 
            // Note: You can send and receive messages even if you do not specify the message key. 
            msg.setKey(orderId);
            // The key field that is used in ordered messages to identify different partitions. A sharding Key is different from the key of a normal message. 
            // This field can be set to a non-empty string for globally ordered messages. 
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // Send the message. If no exception is thrown, the message is sent. 
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // Specify the logic to resend or persist the message if the message fails to be sent and needs to be re-sent. 
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // Before you exit the application, shut down the producer. 
        // Note: Memory can be saved if you shut down a producer. If you need to frequently send messages, do not need to shut down a producer. 
        producer.shutdown();
    }

}

Subscribe to ordered messages

Globally ordered messages and partitionally ordered messages are subscribed in the same manner. The following sample code is provided:

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;


public class ConsumerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // An AccessKey ID is used as the identifier for identity authentication. 
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // An AccessKey secret is used as the password for identity authentication.
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
          // The timeout period in milliseconds before a retry is performed when the system fails to consume an ordered message. Value range: 10 to 30,000. 
        properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
        // The maximum number of retries for the message if the message fails to be consumed. 
        properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");

        // Before you use the consumer to subscribe to a message, call the start() method only once to start the consumer. 
        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

        consumer.subscribe(
                // The topic to which the message belongs. 
                "Order_global_topic",
                // Subscribe to messages that contain specified tags in the specified topic.
                // 1. An asterisk (*) specifies that the consumer subscribes to all messages. 
                // 2. TagA || TagB || TagC specifies that the consumer subscribes to messages that contain TagA, TagB, or TagC. 
                "*",
                new MessageOrderListener() {
                    /**
                     * 1 If a message fails to be consumed or an exception occurs during message processing, OrderAction.Suspend is returned. 
                     * 2. If a message is processed, OrderAction.Success is returned. 
                     */
                    @Override
                    public OrderAction consume(Message message, ConsumeOrderContext context) {
                        System.out.println(message);
                        return OrderAction.Success;
                    }
                });

        consumer.start();
    }
}