This topic provides the sample code for sending and subscribing to transactional messages through the Java SDK over TCP.

Message Queue for Apache RocketMQ provides a distributed transaction processing function similar to X/Open XA to ensure transaction consistency in Message Queue for Apache RocketMQ.

Note For new users, we recommend that you read the Demo project to learn how to build a Message Queue for Apache RocketMQ project before sending and subscribing to a message.

Interaction process

The following figure shows the interaction process of transactional messages.

Process

For more information, see Transactional messages.

Prerequisites

You have completed the following operations:

Send transactional messages

Note For more information about the sample code, see Message Queue for Apache RocketMQ code library.
Perform the following steps to send a transactional message:
  1. Send a half message and execute a local transaction. The code is as follows:

    package com.alibaba.webx.TryHsf.app1;
    
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
    import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
    import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    public class TransactionProducerClient {
     private final static Logger log = ClientLogger.getLog(); // Your own logger set for troubleshooting.
    
     public static void main(String[] args) throws InterruptedException {
         final BusinessService businessService = new BusinessService(); // The local business.
         Properties properties = new Properties();
            // The group ID that you created in the console. Note: Transactional messages cannot share group IDs with other types of messages.
         properties.put(PropertyKeyConst.GROUP_ID, "XXX");
            // The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
         properties.put(PropertyKeyConst.AccessKey, "XXX");
            // The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
         properties.put(PropertyKeyConst.SecretKey, "XXX");
            // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
         properties.put(PropertyKeyConst.NAMESRV_ADDR,
           "XXX");
    
         TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
                 new LocalTransactionCheckerImpl());
         producer.start();
         Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
         try {
                 SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
                     @Override
                     public TransactionStatus execute(Message msg, Object arg) {
                         // The message ID. Two messages may have the same message body but different message IDs. The current message ID cannot be retrieved in the console.
                         String msgId = msg.getMsgID();
                         // Calculate the message body by using CRC32 or other algorithms, such as MD5.
                         long crc32Id = HashUtil.crc32Code(msg.getBody());
                         // The message ID and CRC32 ID are used to prevent duplicated messages.
                         // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
                         // To prevent duplicated messages, calculate the message body by using the CRC32 or MD5 algorithm.
                         Object businessServiceArgs = new Object();
                         TransactionStatus transactionStatus = TransactionStatus.Unknow;
                         try {
                             boolean isCommit =
                                 businessService.execbusinessService(businessServiceArgs);
                             if (isCommit) {
                                 // Commit the message if the local transaction succeeds.
                                 transactionStatus = TransactionStatus.CommitTransaction;
                             } else {
                                 // Roll back the message if the local transaction fails.
                                 transactionStatus = TransactionStatus.RollbackTransaction;
                             }
                         } catch (Exception e) {
                             log.error("Message Id:{}", msgId, e);
                         }
                         System.out.println(msg.getMsgID());
                         log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
                         return transactionStatus;
                     }
                 }, null);
             }
             catch (Exception e) {
                    // The message failed to be sent and must be resent. The system can resend the message or store message data persistently.
                 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                 e.printStackTrace();
             }
         // Use the demo example to prevent the process from exiting, which is not required in actual use.
         TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
     }
    }                        
  2. Commit the status of the transactional message

    After the execution of a local transaction, which can be successful or failed, the broker must be notified of the transaction status of the current message. The following notification modes are supported:

    • Commit the status after executing the local transaction.
    • Wait until the broker requests to check the transaction status of the message.

    A transaction can be in one of the following states:

    • TransactionStatus.CommitTransaction: The transaction is submitted, and the consumer can consume the message.
    • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.
    • TransactionStatus.Unknow: The transaction is in an unknown state, and the Message Queue for Apache RocketMQ broker is expected to query the status of the local transaction that corresponds to the message from the message sender.
    public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
       private final static Logger log = ClientLogger.getLog();
       final  BusinessService businessService = new BusinessService();
    
       @Override
       public TransactionStatus check(Message msg) {
           // The message ID. Two messages may have the same message body but different message IDs. The current message is a half message, and therefore its message ID cannot be retrieved in the console.
           String msgId = msg.getMsgID();
           // Calculate the message body by using CRC32 or other algorithms, such as MD5.
           long crc32Id = HashUtil.crc32Code(msg.getBody());
           // The message ID and CRC32 ID are used to prevent duplicated messages.
           // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
           // To prevent duplicated messages, calculate the message body by using the CRC32 or MD5 algorithm.
           // The parameter object of the business, which is an example in this case. Set the object based on the actual situation of your business.
           Object businessServiceArgs = new Object();
           TransactionStatus transactionStatus = TransactionStatus.Unknow;
           try {
               boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
               if (isCommit) {
                   // Commit the message if the local transaction succeeds.
                   transactionStatus = TransactionStatus.CommitTransaction;
               } else {
                   // Roll back the message if the local transaction fails.
                   transactionStatus = TransactionStatus.RollbackTransaction;
               }
           } catch (Exception e) {
               log.error("Message Id:{}", msgId, e);
           }
           log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
           return transactionStatus;
       }
     }                        

    Tool class

    import java.util.zip.CRC32;
    public class HashUtil {
        public static long crc32Code(byte[] bytes) {
            CRC32 crc32 = new CRC32();
            crc32.update(bytes);
            return crc32.getValue();
        }
    }                      

Transaction check mechanism

  • Why must the transaction check mechanism be implemented when transactional messages are sent?

    If the half message is sent in step 1 but TransactionStatus.Unknow is returned or no local transaction of any status is committed because the application exits, the status of the half message is unknown to the Message Queue for Apache RocketMQ broker. Therefore, the broker periodically requests the sender to check and report the status of the half message.

  • What does the business logic do when the check method is called back?

    The check method for transactional messages needs to contain the logic of transaction consistency check. After a transactional message is sent, Message Queue for Apache RocketMQ needs to call LocalTransactionChecker to respond to the request of the broker for the local transaction status. Therefore, the check method for transactional messages needs to complete the following tasks:

    1. Check the status of the local transaction corresponding to the half message (committed or rollback).
    2. Commit the status of the local transaction corresponding to the half message to the broker.

Subscribe to transactional messages

The mode for subscribing to transactional messages is the same as that for subscribing to normal messages. For more information, see Subscribe to messages.