All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive ordered messages

Last Updated:Aug 21, 2023

Ordered messages are a type of message provided by ApsaraMQ for RocketMQ. Ordered messages are consumed in strict first-in-first-out (FIFO) order. This topic provides sample code on how to send and subscribe to ordered messages by using the TCP client SDK for .NET.

Classification of ordered messages

Ordered messages are classified into the following types:

  • Globally ordered message: All messages in the specified topic are published and consumed in strict FIFO order.

  • Partitionally ordered messages: All messages in the specified topic are distributed to different partitions by using sharding keys. The messages in each partition are consumed in strict FIFO order. A sharding key is a key field that is used for ordered messages to identify different partitions. Sharding keys are different from the keys of normal messages.

For more information, see Ordered messages.

Prerequisites

  • The SDK for .NET is downloaded. For more information, see Release notes.

  • The environment is prepared. For more information, see Prepare the environment.

  • The resources that you want to specify in the code are created in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.

  • The AccessKey pair of your Alibaba Cloud account is obtained. For more information, see Create an AccessKey pair.

Send ordered messages

Important

An ApsaraMQ for RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.

For information about the sample code, see the ApsaraMQ for RocketMQ code repository.

The following sample code provides an example on how to send ordered messages by using the TPC client SDK for .NET:

using System;
using ons;

public class OrderProducerExampleForEx
{
    public OrderProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account. You can obtain the account information in the Alibaba Cloud Management Console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	      // The AccessKey secret that is used for authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint. You can obtain the endpoint in the TCP endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create the producer instance. 
        // Note: Producer instances are thread-safe and can be used to send messages from different topics. In most cases, each thread requires only one producer instance. 
        OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);

        // Start the producer instance. 
        producer.start();

        // Create the message. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        string shardingKey = "App-Test";
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg, shardingKey);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, terminate the producer instance. 
        producer.shutdown();

    }
}

Subscribe to ordered messages

The following sample code provides an example on how to subscribe to ordered messages by using the TCP client SDK for .NET:

using System;
using System.Text;
using System.Threading;
using ons;

namespace demo
{

    public class MyMsgOrderListener : MessageOrderListener
    {
        public MyMsgOrderListener()
        {

        }

        ~MyMsgOrderListener()
        {
        }

        public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
        {
            Byte[] text = Encoding.Default.GetBytes(value.getBody());
            Console.WriteLine(Encoding.UTF8.GetString(text));
            return ons.OrderAction.Success;
        }
    }

    class OrderConsumerExampleForEx
    {
        static void Main(string[] args)
        {
            // Configure your account. You can obtain the account information in the Alibaba Cloud Management Console. 
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // The AccessKey ID that is used for authentication.  
            factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
            // The AccessKey secret that is used for authentication.  
            factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
            // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
            // The topic that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
            // The TCP endpoint. You can obtain the endpoint in the TCP endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
            // The log path. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

            // Create the consumer instance. 
            OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);

            // Subscribe to the topic. 
            consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());

            // Start the consumer instance. 
            consumer.start();

            // Put the main thread to sleep for a period of time. 
            Thread.Sleep(30000);

            // If the consumer instance is no longer required, terminate the instance. 
            consumer.shutdown();
        }
    }
}