Ordered messages, which are also known as first in, first out (FIFO) messages, are provided by Message Queue for Apache RocketMQ. The messages are published and consumed in a strict order. This topic provides sample code that shows how to use the HTTP client SDK for C# to send and subscribe to ordered messages.

Background information

Ordered messages are classified into the following types:
  • Globally ordered message: All messages in a specified topic are published and consumed in first-in-first-out (FIFO) order.
  • Partitionally ordered message: All messages in a specified topic are distributed to different partitions by using shard keys. The messages in each partition are published and consumed in FIFO order. A shard key is a key field that is used for ordered messages to identify different partitions. The shard key is different from the key of a normal message.

For more information, see Ordered messages.

Prerequisites

The following operations are performed:

  • Install the SDK for C#. For more information, see Prepare the environment.
  • Create resources that you want to specify in the code. For example, you must create the instance, topic, and group that you want to specify in the code in the Message Queue for Apache RocketMQ console in advance. For more information, see Create resources.

Send ordered messages

The following sample code shows how to send ordered messages:

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class OrderProducerSample
    {
        // The HTTP endpoint. To obtain the HTTP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instance Details page, scroll to the Basic Information section and view the HTTP endpoint on the Endpoints tab. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // The AccessKey ID that you created in the Resource Access Management (RAM) console. The AccessKey ID is used for identity authentication. 
        private const string _accessKeyId = "${ACCESS_KEY}";
        // The AccessKey secret that you created in the RAM console. The AccessKey secret is used for identity authentication. 
        private const string _secretAccessKey = "${SECRET_KEY}";
        // The topic of the message. The topic is created in the Message Queue for Apache RocketMQ console. 
        private const string _topicName = "${TOPIC}";
        // The ID of the instance to which the topic belongs. The instance is created in the Message Queue for Apache RocketMQ console. 
        // If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instance ID to null or an empty string. You can check whether your instance has a namespace on the Instance Details page in the Message Queue for Apache RocketMQ console. 
        private const string _instanceId = "${INSTANCE_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                // Cyclically send eight messages. 
                for (int i = 0; i < 8; i++)
                {
                    // The content and tag of the message. 
                    TopicMessage sendMsg = new TopicMessage("hello mq", "tag");
                    // The custom attributes of the message. 
                    sendMsg.PutProperty("a", i.ToString());
                    // The Sharding Key that is used to distribute ordered messages to a specific partition. The Sharding Key is used to identify a partition. A Sharding Key is different from a message key. 
                    sendMsg.ShardingKey = (i % 2).ToString();
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}

Subscribe to ordered messages

The following sample code shows how to subscribe to ordered messages:

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
    public class OrderConsumerSample
{
        // The HTTP endpoint. To obtain the HTTP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. The endpoint is displayed on the Endpoints tab of the Instance Details page. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // The AccessKey ID that you created in the RAM console. The AccessKey ID is used for identity authentication. 
        private const string _accessKeyId = "${ACCESS_KEY}";
        // The AccessKey secret that you created in the RAM console. The AccessKey secret is used for identity authentication. 
        private const string _secretAccessKey = "${SECRET_KEY}";
        // The topic of the message. The topic is created in the Message Queue for Apache RocketMQ console. 
        private const string _topicName = "${TOPIC}";
        // The ID of the group that you created in the Message Queue for Apache RocketMQ console. 
        private const string _groupId = "${GROUP_ID}";
        // The ID of the instance to which the topic belongs. The instance is created in the Message Queue for Apache RocketMQ console. 
        // If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instance ID to null or an empty string. You can check whether your instance has a namespace on the Instance Details page in the Message Queue for Apache RocketMQ console. 
private const string _instanceId = "${INSTANCE_ID}";
        
        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages. 
            while (true)
            {
                try
                {
                    // Consume messages in long polling mode. The consumer may pull partitionally ordered messages from multiple partitions. The consumer consumes messages from the same partition in the order in which the messages are sent. 
                    // A consumer pulls partitionally ordered messages from a partition. If the broker does not receive the acknowledgment (ACK) for a message, the consumer consumes the message again. 
                    // The consumer can consume the next batch of messages from a partition only after all messages that are pulled from the partition in the previous batch are acknowledged when they are consumed. 
                    // In long polling mode, if no message is available for consumption in the topic, requests are suspended on the broker for a specified period of time. If a message becomes available for consumption during this time period, the broker immediately sends a response to the consumer. In this example, the time period is set to 3 seconds. 
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessageOrderly(
                            3, // The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value of this parameter is 16. 
                            3 // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
                    // Specify the message consumption logic. 
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // If the broker does not receive an acknowledgment (ACK) from the consumer before the period of time specified by the Message.nextConsumeTime parameter elapses, the message is consumed again. 
                    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // The broker may fail to receive an ACK for a message from the consumer if the handle of the message times out. 
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}