ApsaraMQ for RocketMQ provides distributed transaction processing similar to eXtended Architecture (X/Open XA), ensuring transaction consistency across distributed systems. This topic describes how to send and consume transactional messages by using the HTTP client SDK for Java.
How transactional messages work

A transactional message goes through the following stages:
The producer sends a half message to the broker. A half message is a transactional message that has not yet been committed or rolled back.
The producer runs the local transaction.
Based on the local transaction result, the producer commits or rolls back the half message.
If the broker does not receive a commit or rollback within a specified period, it initiates a transaction status check to query the local transaction result.
Once committed, the message becomes available for consumers.
For more information, see Transactional messages.
Prerequisites
Before you begin, make sure that you have:
The Java HTTP client SDK installed. For more information, see Prepare the environment.
An ApsaraMQ for RocketMQ instance, topic, and consumer group created in the ApsaraMQ for RocketMQ console
An Alibaba Cloud AccessKey pair stored in the
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRETenvironment variables
Send transactional messages
The following code sends transactional messages and handles uncommitted half messages by using the HTTP client SDK for Java. It demonstrates four transaction outcomes: immediate commit, deferred commit, conditional commit after retry, and rollback.
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQTransProducer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.List;
public class TransProducer {
static void processCommitRollError(Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Commit/Roll transaction error, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
}
}
public static void main(String[] args) throws Throwable {
MQClient mqClient = new MQClient(
// HTTP endpoint. Find this on the Instance Details page > Endpoints tab in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
// AccessKey ID and secret from environment variables.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topic for transactional messages. Create the topic in the ApsaraMQ for RocketMQ console.
// A topic supports only one message type. A topic for normal messages cannot send transactional messages.
final String topic = "${TOPIC}";
// Instance ID. If the instance has a namespace, specify the ID; otherwise, set to null or "".
// Check the namespace on the Instance Details page in the ApsaraMQ for RocketMQ console.
final String instanceId = "${INSTANCE_ID}";
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
final String groupId = "${GROUP_ID}";
final MQTransProducer mqTransProducer = mqClient.getTransProducer(instanceId, topic, groupId);
for (int i = 0; i < 4; i++) {
TopicMessage topicMessage = new TopicMessage();
topicMessage.setMessageBody("trans_msg");
topicMessage.setMessageTag("a");
topicMessage.setMessageKey(String.valueOf(System.currentTimeMillis()));
// Delay (in seconds) before the first transaction status check. Valid values: 10 to 300.
// After the first check, the broker rechecks every 10 seconds for up to 24 hours.
topicMessage.setTransCheckImmunityTime(10);
topicMessage.getProperties().put("a", String.valueOf(i));
TopicMessage pubResultMsg = null;
pubResultMsg = mqTransProducer.publishMessage(topicMessage);
System.out.println("Send---->msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()
+ ", Handle: " + pubResultMsg.getReceiptHandle()
);
if (pubResultMsg != null && pubResultMsg.getReceiptHandle() != null) {
if (i == 0) {
// Commit the first message immediately using its receipt handle.
// The broker tracks half messages by receipt handle for commit/rollback.
try {
mqTransProducer.commit(pubResultMsg.getReceiptHandle());
System.out.println(String.format("MessageId:%s, commit", pubResultMsg.getMessageId()));
} catch (Throwable e) {
// Commit/rollback fails if the receipt handle has expired (TransCheckImmunityTime elapsed).
if (e instanceof AckMessageException) {
processCommitRollError(e);
continue;
}
}
}
}
}
// Start a separate thread to poll and resolve uncommitted half messages.
Thread t = new Thread(new Runnable() {
public void run() {
int count = 0;
while(true) {
try {
if (count == 3) {
break;
}
List<Message> messages = mqTransProducer.consumeHalfMessage(3, 3);
if (messages == null) {
System.out.println("No Half message!");
continue;
}
System.out.println(String.format("Half---->MessageId:%s,Properties:%s,Body:%s,Latency:%d",
messages.get(0).getMessageId(),
messages.get(0).getProperties(),
messages.get(0).getMessageBodyString(),
System.currentTimeMillis() - messages.get(0).getPublishTime()));
for (Message message : messages) {
try {
if (Integer.valueOf(message.getProperties().get("a")) == 1) {
// Commit the transactional message.
mqTransProducer.commit(message.getReceiptHandle());
count++;
System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
} else if (Integer.valueOf(message.getProperties().get("a")) == 2
&& message.getConsumedTimes() > 1) {
// Commit the transactional message.
mqTransProducer.commit(message.getReceiptHandle());
count++;
System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
} else if (Integer.valueOf(message.getProperties().get("a")) == 3) {
// Roll back the transactional message.
mqTransProducer.rollback(message.getReceiptHandle());
count++;
System.out.println(String.format("MessageId:%s, rollback", message.getMessageId()));
} else {
// Check the status next time.
System.out.println(String.format("MessageId:%s, unknown", message.getMessageId()));
}
} catch (Throwable e) {
// Commit/rollback fails if the receipt handle has expired
// (TransCheckImmunityTime or consumeHalfMessage timeout elapsed).
processCommitRollError(e);
}
}
} catch (Throwable e) {
System.out.println(e.getMessage());
}
}
}
});
t.start();
t.join();
mqClient.close();
}
}Replace the following placeholders with your actual values:
| Placeholder | Description | Where to find |
|---|---|---|
${HTTP_ENDPOINT} | HTTP endpoint of your instance | Instance Details page > Endpoints tab > HTTP Endpoint section |
${TOPIC} | Topic name | Topics page in the ApsaraMQ for RocketMQ console |
${INSTANCE_ID} | Instance ID. Set to null or "" if the instance has no namespace | Instance Details page |
${GROUP_ID} | Consumer group ID | Groups page in the ApsaraMQ for RocketMQ console |
Key parameters
| Parameter | Description | Constraints |
|---|---|---|
TransCheckImmunityTime | Delay before the first transaction status check, in seconds | 10 to 300 |
consumeHalfMessage(batchSize, waitSeconds) | Polls uncommitted half messages | In this example, batchSize is set to 3 and waitSeconds is set to 3 |
Transaction outcomes in this example
The sample code demonstrates four transaction scenarios based on the message property a:
Message (a value) | Outcome | Description |
|---|---|---|
| 0 | Immediate commit | Committed right after sending |
| 1 | Deferred commit | Committed during half message processing |
| 2 | Conditional commit | Committed only after at least one retry (consumedTimes > 1) |
| 3 | Rollback | Rolled back during half message processing |
If a half message is not committed or rolled back after the first status check, the broker rechecks every 10 seconds for up to 24 hours.
Subscribe to transactional messages
Transactional messages are consumed the same way as normal messages. The following code uses long polling to consume messages in batches and acknowledge them by using the HTTP client SDK for Java.
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// HTTP endpoint. To obtain the HTTP endpoint, log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the HTTP endpoint on the Endpoints tab.
"${HTTP_ENDPOINT}",
// The AccessKey ID that is used for authentication.
"${ACCESS_KEY}",
// The AccessKey secret that is used for authentication.
"${SECRET_KEY}"
);
// Topic for transactional messages. Create the topic in the ApsaraMQ for RocketMQ console.
// A topic supports only one message type. A topic for normal messages cannot consume transactional messages.
final String topic = "${TOPIC}";
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
final String groupId = "${GROUP_ID}";
// Instance ID. If the instance has a namespace, specify the ID; otherwise, set to null or "".
// Check the namespace on the Instance Details page in the ApsaraMQ for RocketMQ console.
final String instanceId = "${INSTANCE_ID}";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// Consume messages in a loop. For production use, consume messages with multiple threads for higher throughput.
do {
List<Message> messages = null;
try {
// Long polling: if no message is available, the request is held at the broker
// for the specified duration (3 seconds here) before returning.
messages = consumer.consumeMessage(
3,// The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The largest value you can set is 16.
3// The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The largest value you can set is 30.
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// No messages available for consumption.
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}
// Process the consumed messages.
for (Message message : messages) {
System.out.println("Receive message: " + message);
}
// Acknowledge consumed messages. If the broker does not receive an ACK
// before the delivery retry interval elapses, the message is delivered again.
// Each delivery attempt generates a new receipt handle.
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
// ACK may fail if the receipt handle has expired.
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}Consumer parameters
| Parameter | Description | Valid values |
|---|---|---|
consumeMessage batch size | Maximum number of messages returned per request | The largest value you can set is 16 |
consumeMessage polling duration | How long the broker holds the request when no messages are available, in seconds | The largest value you can set is 30 |
Related topics
Transactional messages: concepts, lifecycle, and interaction flow
Create resources: set up instances, topics, and consumer groups
Prepare the environment: install the HTTP client SDK for Java