All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive transactional messages

Last Updated:Aug 18, 2023

This topic provides the sample code on how to send and receive transactional messages by using the TCP client SDK for .NET. Scheduled messages are supported in the Internet, China (Hangzhou), China (Beijing), China (Shanghai), and China (Shenzhen) regions.

ApsaraMQ for RocketMQ provides a distributed transaction processing feature that is similar to eXtended Architecture (X/Open XA). ApsaraMQ for RocketMQ uses transactional messages to ensure transactional consistency.

Interaction process

The following figure shows the process to use transactional messages.事务消息交互流程

For more information, see Transactional messages.

Prerequisites

  • The SDK for .NET is downloaded. For more information, see Release notes.

  • The environment is prepared. For more information, see Prepare the environment.

  • The resources that you want to specify in the code are created in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.

  • The AccessKey pair of your Alibaba Cloud account is obtained. For more information, see Create an AccessKey pair.

Send transactional messages

Perform the following steps to send a transactional message:

  1. Send a half message and execute a local transaction. Sample code:

     using System;
     using System.Collections.Generic;
     using System.Linq;
     using System.Text;
     using System.Runtime.InteropServices;
     using ons;
    
     namespace ons
     {
     public class MyLocalTransactionExecuter : LocalTransactionExecuter
     {
         public MyLocalTransactionExecuter()
         {
         }
    
         ~MyLocalTransactionExecuter()
         {
         }
         public override TransactionStatus execute(Message value)
         {
                 Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                 value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser"));
    
                 // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. 
                 string msgId = value.getMsgID();
                 // Calculate the message body by using an algorithm such as CRC32 and MD5. 
                 // The message ID and CRC32 ID are used to prevent duplicate messages. 
                 // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. 
    
                 TransactionStatus transactionStatus = TransactionStatus.Unknow;
                 try {
                     boolean isCommit = The execution result of the local transaction;
                     if (isCommit) {
                         // Commit the message if the local transaction is executed. 
                         transactionStatus = TransactionStatus.CommitTransaction;
                     } else {
                         // Roll back the message if the local transaction failed to be executed. 
                         transactionStatus = TransactionStatus.RollbackTransaction;
                     }
                 } catch (Exception e) {
                     // The logic to handle exceptions. 
                 }
                 return transactionStatus;
         }
     }
     class onscsharp
     {
    
         static void Main(string[] args)
         {
             ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
             // The TCP endpoint. You can obtain the endpoint in the TCP endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX");
             // The topic that you created in the ApsaraMQ for RocketMQ console. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "");
             // The message content. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "");
             // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
             // The AccessKey ID that is used for authentication. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    	       // The AccessKey secret that is used for authentication. 
             factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            
             //create transaction producer                
             LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
             TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo, myChecker);
    
             // Before you send the message, call the start() method only once to start the producer. After the producer is started, messages can be concurrently sent in multiple threads. 
             pProducer.start();
    
                 Message msg = new Message(
                 //Message Topic
                 factoryInfo.getPublishTopics(),
                 //Message Tag
                 "TagA",
                 //Message Body
                 factoryInfo.getMessageContent()
             );
    
             // The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible. 
             // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. 
             // Note: You can send and receive a message even if you do not specify the key. 
             msg.setKey("ORDERID_100");
    
             // Send the message. If no exception is thrown, the message is sent. 
             try
             {
                 LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
                 SendResultONS sendResult = pProducer.send(msg, myExecuter);
             }
             catch(ONSClientException e)
             {
                 Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
             }
    
             // Before you exit the application, destroy the producer. Otherwise, issues such as memory leaks occur. 
             // The producer cannot be started again after it is destroyed. 
             pProducer.shutdown();
         }
     }
     }
  2. Commit the status of the transactional message.

    After the local transaction is executed, the ApsaraMQ for RocketMQ broker must be notified of the transaction status of the current message, regardless of whether the execution is successful. The ApsaraMQ for RocketMQ broker can be notified in one of the following ways:

    • Commit the status after the local transaction is executed.

    • Wait until the ApsaraMQ for RocketMQ broker sends a request to check the transaction status of the message.

    A transaction can be in one of the following states:

    • TransactionStatus.CommitTransaction: The transaction is committed, and the consumer can consume the message.

    • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.

    • TransactionStatus.Unknow: The status of the transaction is unknown, and the system is waiting for the broker to query the status of the local transaction that corresponds to the message.

    public class MyLocalTransactionChecker : LocalTransactionChecker
    {
        public MyLocalTransactionChecker()
        {
        }
        ~MyLocalTransactionChecker()
        {
        }
        public override TransactionStatus check(Message value)
        {
                Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser"));
                // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. 
                string msgId = value.getMsgID();
                // Calculate the message body by using an algorithm such as CRC32 and MD5. 
                // The message ID and CRC32 ID are used to prevent duplicate messages. 
                // You do not need to specify the message ID or CRC32 ID if your business is idempotent. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. 
                // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. 
                TransactionStatus transactionStatus = TransactionStatus.Unknow;
                try {
                    boolean isCommit = The execution result of the local transaction;
                    if (isCommit) {
                        // Commit the message if the local transaction is executed. 
                        transactionStatus = TransactionStatus.CommitTransaction;
                    } else {
                        // Roll back the message if the local transaction failed to be executed. 
                        transactionStatus = TransactionStatus.RollbackTransaction;
                    }
                } catch (Exception e) {
                    //exception handle
                }
                return transactionStatus;
        }
        }

    Mechanism of transaction status check

    • Why must the mechanism of transaction status check be implemented when transactional messages are sent?

      If the half message is sent in Step 1 but TransactionStatus.Unknow is returned or no local transaction of any status is committed because the application exits, the status of the half message is unknown to the ApsaraMQ for RocketMQ broker. Therefore, the broker periodically sends a request to a producer in the producer cluster to check the status of a half message. After the status check request is received, the producer checks and commits the final status of the local transaction that corresponds to the half message.

    • What does the business logic do when the check method is called back?

      The check method for transactional messages must contain the logic that is used to check transaction consistency. After a transactional message is sent, ApsaraMQ for RocketMQ must call the LocalTransactionChecker API operation to respond to the status check request from the broker for the local transaction. Therefore, the method that is used to check transactional messages must achieve the following objectives:

      1. Check the status (committed or rollback) of the local transaction that corresponds to the half message.

      2. Commit the status of the local transaction that corresponds to the half message to the broker.

    • How does the status of a local transaction affect a half message?

      • TransactionStatus.CommitTransaction: The transaction is committed. The message can be consumed by consumers.

      • TransactionStatus.RollbackTransaction: The transaction is rolled back. The message is discarded and cannot be consumed.

      • TransactionStatus.Unknow: The status of the transaction is unknown, and the system is waiting for the broker to query the status of the local transaction that corresponds to the message.

      For more information, see the implementation of MyLocalTransactionChecker.

Subscribe to transactional messages

The sample code for subscribing to transaction messages is the same as that for subscribing to normal messages. For more information, see Subscribe to messages.