All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive transactional messages

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ provides distributed transaction processing similar to eXtended Architecture (X/Open XA), ensuring transaction consistency across distributed systems. This topic describes how to send and consume transactional messages by using the HTTP client SDK for Java.

How transactional messages work

Transactional message interaction flow

A transactional message goes through the following stages:

  1. The producer sends a half message to the broker. A half message is a transactional message that has not yet been committed or rolled back.

  2. The producer runs the local transaction.

  3. Based on the local transaction result, the producer commits or rolls back the half message.

  4. If the broker does not receive a commit or rollback within a specified period, it initiates a transaction status check to query the local transaction result.

  5. Once committed, the message becomes available for consumers.

For more information, see Transactional messages.

Prerequisites

Before you begin, make sure that you have:

Send transactional messages

The following code sends transactional messages and handles uncommitted half messages by using the HTTP client SDK for Java. It demonstrates four transaction outcomes: immediate commit, deferred commit, conditional commit after retry, and rollback.

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQTransProducer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.List;

public class TransProducer {


    static void processCommitRollError(Throwable e) {
        if (e instanceof AckMessageException) {
            AckMessageException errors = (AckMessageException) e;
            System.out.println("Commit/Roll transaction error, requestId is:" + errors.getRequestId() + ", fail handles:");
            if (errors.getErrorMessages() != null) {
                for (String errorHandle :errors.getErrorMessages().keySet()) {
                    System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                            + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                }
            }
        }
    }

    public static void main(String[] args) throws Throwable {
        MQClient mqClient = new MQClient(
                // HTTP endpoint. Find this on the Instance Details page > Endpoints tab in the ApsaraMQ for RocketMQ console.
                "${HTTP_ENDPOINT}",
                // AccessKey ID and secret from environment variables.
	              System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
	              System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
        );

        // Topic for transactional messages. Create the topic in the ApsaraMQ for RocketMQ console.
        // A topic supports only one message type. A topic for normal messages cannot send transactional messages.
        final String topic = "${TOPIC}";
        // Instance ID. If the instance has a namespace, specify the ID; otherwise, set to null or "".
        // Check the namespace on the Instance Details page in the ApsaraMQ for RocketMQ console.
        final String instanceId = "${INSTANCE_ID}";
        // Consumer group ID created in the ApsaraMQ for RocketMQ console.
        final String groupId = "${GROUP_ID}";

        final MQTransProducer mqTransProducer = mqClient.getTransProducer(instanceId, topic, groupId);

        for (int i = 0; i < 4; i++) {
            TopicMessage topicMessage = new TopicMessage();
            topicMessage.setMessageBody("trans_msg");
            topicMessage.setMessageTag("a");
            topicMessage.setMessageKey(String.valueOf(System.currentTimeMillis()));
            // Delay (in seconds) before the first transaction status check. Valid values: 10 to 300.
            // After the first check, the broker rechecks every 10 seconds for up to 24 hours.
            topicMessage.setTransCheckImmunityTime(10);
            topicMessage.getProperties().put("a", String.valueOf(i));

            TopicMessage pubResultMsg = null;
            pubResultMsg = mqTransProducer.publishMessage(topicMessage);
            System.out.println("Send---->msgId is: " + pubResultMsg.getMessageId()
                    + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()
                    + ", Handle: " + pubResultMsg.getReceiptHandle()
            );
            if (pubResultMsg != null && pubResultMsg.getReceiptHandle() != null) {
                if (i == 0) {
                    // Commit the first message immediately using its receipt handle.
                    // The broker tracks half messages by receipt handle for commit/rollback.
                    try {
                        mqTransProducer.commit(pubResultMsg.getReceiptHandle());
                        System.out.println(String.format("MessageId:%s, commit", pubResultMsg.getMessageId()));
                    } catch (Throwable e) {
                        // Commit/rollback fails if the receipt handle has expired (TransCheckImmunityTime elapsed).
                        if (e instanceof AckMessageException) {
                            processCommitRollError(e);
                            continue;
                        }
                    }
                }
            }
        }

        // Start a separate thread to poll and resolve uncommitted half messages.
        Thread t = new Thread(new Runnable() {
            public void run() {
                int count = 0;
                while(true) {
                    try {
                        if (count == 3) {
                            break;
                        }
                        List<Message> messages = mqTransProducer.consumeHalfMessage(3, 3);
                        if (messages == null) {
                            System.out.println("No Half message!");
                            continue;
                        }
                        System.out.println(String.format("Half---->MessageId:%s,Properties:%s,Body:%s,Latency:%d",
                                messages.get(0).getMessageId(),
                                messages.get(0).getProperties(),
                                messages.get(0).getMessageBodyString(),
                                System.currentTimeMillis() - messages.get(0).getPublishTime()));

                        for (Message message : messages) {
                            try {
                                if (Integer.valueOf(message.getProperties().get("a")) == 1) {
                                    // Commit the transactional message.
                                    mqTransProducer.commit(message.getReceiptHandle());
                                    count++;
                                    System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
                                } else if (Integer.valueOf(message.getProperties().get("a")) == 2
                                        && message.getConsumedTimes() > 1) {
                                    // Commit the transactional message.
                                    mqTransProducer.commit(message.getReceiptHandle());
                                    count++;
                                    System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
                                } else if (Integer.valueOf(message.getProperties().get("a")) == 3) {
                                    // Roll back the transactional message.
                                    mqTransProducer.rollback(message.getReceiptHandle());
                                    count++;
                                    System.out.println(String.format("MessageId:%s, rollback", message.getMessageId()));
                                } else {
                                    // Check the status next time.
                                    System.out.println(String.format("MessageId:%s, unknown", message.getMessageId()));
                                }
                            } catch (Throwable e) {
                                // Commit/rollback fails if the receipt handle has expired
                                // (TransCheckImmunityTime or consumeHalfMessage timeout elapsed).
                                processCommitRollError(e);
                            }
                        }
                    } catch (Throwable e) {
                        System.out.println(e.getMessage());
                    }
                }
            }
        });

        t.start();

        t.join();

        mqClient.close();
    }

}

Replace the following placeholders with your actual values:

PlaceholderDescriptionWhere to find
${HTTP_ENDPOINT}HTTP endpoint of your instanceInstance Details page > Endpoints tab > HTTP Endpoint section
${TOPIC}Topic nameTopics page in the ApsaraMQ for RocketMQ console
${INSTANCE_ID}Instance ID. Set to null or "" if the instance has no namespaceInstance Details page
${GROUP_ID}Consumer group IDGroups page in the ApsaraMQ for RocketMQ console

Key parameters

ParameterDescriptionConstraints
TransCheckImmunityTimeDelay before the first transaction status check, in seconds10 to 300
consumeHalfMessage(batchSize, waitSeconds)Polls uncommitted half messagesIn this example, batchSize is set to 3 and waitSeconds is set to 3

Transaction outcomes in this example

The sample code demonstrates four transaction scenarios based on the message property a:

Message (a value)OutcomeDescription
0Immediate commitCommitted right after sending
1Deferred commitCommitted during half message processing
2Conditional commitCommitted only after at least one retry (consumedTimes > 1)
3RollbackRolled back during half message processing
If a half message is not committed or rolled back after the first status check, the broker rechecks every 10 seconds for up to 24 hours.

Subscribe to transactional messages

Transactional messages are consumed the same way as normal messages. The following code uses long polling to consume messages in batches and acknowledge them by using the HTTP client SDK for Java.

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;

import java.util.ArrayList;
import java.util.List;

public class Consumer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // HTTP endpoint. To obtain the HTTP endpoint, log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the HTTP endpoint on the Endpoints tab.
                "${HTTP_ENDPOINT}",
                // The AccessKey ID that is used for authentication.
                "${ACCESS_KEY}",
                // The AccessKey secret that is used for authentication.
                "${SECRET_KEY}"
        );

        // Topic for transactional messages. Create the topic in the ApsaraMQ for RocketMQ console.
        // A topic supports only one message type. A topic for normal messages cannot consume transactional messages.
        final String topic = "${TOPIC}";
        // Consumer group ID created in the ApsaraMQ for RocketMQ console.
        final String groupId = "${GROUP_ID}";
        // Instance ID. If the instance has a namespace, specify the ID; otherwise, set to null or "".
        // Check the namespace on the Instance Details page in the ApsaraMQ for RocketMQ console.
        final String instanceId = "${INSTANCE_ID}";

        final MQConsumer consumer;
        if (instanceId != null && instanceId != "") {
            consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
        } else {
            consumer = mqClient.getConsumer(topic, groupId);
        }

        // Consume messages in a loop. For production use, consume messages with multiple threads for higher throughput.
        do {
            List<Message> messages = null;

            try {
                // Long polling: if no message is available, the request is held at the broker
                // for the specified duration (3 seconds here) before returning.
                messages = consumer.consumeMessage(
                        3,// The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The largest value you can set is 16.
                        3// The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The largest value you can set is 30.
                );
            } catch (Throwable e) {
                e.printStackTrace();
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            // No messages available for consumption.
            if (messages == null || messages.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
                continue;
            }

            // Process the consumed messages.
            for (Message message : messages) {
                System.out.println("Receive message: " + message);
            }

            // Acknowledge consumed messages. If the broker does not receive an ACK
            // before the delivery retry interval elapses, the message is delivered again.
            // Each delivery attempt generates a new receipt handle.
            {
                List<String> handles = new ArrayList<String>();
                for (Message message : messages) {
                    handles.add(message.getReceiptHandle());
                }

                try {
                    consumer.ackMessage(handles);
                } catch (Throwable e) {
                    // ACK may fail if the receipt handle has expired.
                    if (e instanceof AckMessageException) {
                        AckMessageException errors = (AckMessageException) e;
                        System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                        if (errors.getErrorMessages() != null) {
                            for (String errorHandle :errors.getErrorMessages().keySet()) {
                                System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                        + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                            }
                        }
                        continue;
                    }
                    e.printStackTrace();
                }
            }
        } while (true);
    }
}

Consumer parameters

ParameterDescriptionValid values
consumeMessage batch sizeMaximum number of messages returned per requestThe largest value you can set is 16
consumeMessage polling durationHow long the broker holds the request when no messages are available, in secondsThe largest value you can set is 30

Related topics