This topic provides the sample code on how to send and subscribe to transactional messages by using Message Queue for Apache RocketMQ SDK for Java over TCP.

Message Queue for Apache RocketMQ provides a distributed transaction processing feature similar to X/Open XA to ensure transaction consistency in Message Queue for Apache RocketMQ.

Note If you are new to Message Queue for Apache RocketMQ, we recommend that you read Demo project to understand how to build a Message Queue for Apache RocketMQ project before you send and subscribe to messages.

Interaction process

The following figure shows the interaction process of transactional messages.

process

For more information, see Transactional message.

Prerequisites

Before you begin, make sure that the following operations are complete:

  • The SDK for Java is downloaded. For more information about release notes of the SDK for Java, see Release notes.
  • An environment is configured. For more information, see Prepare the environment.
  • Optional. Log configuration is complete. For more information, see Log configuration.

Send transactional messages

Note For more information about the sample code, see Message Queue for Apache RocketMQ code library.
Perform the following steps to send a transactional message:
  1. Send a half message and execute a local transaction. The following code provides an example:
    package com.alibaba.webx.TryHsf.app1;
    
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
    import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
    import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    public class TransactionProducerClient {
     private final static Logger log = ClientLogger.getLog(); // Configure logging to facilitate troubleshooting. 
    
     public static void main(String[] args) throws InterruptedException {
         final BusinessService businessService = new BusinessService(); // Your local business service. 
         Properties properties = new Properties();
            // The ID of the group that you create in the Message Queue for Apache RocketMQ console. Note: Transactional messages cannot share group IDs with other types of messages. 
         properties.put(PropertyKeyConst.GROUP_ID,"XXX");
            // The AccessKey ID that you create in the RAM console for identity authentication. 
         properties.put(PropertyKeyConst.AccessKey,"XXX");
            // The AccessKey secret that you create in the RAM console for identity authentication. 
         properties.put(PropertyKeyConst.SecretKey,"XXX");
            // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page that appears, go to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
         properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
    
         TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
                 new LocalTransactionCheckerImpl());
         producer.start();
         Message msg = new Message("Topic","TagA","Hello MQ transaction===".getBytes());
         try {
                 SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
                     @Override
                     public TransactionStatus execute(Message msg, Object arg) {
                         // The ID of the message. Two messages may have the same message body but cannot have the same ID. The current message is a half message. Therefore, its message ID cannot be queried in the Message Queue for Apache RocketMQ console. 
                         String msgId = msg.getMsgID();
                         // Calculate the message body by using an algorithm such as CRC32 and MD5. 
                         long crc32Id = HashUtil.crc32Code(msg.getBody());
                         // The message ID and CRC32 ID are used to prevent duplicate messages. 
                         // You do not need to specify the message ID or CRC32 ID if your business service itself achieves idempotence. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. 
                         // To prevent duplicate messages, calculate the message body by using the CRC32 or MD5 algorithm. 
                         Object businessServiceArgs = new Object();
                         TransactionStatus transactionStatus = TransactionStatus.Unknow;
                         try {
                             boolean isCommit =
                                 businessService.execbusinessService(businessServiceArgs);
                             if (isCommit) {
                                 // Commit the message if the local transaction succeeds. 
                                 transactionStatus = TransactionStatus.CommitTransaction;
                             } else {
                                 // Roll back the message if the local transaction fails. 
                                 transactionStatus = TransactionStatus.RollbackTransaction;
                             }
                         } catch (Exception e) {
                             log.error("Message Id:{}", msgId, e);
                         }
                         System.out.println(msg.getMsgID());
                         log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
                         return transactionStatus;
                     }
                 }, null);
             }
             catch (Exception e) {
                    // Specify the logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                 e.printStackTrace();
             }
         // Use the demo to prevent the process from exiting. This is not required in actual use. 
         TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
     }
    }                        
  2. Commit the status of the transactional message.

    After the local transaction is executed, the Message Queue for Apache RocketMQ broker must be notified of the transaction status of the current message, no matter whether the execution is successful or fails. The Message Queue for Apache RocketMQ broker can be notified in one of the following ways:

    • Commit the status after the local transaction is executed.
    • Wait until the Message Queue for Apache RocketMQ broker sends a request to check the transaction status of the message.

    A transaction can be in one of the following states:

    • TransactionStatus.CommitTransaction: The transaction is committed. The consumer can consume the message.
    • TransactionStatus.RollbackTransaction: The transaction is rolled back. The message is discarded and cannot be consumed.
    • TransactionStatus.Unknow: The transaction is in an unknown state. The Message Queue for Apache RocketMQ broker is expected to query the status of the local transaction that corresponds to the message from the message producer.
    public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
       private final static Logger log = ClientLogger.getLog();
       final  BusinessService businessService = new BusinessService();
    
       @Override
       public TransactionStatus check(Message msg) {
           // The ID of the message. Two messages may have the same message body but cannot have the same ID. The current message is a half message. Therefore, its message ID cannot be queried in the Message Queue for Apache RocketMQ console. 
           String msgId = msg.getMsgID();
           // Calculate the message body by using an algorithm such as CRC32 and MD5. 
           long crc32Id = HashUtil.crc32Code(msg.getBody());
           // The message ID and CRC32 ID are used to prevent duplicate messages. 
           // You do not need to specify the message ID or CRC32 ID if your business service itself achieves idempotence. Otherwise, specify the message ID or CRC32 ID to ensure idempotence. 
           // To prevent duplicate messages, calculate the message body by using the CRC32 or MD5 algorithm. 
           // The parameter object of your business service. Specify the object based on your business service. 
           Object businessServiceArgs = new Object();
           TransactionStatus transactionStatus = TransactionStatus.Unknow;
           try {
               boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
               if (isCommit) {
                   // Commit the message if the local transaction succeeds. 
                   transactionStatus = TransactionStatus.CommitTransaction;
               } else {
                   // Roll back the message if the local transaction fails. 
                   transactionStatus = TransactionStatus.RollbackTransaction;
               }
           } catch (Exception e) {
               log.error("Message Id:{}", msgId, e);
           }
           log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
           return transactionStatus;
       }
     }                        

    Utility class

    import java.util.zip.CRC32;
    public class HashUtil {
        public static long crc32Code(byte[] bytes) {
            CRC32 crc32 = new CRC32();
            crc32.update(bytes);
            return crc32.getValue();
        }
    }                      

Mechanism of transaction status check

  • Why must the mechanism of transaction status check be implemented when transactional messages are sent?

    If the half message is sent in Step 1 but TransactionStatus.Unknow is returned or no local transaction of any status is committed because the application exits, the status of the half message is unknown to the Message Queue for Apache RocketMQ broker. Therefore, the broker periodically sends a request to a producer in the producer cluster to check the status of a half message. After the status check request is received, the producer checks and commits the final state of the local transaction that corresponds to the half message.

  • What does the business logic do when the check method is called back?

    The check method for transactional messages must contain the logic of transaction consistency check. After a transactional message is sent, Message Queue for Apache RocketMQ must call LocalTransactionChecker to respond to the request of the broker for the local transaction status. Therefore, the check method for transactional messages must achieve the following objectives:

    1. Check the status (committed or rollback) of the local transaction that corresponds to the half message.
    2. Commit the status of the local transaction that corresponds to the half message to the broker.

Subscribe to transactional messages

The mode for subscribing to transactional messages is the same as that for subscribing to standard messages. For more information, see Subscribe to messages.