Ordered messages in ApsaraMQ for RocketMQ are consumed in strict first-in-first-out (FIFO) order. This topic provides sample code for sending and receiving ordered messages with the TCP client SDK for .NET.
Ordering modes
ApsaraMQ for RocketMQ supports two ordering modes:
Globally ordered messages: All messages in a topic are published and consumed in strict FIFO order.
Partitionally ordered messages: Messages in a topic are distributed across partitions by sharding key. Within each partition, messages are consumed in strict FIFO order.
A sharding key is a key field that routes ordered messages to a specific partition. It is different from the keys of normal messages.
The ApsaraMQ for RocketMQ broker determines message order based on the sequence in which a single producer or thread sends messages. If multiple producers or threads send messages concurrently, the broker orders messages by arrival time, which may differ from the intended business order. To guarantee ordering, send all related messages from a single producer or thread.
For more information, see Ordered messages.
Prerequisites
Before you begin, make sure that you have:
Downloaded the SDK for .NET. For more information, see Release notes
Prepared the environment. For more information, see Prepare the environment
Created instances, topics, and consumer groups in the ApsaraMQ for RocketMQ console. For more information, see Create resources
Obtained an AccessKey pair for your Alibaba Cloud account. For more information, see Create an AccessKey pair
Send ordered messages
The following sample code sends ordered messages using the TCP client SDK for .NET. All messages share the same sharding key, so they are routed to the same partition and consumed in send order.
For the complete code repository, see ApsaraMQ for RocketMQ code repository.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-group-id> | Producer group ID created in the console | GID_example |
<your-topic> | Topic created in the console | T_example_topic_name |
<your-tcp-endpoint> | TCP endpoint from the Instance Details page in the console | NameSrv_Addr |
<your-log-path> | Local path for SDK logs | C://log |
<your-sharding-key> | Sharding key that determines partition routing | App-Test |
using System;
using ons;
public class OrderProducerExampleForEx
{
public OrderProducerExampleForEx()
{
}
static void Main(string[] args) {
// Configure the producer properties.
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
// and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// AccessKey ID for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// AccessKey secret for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Producer group ID created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "<your-group-id>");
// Topic created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "<your-topic>");
// TCP endpoint from the Instance Details page in the console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "<your-tcp-endpoint>");
// Local path for SDK logs.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "<your-log-path>");
// Create a producer instance.
// Producer instances are thread-safe and can send messages to different topics.
// In most cases, one producer instance per thread is sufficient.
OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);
// Start the producer.
producer.start();
// Create a message with topic, tag, and body.
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
// Define the sharding key. Messages with the same sharding key
// are sent to the same partition and consumed in order.
string shardingKey = "<your-sharding-key>";
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg, shardingKey);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// Shut down the producer before exiting the thread.
producer.shutdown();
}
}Receive ordered messages
The following sample code receives ordered messages using the TCP client SDK for .NET. The consumer processes messages in FIFO order within each partition.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-group-id> | Consumer group ID created in the console | GID_example |
<your-topic> | Topic created in the console | T_example_topic_name |
<your-tcp-endpoint> | TCP endpoint from the Instance Details page in the console | NameSrv_Addr |
<your-log-path> | Local path for SDK logs | C://log |
using System;
using System.Text;
using System.Threading;
using ons;
namespace demo
{
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.OrderAction.Success;
}
}
class OrderConsumerExampleForEx
{
static void Main(string[] args)
{
// Configure the consumer properties.
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// AccessKey ID for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// AccessKey secret for authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "<your-group-id>");
// Topic created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "<your-topic>");
// TCP endpoint from the Instance Details page in the console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "<your-tcp-endpoint>");
// Local path for SDK logs.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "<your-log-path>");
// Create a consumer instance.
OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);
// Subscribe to the topic with a wildcard tag filter.
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgOrderListener());
// Start the consumer.
consumer.start();
// Keep the main thread alive to continue receiving messages.
Thread.Sleep(30000);
// Shut down the consumer when it is no longer needed.
consumer.shutdown();
}
}
}What to do next
Learn more about ordered message concepts: Ordered messages
Browse additional examples in the ApsaraMQ for RocketMQ code repository