In distributed systems, a local database operation and a downstream message delivery must either both succeed or both fail. Standard messages cannot guarantee this atomicity -- a message might be delivered even if the local transaction rolls back. ApsaraMQ for RocketMQ provides a distributed transaction processing feature similar to eXtended Architecture (X/Open XA) to solve this problem. It uses transactional messages to ensure eventual consistency: the broker holds each message in a half-committed state and delivers it only after the producer explicitly commits.
This topic provides Go code examples for sending and receiving transactional messages with the HTTP client SDK.
How it works
A transactional message goes through the following lifecycle:

The producer sends a half message to the broker.
The broker persists the half message and acknowledges receipt.
The producer executes the local transaction.
Based on the local transaction result, the producer commits or rolls back the half message.
If the broker receives neither a commit nor a rollback within the
TransCheckImmunityTimeinterval, it initiates a back-check. The broker repeats this check every 10 seconds for up to 24 hours.Once committed, the broker delivers the message to consumers.
For more details about the transactional message model, see Transactional messages.
Prerequisites
The Go HTTP client SDK installed. For details, see Prepare the environment.
An instance, a topic, and a consumer group created in the ApsaraMQ for RocketMQ console. For details, see Create resources.
The following environment variables configured with your Alibaba Cloud credentials: For details on obtaining an AccessKey pair, see Create an AccessKey pair.
ALIBABA_CLOUD_ACCESS_KEY_ID-- Your AccessKey IDALIBABA_CLOUD_ACCESS_KEY_SECRET-- Your AccessKey secret
Usage notes
| Constraint | Details |
|---|---|
| Topic type exclusivity | Each topic supports only one message type. A topic created for normal messages cannot send or receive transactional messages. |
TransCheckImmunityTime | Controls the delay before the first back-check. Valid values: 10 to 300 seconds. After the first back-check, the broker re-checks every 10 seconds for up to 24 hours. |
| In-progress transactions | If a local transaction is still running during a back-check, return an unknown status instead of committing or rolling back prematurely. Set TransCheckImmunityTime to a value that covers the expected local transaction duration to reduce unnecessary checks. |
Send transactional messages
Sending a transactional message involves three steps:
Publish a half message -- Send the message to the broker in a half-committed state.
Commit or roll back -- Finalize the transaction based on local business logic.
Handle back-checks -- Respond to broker-initiated status queries for uncommitted half messages.
The following example sends four transactional messages. The first message is committed immediately after publishing. The remaining three are resolved during back-checks based on the message property a:
a=1: Commit on the first back-check.a=2: Commit after the second back-check (ConsumedTimes > 1).a=3: Roll back.
Replace the following placeholders with your actual values:
| Placeholder | Description |
|---|---|
${HTTP_ENDPOINT} | HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console |
${TOPIC} | Topic name |
${INSTANCE_ID} | Instance ID. Set to an empty string if the instance has no namespace. |
${GROUP_ID} | Consumer group ID |
package main
import (
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
"github.com/gogap/errors"
)
var loopCount = 0
// ProcessError logs acknowledgment errors returned by commit or rollback operations.
func ProcessError(err error) {
// If a commit or rollback is attempted after the receipt handle expires
// (either TransCheckImmunityTime or the consumeHalfMessage timeout),
// the operation fails. This example uses a 10-second timeout for consumeHalfMessage.
if err == nil {
return
}
fmt.Println(err)
for _, errAckItem := range err.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
}
// ConsumeHalfMsg processes uncommitted half messages through back-checks.
// Based on the message property "a", it commits, rolls back, or defers the decision.
func ConsumeHalfMsg(mqTransProducer *mq_http_sdk.MQTransProducer) {
for {
if loopCount >= 10 {
return
}
loopCount++
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n\tBody: %s\n"+
"\tProperties:%s, Key:%s, Timer:%d, Trans:%d\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
v.Properties, v.MessageKey, v.StartDeliverTime, v.TransCheckImmunityTime)
a, _ := strconv.Atoi(v.Properties["a"])
var comRollErr error
if a == 1 {
// Property a=1: commit the transaction.
comRollErr = (*mqTransProducer).Commit(v.ReceiptHandle)
fmt.Println("Commit---------->")
} else if a == 2 && v.ConsumedTimes > 1 {
// Property a=2: commit only after a second back-check.
comRollErr = (*mqTransProducer).Commit(v.ReceiptHandle)
fmt.Println("Commit---------->")
} else if a == 3 {
// Property a=3: roll back the transaction.
comRollErr = (*mqTransProducer).Rollback(v.ReceiptHandle)
fmt.Println("Rollback---------->")
} else {
// Unknown state: defer to the next back-check.
fmt.Println("Unknown---------->")
}
ProcessError(comRollErr)
}
endChan <- 1
}
case err := <-errChan:
{
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
return
}
}
}()
// Poll for uncommitted half messages using long polling.
// If no half message is available, the request is held on the broker
// for up to the specified polling duration before returning.
(*mqTransProducer).ConsumeHalfMessage(respChan, errChan,
3, // Max messages per batch (valid range: 1-16)
3, // Long polling duration in seconds (valid range: 1-30)
)
<-endChan
}
}
func main() {
// Step 1: Configure the client.
// Get the HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
endpoint := "${HTTP_ENDPOINT}"
// Read credentials from environment variables.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Specify the topic. The topic must already exist in the console.
topic := "${TOPIC}"
// Specify the instance ID. Set to "" if the instance has no namespace.
instanceId := "${INSTANCE_ID}"
// Specify the consumer group ID.
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqTransProducer := client.GetTransProducer(instanceId, topic, groupId)
// Step 2: Start a goroutine to handle back-checks for uncommitted half messages.
go ConsumeHalfMsg(&mqTransProducer)
// Step 3: Publish four transactional messages.
// The first message (i=0) is committed immediately.
// The remaining three are resolved during back-checks.
for i := 0; i < 4; i++ {
msg := mq_http_sdk.PublishMessageRequest{
MessageBody: "I am transaction msg!",
Properties: map[string]string{"a": strconv.Itoa(i)},
}
// Set the back-check delay: seconds before the first status check.
// Valid values: 10-300. After the first check, the broker re-checks every 10s.
msg.TransCheckImmunityTime = 10
resp, pubErr := mqTransProducer.PublishMessage(msg)
if pubErr != nil {
fmt.Println(pubErr)
return
}
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, Handle:%s\n",
resp.MessageId, resp.MessageBodyMD5, resp.ReceiptHandle)
if i == 0 {
// Commit the first message immediately using its receipt handle.
ackErr := mqTransProducer.Commit(resp.ReceiptHandle)
fmt.Println("Commit---------->")
ProcessError(ackErr)
}
}
// Wait for the back-check goroutine to complete.
for loopCount < 10 {
time.Sleep(time.Duration(1) * time.Second)
}
}Subscribe to transactional messages
After a transactional message is committed, the broker delivers it to consumers like any other message. Subscribe using the standard consumer API.
The following example consumes messages in long polling mode and acknowledges each batch. If the broker does not receive an acknowledgment before NextConsumeTime elapses, it redelivers the message.
package main
import (
"fmt"
"os"
"strings"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
"github.com/gogap/errors"
)
func main() {
// Configure the client with the same parameters used for the producer.
endpoint := "${HTTP_ENDPOINT}"
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
topic := "${TOPIC}"
instanceId := "${INSTANCE_ID}"
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
// Pass an empty string as the message tag to subscribe to all tags.
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
}
// Acknowledge messages after processing.
// Unacknowledged messages are redelivered after NextConsumeTime.
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
fmt.Println(ackerr)
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
fmt.Println("ack err =", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// Consume messages in long polling mode.
// The default network timeout is 35 seconds.
mqConsumer.ConsumeMessage(respChan, errChan,
3, // Max messages per batch (valid range: 1-16)
3, // Long polling duration in seconds (valid range: 1-30)
)
<-endChan
}
}Related topics
Transactional messages: The transactional message model, lifecycle, and design rationale.
Send and receive normal messages: Basic message publishing and consumption.
Send and receive scheduled messages: Delayed message delivery to a specified time.