This topic provides the sample code for sending and subscribing to transactional messages through the .NET SDK over TCP. The currently supported regions include the Internet Region, China (Hangzhou), China (Beijing), China (Shanghai), and China (Shenzhen).

Message Queue for Apache RocketMQ provides a distributed transaction processing function similar to X/Open XA to ensure transaction consistency in Message Queue for Apache RocketMQ.

Interaction process

The following figure shows the interaction process of transactional messages.Transactional message interaction flowchart

For more information, see Transactional messages.

Prerequisites

Send transactional messages

Perform the following steps to send a transactional message:
  1. Send a half message and execute a local transaction. The sample code is as follows:
     using System;
     using System.Collections.Generic;
     using System.Linq;
     using System.Text;
     using System.Runtime.InteropServices;
     using ons;
    
     namespace ons
     {
     public class MyLocalTransactionExecuter : LocalTransactionExecuter
     {
         public MyLocalTransactionExecuter()
         {
         }
    
         ~MyLocalTransactionExecuter()
         {
         }
         public override TransactionStatus execute(Message value)
         {
                 Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                 value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
    
                 // The ID of the message. Two messages can have the same message body but different message IDs. Currently, message IDs cannot be retrieved in the console.
                 string msgId = value.getMsgID();
                 // Calculate the message body by using CRC32 or other algorithms, such as MD5.
                 // The message ID and CRC32 ID are used to prevent duplicated messages.
                 // To prevent duplicated messages, calculate the message body by using the CRC32 or MD5 algorithm.
    
                 TransactionStatus transactionStatus = TransactionStatus.Unknow;
                 try {
                     boolean isCommit = Execution result of the local transaction
                     if (isCommit) {
                         // Commit the message if the local transaction succeeds.
                         transactionStatus = TransactionStatus.CommitTransaction;
                     } else {
                         // Roll back the message if the local transaction fails.
                         transactionStatus = TransactionStatus.RollbackTransaction;
                     }
                 } catch (Exception e) {
                     // Handle exceptions.
                 }
                 return transactionStatus;
         }
     }
     class onscsharp
     {
    
         static void Main(string[] args)
         {
             ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
             factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX");//The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
             factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "");//The group ID you created in the console.
             factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "");// The topic you created in the console.
             factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "");//The message body.
             factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "");//The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
              factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "");//The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
    
             //create transaction producer       
             ONSFactory onsfactory = new ONSFactory();
             LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
             TransactionProducer pProducer = onsfactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker);
    
             // Before sending a message, call the start method once to start the producer. After the producer is started, messages can be concurrently sent in multiple threads.
             pProducer.start();
    
                 Message msg = new Message(
                 // The message topic.
                 factoryInfo.getPublishTopics(),
                 // The message tag.
                 "TagA",
                 // The message body.
                 factoryInfo.getMessageContent()
             );
    
             // The message key, which must be globally unique.
             // A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
             // Note: Messages can still be sent and received even if this attribute is not set.
             msg.setKey("ORDERID_100");
    
             // The message sending result, which is successful if no exception occurs.
             try
             {
                 LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
                 SendResultONS sendResult = pProducer.send(msg, ref myExecuter);
             }
             catch(ONSClientException e)
             {
                 Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
             }
    
             // Destroy the producer object before exiting the application. Otherwise, memory leakage may occur.
             // The producer cannot be started again after shutdown.
             pProducer.shutdown();
         }
     }
     }
  2. Commit the status of the transactional message.
    After the execution of a local transaction, which can be successful or failed, the broker must be notified of the transaction status of the current message. The following notification modes are supported:
    • Commit the status after executing the local transaction.
    • Wait until the broker requests to check the transaction status of the message.
    A transaction can be in one of the following states:
    • TransactionStatus.CommitTransaction: The transaction is submitted, and the consumer can consume the message.
    • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.
    • TransactionStatus.Unknow: The transaction is in an unknown state, and the Message Queue for Apache RocketMQ broker is expected to query the status of the local transaction that corresponds to the message from the message sender.
    public class MyLocalTransactionChecker : LocalTransactionChecker
    {
        public MyLocalTransactionChecker()
        {
        }
        ~MyLocalTransactionChecker()
        {
        }
        public override TransactionStatus check(Message value)
        {
                Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
                // The ID of the message. Two messages can have the same message body but different message IDs. Currently, message IDs cannot be retrieved in the console.
                string msgId = value.getMsgID();
                // Calculate the message body by using CRC32 or other algorithms, such as MD5.
                // The message ID and CRC32 ID are used to prevent duplicated messages.
                // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
                // To prevent duplicated messages, calculate the message body by using the CRC32 or MD5 algorithm.
                TransactionStatus transactionStatus = TransactionStatus.Unknow;
                try {
                    boolean isCommit = Execution result of the local transaction
                    if (isCommit) {
                        // Submit the message if the local transaction succeeds.
                        transactionStatus = TransactionStatus.CommitTransaction;
                    } else {
                        // Roll back the message if the local transaction fails.
                        transactionStatus = TransactionStatus.RollbackTransaction;
                    }
                } catch (Exception e) {
                    // Handle exceptions.
                }
                return transactionStatus;
        }
        }

    Transaction check mechanism

    • Why must the transaction check mechanism be implemented when transactional messages are sent?

      If the half message is sent in step 1 but TransactionStatus.Unknow is returned or the local transaction commits no status data because the application exits, the status of the half message is unknown to the Message Queue for Apache RocketMQ broker. Therefore, the Message Queue for Apache RocketMQ broker periodically requests the sender to check and report the status of the half message.

    • What does the business logic do when the check method is called back?
      The check method for transactional messages needs to contain the logic of transaction consistency check. After a transactional message is sent, Message Queue for Apache RocketMQ needs to call LocalTransactionChecker to respond to the request of the broker for the local transaction status. Therefore, the check method for transactional messages needs to complete the following tasks:
      1. Check the status of the local transaction corresponding to the half message (committed or rollback).

      2. Commit the status of the local transaction corresponding to the half message to the broker.

    • What is the impact of different local transaction states on the half message?
      • TransactionStatus.CommitTransaction: The transaction is submitted, and the consumer can consume the message.
      • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.
      • TransactionStatus.Unknow: The transaction is in an unknown state, and the Message Queue for Apache RocketMQ broker is expected to query the status of the local transaction that corresponds to the message from the message sender.

      For more information about the code, see the implementation of MyLocalTransactionChecker.

Subscribe to transactional messages

For instructions and sample code for subscribing to transactional messages, see Subscribe to messages.