All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive transactional messages

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ uses transactional messages to ensure transactional consistency across distributed services. A transactional message ties message delivery to the outcome of a local business operation: consumers receive the message only after the local transaction commits.

The following Python sample code uses the HTTP client SDK to demonstrate three operations: sending transactional messages, handling transaction status checks, and consuming committed messages.

How it works

Transactional messages follow a two-phase commit protocol similar to X/Open XA.

The producer, broker, and consumer interact as follows:

  1. The producer sends a half message to the broker. A half message is persisted on the broker but marked as temporarily undeliverable to consumers.

  2. The broker acknowledges the half message and returns a receipt handle.

  3. The producer runs the local transaction (for example, a database write).

  4. Based on the local transaction result, the producer sends a second-phase confirmation to either commit (deliver to consumers) or rollback (discard the message).

  5. If the broker does not receive the second-phase confirmation -- due to a network failure or producer restart -- it initiates a transaction status check by querying the producer.

  6. The producer re-evaluates the local transaction state and responds with commit, rollback, or unknown.

  7. The broker repeats the check every 10 seconds for up to 24 hours.

Transactional message interaction flow

For more information, see Transactional messages.

Prerequisites

Before you begin, make sure that you have:

Key concepts

TermDescription
Half messageA message sent to the broker but marked as temporarily undeliverable. It becomes visible to consumers only after the producer commits the transaction.
Transaction status checkWhen the broker does not receive a commit or rollback confirmation, it proactively queries the producer for the transaction outcome.
TransCheckImmunityTimeThe delay (in seconds) before the broker initiates the first status check after a half message is sent. Valid values: 10 to 300.
Receipt handleA unique token assigned each time a message is consumed. Used to commit, roll back, or acknowledge messages.

Send transactional messages

Sending a transactional message involves three parts:

  1. Initialize the client and publish half messages -- The producer sends messages that the broker holds as half messages.

  2. Handle transaction status checks -- A background thread polls for unresolved half messages and commits or rolls back each one based on the local transaction result.

  3. Commit immediately after publishing -- Optionally, commit a half message right after publishing it, using the receipt handle from the publish response.

The following code demonstrates all three parts. Replace the placeholders with your actual values:

PlaceholderDescriptionExample
<your-http-endpoint>HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ consolehttp://xxx.mqrest.cn-hangzhou.aliyuncs.com
<your-topic>Topic created in the ApsaraMQ for RocketMQ consoleTransTopic
<your-group-id>Consumer group ID created in the ApsaraMQ for RocketMQ consoleGID_Trans
<your-instance-id>Instance ID. If the instance has a namespace, specify the ID. If not, use an empty string "". Check the Instance Details page for the namespace setting.MQ_INST_xxx
#!/usr/bin/env python
# coding=utf8
import sys
import os
import time
import threading

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *

# ---------------------------------------------------------------------------
# 1. Initialize the client
# ---------------------------------------------------------------------------
# Get the HTTP endpoint from the Instance Details page in the
# ApsaraMQ for RocketMQ console.
mq_client = MQClient(
    "<your-http-endpoint>",
    # Retrieve credentials from environment variables to avoid
    # hardcoding sensitive information.
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)

topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"

# Number of transactional messages to send in this example.
msg_count = 4


# ---------------------------------------------------------------------------
# 2. Define error handling for commit/rollback failures
# ---------------------------------------------------------------------------
def process_trans_error(exp):
    """Handle commit or rollback failures.

    A failure occurs when the operation takes longer than the
    TransCheckImmunityTime value or the consume_half_message timeout
    (10 seconds in this example).
    """
    print("\nCommit/Roll Transaction Message Fail! Exception:%s" % exp)
    if exp.sub_errors:
        for sub_error in exp.sub_errors:
            print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" %
                  (sub_error["ReceiptHandle"], sub_error["ErrorCode"],
                   sub_error["ErrorMessage"]))


# ---------------------------------------------------------------------------
# 3. Set up the transaction status checker (background thread)
# ---------------------------------------------------------------------------
# The broker queries this checker when it has not received a commit or
# rollback for a half message. Run it in a separate thread so it can
# process checks while the producer continues sending messages.
class TransactionCheckThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.count = 0
        # Create a separate client for the checker thread.
        self.mq_client = MQClient(
            "<your-http-endpoint>",
            os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        self.trans_producer = self.mq_client.get_trans_producer(
            instance_id, topic_name, group_id
        )

    def run(self):
        while True:
            if self.count == 3:
                break
            try:
                # Poll for half messages awaiting a transaction decision.
                # Arguments: batch size = 1, wait timeout = 3 seconds.
                half_msgs = self.trans_producer.consume_half_message(1, 3)
                for half_msg in half_msgs:
                    print("Receive Half Message, MessageId: %s\n"
                          "MessageBodyMD5: %s\n"
                          "MessageTag: %s\n"
                          "ConsumedTimes: %s\n"
                          "PublishTime: %s\n"
                          "Body: %s\n"
                          "NextConsumeTime: %s\n"
                          "ReceiptHandle: %s\n"
                          "Properties: %s" %
                          (half_msg.message_id,
                           half_msg.message_body_md5,
                           half_msg.message_tag,
                           half_msg.consumed_times,
                           half_msg.publish_time,
                           half_msg.message_body,
                           half_msg.next_consume_time,
                           half_msg.receipt_handle,
                           half_msg.properties))

                # Decide whether to commit or roll back based on local
                # transaction results. In this example, the decision
                # depends on the custom property "a":
                #   a == 1 -> commit immediately
                #   a == 2 -> commit on the second check (simulates
                #             delayed local transaction completion)
                #   a == 3 -> rollback (local transaction failed)
                #   other  -> unknown (check again later)
                a = int(half_msg.get_property("a"))
                try:
                    if a == 1:
                        self.trans_producer.commit(half_msg.receipt_handle)
                        self.count += 1
                        print("------>commit")
                    elif a == 2 and half_msg.consumed_times > 1:
                        self.trans_producer.commit(half_msg.receipt_handle)
                        self.count += 1
                        print("------>commit")
                    elif a == 3:
                        self.trans_producer.rollback(half_msg.receipt_handle)
                        self.count += 1
                        print("------>rollback")
                    else:
                        print("------>unknown")
                except MQExceptionBase as rec_commit_roll_e:
                    process_trans_error(rec_commit_roll_e)
            except MQExceptionBase as half_e:
                if half_e.type == "MessageNotExist":
                    print("No half message! RequestId: %s" % half_e.req_id)
                    continue
                print("Consume half message Fail! Exception:%s\n" % half_e)
                break


# ---------------------------------------------------------------------------
# 4. Start the checker thread and send transactional messages
# ---------------------------------------------------------------------------
check_thread = TransactionCheckThread()
check_thread.setDaemon(True)
check_thread.start()

try:
    trans_producer = mq_client.get_trans_producer(
        instance_id, topic_name, group_id
    )
    for i in range(msg_count):
        msg = TopicMessage(
            "I am test message %s." % i,   # Message body
            "tagA"                          # Message tag
        )
        # Set a custom property used by the checker to simulate
        # different transaction outcomes.
        msg.put_property("a", i)
        msg.set_message_key("MessageKey")

        # TransCheckImmunityTime: delay (in seconds) before the first
        # transaction status check. Valid values: 10 to 300.
        # After the first check, the broker retries every 10 seconds
        # for up to 24 hours.
        msg.set_trans_check_immunity_time(10)

        re_msg = trans_producer.publish_message(msg)
        print("Publish Transaction Message Succeed. MessageID:%s, "
              "BodyMD5:%s, Handle:%s" %
              (re_msg.message_id, re_msg.message_body_md5,
               re_msg.receipt_handle))
        time.sleep(1)

        # For the first message (i == 0), commit immediately using
        # the receipt handle returned by publish_message.
        if i == 0:
            try:
                trans_producer.commit(re_msg.receipt_handle)
            except MQExceptionBase as pub_commit_roll_e:
                process_trans_error(pub_commit_roll_e)

except MQExceptionBase as pub_e:
    if pub_e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % pub_e)

# Wait for the checker thread to finish processing.
while True:
    if not check_thread.is_alive():
        break
    time.sleep(1)

What the example does

The producer sends four transactional messages with the custom property a set to 0, 1, 2, and 3. Each value triggers a different transaction outcome:

Property aChecker behaviorOutcome
0The producer commits immediately after publishing (bypasses the checker).Message delivered to consumers.
1The checker commits on the first status check.Message delivered to consumers.
2The checker returns unknown on the first check and commits on the second.Message delivered after a delay.
3The checker rolls back.Message discarded, never delivered.

Subscribe to transactional messages

Committed transactional messages are consumed the same way as normal messages. The consumer receives only messages whose transactions have been committed.

The following code uses long polling to fetch messages in batches and acknowledge them.

import os
import time

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# Initialize the consumer client.
mq_client = MQClient(
    "<your-http-endpoint>",
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)

topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# Long polling timeout in seconds. Maximum: 30.
wait_seconds = 3
# Maximum messages per batch. Maximum: 16.
batch = 3

print("Consume Message From Topic: %s, Consumer: %s, WaitSeconds: %s\n" %
      (topic_name, group_id, wait_seconds))

while True:
    try:
        # Long polling: if no message is available, the request is held
        # on the broker for up to wait_seconds before returning.
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print("Receive, MessageId: %s\n"
                  "MessageBodyMD5: %s\n"
                  "MessageTag: %s\n"
                  "ConsumedTimes: %s\n"
                  "PublishTime: %s\n"
                  "Body: %s\n"
                  "NextConsumeTime: %s\n"
                  "ReceiptHandle: %s\n"
                  "Properties: %s\n" %
                  (msg.message_id, msg.message_body_md5,
                   msg.message_tag, msg.consumed_times,
                   msg.publish_time, msg.message_body,
                   msg.next_consume_time, msg.receipt_handle,
                   msg.properties))
    except MQExceptionBase as e:
        if e.type == "MessageNotExist":
            print("No new message! RequestId: %s" % e.req_id)
            continue
        print("Consume Message Fail! Exception:%s\n" % e)
        time.sleep(2)
        continue

    # Acknowledge consumed messages. If the broker does not receive an
    # ACK before msg.next_consume_time, it redelivers the message.
    # Each delivery assigns a new receipt handle.
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print("Ack %s Message Succeed.\n\n" % len(receipt_handle_list))
    except MQExceptionBase as e:
        print("\nAck Message Fail! Exception:%s" % e)
        if e.sub_errors:
            for sub_error in e.sub_errors:
                print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" %
                      (sub_error["ReceiptHandle"],
                       sub_error["ErrorCode"],
                       sub_error["ErrorMessage"]))

Usage notes

  • TransCheckImmunityTime range: Valid values are 10 to 300 seconds.

  • Status check retry: After the first check, the broker queries the producer every 10 seconds for up to 24 hours.

  • Separate client for the checker thread: The transaction status checker must use its own MQClient instance. Sharing a client between the producer thread and the checker thread causes concurrency issues.

  • ACK timeout: If the consumer does not acknowledge a message before the next_consume_time deadline, the broker redelivers it.

What's next