ApsaraMQ for RocketMQ delivers and consumes ordered messages in strict first-in, first-out (FIFO) order. The following sample code demonstrates how to send and consume ordered messages with the HTTP client SDK for Go.
Ordering types
Ordered messages fall into two categories:
| Type | Behavior | Use when |
|---|---|---|
| Globally ordered | All messages in a topic follow a single FIFO sequence. | Every message must be processed in exact send order, for example, sequential command execution. |
| Partitionally ordered | Messages are distributed across partitions by sharding key. Each partition maintains its own FIFO order. | Only messages that share a logical grouping, for example, the same order ID or user ID, need strict ordering. |
A sharding key identifies which partition a message belongs to. It is different from a message key: the message key is a business identifier used for tracing, while the sharding key controls ordering.
For more information, see Ordered messages.
How ordering works
Ordering depends on coordination between the producer and consumer.
Sending side: Use a single producer and a single thread to send messages. The broker preserves the exact sequence in which it receives messages from that producer.
Consuming side: Call ConsumeMessageOrderly to pull messages partition by partition. Within each partition, the next batch is delivered only after every message in the current batch is acknowledged. If the broker does not receive an acknowledgment before the NextConsumeTime deadline, it redelivers the unacknowledged message.
Prerequisites
Complete the following setup before running the sample code:
Create an instance, a topic, and a consumer group in the ApsaraMQ for RocketMQ console
Create an AccessKey pair for your Alibaba Cloud account
Send ordered messages
The broker determines message order by the sequence in which a single producer or thread sends messages. If multiple producers or threads send concurrently, arrival order at the broker may differ from the intended business order. To guarantee ordering, send from a single producer using a single thread.
The following code sends eight partitionally ordered messages. The sharding key i % 2 distributes messages across two partitions, so messages with even indexes go to one partition and odd indexes go to another.
Replace the placeholders before running the code:
| Placeholder | Description | Example |
|---|---|---|
${HTTP_ENDPOINT} | HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console | http://xxx.mqrest.cn-hangzhou.aliyuncs.com |
${TOPIC} | Topic created in the console | OrderedTestTopic |
${INSTANCE_ID} | Instance ID. If the instance has a namespace, specify the ID. If not, set this to an empty string. Check the Instance Details page for namespace information. | MQ_INST_xxx |
package main
import (
"fmt"
"time"
"strconv"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
endpoint := "${HTTP_ENDPOINT}"
// Set the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET
// environment variables before running this code.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Topic created in the ApsaraMQ for RocketMQ console.
topic := "${TOPIC}"
// Instance ID. If the instance has a namespace, specify the ID.
// If not, set this to an empty string.
instanceId := "${INSTANCE_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// Send 8 messages across 2 partitions.
for i := 0; i < 8; i++ {
msg := mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!",
MessageTag: "",
Properties: map[string]string{},
}
msg.MessageKey = "MessageKey"
msg.Properties["a"] = strconv.Itoa(i)
// Sharding key determines the partition.
// Messages with the same sharding key are delivered in FIFO order.
msg.ShardingKey = strconv.Itoa(i % 2)
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}Consume ordered messages
This code consumes ordered messages using long polling. The consumer pulls messages partition by partition and acknowledges each batch before requesting the next.
Replace the placeholders with your actual values. In addition to the placeholders listed in the Send ordered messages section, specify the following:
| Placeholder | Description | Example |
|---|---|---|
${GROUP_ID} | Consumer group created in the ApsaraMQ for RocketMQ console | GID_OrderedTest |
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
endpoint := "${HTTP_ENDPOINT}"
// Set the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET
// environment variables before running this code.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Topic created in the ApsaraMQ for RocketMQ console.
topic := "${TOPIC}"
// Instance ID. If the instance has a namespace, specify the ID.
// If not, set this to an empty string.
instanceId := "${INSTANCE_ID}"
// Consumer group created in the ApsaraMQ for RocketMQ console.
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n"+
"\tShardingKey: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties, v.ShardingKey)
}
// Acknowledge all messages in this batch.
// If the broker does not receive an ACK before NextConsumeTime,
// it redelivers the message.
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
fmt.Println(ackerr)
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
fmt.Println("ack err =", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// The consumer pulls partitionally ordered messages partition by partition.
// Within each partition, the next batch is delivered only after all messages
// in the current batch are acknowledged.
// Long polling: if no message is available, the request is held on the broker
// for up to the specified polling duration. The network timeout is 35 seconds.
mqConsumer.ConsumeMessageOrderly(respChan, errChan,
3, // Max messages per batch (up to 16).
3, // Long polling duration in seconds (up to 30).
)
<-endChan
}
}Key consumption behaviors
| Behavior | Detail |
|---|---|
| Partition-level ordering | The consumer may pull from multiple partitions simultaneously, but messages within each partition are always delivered in send order. |
| Batch acknowledgment gate | The next batch from a partition is not delivered until every message in the previous batch is acknowledged. |
| Automatic redelivery | If the broker receives no acknowledgment before NextConsumeTime, it redelivers the message. Each redelivery assigns a new receipt handle with a unique timestamp. |
| Long polling | When no messages are available, the broker holds the request for up to the specified polling duration (3 seconds in this example) before returning an empty response. The network timeout is 35 seconds. |
What's next
Ordered messages: Ordering types, guarantees, and design considerations
Send and receive other message types using the Go HTTP SDK
Prepare the environment: Set up the Go development environment