ApsaraMQ for RocketMQ uses transactional messages to ensure transactional consistency across distributed services. A transactional message ties message delivery to the outcome of a local business operation: consumers receive the message only after the local transaction commits.
The following Python sample code uses the HTTP client SDK to demonstrate three operations: sending transactional messages, handling transaction status checks, and consuming committed messages.
How it works
Transactional messages follow a two-phase commit protocol similar to X/Open XA.
The producer, broker, and consumer interact as follows:
The producer sends a half message to the broker. A half message is persisted on the broker but marked as temporarily undeliverable to consumers.
The broker acknowledges the half message and returns a receipt handle.
The producer runs the local transaction (for example, a database write).
Based on the local transaction result, the producer sends a second-phase confirmation to either commit (deliver to consumers) or rollback (discard the message).
If the broker does not receive the second-phase confirmation -- due to a network failure or producer restart -- it initiates a transaction status check by querying the producer.
The producer re-evaluates the local transaction state and responds with commit, rollback, or unknown.
The broker repeats the check every 10 seconds for up to 24 hours.

For more information, see Transactional messages.
Prerequisites
Before you begin, make sure that you have:
The Python HTTP client SDK installed -- see Prepare the environment
An ApsaraMQ for RocketMQ instance, a topic, and a consumer group -- see Create resources
An AccessKey pair for your Alibaba Cloud account -- see Create an AccessKey pair
Key concepts
| Term | Description |
|---|---|
| Half message | A message sent to the broker but marked as temporarily undeliverable. It becomes visible to consumers only after the producer commits the transaction. |
| Transaction status check | When the broker does not receive a commit or rollback confirmation, it proactively queries the producer for the transaction outcome. |
TransCheckImmunityTime | The delay (in seconds) before the broker initiates the first status check after a half message is sent. Valid values: 10 to 300. |
| Receipt handle | A unique token assigned each time a message is consumed. Used to commit, roll back, or acknowledge messages. |
Send transactional messages
Sending a transactional message involves three parts:
Initialize the client and publish half messages -- The producer sends messages that the broker holds as half messages.
Handle transaction status checks -- A background thread polls for unresolved half messages and commits or rolls back each one based on the local transaction result.
Commit immediately after publishing -- Optionally, commit a half message right after publishing it, using the receipt handle from the publish response.
The following code demonstrates all three parts. Replace the placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-http-endpoint> | HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console | http://xxx.mqrest.cn-hangzhou.aliyuncs.com |
<your-topic> | Topic created in the ApsaraMQ for RocketMQ console | TransTopic |
<your-group-id> | Consumer group ID created in the ApsaraMQ for RocketMQ console | GID_Trans |
<your-instance-id> | Instance ID. If the instance has a namespace, specify the ID. If not, use an empty string "". Check the Instance Details page for the namespace setting. | MQ_INST_xxx |
#!/usr/bin/env python
# coding=utf8
import sys
import os
import time
import threading
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
# ---------------------------------------------------------------------------
# 1. Initialize the client
# ---------------------------------------------------------------------------
# Get the HTTP endpoint from the Instance Details page in the
# ApsaraMQ for RocketMQ console.
mq_client = MQClient(
"<your-http-endpoint>",
# Retrieve credentials from environment variables to avoid
# hardcoding sensitive information.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"
# Number of transactional messages to send in this example.
msg_count = 4
# ---------------------------------------------------------------------------
# 2. Define error handling for commit/rollback failures
# ---------------------------------------------------------------------------
def process_trans_error(exp):
"""Handle commit or rollback failures.
A failure occurs when the operation takes longer than the
TransCheckImmunityTime value or the consume_half_message timeout
(10 seconds in this example).
"""
print("\nCommit/Roll Transaction Message Fail! Exception:%s" % exp)
if exp.sub_errors:
for sub_error in exp.sub_errors:
print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" %
(sub_error["ReceiptHandle"], sub_error["ErrorCode"],
sub_error["ErrorMessage"]))
# ---------------------------------------------------------------------------
# 3. Set up the transaction status checker (background thread)
# ---------------------------------------------------------------------------
# The broker queries this checker when it has not received a commit or
# rollback for a half message. Run it in a separate thread so it can
# process checks while the producer continues sending messages.
class TransactionCheckThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.count = 0
# Create a separate client for the checker thread.
self.mq_client = MQClient(
"<your-http-endpoint>",
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
self.trans_producer = self.mq_client.get_trans_producer(
instance_id, topic_name, group_id
)
def run(self):
while True:
if self.count == 3:
break
try:
# Poll for half messages awaiting a transaction decision.
# Arguments: batch size = 1, wait timeout = 3 seconds.
half_msgs = self.trans_producer.consume_half_message(1, 3)
for half_msg in half_msgs:
print("Receive Half Message, MessageId: %s\n"
"MessageBodyMD5: %s\n"
"MessageTag: %s\n"
"ConsumedTimes: %s\n"
"PublishTime: %s\n"
"Body: %s\n"
"NextConsumeTime: %s\n"
"ReceiptHandle: %s\n"
"Properties: %s" %
(half_msg.message_id,
half_msg.message_body_md5,
half_msg.message_tag,
half_msg.consumed_times,
half_msg.publish_time,
half_msg.message_body,
half_msg.next_consume_time,
half_msg.receipt_handle,
half_msg.properties))
# Decide whether to commit or roll back based on local
# transaction results. In this example, the decision
# depends on the custom property "a":
# a == 1 -> commit immediately
# a == 2 -> commit on the second check (simulates
# delayed local transaction completion)
# a == 3 -> rollback (local transaction failed)
# other -> unknown (check again later)
a = int(half_msg.get_property("a"))
try:
if a == 1:
self.trans_producer.commit(half_msg.receipt_handle)
self.count += 1
print("------>commit")
elif a == 2 and half_msg.consumed_times > 1:
self.trans_producer.commit(half_msg.receipt_handle)
self.count += 1
print("------>commit")
elif a == 3:
self.trans_producer.rollback(half_msg.receipt_handle)
self.count += 1
print("------>rollback")
else:
print("------>unknown")
except MQExceptionBase as rec_commit_roll_e:
process_trans_error(rec_commit_roll_e)
except MQExceptionBase as half_e:
if half_e.type == "MessageNotExist":
print("No half message! RequestId: %s" % half_e.req_id)
continue
print("Consume half message Fail! Exception:%s\n" % half_e)
break
# ---------------------------------------------------------------------------
# 4. Start the checker thread and send transactional messages
# ---------------------------------------------------------------------------
check_thread = TransactionCheckThread()
check_thread.setDaemon(True)
check_thread.start()
try:
trans_producer = mq_client.get_trans_producer(
instance_id, topic_name, group_id
)
for i in range(msg_count):
msg = TopicMessage(
"I am test message %s." % i, # Message body
"tagA" # Message tag
)
# Set a custom property used by the checker to simulate
# different transaction outcomes.
msg.put_property("a", i)
msg.set_message_key("MessageKey")
# TransCheckImmunityTime: delay (in seconds) before the first
# transaction status check. Valid values: 10 to 300.
# After the first check, the broker retries every 10 seconds
# for up to 24 hours.
msg.set_trans_check_immunity_time(10)
re_msg = trans_producer.publish_message(msg)
print("Publish Transaction Message Succeed. MessageID:%s, "
"BodyMD5:%s, Handle:%s" %
(re_msg.message_id, re_msg.message_body_md5,
re_msg.receipt_handle))
time.sleep(1)
# For the first message (i == 0), commit immediately using
# the receipt handle returned by publish_message.
if i == 0:
try:
trans_producer.commit(re_msg.receipt_handle)
except MQExceptionBase as pub_commit_roll_e:
process_trans_error(pub_commit_roll_e)
except MQExceptionBase as pub_e:
if pub_e.type == "TopicNotExist":
print("Topic not exist, please create it.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % pub_e)
# Wait for the checker thread to finish processing.
while True:
if not check_thread.is_alive():
break
time.sleep(1)What the example does
The producer sends four transactional messages with the custom property a set to 0, 1, 2, and 3. Each value triggers a different transaction outcome:
Property a | Checker behavior | Outcome |
|---|---|---|
| 0 | The producer commits immediately after publishing (bypasses the checker). | Message delivered to consumers. |
| 1 | The checker commits on the first status check. | Message delivered to consumers. |
| 2 | The checker returns unknown on the first check and commits on the second. | Message delivered after a delay. |
| 3 | The checker rolls back. | Message discarded, never delivered. |
Subscribe to transactional messages
Committed transactional messages are consumed the same way as normal messages. The consumer receives only messages whose transactions have been committed.
The following code uses long polling to fetch messages in batches and acknowledge them.
import os
import time
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Initialize the consumer client.
mq_client = MQClient(
"<your-http-endpoint>",
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# Long polling timeout in seconds. Maximum: 30.
wait_seconds = 3
# Maximum messages per batch. Maximum: 16.
batch = 3
print("Consume Message From Topic: %s, Consumer: %s, WaitSeconds: %s\n" %
(topic_name, group_id, wait_seconds))
while True:
try:
# Long polling: if no message is available, the request is held
# on the broker for up to wait_seconds before returning.
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print("Receive, MessageId: %s\n"
"MessageBodyMD5: %s\n"
"MessageTag: %s\n"
"ConsumedTimes: %s\n"
"PublishTime: %s\n"
"Body: %s\n"
"NextConsumeTime: %s\n"
"ReceiptHandle: %s\n"
"Properties: %s\n" %
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle,
msg.properties))
except MQExceptionBase as e:
if e.type == "MessageNotExist":
print("No new message! RequestId: %s" % e.req_id)
continue
print("Consume Message Fail! Exception:%s\n" % e)
time.sleep(2)
continue
# Acknowledge consumed messages. If the broker does not receive an
# ACK before msg.next_consume_time, it redelivers the message.
# Each delivery assigns a new receipt handle.
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print("Ack %s Message Succeed.\n\n" % len(receipt_handle_list))
except MQExceptionBase as e:
print("\nAck Message Fail! Exception:%s" % e)
if e.sub_errors:
for sub_error in e.sub_errors:
print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" %
(sub_error["ReceiptHandle"],
sub_error["ErrorCode"],
sub_error["ErrorMessage"]))Usage notes
TransCheckImmunityTimerange: Valid values are 10 to 300 seconds.Status check retry: After the first check, the broker queries the producer every 10 seconds for up to 24 hours.
Separate client for the checker thread: The transaction status checker must use its own
MQClientinstance. Sharing a client between the producer thread and the checker thread causes concurrency issues.ACK timeout: If the consumer does not acknowledge a message before the
next_consume_timedeadline, the broker redelivers it.
What's next
Transactional messages -- Understand the transactional message model and lifecycle.
Prepare the environment -- Install and configure the Python HTTP client SDK.
Create resources -- Set up instances, topics, and consumer groups.