All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive transactional messages

Last Updated:Mar 11, 2026

In distributed systems, a local database operation and a downstream message delivery must either both succeed or both fail. Standard messages cannot guarantee this atomicity -- a message might be delivered even if the local transaction rolls back. ApsaraMQ for RocketMQ provides a distributed transaction processing feature similar to eXtended Architecture (X/Open XA) to solve this problem. It uses transactional messages to ensure eventual consistency: the broker holds each message in a half-committed state and delivers it only after the producer explicitly commits.

This topic provides Go code examples for sending and receiving transactional messages with the HTTP client SDK.

How it works

A transactional message goes through the following lifecycle:

图片1.png
  1. The producer sends a half message to the broker.

  2. The broker persists the half message and acknowledges receipt.

  3. The producer executes the local transaction.

  4. Based on the local transaction result, the producer commits or rolls back the half message.

  5. If the broker receives neither a commit nor a rollback within the TransCheckImmunityTime interval, it initiates a back-check. The broker repeats this check every 10 seconds for up to 24 hours.

  6. Once committed, the broker delivers the message to consumers.

For more details about the transactional message model, see Transactional messages.

Prerequisites

  • The Go HTTP client SDK installed. For details, see Prepare the environment.

  • An instance, a topic, and a consumer group created in the ApsaraMQ for RocketMQ console. For details, see Create resources.

  • The following environment variables configured with your Alibaba Cloud credentials: For details on obtaining an AccessKey pair, see Create an AccessKey pair.

    • ALIBABA_CLOUD_ACCESS_KEY_ID -- Your AccessKey ID

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET -- Your AccessKey secret

Usage notes

ConstraintDetails
Topic type exclusivityEach topic supports only one message type. A topic created for normal messages cannot send or receive transactional messages.
TransCheckImmunityTimeControls the delay before the first back-check. Valid values: 10 to 300 seconds. After the first back-check, the broker re-checks every 10 seconds for up to 24 hours.
In-progress transactionsIf a local transaction is still running during a back-check, return an unknown status instead of committing or rolling back prematurely. Set TransCheckImmunityTime to a value that covers the expected local transaction duration to reduce unnecessary checks.

Send transactional messages

Sending a transactional message involves three steps:

  1. Publish a half message -- Send the message to the broker in a half-committed state.

  2. Commit or roll back -- Finalize the transaction based on local business logic.

  3. Handle back-checks -- Respond to broker-initiated status queries for uncommitted half messages.

The following example sends four transactional messages. The first message is committed immediately after publishing. The remaining three are resolved during back-checks based on the message property a:

  • a=1: Commit on the first back-check.

  • a=2: Commit after the second back-check (ConsumedTimes > 1).

  • a=3: Roll back.

Replace the following placeholders with your actual values:

PlaceholderDescription
${HTTP_ENDPOINT}HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console
${TOPIC}Topic name
${INSTANCE_ID}Instance ID. Set to an empty string if the instance has no namespace.
${GROUP_ID}Consumer group ID
package main

import (
    "fmt"
    "os"
    "strconv"
    "strings"
    "time"

    "github.com/aliyunmq/mq-http-go-sdk"
    "github.com/gogap/errors"
)

var loopCount = 0

// ProcessError logs acknowledgment errors returned by commit or rollback operations.
func ProcessError(err error) {
    // If a commit or rollback is attempted after the receipt handle expires
    // (either TransCheckImmunityTime or the consumeHalfMessage timeout),
    // the operation fails. This example uses a 10-second timeout for consumeHalfMessage.
    if err == nil {
        return
    }
    fmt.Println(err)
    for _, errAckItem := range err.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
        fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
            errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
    }
}

// ConsumeHalfMsg processes uncommitted half messages through back-checks.
// Based on the message property "a", it commits, rolls back, or defers the decision.
func ConsumeHalfMsg(mqTransProducer *mq_http_sdk.MQTransProducer) {
    for {
        if loopCount >= 10 {
            return
        }
        loopCount++
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    var handles []string
                    fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n\tBody: %s\n"+
                            "\tProperties:%s, Key:%s, Timer:%d, Trans:%d\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
                            v.Properties, v.MessageKey, v.StartDeliverTime, v.TransCheckImmunityTime)

                        a, _ := strconv.Atoi(v.Properties["a"])
                        var comRollErr error
                        if a == 1 {
                            // Property a=1: commit the transaction.
                            comRollErr = (*mqTransProducer).Commit(v.ReceiptHandle)
                            fmt.Println("Commit---------->")
                        } else if a == 2 && v.ConsumedTimes > 1 {
                            // Property a=2: commit only after a second back-check.
                            comRollErr = (*mqTransProducer).Commit(v.ReceiptHandle)
                            fmt.Println("Commit---------->")
                        } else if a == 3 {
                            // Property a=3: roll back the transaction.
                            comRollErr = (*mqTransProducer).Rollback(v.ReceiptHandle)
                            fmt.Println("Rollback---------->")
                        } else {
                            // Unknown state: defer to the next back-check.
                            fmt.Println("Unknown---------->")
                        }
                        ProcessError(comRollErr)
                    }
                    endChan <- 1
                }
            case err := <-errChan:
                {
                    if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                        fmt.Println("\nNo new message, continue!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout of consumer message ??")
                    return
                }
            }
        }()

        // Poll for uncommitted half messages using long polling.
        // If no half message is available, the request is held on the broker
        // for up to the specified polling duration before returning.
        (*mqTransProducer).ConsumeHalfMessage(respChan, errChan,
            3, // Max messages per batch (valid range: 1-16)
            3, // Long polling duration in seconds (valid range: 1-30)
        )
        <-endChan
    }
}

func main() {
    // Step 1: Configure the client.
    // Get the HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
    endpoint := "${HTTP_ENDPOINT}"
    // Read credentials from environment variables.
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // Specify the topic. The topic must already exist in the console.
    topic := "${TOPIC}"
    // Specify the instance ID. Set to "" if the instance has no namespace.
    instanceId := "${INSTANCE_ID}"
    // Specify the consumer group ID.
    groupId := "${GROUP_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqTransProducer := client.GetTransProducer(instanceId, topic, groupId)

    // Step 2: Start a goroutine to handle back-checks for uncommitted half messages.
    go ConsumeHalfMsg(&mqTransProducer)

    // Step 3: Publish four transactional messages.
    // The first message (i=0) is committed immediately.
    // The remaining three are resolved during back-checks.
    for i := 0; i < 4; i++ {
        msg := mq_http_sdk.PublishMessageRequest{
            MessageBody: "I am transaction msg!",
            Properties:  map[string]string{"a": strconv.Itoa(i)},
        }
        // Set the back-check delay: seconds before the first status check.
        // Valid values: 10-300. After the first check, the broker re-checks every 10s.
        msg.TransCheckImmunityTime = 10

        resp, pubErr := mqTransProducer.PublishMessage(msg)
        if pubErr != nil {
            fmt.Println(pubErr)
            return
        }
        fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, Handle:%s\n",
            resp.MessageId, resp.MessageBodyMD5, resp.ReceiptHandle)
        if i == 0 {
            // Commit the first message immediately using its receipt handle.
            ackErr := mqTransProducer.Commit(resp.ReceiptHandle)
            fmt.Println("Commit---------->")
            ProcessError(ackErr)
        }
    }

    // Wait for the back-check goroutine to complete.
    for loopCount < 10 {
        time.Sleep(time.Duration(1) * time.Second)
    }
}

Subscribe to transactional messages

After a transactional message is committed, the broker delivers it to consumers like any other message. Subscribe using the standard consumer API.

The following example consumes messages in long polling mode and acknowledges each batch. If the broker does not receive an acknowledgment before NextConsumeTime elapses, it redelivers the message.

package main

import (
    "fmt"
    "os"
    "strings"
    "time"

    "github.com/aliyunmq/mq-http-go-sdk"
    "github.com/gogap/errors"
)

func main() {
    // Configure the client with the same parameters used for the producer.
    endpoint := "${HTTP_ENDPOINT}"
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    topic := "${TOPIC}"
    instanceId := "${INSTANCE_ID}"
    groupId := "${GROUP_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    // Pass an empty string as the message tag to subscribe to all tags.
    mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    var handles []string
                    fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
                    }

                    // Acknowledge messages after processing.
                    // Unacknowledged messages are redelivered after NextConsumeTime.
                    ackerr := mqConsumer.AckMessage(handles)
                    if ackerr != nil {
                        fmt.Println(ackerr)
                        if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
                            for _, errAckItem := range errAckItems {
                                fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                                    errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
                            }
                        } else {
                            fmt.Println("ack err =", ackerr)
                        }
                        time.Sleep(time.Duration(3) * time.Second)
                    } else {
                        fmt.Printf("Ack ---->\n\t%s\n", handles)
                    }

                    endChan <- 1
                }
            case err := <-errChan:
                {
                    if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                        fmt.Println("\nNo new message, continue!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout of consumer message ??")
                    endChan <- 1
                }
            }
        }()

        // Consume messages in long polling mode.
        // The default network timeout is 35 seconds.
        mqConsumer.ConsumeMessage(respChan, errChan,
            3, // Max messages per batch (valid range: 1-16)
            3, // Long polling duration in seconds (valid range: 1-30)
        )
        <-endChan
    }
}

Related topics