ApsaraMQ for RocketMQ 5.x instances support RocketMQ 1.x and 2.x SDK for .NET. The following examples demonstrate how to send and receive normal, ordered, scheduled/delayed, and transactional messages.
The latest RocketMQ 5.x SDKs are fully compatible with 5.x brokers and offer more features. For more information, see Release notes.
Alibaba Cloud maintains RocketMQ 3.x, 4.x, and TCP client SDKs for existing workloads only.
Prerequisites
Before you run the sample code, prepare your environment. For more information, see Prepare the environment.
Common configuration
All examples configure ONSFactoryProperty with the following parameters. Replace the placeholders with your actual values.
| Placeholder | Description | Example |
|---|---|---|
<your-instance-username> | Instance username from the Intelligent Authentication tab on the Access Control page | -- |
<your-instance-password> | Instance password from the Intelligent Authentication tab on the Access Control page | -- |
<your-group-id> | Group ID created in the ApsaraMQ for RocketMQ console | GID_example |
<your-topic> | Topic created in the ApsaraMQ for RocketMQ console | T_example_topic_name |
<your-endpoint> | Endpoint from the ApsaraMQ for RocketMQ console (domain and port only -- no http:// or https:// prefix, no resolved IP address) | rmq-cn-XXXX.rmq.aliyuncs.com:8080 |
Use the instance username and password as AccessKey and SecretKey, not your Alibaba Cloud account AccessKey pair.
Do not specify the instance ID when you access a 5.x instance with the 1.x/2.x .NET SDK. Specifying the instance ID causes the connection to fail.
If the client runs on an ECS instance inside a VPC, the broker obtains credentials automatically. Skip the AccessKey and SecretKey configuration.
For serverless instances accessed over the internet, always specify the username and password. If you enable authentication-free in VPCs for the serverless instance and connect from within a VPC, skip the credentials.
Normal messages
Send normal messages
using System;
using ons;
public class ProducerExampleForEx
{
public ProducerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// Instance username and password from the Intelligent Authentication tab
// of the Access Control page in the ApsaraMQ for RocketMQ console.
// Do not use your Alibaba Cloud account AccessKey pair.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "<your-instance-username>");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "<your-instance-password>");
// Do not specify the instance ID for 5.x instances.
// Group ID created in the console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "<your-group-id>");
// Topic created in the console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "<your-topic>");
// Endpoint from the console. Use domain:port only -- no http:// prefix, no resolved IP.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "<your-endpoint>");
// Log path.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Create a producer. Producers are thread-safe; in most cases, each thread requires only one instance.
Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);
// Start the producer.
producer.start();
// Create and send messages.
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
msg.setKey(Guid.NewGuid().ToString());
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// Shut down the producer before exiting the thread.
producer.shutdown();
}
}Subscribe to normal messages
using System;
using System.Threading;
using System.Text;
using ons;
// Callback invoked when a message arrives from the broker.
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// Instance credentials from the Intelligent Authentication tab
// of the Access Control page. Do not use Alibaba Cloud account keys.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "<your-instance-username>");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "<your-instance-password>");
// Do not specify the instance ID for 5.x instances.
// Consumer group ID created in the console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "<your-group-id>");
// Topic created in the console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "<your-topic>");
// TCP endpoint from the Instance Details page in the console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "<your-endpoint>");
// Log path.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Consumption mode: clustering (default) or broadcasting.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// Create a push consumer.
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// Subscribe to the topic with a wildcard tag filter.
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// Start the consumer.
consumer.start();
// Demo only. In production, keep the process running.
Thread.Sleep(300000);
// Shut down the consumer before exiting the process.
consumer.shutdown();
}
}Ordered messages
Ordered messages guarantee FIFO delivery for messages that share the same sharding key.
Send ordered messages
The key difference from normal messages: use OrderProducer and pass a shardingKey to send(). Messages with the same sharding key are delivered in order.
using System;
using ons;
public class OrderProducerExampleForEx
{
public OrderProducerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// Instance credentials.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "<your-instance-username>");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "<your-instance-password>");
// Do not specify the instance ID for 5.x instances.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "<your-group-id>");
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "<your-topic>");
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "<your-endpoint>");
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Create an order producer (not a regular Producer).
OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);
producer.start();
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
// Messages with the same sharding key are delivered in order.
string shardingKey = "App-Test";
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg, shardingKey);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
producer.shutdown();
}
}Subscribe to ordered messages
Use OrderConsumer and implement MessageOrderListener. Return OrderAction.Success on success or OrderAction.Suspend to retry.
using System;
using System.Text;
using System.Threading;
using ons;
namespace demo
{
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.OrderAction.Success;
}
}
class OrderConsumerExampleForEx
{
static void Main(string[] args)
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// Instance credentials.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "<your-instance-username>");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "<your-instance-password>");
// Do not specify the instance ID for 5.x instances.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "<your-group-id>");
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "<your-topic>");
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "<your-endpoint>");
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Create an order consumer (not a regular PushConsumer).
OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);
consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());
consumer.start();
// Demo only. In production, keep the process running.
Thread.Sleep(30000);
consumer.shutdown();
}
}
}Scheduled and delayed messages
Scheduled and delayed messages are delivered after a specified timestamp. Set the delivery time with setStartDeliverTime().
Send scheduled or delayed messages
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;
namespace ons
{
class onscsharp
{
static void Main(string[] args)
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "<your-group-id>");
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "<your-endpoint>");
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "<your-topic>");
factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "<your-message-content>");
// Instance credentials.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "<your-instance-username>");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "<your-instance-password>");
// Do not specify the instance ID for 5.x instances.
Producer pProducer = ONSFactory.getInstance().createProducer(factoryInfo);
// Call start() once before sending messages.
pProducer.start();
Message msg = new Message(
factoryInfo.getPublishTopics(),
"TagA",
factoryInfo.getMessageContent()
);
// Optional: set a business key for message tracing in the console.
msg.setKey("ORDERID_100");
// Set the delivery time in milliseconds. The message is delivered after this timestamp.
// In this example, the message is delivered 3 seconds from now.
long deliverTime = System.currentTimeMillis() + 3000;
msg.setStartDeliverTime(deliverTime);
try
{
SendResultONS sendResult = pProducer.send(msg);
}
catch(ONSClientException e)
{
// Handle the send failure.
}
// Shut down the producer before exiting. Skipping shutdown may cause memory leaks.
pProducer.shutdown();
}
}
}Subscribe to scheduled or delayed messages
Subscribe to scheduled and delayed messages the same way as normal messages. For more information, see Subscribe to normal messages.
Transactional messages
Transactional messages use a two-phase commit: send a half message with a local transaction executor, then confirm the transaction status with a checker. The broker calls the checker if it does not receive a commit or rollback within the timeout.
Send transactional messages
Step 1: Send a half message and run the local transaction
Implement LocalTransactionExecuter to run your business logic after the half message is sent. Return CommitTransaction, RollbackTransaction, or Unknow based on the outcome.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;
namespace ons
{
public class MyLocalTransactionExecuter : LocalTransactionExecuter
{
public MyLocalTransactionExecuter()
{
}
~MyLocalTransactionExecuter()
{
}
public override TransactionStatus execute(Message value)
{
Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
// Use the message ID and a CRC32/MD5 hash of the body to deduplicate messages.
string msgId = value.getMsgID();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean isCommit = Execution result of the local transaction;
if (isCommit) {
// Commit the message if the local transaction succeeds.
transactionStatus = TransactionStatus.CommitTransaction;
} else {
// Roll back the message if the local transaction fails.
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
// Handle the exception.
}
return transactionStatus;
}
}
class onscsharp
{
static void Main(string[] args)
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "<your-endpoint>");
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "<your-topic>");
factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "<your-message-content>");
// Instance credentials.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "<your-instance-username>");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "<your-instance-password>");
// Do not specify the instance ID for 5.x instances.
// Create a transaction producer with a transaction checker.
LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker);
// Call start() once. After startup, send messages concurrently from multiple threads.
pProducer.start();
Message msg = new Message(
factoryInfo.getPublishTopics(),
"TagA",
factoryInfo.getMessageContent()
);
// Optional: set a business key for message tracing in the console.
msg.setKey("ORDERID_100");
try
{
LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
SendResultONS sendResult = pProducer.send(msg, ref myExecuter);
}
catch(ONSClientException e)
{
Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
}
// Shut down the producer before exiting. The producer cannot be restarted after shutdown.
pProducer.shutdown();
}
}
}Step 2: Implement the transaction status checker
The broker invokes LocalTransactionChecker.check() to verify the transaction status when it has not received a commit or rollback. Implement idempotent logic to handle repeated check calls.
public class MyLocalTransactionChecker : LocalTransactionChecker
{
public MyLocalTransactionChecker()
{
}
~MyLocalTransactionChecker()
{
}
public override TransactionStatus check(Message value)
{
Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
// Use the message ID and a CRC32/MD5 hash of the body to deduplicate.
string msgId = value.getMsgID();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean isCommit = Execution result of the local transaction;
if (isCommit) {
// Commit the message if the local transaction succeeded.
transactionStatus = TransactionStatus.CommitTransaction;
} else {
// Roll back the message if the local transaction failed.
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
// Handle the exception.
}
return transactionStatus;
}
}Subscribe to transactional messages
Subscribe to transactional messages the same way as normal messages. For more information, see Subscribe to normal messages.