All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive transactional messages

Last Updated:Jan 08, 2024

This topic provides sample code on how to send and receive transactional messages by using the TCP client SDK for Java.

ApsaraMQ for RocketMQ provides a distributed transaction processing feature similar to eXtended Architecture (X/Open XA) to ensure transaction consistency in ApsaraMQ for RocketMQ.

Note

If you are a new user of ApsaraMQ for RocketMQ, we recommend that you refer to the Demo project to build a project before you use ApsaraMQ for RocketMQ to send and receive messages.

Interaction process

The following figure shows the interaction process of transactional messages.

process

For more information, see Transactional messages.

Prerequisites

Before you start, make sure that the following operations are performed:

  • The SDK for Java is downloaded. For information about the release notes of the SDK for Java, see Release notes.

  • An environment is set up. For more information, see Prepare the environment.

  • (Optional) Logging settings are configured. For more information, see Logging settings.

Send transactional messages

For information about the sample code, see ApsaraMQ for RocketMQ code library.

package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. Note: The ID of the consumer group that is used for transactional messages cannot be the same as the ID of the consumer group that is used for other types of messages. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // The AccessKey secret that is used for authentication. 
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");

        // Before you initialize the producer, you must register a checker to check the status of the local transaction. 
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try{
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        System.out.println("Execute the local transaction and commit the transaction status.");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            }catch (ONSClientException e){
                // Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Send transaction message success.");
    }
}
// The local transaction checker. 
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
   
    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("The request to check the transaction status of the message. MsgId: " + msg.getMsgID());
        return TransactionStatus.CommitTransaction;
    }
}

Mechanism of transaction status check

  • Why must the mechanism of transaction status check be implemented when transactional messages are sent?

    If a half message is sent but TransactionStatus.Unknow is returned or no status is committed for the local transaction due to application exit, the status of the half message is unknown to the ApsaraMQ for RocketMQ broker. Therefore, the broker periodically sends a request to a producer in the producer cluster to check the status of a half message. After the status check request is received, the producer checks and commits the final status of the local transaction that corresponds to the half message.

  • What does the business logic do when the check method is called back?

    The check method for transactional messages must contain the logic that is used to check transaction consistency. After a transactional message is sent, ApsaraMQ for RocketMQ must call the LocalTransactionCheckerg API operation to respond to the request from the broker for the local transaction status. Therefore, the method that is used to check transactional messages must achieve the following objectives:

    1. Check the status (committed or rollback) of the local transaction that corresponds to the half message.

    2. Commit the status of the local transaction that corresponds to the half message to the broker.

Subscribe to transactional messages

The sample code for subscribing to transaction messages is the same as that for subscribing to normal messages. For more information, see Subscribe to messages.