Ordered messages, also known as first-in-first-out (FIFO) messages, are provided by Message Queue for Apache RocketMQ. Ordered messages are published and consumed in a strict order. This topic provides sample code to show you how to send and subscribe to ordered messages by using TCP client SDK for .NET.

Classification of ordered messages

Ordered messages are classified into the following types:

  • Globally ordered messages: All messages of a specified topic are published and consumed in first-in-first-out (FIFO) order.
  • Partitionally ordered messages: All messages of a specified topic are distributed to different partitions by using Sharding Keys. The messages in each partition are published and consumed in FIFO order. A Sharding Key is a key field that is used for ordered messages to identify different partitions. The Sharding Key is different from the key of a normal message.

For more information, see Ordered messages 2.0.

Prerequisites

Send ordered messages

Notice The Message Queue for Apache RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer to concurrently send messages in a single thread. If the sender uses multiple producers or multiple threads to concurrently send messages, the message order is determined based on the order in which the messages are received by the Message Queue for Apache RocketMQ broker. This order may be different from the sending order on the business side.

For information about the sample code, see Message Queue for Apache RocketMQ code library.

The following sample code shows how to send ordered messages:

using System;
using ons;

public class OrderProducerExampleForEx
{
    public OrderProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account. You can obtain the following settings in the console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The ID of the group that you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // Specify the log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create a producer instance. 
        // Note: A producer instance is thread-safe and can be used to send messages to different topics. In most cases, each thread requires only one producer instance. 
        OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);

        // Start the producer instance. 
        producer.start();

        // Create a message. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        string shardingKey = "App-Test";
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg, shardingKey);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, shut down the producer instance. 
        producer.shutdown();

    }
}

Subscribe to ordered messages

The following sample code shows how to subscribe to ordered messages:

using System;
using System.Text;
using System.Threading;
using ons;

namespace demo
{

    public class MyMsgOrderListener : MessageOrderListener
    {
        public MyMsgOrderListener()
        {

        }

        ~MyMsgOrderListener()
        {
        }

        public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
        {
            Byte[] text = Encoding.Default.GetBytes(value.getBody());
            Console.WriteLine(Encoding.UTF8.GetString(text));
            return ons.OrderAction.Success;
        }
    }

    class OrderConsumerExampleForEx
    {
        static void Main(string[] args)
        {
            // Configure your account. You can obtain the following settings in the console. 
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
            // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
            // The ID of the group that you created in the Message Queue for Apache RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
            // The topic you created in the Message Queue for Apache RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
            // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
            // Specify the log path. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

            // Create a consumer instance. 
            OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);

            // Subscribe to a topic. 
            consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());

            // Start the consumer instance. 
            consumer.start();

            // Enable the main thread to sleep for a period of time. 
            Thread.Sleep(30000);

            // Shut down the consumer instance when the consumer instance is no longer used. 
            consumer.shutdown();
        }
    }
}