ApsaraMQ for RocketMQ provides distributed transaction processing similar to eXtended Architecture (X/Open XA) to ensure transaction consistency in ApsaraMQ for RocketMQ. This topic provides C++ sample code for sending and subscribing to transactional messages with the HTTP client SDK.
Background information
The following figure shows the interaction process of transactional messages.

For more information, see Transactional messages.
Prerequisites
Before you start, make sure that the following operations are performed:
Install the SDK for C++. For more information, see Prepare the environment.
Create the resources that you want to specify in the code in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.
Obtain the AccessKey pair of your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Placeholder reference
Replace the following placeholders in the sample code with your actual values:
| Placeholder | Description | Example |
|---|---|---|
${HTTP_ENDPOINT} | HTTP endpoint from the HTTP Endpoint section on the Instance Details page in the ApsaraMQ for RocketMQ console | http://1234567890123456.mqrest.cn-hangzhou.aliyuncs.com |
${TOPIC} | Topic name | TransactionTopic |
${INSTANCE_ID} | Instance ID. If the instance does not have a namespace, set this to an empty string. Check the Instance Details page for namespace information. | MQ_INST_1234567890_BXaaaa |
${GROUP_ID} | Consumer group ID | GID_transaction_test |
Send transactional messages
Both the message publishing and the half-message transaction check run in the same process. The producer spawns a background thread to consume half messages and commit or roll them back based on local transaction logic.
//#include <iostream>
#include <fstream>
#ifdef _WIN32
#include <windows.h>
#include <process.h>
#else
#include "pthread.h"
#endif
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
const int32_t pubMsgCount = 4;
const int32_t halfCheckCount = 3;
void processCommitRollError(AckMessageResponse& bdmResp, const std::string& messageId) {
if (bdmResp.isSuccess()) {
cout << "Commit/Roll Transaction Suc: " << messageId << endl;
return;
}
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "Commit/Roll Transaction ERROR: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
}
#ifdef WIN32
unsigned __stdcall consumeHalfMessageThread(void *arg)
#else
void* consumeHalfMessageThread(void *arg)
#endif
{
MQTransProducerPtr transProducer = *(MQTransProducerPtr*)(arg);
int count = 0;
do {
std::vector<Message> halfMsgs;
try {
// Long polling: if no half message is available, the request hangs
// on the broker for the specified duration before returning.
transProducer->consumeHalfMessage(
1, // Max messages per request (max: 16)
3, // Long polling timeout in seconds (max: 30)
halfMsgs
);
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No half message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
}
if (halfMsgs.size() == 0) {
continue;
}
cout << "Consume Half: " << halfMsgs.size() << " Messages!" << endl;
// Process each half message and decide: commit, rollback, or defer.
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = halfMsgs.begin();
iter != halfMsgs.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
int32_t consumedTimes = iter->getConsumedTimes();
const std::string propA = iter->getProperty("a");
const std::string handle = iter->getReceiptHandle();
AckMessageResponse bdmResp;
if (propA == "1") {
cout << "Commit msg.." << endl;
transProducer->commit(handle, bdmResp);
count++;
} else if(propA == "2") {
if (consumedTimes > 1) {
cout << "Commit msg.." << endl;
transProducer->commit(handle, bdmResp);
count++;
} else {
cout << "Commit Later!!!" << endl;
}
} else if(propA == "3") {
cout << "Rollback msg.." << endl;
transProducer->rollback(handle, bdmResp);
count++;
} else {
transProducer->commit(handle, bdmResp);
cout << "Unkown msg.." << endl;
}
// Commit or rollback must complete before NextConsumeTime.
// Otherwise, the operation fails.
processCommitRollError(bdmResp, iter->getMessageId());
}
} while(count < halfCheckCount);
#ifdef WIN32
return 0;
#else
return NULL;
#endif
}
int main() {
MQClient mqClient(
// HTTP endpoint. Find this in the HTTP Endpoint section
// of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
// Obtain AccessKey credentials from environment variables.
// ALIBABA_CLOUD_ACCESS_KEY_ID: AccessKey ID for authentication
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// ALIBABA_CLOUD_ACCESS_KEY_SECRET: AccessKey secret for authentication
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topic for publishing messages.
// Create this topic in the ApsaraMQ for RocketMQ console.
string topic = "${TOPIC}";
// Instance ID. If the instance has no namespace, set this to an empty string.
// Check the Instance Details page for namespace information.
string instanceId = "${INSTANCE_ID}";
// Consumer group ID. Create this in the ApsaraMQ for RocketMQ console.
string groupId = "${GROUP_ID}";
MQTransProducerPtr transProducer;
if (instanceId == "") {
transProducer = mqClient.getTransProducerRef(topic, groupId);
} else {
transProducer = mqClient.getTransProducerRef(instanceId, topic, groupId);
}
// Start a background thread to handle transaction status checks
// (consume and resolve unacknowledged half messages).
#ifdef WIN32
HANDLE thread;
unsigned int threadId;
thread = (HANDLE)_beginthreadex(NULL, 0, consumeHalfMessageThread, &transProducer, 0, &threadId);
#else
pthread_t thread;
pthread_create(&thread, NULL, consumeHalfMessageThread, static_cast<void *>(&transProducer));
#endif
try {
for (int i = 0; i < pubMsgCount; i++)
{
PublishMessageResponse pmResp;
TopicMessage pubMsg("Hello, mq, trans_msg!");
pubMsg.putProperty("a",std::to_string(i));
pubMsg.setMessageKey("ImKey");
pubMsg.setTransCheckImmunityTime(10);
transProducer->publishMessage(pubMsg, pmResp);
cout << "Publish mq message success. Topic:" << topic
<< ", msgId:" << pmResp.getMessageId()
<< ", bodyMD5:" << pmResp.getMessageBodyMD5()
<< ", Handle:" << pmResp.getReceiptHandle() << endl;
if (i == 0) {
// Immediately commit the first message using its receipt handle.
// Commit or rollback must complete before TransCheckImmunityTime
// elapses; otherwise, the operation fails.
AckMessageResponse bdmResp;
transProducer->commit(pmResp.getReceiptHandle(), bdmResp);
processCommitRollError(bdmResp, pmResp.getMessageId());
}
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
}
#ifdef WIN32
WaitForSingleObject(thread, INFINITE);
CloseHandle(thread);
#else
pthread_join(thread, NULL);
#endif
return 0;
}Transaction check logic
The background thread resolves each half message based on the custom property a:
Property a value | Action | Description |
|---|---|---|
"1" | Commit immediately | Local transaction succeeded on first check. |
"2" | Commit on retry | Defers on the first check; commits when consumedTimes > 1. |
"3" | Rollback | Local transaction failed; discard the message. |
| Other | Commit (default) | Handles unexpected property values. |
Both commit and rollback must complete before the NextConsumeTime deadline. If the deadline passes, the operation fails and the broker re-delivers the half message for another check.
Subscribe to transactional messages
After a transactional message is committed, consumers receive it through standard message consumption. No special handling is required on the subscriber side.
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// The HTTP endpoint, found in the HTTP Endpoint section
// of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
// The AccessKey ID and AccessKey secret for authentication.
// Read from environment variables to avoid hardcoding credentials.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// The topic to consume messages from. Create the topic in the ApsaraMQ for RocketMQ console.
string topic = "${TOPIC}";
// The consumer group ID. Create the consumer group in the ApsaraMQ for RocketMQ console.
string groupId = "${GROUP_ID}";
// The instance ID. If the instance has no namespace, set this to an empty string.
// Check the namespace on the Instance Details page in the console.
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Consume messages in long polling mode.
// If no messages are available, the broker holds the request for the specified
// polling duration and responds immediately when a message arrives.
consumer->consumeMessage(
3, // Batch size: max messages per request. Maximum allowed: 16.
3, // Polling duration in seconds. Maximum allowed: 30.
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// Process each message and collect receipt handles for acknowledgment.
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// Send an ACK to the broker. If the broker does not receive an ACK before
// NextConsumeTime, it redelivers the message. Each receipt handle carries
// a unique timestamp that expires after the NextConsumeTime window.
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// Log failed ACK items. A handle that has expired causes ACK failure,
// and the broker redelivers the corresponding message.
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}What's next
Transactional messages -- Learn about the transactional message lifecycle, states, and retry mechanisms.