All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive ordered messages

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ delivers ordered messages in strict first-in-first-out (FIFO) order. This topic provides PHP sample code for sending and receiving ordered messages through the HTTP client SDK.

Ordering types

ApsaraMQ for RocketMQ supports two ordering scopes:

TypeOrdering scopeHow it works
Globally orderedEntire topicAll messages in the topic are sent and consumed in FIFO order
Partitionally orderedPer partitionMessages are distributed to partitions by sharding key. Messages within each partition are consumed in FIFO order

A sharding key identifies which partition a message belongs to. It is different from a message key.

For more information, see Ordered messages.

Ordering guarantees

Ordered delivery depends on both the producer and the consumer behaving correctly.

Important

The broker determines message order based on the sequence in which a single producer or thread sends messages. If multiple producers or threads send messages concurrently, the order is determined by the order in which the messages arrive at the broker, which may differ from the intended business order.

Consumer side:

  • The consumer may pull ordered messages from multiple partitions in a single batch. Messages from each partition retain their send order.

  • The consumer must acknowledge (ACK) all messages from a partition before receiving the next batch from that partition.

  • If the broker does not receive an ACK before the timeout specified by getNextConsumeTime(), it redelivers the message.

Prerequisites

Before you begin, make sure that you have:

Send ordered messages

The following code sends four ordered messages and distributes them across two partitions by using setShardingKey(). Messages that share the same sharding key are delivered in FIFO order within that partition.

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            // HTTP endpoint. Find this in the HTTP Endpoint section on the Instance Details
            // page in the ApsaraMQ for RocketMQ console.
            "${HTTP_ENDPOINT}",
            // Get AccessKey credentials from environment variables.
            getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
        );

        // Topic created in the ApsaraMQ for RocketMQ console.
        $topic = "${TOPIC}";
        // Instance ID. If the instance has no namespace, set this to null or "".
        // Check the Instance Details page for namespace information.
        $instanceId = "${INSTANCE_ID}";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i = 1; $i <= 4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "hello mq! " // Message body
                );
                // Set a custom message property.
                $publishMessage->putProperty("a", $i);
                // Set the sharding key to distribute messages across partitions.
                // Messages with the same sharding key are delivered in FIFO order.
                $publishMessage->setShardingKey($i % 2);

                $result = $this->producer->publishMessage($publishMessage);

                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}


$instance = new ProducerTest();
$instance->run();

?>

Replace the following placeholders with your actual values:

PlaceholderDescription
${HTTP_ENDPOINT}HTTP endpoint from the Instance Details page
${TOPIC}Topic name
${INSTANCE_ID}Instance ID (null or "" if no namespace)

Receive ordered messages

The following code consumes ordered messages by using consumeMessageOrderly(), which maintains FIFO order within each partition. The consumer uses long polling: if no message is available, the request is held on the broker until a message arrives or the polling timeout expires.

<?php

require "vendor/autoload.php";

use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            // HTTP endpoint. Find this in the HTTP Endpoint section on the Instance Details
            // page in the ApsaraMQ for RocketMQ console.
            "${HTTP_ENDPOINT}",
            // Get AccessKey credentials from environment variables.
            getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
        );

        // Topic created in the ApsaraMQ for RocketMQ console.
        $topic = "${TOPIC}";
        // Consumer group ID created in the ApsaraMQ for RocketMQ console.
        $groupId = "${GROUP_ID}";
        // Instance ID. If the instance has no namespace, set this to null or "".
        // Check the Instance Details page for namespace information.
        $instanceId = "${INSTANCE_ID}";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function ackMessages($receiptHandles)
    {
        try {
            $this->consumer->ackMessage($receiptHandles);
        } catch (\Exception $e) {
            if ($e instanceof MQ\Exception\AckMessageException) {
                // Handle ACK failures. This occurs when a receipt handle has expired.
                printf("Ack Error, RequestId:%s\n", $e->getRequestId());
                foreach ($e->getAckMessageErrorItems() as $errorItem) {
                    printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
                }
            }
        }
    }

    public function run()
    {
        // Poll for messages continuously. For production use, run multiple threads
        // to consume messages concurrently across partitions.
        while (True) {
            try {
                // consumeMessageOrderly() ensures FIFO order within each partition.
                // If the broker does not receive an ACK for a message, it redelivers
                // that message before sending subsequent messages from the same partition.
                $messages = $this->consumer->consumeMessageOrderly(
                    3, // Max messages per batch (up to 16)
                    3  // Long polling timeout in seconds (up to 30)
                );
            } catch (\MQ\Exception\MessageResolveException $e) {
                // Handle messages that cannot be parsed due to invalid characters.
                $messages = $e->getPartialResult()->getMessages();
                $failMessages = $e->getPartialResult()->getFailResolveMessages();

                $receiptHandles = array();
                foreach ($messages as $message) {
                    $receiptHandles[] = $message->getReceiptHandle();
                    printf("MsgID %s\n", $message->getMessageId());
                }
                foreach ($failMessages as $failMessage) {
                    $receiptHandles[] = $failMessage->getReceiptHandle();
                    printf("Fail To Resolve Message. MsgID %s\n", $failMessage->getMessageId());
                }
                $this->ackMessages($receiptHandles);
                continue;
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    // No messages available. Long polling continues automatically.
                    printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "======>consume finish, messages:\n";

            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,ShardingKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getShardingKey());
                print_r($message->getProperties());
            }

            // ACK all messages. If the broker does not receive an ACK before
            // getNextConsumeTime(), it redelivers the message. Each redelivery
            // assigns a new receipt handle with an updated timestamp.
            print_r($receiptHandles);
            $this->ackMessages($receiptHandles);
            print "=======>ack finish\n";


        }

    }
}


$instance = new ConsumerTest();
$instance->run();

?>

In addition to the placeholders in the Send ordered messages section, replace the following:

PlaceholderDescription
${GROUP_ID}Consumer group ID

consumeMessageOrderly() parameters

ParameterDescriptionRange
numOfMessagesMaximum number of messages to consume per batchUp to 16
waitSecondsLong polling duration in seconds. If no message is available, the request is held on the broker until a message arrives or the timeout expiresUp to 30

Error handling

The consumer handles three error scenarios:

ExceptionCauseAction
MessageResolveExceptionInvalid characters in the message body prevent parsingACK both parseable and unparseable messages
MessageNotExistExceptionNo messages available in the topicContinue long polling automatically
AckMessageExceptionReceipt handle expired before the ACK was sentLog the error and retry on next delivery