This topic provides sample code on how to send and receive transactional messages by using the Message Queue for Apache RocketMQ SDK for Java over TCP.

ApsaraMQ for RocketMQ provides a distributed transaction processing feature that is similar to X/Open XA. ApsaraMQ for RocketMQ uses transactional messages to ensure transactional consistency.

Note If you are new to Message Queue for Apache RocketMQ, we recommend that you read Demo project to understand how to build a ApsaraMQ for RocketMQ project before you send and receive messages.

Interaction process

The following figure shows the interaction process of transactional messages.

process

For more information, see Transactional messages.

Prerequisites

The following operations are performed:

  • The SDK for Java is downloaded. For information about the release notes for the SDK for Java, see Release notes.
  • An environment is set up. For more information, see Prepare the environment.
  • (Optional) Logging settings are configured. For more information, see Logging settings.

Send transactional messages

Note For the detailed sample code, see ApsaraMQ for RocketMQ code library.
package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // The ID of the group that you created in the ApsaraMQ for RocketMQ console. Note: Transactional messages cannot share group IDs with other types of messages. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // An AccessKey ID is used as the identifier for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair.
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // An AccessKey secret is used as the password for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair.
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page that appears, go to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");

        // Before you initialize the producer, you must register a checker to check the status of the local transaction. 
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try{
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        System.out.println("Execute the local transaction and commit the transaction status.");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            }catch (ONSClientException e){
                // Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Send transaction message success.");
    }
}
// The local transaction checker. 
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
   
    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("The request to check the transaction status of the message. MsgId: " + msg.getMsgID());
        return TransactionStatus.CommitTransaction;
    }
}

Mechanism of transaction status check

  • Why must the mechanism of transaction status check be implemented when transactional messages are sent?

    If the half message is sent but TransactionStatus.Unknow is returned or no status is committed for the local transaction because the application exits, the status of the half message is unknown to the Message Queue for Apache RocketMQ broker. Therefore, the broker periodically sends a request to a producer in the producer cluster to check the status of a half message. After the status check request is received, the producer checks and commits the final status of the local transaction that corresponds to the half message.

  • What does the business logic do when the check method is called back?

    The check method for transactional messages must contain the logic of transaction consistency check. After a transactional message is sent, ApsaraMQ for RocketMQ must call LocalTransactionChecker to respond to the request from the broker for the local transaction status. Therefore, the method for checking transactional messages must achieve the following objectives:

    1. Check the status (committed or rollback) of the local transaction that corresponds to the half message.
    2. Commit the status of the local transaction that corresponds to the half message to the broker.

Subscribe to transactional messages

The method for subscribing to transactional messages is the same as the method for subscribing to normal messages. For more information, see Subscribe to messages.