All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive transactional messages

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ uses transactional messages to ensure eventual consistency across distributed transactions, similar to the eXtended Architecture (X/Open XA) protocol. The following Node.js sample code demonstrates how to send and receive transactional messages through the HTTP client SDK.

How transactional messages work

A transactional message goes through a two-phase commit process that ties a local transaction to message delivery. The following steps describe the complete interaction between the producer, the broker, and the consumer:

  1. The producer sends a half message to the broker. The half message is stored but remains invisible to consumers.

  2. The broker persists the half message and returns an acknowledgment (ACK) with a receipt handle.

  3. The producer executes the local transaction (for example, updating a database).

  4. Based on the local transaction result, the producer commits or rolls back the half message using the receipt handle.

  5. If the producer commits the half message, the broker marks it as deliverable. Consumers can now receive it.

  6. If the producer rolls back the half message, the broker discards it. Consumers never see it.

  7. If the broker does not receive a commit or rollback decision (due to a network failure or application restart), it initiates a transaction check after the TransCheckImmunityTime interval elapses.

  8. The producer receives the check request and re-evaluates the local transaction status.

  9. The producer sends the commit or rollback decision to the broker. If the decision is still unknown, the broker retries every 10 seconds for up to 24 hours.

图片1.png

For more information, see Transactional messages.

HTTP SDK considerations

The HTTP client SDK uses a pull-based model for transaction checks. Unlike native RocketMQ SDKs where the broker pushes check requests to the producer, the HTTP SDK requires the producer to poll for uncommitted half messages with consumeHalfMessage. Run this polling loop in a dedicated process or thread to ensure reliable transaction resolution.

Prerequisites

  • Install the Node.js SDK. For more information, see Prepare the environment.

  • Create the required resources in the ApsaraMQ for RocketMQ console: an instance, a topic, and a consumer group. For more information, see Create resources.

  • Get an AccessKey pair for your Alibaba Cloud account. For more information, see Create an AccessKey pair.

Key parameters

Before running the sample code, replace the following placeholders with your actual values:

PlaceholderDescriptionWhere to find it
${HTTP_ENDPOINT}HTTP endpoint of your instanceInstance Details page > HTTP Endpoint section in the ApsaraMQ for RocketMQ console
${TOPIC}Topic nameCreated in the ApsaraMQ for RocketMQ console
${GROUP_ID}Consumer group IDCreated in the ApsaraMQ for RocketMQ console
${INSTANCE_ID}Instance ID (set to null or "" if the instance has no namespace)Instance Details page in the ApsaraMQ for RocketMQ console

AccessKey credentials are read from environment variables:

  • ALIBABA_CLOUD_ACCESS_KEY_ID

  • ALIBABA_CLOUD_ACCESS_KEY_SECRET

Send transactional messages

The producer code has two parts: sending transactional messages and polling for uncommitted half messages.

Transaction producer API

MethodPurpose
client.getTransProducer(instanceId, topic, groupId)Create a transaction producer
mqTransProducer.publishMessage(body, tag, msgProps)Send a half message
mqTransProducer.commit(receiptHandle)Commit a half message
mqTransProducer.rollback(receiptHandle)Roll back a half message
mqTransProducer.consumeHalfMessage(numOfMessages, waitSeconds)Poll for uncommitted half messages

TransCheckImmunityTime

TransCheckImmunityTime controls how long the broker waits before the first transaction check after a half message is sent.

  • Unit: seconds

  • Valid values: 10 to 300

  • Behavior after the first check: If the half message is still uncommitted, the broker checks every 10 seconds for up to 24 hours.

Sample code

The following example sends four transactional messages. The first message is committed immediately after publishing. The remaining three are resolved during the half-message polling loop based on their custom property a:

  • a=1: committed on the first check

  • a=2: committed after the second check (ConsumedTimes > 1)

  • a=3: rolled back

const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

// HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey credentials from environment variables.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// Topic created in the ApsaraMQ for RocketMQ console.
const topic = "${TOPIC}";
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
const groupId = "${GROUP_ID}";
// Instance ID. Set to null or "" if the instance has no namespace.
const instanceId = "${INSTANCE_ID}";

const mqTransProducer = client.getTransProducer(instanceId, topic, groupId);

async function processTransResult(res, msgId) {
    if (!res) {
        return;
    }
    if (res.code != 204) {
      // Commit or rollback failed. The receipt handle may have expired
      // if TransCheckImmunityTime or the consumeHalfMessage timeout elapsed.
      console.log("Commit/Rollback Message Fail:");
      const failHandles = res.body.map((error) => {
        console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
        return error.ReceiptHandle;
      });
    } else {
      console.log("Commit/Rollback Message suc!!! %s", msgId);
    }
}

var halfMessageCount = 0;
var halfMessageConsumeCount = 0;

(async function(){
  try {
    // Send four transactional messages.
    for(var i = 0; i < 4; i++) {
      let res;
      msgProps = new MessageProperties();
      // Custom property used by the half-message polling loop
      // to decide whether to commit or roll back.
      msgProps.putProperty("a", i);
      msgProps.messageKey("MessageKey");
      // First transaction check delay: 10 seconds. Valid values: 10-300.
      // After the first check, the broker rechecks every 10 seconds for up to 24 hours.
      msgProps.transCheckImmunityTime(10);
      res = await mqTransProducer.publishMessage("hello mq.", "tagA", msgProps);
      console.log("Publish message: MessageID:%s,BodyMD5:%s,Handle:%s", res.body.MessageId, res.body.MessageBodyMD5, res.body.ReceiptHandle);
      if (res && i == 0) {
          // Commit the first message immediately using its receipt handle.
          const msgId = res.body.MessageId;
          res = await mqTransProducer.commit(res.body.ReceiptHandle);
          console.log("Commit msg when publish, %s", msgId);
        processTransResult(res, msgId);
      }
    }
  } catch(e) {
    // Add retry or persistence logic for failed messages.
    console.log(e)
  }
})();

// Run a separate loop to poll and resolve uncommitted half messages.
(async function() {
  // Poll for half messages, similar to consuming normal messages.
  while(halfMessageCount < 3 && halfMessageConsumeCount < 15) {
    try {
        halfMessageConsumeCount++;
      res = await mqTransProducer.consumeHalfMessage(3, 3);
      if (res.code == 200) {
        console.log("Consume Messages, requestId:%s", res.requestId);
        res.body.forEach(async (message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
            ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
              message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);

          var propA = message.Properties && message.Properties.a ? parseInt(message.Properties.a) : 0;
                  var opResp;
                  if (propA == 1 || (propA == 2 && message.ConsumedTimes > 1)) {
                      opResp = await mqTransProducer.commit(message.ReceiptHandle);
                      console.log("Commit msg when check half, %s", message.MessageId);
                      halfMessageCount++;
                  } else if (propA == 3) {
                      opResp = await mqTransProducer.rollback(message.ReceiptHandle);
                      console.log("Rollback msg when check half, %s", message.MessageId);
                      halfMessageCount++;
                  }
                  processTransResult(opResp, message.MessageId);
        });
      }
    } catch(e) {
      if (e.Code && e.Code.indexOf("MessageNotExist") > -1) {
        // No half messages available. Long polling continues.
        console.log("Consume Transaction Half msg: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();

How the sample code works

The code runs two concurrent async functions:

  1. Send loop: Publishes four half messages with tag tagA and a custom property a set to the loop index (0-3). The first message (a=0) is committed immediately after publishing.

  2. Half-message polling loop: Polls for uncommitted half messages and resolves them based on the a property: The loop exits after resolving 3 half messages or after 15 polling attempts.

    Property a valueActionCondition
    0Already committed during sendN/A
    1CommitFirst check
    2CommitAfter retry (ConsumedTimes > 1)
    3RollbackFirst check

Response codes

CodeMeaning
200Messages retrieved
204Commit, rollback, or ACK succeeded
Non-204Operation failed (the receipt handle may have expired)

Receive transactional messages

Consumers receive committed transactional messages the same way as normal messages. Long polling avoids busy-waiting by suspending the request until a message arrives or the timeout elapses.

Consumer API

MethodPurpose
client.getConsumer(instanceId, topic, groupId)Create a consumer
consumer.consumeMessage(numOfMessages, waitSeconds)Pull messages using long polling
consumer.ackMessage(handles)Acknowledge consumed messages

Parameters

ParameterDescriptionMaximum value
numOfMessagesMaximum messages to retrieve per request16
waitSecondsLong polling duration in seconds30

Sample code

const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

// HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey credentials from environment variables.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// Topic created in the ApsaraMQ for RocketMQ console.
const topic = "${TOPIC}";
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
const groupId = "${GROUP_ID}";
// Instance ID. Set to null or "" if the instance has no namespace.
const instanceId = "${INSTANCE_ID}";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  // Consume messages in a loop.
  while(true) {
    try {
      // Long polling: if no message is available, the request suspends on the broker
      // until a message arrives or the timeout elapses.
      res = await consumer.consumeMessage(
          3, // Max messages per request (up to 16)
          3  // Long polling timeout in seconds (up to 30)
          );

      if (res.code == 200) {
        console.log("Consume Messages, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
            ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
              message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
          return message.ReceiptHandle;
        });

        // ACK all processed messages. If the broker does not receive an ACK
        // before NextConsumeTime, it redelivers the message with a new receipt handle.
        res = await consumer.ackMessage(handles);
        if (res.code != 204) {
          // ACK failed. The receipt handle may have expired.
          console.log("Ack Message Fail:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        // No messages available. Long polling continues.
        console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();

How the consumer works

The consumer runs an infinite loop that:

  1. Pulls up to 3 messages per request with a 3-second long polling timeout.

  2. Processes each message and collects the receipt handles.

  3. Sends an ACK for all processed messages.

If the consumer does not ACK a message before NextConsumeTime, the broker redelivers it. Each redelivery generates a new receipt handle.

When no messages are available, the MessageNotExist error code is returned. This is normal in long polling mode -- the loop continues waiting.

Usage notes

  • Commit or roll back promptly. Half messages that remain unresolved consume broker storage. Avoid returning an unknown status from your local transaction check unless the transaction is genuinely still in progress.

  • Handle receipt handle expiration. Receipt handles have a limited validity period. If a commit or rollback fails with a non-204 response, the handle may have expired. The broker re-triggers a transaction check automatically.

  • Run the half-message polling loop reliably. In production, run the consumeHalfMessage loop as a dedicated long-running process to ensure unresolved half messages are handled in a timely manner.

What's next