ApsaraMQ for RocketMQ uses transactional messages to ensure eventual consistency across distributed transactions, similar to the eXtended Architecture (X/Open XA) protocol. The following Node.js sample code demonstrates how to send and receive transactional messages through the HTTP client SDK.
How transactional messages work
A transactional message goes through a two-phase commit process that ties a local transaction to message delivery. The following steps describe the complete interaction between the producer, the broker, and the consumer:
The producer sends a half message to the broker. The half message is stored but remains invisible to consumers.
The broker persists the half message and returns an acknowledgment (ACK) with a receipt handle.
The producer executes the local transaction (for example, updating a database).
Based on the local transaction result, the producer commits or rolls back the half message using the receipt handle.
If the producer commits the half message, the broker marks it as deliverable. Consumers can now receive it.
If the producer rolls back the half message, the broker discards it. Consumers never see it.
If the broker does not receive a commit or rollback decision (due to a network failure or application restart), it initiates a transaction check after the
TransCheckImmunityTimeinterval elapses.The producer receives the check request and re-evaluates the local transaction status.
The producer sends the commit or rollback decision to the broker. If the decision is still unknown, the broker retries every 10 seconds for up to 24 hours.

For more information, see Transactional messages.
HTTP SDK considerations
The HTTP client SDK uses a pull-based model for transaction checks. Unlike native RocketMQ SDKs where the broker pushes check requests to the producer, the HTTP SDK requires the producer to poll for uncommitted half messages with consumeHalfMessage. Run this polling loop in a dedicated process or thread to ensure reliable transaction resolution.
Prerequisites
Install the Node.js SDK. For more information, see Prepare the environment.
Create the required resources in the ApsaraMQ for RocketMQ console: an instance, a topic, and a consumer group. For more information, see Create resources.
Get an AccessKey pair for your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Key parameters
Before running the sample code, replace the following placeholders with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
${HTTP_ENDPOINT} | HTTP endpoint of your instance | Instance Details page > HTTP Endpoint section in the ApsaraMQ for RocketMQ console |
${TOPIC} | Topic name | Created in the ApsaraMQ for RocketMQ console |
${GROUP_ID} | Consumer group ID | Created in the ApsaraMQ for RocketMQ console |
${INSTANCE_ID} | Instance ID (set to null or "" if the instance has no namespace) | Instance Details page in the ApsaraMQ for RocketMQ console |
AccessKey credentials are read from environment variables:
ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET
Send transactional messages
The producer code has two parts: sending transactional messages and polling for uncommitted half messages.
Transaction producer API
| Method | Purpose |
|---|---|
client.getTransProducer(instanceId, topic, groupId) | Create a transaction producer |
mqTransProducer.publishMessage(body, tag, msgProps) | Send a half message |
mqTransProducer.commit(receiptHandle) | Commit a half message |
mqTransProducer.rollback(receiptHandle) | Roll back a half message |
mqTransProducer.consumeHalfMessage(numOfMessages, waitSeconds) | Poll for uncommitted half messages |
TransCheckImmunityTime
TransCheckImmunityTime controls how long the broker waits before the first transaction check after a half message is sent.
Unit: seconds
Valid values: 10 to 300
Behavior after the first check: If the half message is still uncommitted, the broker checks every 10 seconds for up to 24 hours.
Sample code
The following example sends four transactional messages. The first message is committed immediately after publishing. The remaining three are resolved during the half-message polling loop based on their custom property a:
a=1: committed on the first checka=2: committed after the second check (ConsumedTimes > 1)a=3: rolled back
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
// HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey credentials from environment variables.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topic created in the ApsaraMQ for RocketMQ console.
const topic = "${TOPIC}";
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
const groupId = "${GROUP_ID}";
// Instance ID. Set to null or "" if the instance has no namespace.
const instanceId = "${INSTANCE_ID}";
const mqTransProducer = client.getTransProducer(instanceId, topic, groupId);
async function processTransResult(res, msgId) {
if (!res) {
return;
}
if (res.code != 204) {
// Commit or rollback failed. The receipt handle may have expired
// if TransCheckImmunityTime or the consumeHalfMessage timeout elapsed.
console.log("Commit/Rollback Message Fail:");
const failHandles = res.body.map((error) => {
console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
} else {
console.log("Commit/Rollback Message suc!!! %s", msgId);
}
}
var halfMessageCount = 0;
var halfMessageConsumeCount = 0;
(async function(){
try {
// Send four transactional messages.
for(var i = 0; i < 4; i++) {
let res;
msgProps = new MessageProperties();
// Custom property used by the half-message polling loop
// to decide whether to commit or roll back.
msgProps.putProperty("a", i);
msgProps.messageKey("MessageKey");
// First transaction check delay: 10 seconds. Valid values: 10-300.
// After the first check, the broker rechecks every 10 seconds for up to 24 hours.
msgProps.transCheckImmunityTime(10);
res = await mqTransProducer.publishMessage("hello mq.", "tagA", msgProps);
console.log("Publish message: MessageID:%s,BodyMD5:%s,Handle:%s", res.body.MessageId, res.body.MessageBodyMD5, res.body.ReceiptHandle);
if (res && i == 0) {
// Commit the first message immediately using its receipt handle.
const msgId = res.body.MessageId;
res = await mqTransProducer.commit(res.body.ReceiptHandle);
console.log("Commit msg when publish, %s", msgId);
processTransResult(res, msgId);
}
}
} catch(e) {
// Add retry or persistence logic for failed messages.
console.log(e)
}
})();
// Run a separate loop to poll and resolve uncommitted half messages.
(async function() {
// Poll for half messages, similar to consuming normal messages.
while(halfMessageCount < 3 && halfMessageConsumeCount < 15) {
try {
halfMessageConsumeCount++;
res = await mqTransProducer.consumeHalfMessage(3, 3);
if (res.code == 200) {
console.log("Consume Messages, requestId:%s", res.requestId);
res.body.forEach(async (message) => {
console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
",Props:%j,MessageKey:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
var propA = message.Properties && message.Properties.a ? parseInt(message.Properties.a) : 0;
var opResp;
if (propA == 1 || (propA == 2 && message.ConsumedTimes > 1)) {
opResp = await mqTransProducer.commit(message.ReceiptHandle);
console.log("Commit msg when check half, %s", message.MessageId);
halfMessageCount++;
} else if (propA == 3) {
opResp = await mqTransProducer.rollback(message.ReceiptHandle);
console.log("Rollback msg when check half, %s", message.MessageId);
halfMessageCount++;
}
processTransResult(opResp, message.MessageId);
});
}
} catch(e) {
if (e.Code && e.Code.indexOf("MessageNotExist") > -1) {
// No half messages available. Long polling continues.
console.log("Consume Transaction Half msg: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();How the sample code works
The code runs two concurrent async functions:
Send loop: Publishes four half messages with tag
tagAand a custom propertyaset to the loop index (0-3). The first message (a=0) is committed immediately after publishing.Half-message polling loop: Polls for uncommitted half messages and resolves them based on the
aproperty: The loop exits after resolving 3 half messages or after 15 polling attempts.Property avalueAction Condition 0 Already committed during send N/A 1 Commit First check 2 Commit After retry ( ConsumedTimes > 1)3 Rollback First check
Response codes
| Code | Meaning |
|---|---|
200 | Messages retrieved |
204 | Commit, rollback, or ACK succeeded |
Non-204 | Operation failed (the receipt handle may have expired) |
Receive transactional messages
Consumers receive committed transactional messages the same way as normal messages. Long polling avoids busy-waiting by suspending the request until a message arrives or the timeout elapses.
Consumer API
| Method | Purpose |
|---|---|
client.getConsumer(instanceId, topic, groupId) | Create a consumer |
consumer.consumeMessage(numOfMessages, waitSeconds) | Pull messages using long polling |
consumer.ackMessage(handles) | Acknowledge consumed messages |
Parameters
| Parameter | Description | Maximum value |
|---|---|---|
numOfMessages | Maximum messages to retrieve per request | 16 |
waitSeconds | Long polling duration in seconds | 30 |
Sample code
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
// HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey credentials from environment variables.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topic created in the ApsaraMQ for RocketMQ console.
const topic = "${TOPIC}";
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
const groupId = "${GROUP_ID}";
// Instance ID. Set to null or "" if the instance has no namespace.
const instanceId = "${INSTANCE_ID}";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
// Consume messages in a loop.
while(true) {
try {
// Long polling: if no message is available, the request suspends on the broker
// until a message arrives or the timeout elapses.
res = await consumer.consumeMessage(
3, // Max messages per request (up to 16)
3 // Long polling timeout in seconds (up to 30)
);
if (res.code == 200) {
console.log("Consume Messages, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
",Props:%j,MessageKey:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
return message.ReceiptHandle;
});
// ACK all processed messages. If the broker does not receive an ACK
// before NextConsumeTime, it redelivers the message with a new receipt handle.
res = await consumer.ackMessage(handles);
if (res.code != 204) {
// ACK failed. The receipt handle may have expired.
console.log("Ack Message Fail:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
// No messages available. Long polling continues.
console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();How the consumer works
The consumer runs an infinite loop that:
Pulls up to 3 messages per request with a 3-second long polling timeout.
Processes each message and collects the receipt handles.
Sends an ACK for all processed messages.
If the consumer does not ACK a message before NextConsumeTime, the broker redelivers it. Each redelivery generates a new receipt handle.
When no messages are available, the MessageNotExist error code is returned. This is normal in long polling mode -- the loop continues waiting.
Usage notes
Commit or roll back promptly. Half messages that remain unresolved consume broker storage. Avoid returning an unknown status from your local transaction check unless the transaction is genuinely still in progress.
Handle receipt handle expiration. Receipt handles have a limited validity period. If a commit or rollback fails with a non-204 response, the handle may have expired. The broker re-triggers a transaction check automatically.
Run the half-message polling loop reliably. In production, run the
consumeHalfMessageloop as a dedicated long-running process to ensure unresolved half messages are handled in a timely manner.
What's next
Learn how transactional messages work: Transactional messages
Set up your development environment: Prepare the environment
Create messaging resources: Create resources