ApsaraMQ for RocketMQ provides a distributed transaction processing feature that is similar to the eXtended Architecture (X/Open XA) protocol. ApsaraMQ for RocketMQ uses transactional messages to ensure transactional consistency.
The following sample code demonstrates how to send and receive transactional messages with the TCP client SDK for .NET.
Scheduled messages are supported in the Internet, China (Hangzhou), China (Beijing), China (Shanghai), and China (Shenzhen) regions.
How transactional messages work
A transactional message goes through three phases: half message delivery, local transaction execution, and final commit or rollback.
The producer sends a half message to the ApsaraMQ for RocketMQ broker. A half message is persisted by the broker but not yet delivered to consumers.
The broker acknowledges the half message. The producer then executes the local transaction.
Based on the local transaction result, the producer sends a commit or rollback status to the broker:
Commit: The broker marks the half message as deliverable. Consumers can now receive it.
Rollback: The broker discards the half message. Consumers never see it.
If the broker does not receive a commit or rollback -- for example, the producer crashes or returns an unknown status -- it periodically sends a transaction status check request to a producer in the producer cluster. The producer checks the local transaction state and reports the final status.

For more information, see Transactional messages.
Prerequisites
The SDK for .NET is downloaded. For more information, see Release notes.
The environment is prepared. For more information, see Prepare the .NET SDK environment.
The resources that you want to specify in the code are created in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.
The AccessKey pair of your Alibaba Cloud account is obtained. For more information, see Create an AccessKey pair.
Send transactional messages
Sending a transactional message requires two components:
| Component | Class | Purpose |
|---|---|---|
| Transaction executor | LocalTransactionExecuter | Runs the local transaction after the half message is sent successfully and returns an initial status. |
| Transaction checker | LocalTransactionChecker | Queries the local transaction state and reports it to the broker when the broker has not received a final commit or rollback. |
Step 1: Implement the transaction executor
The transaction executor runs immediately after the half message is sent. It executes the local transaction logic and returns one of three statuses:
| Status | Effect |
|---|---|
TransactionStatus.CommitTransaction | The transaction is committed. The consumer can receive the message. |
TransactionStatus.RollbackTransaction | The transaction is rolled back. The message is discarded. |
TransactionStatus.Unknow | The status is unknown. The broker will later invoke the transaction checker to resolve it. |
TransactionStatus.Unknow is the actual enum spelling in the SDK (without the trailing *n*).using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;
namespace ons
{
public class MyLocalTransactionExecuter : LocalTransactionExecuter
{
public MyLocalTransactionExecuter()
{
}
~MyLocalTransactionExecuter()
{
}
public override TransactionStatus execute(Message value)
{
Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser"));
// Two messages can have the same message body but different IDs.
// You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console.
// Use the message ID or a CRC32/MD5 hash of the message body
// to implement idempotent processing and prevent duplicate messages.
string msgId = value.getMsgID();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean isCommit = The execution result of the local transaction;
if (isCommit) {
// Local transaction succeeded -- commit the message.
transactionStatus = TransactionStatus.CommitTransaction;
} else {
// Local transaction failed -- roll back the message.
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
// Handle exceptions from the local transaction.
// Returning Unknow triggers the transaction checker later.
}
return transactionStatus;
}
}
}Step 2: Implement the transaction checker
The transaction checker handles status-check requests from the broker. If the broker does not receive a commit or rollback for a half message -- for example, the application exits or the executor returns TransactionStatus.Unknow -- it periodically queries a producer instance in the producer cluster.
The check method must:
Determine whether the local transaction corresponding to the half message was committed or rolled back.
Return the appropriate
TransactionStatusto the broker.
public class MyLocalTransactionChecker : LocalTransactionChecker
{
public MyLocalTransactionChecker()
{
}
~MyLocalTransactionChecker()
{
}
public override TransactionStatus check(Message value)
{
Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser"));
// Two messages can have the same message body but different IDs.
// You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console.
// Use the message ID or a CRC32/MD5 hash of the message body
// to look up the local transaction result.
// If your business logic is idempotent, the deduplication check is optional.
string msgId = value.getMsgID();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean isCommit = The execution result of the local transaction;
if (isCommit) {
// Local transaction succeeded -- commit the message.
transactionStatus = TransactionStatus.CommitTransaction;
} else {
// Local transaction failed -- roll back the message.
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
// Handle exceptions. Returning Unknow keeps the half message
// pending and the broker will check again later.
}
return transactionStatus;
}
}Step 3: Send a half message and run the transaction
Create a TransactionProducer with the transaction checker, then send a half message to trigger the transaction executor.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<tcp-endpoint> | TCP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console | http://MQ_INST_xxx.mq-internet-access.mq-internet.aliyuncs.com:80 |
<topic-name> | Topic created in the ApsaraMQ for RocketMQ console | TransactionTopic |
<message-body> | Content of the message | Hello RocketMQ |
class onscsharp
{
static void Main(string[] args)
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// TCP endpoint. Find this value in the TCP Endpoint section
// of the Instance Details page in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "<tcp-endpoint>");
// Topic created in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "<topic-name>");
// Message content.
factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "<message-body>");
// Make sure the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
// and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Create the transaction producer with the transaction checker.
LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
TransactionProducer pProducer = ONSFactory.getInstance().createTransactionProducer(factoryInfo, myChecker);
// Call start() once before sending messages.
// After the producer starts, you can send messages concurrently from multiple threads.
pProducer.start();
Message msg = new Message(
factoryInfo.getPublishTopics(), // Topic
"TagA", // Tag for message filtering
factoryInfo.getMessageContent() // Message body
);
// Set a message key for tracing. Use a business identifier
// (for example, an order ID) that is globally unique when possible.
// You can use this key to look up the message in the ApsaraMQ for RocketMQ console.
msg.setKey("ORDERID_100");
// Send the half message and run the local transaction.
try
{
LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
SendResultONS sendResult = pProducer.send(msg, myExecuter);
}
catch (ONSClientException e)
{
Console.WriteLine("\nexception of sendmsg:{0}", e.what());
}
// Shut down the producer before the application exits to release resources.
// A producer cannot be restarted after shutdown.
pProducer.shutdown();
}
}Subscribe to transactional messages
Consumers handle transactional messages the same way as normal messages. No special subscription logic is required.
For the subscription sample code, see Subscribe to messages.