Ordered messages, also known as first-in-first-out (FIFO) messages, are a type of message provided by Message Queue for Apache RocketMQ, which are delivered and consumed in a strict order. This topic provides the sample code for sending and subscribing to ordered messages through the .NET SDK over TCP.

Ordered messages are classified into the following types:

  • Globally ordered messages: All messages of the specified topic are delivered and consumed strictly in the FIFO order.
  • Partitionally ordered messages: All messages of the specified topic are partitioned by the shard key. Messages in one shard are published and consumed strictly in FIFO order. A shard key is a key field that is used in ordered messages to distinguish different shards. It is completely different from the key used in normal messages.

For more information, see Ordered messages.

Prerequisites

Send ordered messages

Note For more information about the sample code, see Message Queue for Apache RocketMQ code library.

The sample code for sending ordered messages is as follows:

using System;
using ons;

public class OrderProducerExampleForEx
{
    public OrderProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account according to the settings in the console.
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The group ID you created in the console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic you created in the console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create producer instances.
        // Note: Producer instances are thread-secure and can be used to send messages of different topics. Each thread
        // needs only one producer instance.
        OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);

        // Start the consumer instance.
        producer.start();

        // Create message objects.
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        string shardingKey = "App-Test";
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg, shardingKey);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Disable the producer instance when the thread is about to exit.
        producer.shutdown();

    }
}

Subscribe to ordered messages

The sample code for subscribing to ordered messages is as follows:

using System;
using System.Text;
using System.Threading;
using ons;

namespace demo
{

    public class MyMsgOrderListener : MessageOrderListener
    {
        public MyMsgOrderListener()
        {

        }

        ~MyMsgOrderListener()
        {
        }

        public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
        {
            Byte[] text = Encoding.Default.GetBytes(value.getBody());
            Console.WriteLine(Encoding.UTF8.GetString(text));
            return ons.OrderAction.Success;
        }
    }

    class OrderConsumerExampleForEx
    {
        static void Main(string[] args)
        {
            // Configure your account according to the settings in the console.
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
            factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
            // The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
            factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
            // The group ID you created in the console.
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
            // The topic you created in the console.
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
            // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
            // The log path.
            factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

            // Create consumer instances.
            OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);

            // Subscribe to topics.
            consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());

            // Start the consumer instance.
            consumer.start();

            // Enable the main thread to sleep for a period of time.
            Thread.Sleep(30000);

            // Disable the consumer instance when it is no longer used.
            consumer.shutdown();
        }
    }
}