All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive transactional messages

Last Updated:Aug 18, 2023

This topic provides sample code on how to send and receive transactional messages by using the TCP client SDK for C or C++

ApsaraMQ for RocketMQ provides a distributed transaction processing feature that is similar to X/Open XA. ApsaraMQ for RocketMQ uses transactional messages to ensure transactional consistency.

Interaction process

The following figure shows the interaction process of transactional messages. Transactional messages

For more information, see Transactional messages.

Prerequisites

Before you start, make sure that the following operations are performed:

Send transactional messages

Perform the following steps to send a transactional message:

  1. Send a half message and execute a local transaction. Sample code:

    #include "ONSFactory.h"
    #include "ONSClientException.h"
    using namespace ons;
    
        class MyLocalTransactionExecuter : LocalTransactionExecuter
        {
            MyLocalTransactionExecuter()
            {
            }
    
            ~MyLocalTransactionExecuter()
            {
            }
            virtual TransactionStatus execute(Message &value)
            {
                    // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console. )
                    string msgId = value.getMsgID();
                    // Calculate the message body by using an algorithm such as CRC32 and MD5. 
                    // The message ID and CRC32 ID are used to prevent duplicate messages. 
                    // You do not need to specify the message ID or CRC32 ID if your business is idempotent. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. 
                    // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. 
                    TransactionStatus transactionStatus = Unknow;
                    try {
                        boolean isCommit = The execution result of the local transaction;
                        if (isCommit) {
                            // Commit the message if the local transaction is executed. 
                            transactionStatus = CommitTransaction;
                        } else {
                            // Roll back the message if the local transaction failed to be executed. 
                            transactionStatus = RollbackTransaction;
                        }
                    } catch (...) {
                        //exception handle
                    }
                    return transactionStatus;
            }
        }
    
        int main(int argc, char* argv[])
        {
            // The parameters that are required to create the producer and send messages. 
            ONSFactoryProperty factoryInfo;
            // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
            // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
            // The topic that you created in the ApsaraMQ for RocketMQ console. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
            // The message content. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
            // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
            // The AccessKey ID that is used for authentication. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    		    // The AccessKey secret that is used for authentication. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    
            // Create the producer. ApsaraMQ for RocketMQ does not release pChecker. You must release pChecker by yourself. 
            MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
            g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
    
            // Before you send the message, call the start() method only once to start the producer. 
            pProducer->start();
    
            Message msg(
                //Message Topic
                factoryInfo.getPublishTopics(),
                // The message tag. A message tag is similar to a Gmail tag and is used by consumers to sort and filter messages in the ApsaraMQ for RocketMQ broker.        
                "TagA",
                // The message body. You cannot leave this parameter empty. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. 
                factoryInfo.getMessageContent()
            );
    
            // The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible. 
            // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. 
            // Note: You can send and receive a message even if you do not specify the key. 
            msg.setKey("ORDERID_100");
    
            // Send the message. If no exception is thrown, the message is sent.    
            try
            {
                // ApsaraMQ for RocketMQ does not release pExecuter. You must release pExecuter by yourself. 
                MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
                SendResultONS sendResult = pProducer->send(msg,pExecuter);
            }
            catch(ONSClientException & e)
            {
                // Specify the logic to handle exceptions. 
            }
            // Before you exit the application, destroy the producer. Otherwise, issues such as memory leaks occur. 
            pProducer->shutdown();
    
            return 0;
    
        }        
  2. The following sample code provides an example on how to commit the status of a transactional message:

     class MyLocalTransactionChecker : LocalTransactionChecker
     {
         MyLocalTransactionChecker()
         {
         }
    
         ~MyLocalTransactionChecker()
         {
         }
    
         virtual TransactionStatus check(Message &value)
         {
             // The message ID. Two messages can have the same message body but not the same ID. You cannot query the ID of the current message in the ApsaraMQ for RocketMQ console.
             string msgId = value.getMsgID();
             // Calculate the message body by using an algorithm such as CRC32 and MD5. 
             // The message ID and CRC32 ID are used to prevent duplicate messages. 
             // You do not need to specify the message ID or CRC32 ID if your business is idempotent. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. 
             // To prevent duplicate messages, we recommend that you calculate the message body by using the CRC32 or MD5 algorithm. 
             TransactionStatus transactionStatus = Unknow;
             try {
                 boolean isCommit = The execution result of the local transaction;
                 if (isCommit) {
                     // Commit the message if the local transaction is executed. 
                     transactionStatus = CommitTransaction;
                 } else {
                     // Roll back the message if the local transaction failed to be executed. 
                     transactionStatus = RollbackTransaction;
                 }
             } catch(...) {
                 //exception error
             }
             return transactionStatus;
         }
     }               

Mechanism of transaction status check

  • Why must the mechanism of transaction status check be implemented when transactional messages are sent?

    If the half message is sent but TransactionStatus.Unknow is returned or no status is committed for the local transaction because the application exits, the status of the half message is unknown to the ApsaraMQ forRocketMQ broker. Therefore, the broker periodically sends a request to a producer in the producer cluster to check the status of a half message. After the status check request is received, the producer checks and commits the final status of the local transaction that corresponds to the half message.

  • What does the business logic do when the check method is called back?

    The check method for transactional messages must contain the logic of transaction consistency check. After a transactional message is sent, ApsaraMQ for RocketMQ must call LocalTransactionChecker to respond to the request from the broker for the local transaction status. Therefore, the method for checking transactional messages must achieve the following objectives:

    1. Check the status (committed or rollback) of the local transaction that corresponds to the half message.
    2. Commit the status of the local transaction that corresponds to the half message to the broker.

Subscribe to transactional messages

The sample code for subscribing to transaction messages is the same as that for subscribing to normal messages. For more information, see Subscribe to messages.