This topic provides the sample code for sending and subscribing to transactional messages through the C/C++ SDK over TCP. The currently supported regions include the Internet Region, China (Hangzhou), China (Beijing), China (Shanghai), and China (Shenzhen).

Message Queue for Apache RocketMQ provides a distributed transaction processing function similar to X/Open XA to ensure transaction consistency in Message Queue for Apache RocketMQ.

Interaction process

The following figure shows the interaction process of transactional messages.

事务消息

For more information, see Transactional messages.

Prerequisites

Send transactional messages

Note For more information about the sample code, see Message Queue for Apache RocketMQ code library.
Perform the following steps to send a transactional message:
  1. Send a half message and execute a local transaction. The sample code is as follows:
    #include "ONSFactory.h"
    #include "ONSClientException.h"
    using namespace ons;
    
        class MyLocalTransactionExecuter : LocalTransactionExecuter
        {
            MyLocalTransactionExecuter()
            {
            }
    
            ~MyLocalTransactionExecuter()
            {
            }
            virtual TransactionStatus execute(Message &value)
            {
                    // The message ID. Two messages may have the same message body but different message IDs. The current message ID cannot be retrieved in the console.
                    string msgId = value.getMsgID();
                    // Calculate the message body by using CRC32 or other algorithms, such as MD5.
                    // The message ID and CRC32 ID are used to prevent duplicated messages.
                    // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
                    // To prevent duplicated messages, calculate the message body by using the CRC32 or MD5 algorithm.
                    TransactionStatus transactionStatus = Unknow;
                    try {
                        boolean isCommit = Execution result of the local transaction
                        if (isCommit) {
                            // Submit the message if the local transaction succeeds.
                            transactionStatus = CommitTransaction;
                        } else {
                            // Roll back the message if the local transaction fails.
                            transactionStatus = RollbackTransaction;
                        }
                    } catch (...) {
                        // Handle exceptions.
                    }
                    return transactionStatus;
            }
        }
    
        int main(int argc, char* argv[])
        {
            // The information that is required to create a producer and send messages.
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId,"XXX");//The group ID that you created in the Message Queue for Apache RocketMQ console.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section. 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//The topic that you created in the console.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//The message content.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "xxxxxxxxx");//The AccessKey ID you created in the Message Queue for Apache RocketMQ console for identity authentication.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "xxxxxxxxxxxxxxxxxxxx" );//The AccessKey secret you created in the Message Queue for Apache RocketMQ console for identity authentication.
    
            // Create a producer. Message Queue for Apache RocketMQ does not release pChecker, which must be released by the service provider.
            MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
            g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
    
            // Before sending a message, call the start method once to start the producer.
            pProducer->start();
    
            Message msg(
                // The message topic.
                factoryInfo.getPublishTopics(),
                // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on the specified criteria.       
                "TagA",
                // The message body, which cannot be empty. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must negotiate consistent serialization and deserialization methods.
                factoryInfo.getMessageContent()
            );
    
            // The message key, which must be globally unique.
            // A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
            // Note: Messages can still be sent and received even if this attribute is not set.
            msg.setKey("ORDERID_100");
    
            // The message sending result, which is successful if no exception occurs.     
            try
            {
                // Message Queue for Apache RocketMQ does not release pExecuter, which must be released by the service provider.
                MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
                SendResultONS sendResult = pProducer->send(msg,pExecuter);
            }
            catch(ONSClientException & e)
            {
                // Customize exception handling details.
            }
            // Destroy the producer object before exiting the application. Otherwise, memory leakage may occur.
            pProducer->shutdown();
    
            return 0;
    
        }        
  2. Commit the status of the transactional message.

    After the execution of a local transaction, which can be successful or failed, the broker must be notified of the transaction status of the current message. The following notification modes are supported:

    • Commit the status after executing the local transaction.
    • Wait until the broker requests to check the transaction status of the message.

    A transaction can be in one of the following states:

    • TransactionStatus.CommitTransaction: The transaction is submitted, and the consumer can consume the message.
    • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.
    • TransactionStatus.Unknow: The transaction is in an unknown state, and the Message Queue for Apache RocketMQ broker is expected to query the status of the local transaction that corresponds to the message from the message sender.
       class MyLocalTransactionChecker : LocalTransactionChecker
       {
           MyLocalTransactionChecker()
           {
           }
      
           ~MyLocalTransactionChecker()
           {
           }
      
           virtual TransactionStatus check(Message &value)
           {
               // The message ID. Two messages may have the same message body but different message IDs. The current message ID cannot be retrieved in the console.
               string msgId = value.getMsgID();
               // Calculate the message body by using CRC32 or other algorithms, such as MD5.
               // The message ID and CRC32 ID are used to prevent duplicated messages.
               // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
               // To prevent duplicated messages, calculate the message body by using the CRC32 or MD5 algorithm. 
               TransactionStatus transactionStatus = Unknow;
               try {
                   boolean isCommit = Execution result of the local transaction
                   if (isCommit) {
                       // Submit the message if the local transaction succeeds.
                       transactionStatus = CommitTransaction;
                   } else {
                       // Roll back the message if the local transaction fails.
                       transactionStatus = RollbackTransaction;
                   }
               } catch(...) {
                   // The exception.
               }
               return transactionStatus;
           }
       }               

Transaction check mechanism

  • Why must the transaction check mechanism be implemented when transactional messages are sent?

    If the half message is sent in Step 1 but TransactionStatus.Unknow is returned or no local transaction of any status is submitted because the application exits, the status of the half message is unknown to the Message Queue for Apache RocketMQ broker. Therefore, the broker periodically requests the sender to check and report the status of the half message.

  • What does the business logic do when the check method is called back?
    The check method for transactional messages needs to contain the logic of transaction consistency check. After a transactional message is sent, Message Queue for Apache RocketMQ needs to call LocalTransactionChecker to respond to the request of the broker for the local transaction status. Therefore, the check method for transactional messages needs to complete the following tasks:
    1. Check the status of the local transaction corresponding to the half message (committed or rollback).
    2. Commit the status of the local transaction corresponding to the half message to the broker.

Subscribe to transactional messages

For instructions and sample code for subscribing to transactional messages, see Subscribe to messages.