This topic provides sample code on how to send and receive transactional messages by using the Message Queue for Apache RocketMQ SDK for Java over TCP.
Message Queue for Apache RocketMQ provides a distributed transaction processing feature that is similar to X/Open XA. Message Queue for Apache RocketMQ uses transactional messages to ensure transactional consistency.
Interaction process
The following figure shows the interaction process of transactional messages.

For more information, see Transactional messages.
Prerequisites
The following operations are performed:
- The SDK for Java is downloaded. For information about the release notes for the SDK for Java, see Release notes.
- An environment is set up. For more information, see Prepare the environment.
- (Optional) Logging settings are configured. For more information, see Logging settings.
Send transactional messages
package com.aliyun.openservices.tcp.example.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Date;
import java.util.Properties;
public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// The ID of the group that you created in the Message Queue for Apache RocketMQ console. Note: Transactional messages cannot share group IDs with other types of messages.
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
// The AccessKey ID is used as an identifier of Alibaba Cloud service users. For more information, see Create an AccessKey pair.
properties.put(PropertyKeyConst.AccessKey,"XXX");
// The AccessKey Secret is used to verify the identity of Alibaba Cloud service users. For more information, see Create an AccessKey pair.
properties.put(PropertyKeyConst.SecretKey,"XXX");
// The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page that appears, go to the Basic Information section and view the TCP endpoint on the Endpoints tab.
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
// Before you initialize the producer, you must register a checker to check the status of the local transaction.
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());
for (int i = 0; i < 3; i++) {
try{
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("Execute the local transaction and commit the transaction status.");
return TransactionStatus.CommitTransaction;
}
}, null);
assert sendResult != null;
}catch (ONSClientException e){
// Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
System.out.println("Send transaction message success.");
}
}
// The local transaction checker.
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("The request to check the transaction status of the message. MsgId: " + msg.getMsgID());
return TransactionStatus.CommitTransaction;
}
}
Mechanism of transaction status check
- Why must the mechanism of transaction status check be implemented when transactional
messages are sent?
If the half message is sent but
TransactionStatus.Unknow
is returned or no status is committed for the local transaction because the application exits, the status of the half message is unknown to the Message Queue for Apache RocketMQ broker. Therefore, the broker periodically sends a request to a producer in the producer cluster to check the status of a half message. After the status check request is received, the producer checks and commits the final status of the local transaction that corresponds to the half message. - What does the business logic do when the check method is called back?
The check method for transactional messages must contain the logic of transaction consistency check. After a transactional message is sent, Message Queue for Apache RocketMQ must call
LocalTransactionChecker
to respond to the request from the broker for the local transaction status. Therefore, the method for checking transactional messages must achieve the following objectives:- Check the status (committed or rollback) of the local transaction that corresponds to the half message.
- Commit the status of the local transaction that corresponds to the half message to the broker.
Subscribe to transactional messages
The method for subscribing to transactional messages is the same as the method for subscribing to normal messages. For more information, see Subscribe to messages.