All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive ordered messages

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ delivers and consumes ordered messages in strict first-in, first-out (FIFO) order. The following sample code demonstrates how to send and consume ordered messages with the HTTP client SDK for Go.

Ordering types

Ordered messages fall into two categories:

TypeBehaviorUse when
Globally orderedAll messages in a topic follow a single FIFO sequence.Every message must be processed in exact send order, for example, sequential command execution.
Partitionally orderedMessages are distributed across partitions by sharding key. Each partition maintains its own FIFO order.Only messages that share a logical grouping, for example, the same order ID or user ID, need strict ordering.

A sharding key identifies which partition a message belongs to. It is different from a message key: the message key is a business identifier used for tracing, while the sharding key controls ordering.

For more information, see Ordered messages.

How ordering works

Ordering depends on coordination between the producer and consumer.

Sending side: Use a single producer and a single thread to send messages. The broker preserves the exact sequence in which it receives messages from that producer.

Consuming side: Call ConsumeMessageOrderly to pull messages partition by partition. Within each partition, the next batch is delivered only after every message in the current batch is acknowledged. If the broker does not receive an acknowledgment before the NextConsumeTime deadline, it redelivers the unacknowledged message.

Prerequisites

Complete the following setup before running the sample code:

Send ordered messages

Important

The broker determines message order by the sequence in which a single producer or thread sends messages. If multiple producers or threads send concurrently, arrival order at the broker may differ from the intended business order. To guarantee ordering, send from a single producer using a single thread.

The following code sends eight partitionally ordered messages. The sharding key i % 2 distributes messages across two partitions, so messages with even indexes go to one partition and odd indexes go to another.

Replace the placeholders before running the code:

PlaceholderDescriptionExample
${HTTP_ENDPOINT}HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ consolehttp://xxx.mqrest.cn-hangzhou.aliyuncs.com
${TOPIC}Topic created in the consoleOrderedTestTopic
${INSTANCE_ID}Instance ID. If the instance has a namespace, specify the ID. If not, set this to an empty string. Check the Instance Details page for namespace information.MQ_INST_xxx
package main

import (
    "fmt"
    "time"
    "strconv"
    "os"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
    endpoint := "${HTTP_ENDPOINT}"
    // Set the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET
    // environment variables before running this code.
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // Topic created in the ApsaraMQ for RocketMQ console.
    topic := "${TOPIC}"
    // Instance ID. If the instance has a namespace, specify the ID.
    // If not, set this to an empty string.
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqProducer := client.GetProducer(instanceId, topic)
    // Send 8 messages across 2 partitions.
    for i := 0; i < 8; i++ {
        msg := mq_http_sdk.PublishMessageRequest{
            MessageBody: "hello mq!",
            MessageTag:  "",
            Properties:  map[string]string{},
        }
        msg.MessageKey = "MessageKey"
        msg.Properties["a"] = strconv.Itoa(i)
        // Sharding key determines the partition.
        // Messages with the same sharding key are delivered in FIFO order.
        msg.ShardingKey = strconv.Itoa(i % 2)
        ret, err := mqProducer.PublishMessage(msg)

        if err != nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}

Consume ordered messages

This code consumes ordered messages using long polling. The consumer pulls messages partition by partition and acknowledges each batch before requesting the next.

Replace the placeholders with your actual values. In addition to the placeholders listed in the Send ordered messages section, specify the following:

PlaceholderDescriptionExample
${GROUP_ID}Consumer group created in the ApsaraMQ for RocketMQ consoleGID_OrderedTest
package main

import (
    "fmt"
    "github.com/gogap/errors"
    "strings"
    "time"
    "os"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
    endpoint := "${HTTP_ENDPOINT}"
    // Set the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET
    // environment variables before running this code.
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // Topic created in the ApsaraMQ for RocketMQ console.
    topic := "${TOPIC}"
    // Instance ID. If the instance has a namespace, specify the ID.
    // If not, set this to an empty string.
    instanceId := "${INSTANCE_ID}"
    // Consumer group created in the ApsaraMQ for RocketMQ console.
    groupId := "${GROUP_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    var handles []string
                    fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n"+
                            "\tShardingKey: %s\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties, v.ShardingKey)
                    }

                    // Acknowledge all messages in this batch.
                    // If the broker does not receive an ACK before NextConsumeTime,
                    // it redelivers the message.
                    ackerr := mqConsumer.AckMessage(handles)
                    if ackerr != nil {
                        fmt.Println(ackerr)
                        if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
                           for _, errAckItem := range errAckItems {
                              fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                                 errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
                           }
                        } else {
                           fmt.Println("ack err =", ackerr)
                        }
                        time.Sleep(time.Duration(3) * time.Second)
                    } else {
                        fmt.Printf("Ack ---->\n\t%s\n", handles)
                    }

                    endChan <- 1
                }
            case err := <-errChan:
                {
                    if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                        fmt.Println("\nNo new message, continue!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout of consumer message ??")
                    endChan <- 1
                }
            }
        }()

        // The consumer pulls partitionally ordered messages partition by partition.
        // Within each partition, the next batch is delivered only after all messages
        // in the current batch are acknowledged.
        // Long polling: if no message is available, the request is held on the broker
        // for up to the specified polling duration. The network timeout is 35 seconds.
        mqConsumer.ConsumeMessageOrderly(respChan, errChan,
            3, // Max messages per batch (up to 16).
            3, // Long polling duration in seconds (up to 30).
        )
        <-endChan
    }
}

Key consumption behaviors

BehaviorDetail
Partition-level orderingThe consumer may pull from multiple partitions simultaneously, but messages within each partition are always delivered in send order.
Batch acknowledgment gateThe next batch from a partition is not delivered until every message in the previous batch is acknowledged.
Automatic redeliveryIf the broker receives no acknowledgment before NextConsumeTime, it redelivers the message. Each redelivery assigns a new receipt handle with a unique timestamp.
Long pollingWhen no messages are available, the broker holds the request for up to the specified polling duration (3 seconds in this example) before returning an empty response. The network timeout is 35 seconds.

What's next