ApsaraMQ for RocketMQ delivers ordered messages in strict first-in-first-out (FIFO) order. This topic provides PHP sample code for sending and receiving ordered messages through the HTTP client SDK.
Ordering types
ApsaraMQ for RocketMQ supports two ordering scopes:
| Type | Ordering scope | How it works |
|---|---|---|
| Globally ordered | Entire topic | All messages in the topic are sent and consumed in FIFO order |
| Partitionally ordered | Per partition | Messages are distributed to partitions by sharding key. Messages within each partition are consumed in FIFO order |
A sharding key identifies which partition a message belongs to. It is different from a message key.
For more information, see Ordered messages.
Ordering guarantees
Ordered delivery depends on both the producer and the consumer behaving correctly.
The broker determines message order based on the sequence in which a single producer or thread sends messages. If multiple producers or threads send messages concurrently, the order is determined by the order in which the messages arrive at the broker, which may differ from the intended business order.
Consumer side:
The consumer may pull ordered messages from multiple partitions in a single batch. Messages from each partition retain their send order.
The consumer must acknowledge (ACK) all messages from a partition before receiving the next batch from that partition.
If the broker does not receive an ACK before the timeout specified by
getNextConsumeTime(), it redelivers the message.
Prerequisites
Before you begin, make sure that you have:
The PHP SDK for ApsaraMQ for RocketMQ installed. For details, see Prepare the environment
An ApsaraMQ for RocketMQ instance, topic, and consumer group created in the ApsaraMQ for RocketMQ console
An AccessKey pair for your Alibaba Cloud account. For details, see Create an AccessKey pair
Send ordered messages
The following code sends four ordered messages and distributes them across two partitions by using setShardingKey(). Messages that share the same sharding key are delivered in FIFO order within that partition.
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ProducerTest
{
private $client;
private $producer;
public function __construct()
{
$this->client = new MQClient(
// HTTP endpoint. Find this in the HTTP Endpoint section on the Instance Details
// page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
// Get AccessKey credentials from environment variables.
getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
);
// Topic created in the ApsaraMQ for RocketMQ console.
$topic = "${TOPIC}";
// Instance ID. If the instance has no namespace, set this to null or "".
// Check the Instance Details page for namespace information.
$instanceId = "${INSTANCE_ID}";
$this->producer = $this->client->getProducer($instanceId, $topic);
}
public function run()
{
try
{
for ($i = 1; $i <= 4; $i++)
{
$publishMessage = new TopicMessage(
"hello mq! " // Message body
);
// Set a custom message property.
$publishMessage->putProperty("a", $i);
// Set the sharding key to distribute messages across partitions.
// Messages with the same sharding key are delivered in FIFO order.
$publishMessage->setShardingKey($i % 2);
$result = $this->producer->publishMessage($publishMessage);
print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
}
} catch (\Exception $e) {
print_r($e->getMessage() . "\n");
}
}
}
$instance = new ProducerTest();
$instance->run();
?>Replace the following placeholders with your actual values:
| Placeholder | Description |
|---|---|
${HTTP_ENDPOINT} | HTTP endpoint from the Instance Details page |
${TOPIC} | Topic name |
${INSTANCE_ID} | Instance ID (null or "" if no namespace) |
Receive ordered messages
The following code consumes ordered messages by using consumeMessageOrderly(), which maintains FIFO order within each partition. The consumer uses long polling: if no message is available, the request is held on the broker until a message arrives or the polling timeout expires.
<?php
require "vendor/autoload.php";
use MQ\MQClient;
class ConsumerTest
{
private $client;
private $consumer;
public function __construct()
{
$this->client = new MQClient(
// HTTP endpoint. Find this in the HTTP Endpoint section on the Instance Details
// page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
// Get AccessKey credentials from environment variables.
getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
);
// Topic created in the ApsaraMQ for RocketMQ console.
$topic = "${TOPIC}";
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
$groupId = "${GROUP_ID}";
// Instance ID. If the instance has no namespace, set this to null or "".
// Check the Instance Details page for namespace information.
$instanceId = "${INSTANCE_ID}";
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
public function ackMessages($receiptHandles)
{
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\AckMessageException) {
// Handle ACK failures. This occurs when a receipt handle has expired.
printf("Ack Error, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
}
}
}
}
public function run()
{
// Poll for messages continuously. For production use, run multiple threads
// to consume messages concurrently across partitions.
while (True) {
try {
// consumeMessageOrderly() ensures FIFO order within each partition.
// If the broker does not receive an ACK for a message, it redelivers
// that message before sending subsequent messages from the same partition.
$messages = $this->consumer->consumeMessageOrderly(
3, // Max messages per batch (up to 16)
3 // Long polling timeout in seconds (up to 30)
);
} catch (\MQ\Exception\MessageResolveException $e) {
// Handle messages that cannot be parsed due to invalid characters.
$messages = $e->getPartialResult()->getMessages();
$failMessages = $e->getPartialResult()->getFailResolveMessages();
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MsgID %s\n", $message->getMessageId());
}
foreach ($failMessages as $failMessage) {
$receiptHandles[] = $failMessage->getReceiptHandle();
printf("Fail To Resolve Message. MsgID %s\n", $failMessage->getMessageId());
}
$this->ackMessages($receiptHandles);
continue;
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\MessageNotExistException) {
// No messages available. Long polling continues automatically.
printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId());
continue;
}
print_r($e->getMessage() . "\n");
sleep(3);
continue;
}
print "======>consume finish, messages:\n";
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,ShardingKey:%s\n",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getShardingKey());
print_r($message->getProperties());
}
// ACK all messages. If the broker does not receive an ACK before
// getNextConsumeTime(), it redelivers the message. Each redelivery
// assigns a new receipt handle with an updated timestamp.
print_r($receiptHandles);
$this->ackMessages($receiptHandles);
print "=======>ack finish\n";
}
}
}
$instance = new ConsumerTest();
$instance->run();
?>In addition to the placeholders in the Send ordered messages section, replace the following:
| Placeholder | Description |
|---|---|
${GROUP_ID} | Consumer group ID |
consumeMessageOrderly() parameters
| Parameter | Description | Range |
|---|---|---|
numOfMessages | Maximum number of messages to consume per batch | Up to 16 |
waitSeconds | Long polling duration in seconds. If no message is available, the request is held on the broker until a message arrives or the timeout expires | Up to 30 |
Error handling
The consumer handles three error scenarios:
| Exception | Cause | Action |
|---|---|---|
MessageResolveException | Invalid characters in the message body prevent parsing | ACK both parseable and unparseable messages |
MessageNotExistException | No messages available in the topic | Continue long polling automatically |
AckMessageException | Receipt handle expired before the ACK was sent | Log the error and retry on next delivery |