This topic provides the sample code on how to send and subscribe to transactional messages by using Message Queue for Apache RocketMQ SDK for .NET over TCP. This feature is supported in the Internet, China (Hangzhou), China (Beijing), China (Shanghai), and China (Shenzhen) regions.

Message Queue for Apache RocketMQ provides a distributed transaction processing feature similar to X/Open XA to ensure transaction consistency in Message Queue for Apache RocketMQ.

Interaction process

The following figure shows the interaction process of transactional messages. Interaction process of transactional messages

For more information, see Transactional message.

Prerequisites

  • Message Queue for Apache RocketMQ SDK for .NET is downloaded. For more information, see Release notes.
  • An environment is configured. For more information, see Prepare the environment.

Send transactional messages

Perform the following steps to send a transactional message:
  1. Send a half message and execute a local transaction. The following code provides an example:
     using System;
     using System.Collections.Generic;
     using System.Linq;
     using System.Text;
     using System.Runtime.InteropServices;
     using ons;
    
     namespace ons
     {
     public class MyLocalTransactionExecuter : LocalTransactionExecuter
     {
         public MyLocalTransactionExecuter()
         {
         }
    
         ~MyLocalTransactionExecuter()
         {
         }
         public override TransactionStatus execute(Message value)
         {
                 Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                 value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
    
                 // The ID of the message. Two messages may have the same message body but cannot have the same ID. The current message is a half message. Therefore, its message ID cannot be queried in the Message Queue for Apache RocketMQ console. 
                 string msgId = value.getMsgID();
                 // Calculate the message body by using an algorithm such as CRC32 and MD5. 
                 // The message ID and CRC32 ID are used to prevent duplicate messages. 
                 // To prevent duplicate messages, calculate the message body by using the CRC32 or MD5 algorithm. 
    
                 TransactionStatus transactionStatus = TransactionStatus.Unknow;
                 try {
                     boolean isCommit = Execution result of the local transaction;
                     if (isCommit) {
                         // Commit the message if the local transaction succeeds. 
                         transactionStatus = TransactionStatus.CommitTransaction;
                     } else {
                         // Roll back the message if the local transaction fails. 
                         transactionStatus = TransactionStatus.RollbackTransaction;
                     }
                 } catch (Exception e) {
                     // Specify the logic for handling errors. 
                 }
                 return transactionStatus;
         }
     }
     class onscsharp
     {
    
         static void Main(string[] args)
         {
             ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
             factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX"); // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page that appears, go to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, ""); // The topic that you create in the Message Queue for Apache RocketMQ console. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, ""); // The message content. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, ""); // The AccessKey ID you create in the RAM console for identity authentication. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, ""); // The AccessKey secret you create in the RAM console for identity authentication. 
    
             // Create a transaction producer.       
             LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
             TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker);
    
             // Before you use the producer to send a message, call the start() method once to start the producer. After the producer is started, messages can be concurrently sent in multiple threads. 
             pProducer.start();
    
                 Message msg = new Message(
                 //Message Topic
                 factoryInfo.getPublishTopics(),
                 //Message Tag
                 "TagA",
                 //Message Body
                 factoryInfo.getMessageContent()
             );
    
             // The message key, which must be globally unique. 
             // The key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
             // Note: Messages can be sent and received even if you do not specify the message key. 
             msg.setKey("ORDERID_100");
    
             // Send the message. If no error occurs, the message is sent. 
             try
             {
                 LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
                 SendResultONS sendResult = pProducer.send(msg, ref myExecuter);
             }
             catch(ONSClientException e)
             {
                 Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
             }
    
             // Before you exit your application, shut down the producer. Otherwise, memory leaks may occur. 
             // The producer cannot be started again after it is shut down. 
             pProducer.shutdown();
         }
     }
     }
  2. Commit the status of the transactional message.
    After the local transaction is executed, the Message Queue for Apache RocketMQ broker must be notified of the transaction status of the current message, no matter whether the execution is successful or fails. The Message Queue for Apache RocketMQ broker can be notified in one of the following ways:
    • Commit the status after the local transaction is executed.
    • Wait until the Message Queue for Apache RocketMQ broker sends a request to check the transaction status of the message.
    A transaction can be in one of the following states:
    • TransactionStatus.CommitTransaction: The transaction is committed, and the consumer can consume the message.
    • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.
    • TransactionStatus.Unknow: The transaction is in an unknown state, and the Message Queue for Apache RocketMQ broker is expected to query the status of the local transaction that corresponds to the message from the message producer.
    public class MyLocalTransactionChecker : LocalTransactionChecker
    {
        public MyLocalTransactionChecker()
        {
        }
        ~MyLocalTransactionChecker()
        {
        }
        public override TransactionStatus check(Message value)
        {
                Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
                // The ID of the message. Two messages may have the same message body but cannot have the same ID. The current message is a half message. Therefore, its message ID cannot be queried in the Message Queue for Apache RocketMQ console. 
                string msgId = value.getMsgID();
                // Calculate the message body by using an algorithm such as CRC32 and MD5. 
                // The message ID and CRC32 ID are used to prevent duplicate messages. 
                // You do not need to specify the message ID or CRC32 ID if your business service itself achieves idempotence. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. 
                // To prevent duplicate messages, calculate the message body by using the CRC32 or MD5 algorithm. 
                TransactionStatus transactionStatus = TransactionStatus.Unknow;
                try {
                    boolean isCommit = Execution result of the local transaction;
                    if (isCommit) {
                        // Commit the message if the local transaction succeeds. 
                        transactionStatus = TransactionStatus.CommitTransaction;
                    } else {
                        // Roll back the message if the local transaction fails. 
                        transactionStatus = TransactionStatus.RollbackTransaction;
                    }
                } catch (Exception e) {
                    //exception handle
                }
                return transactionStatus;
        }
        }

    Mechanism of transaction status check

    • Why must the mechanism of transaction status check be implemented when transactional messages are sent?

      If the half message is sent in Step 1 but TransactionStatus.Unknow is returned or no local transaction of any status is committed because the application exits, the status of the half message is unknown to the Message Queue for Apache RocketMQ broker. Therefore, the broker periodically sends a request to a producer in the producer cluster to check the status of a half message. After the status check request is received, the producer checks and commits the final state of the local transaction that corresponds to the half message.

    • What does the business logic do when the check method is called back?
      The check method for transactional messages must contain the logic of transaction consistency check. After a transactional message is sent, Message Queue for Apache RocketMQ must call LocalTransactionChecker to respond to the request of the broker for the local transaction status. Therefore, the check method for transactional messages must achieve the following objectives:
      1. Check the status (committed or rollback) of the local transaction that corresponds to the half message.

      2. Commit the status of the local transaction that corresponds to the half message to the broker.

    • How do different states of a local transaction affect a half message?
      • TransactionStatus.CommitTransaction: The transaction is committed. The consumer can consume the message.
      • TransactionStatus.RollbackTransaction: The transaction is rolled back. The message is discarded and cannot be consumed.
      • TransactionStatus.Unknow: The transaction is in an unknown state. The Message Queue for Apache RocketMQ broker is expected to query the status of the local transaction that corresponds to the message from the message producer.

      For more information, see the implementation of MyLocalTransactionChecker.

Subscribe to transactional messages

The mode for subscribing to transactional messages is the same as that for subscribing to standard messages. For more information, see Subscribe to messages.