The following PHP code examples show how to send and consume scheduled and delayed messages with the ApsaraMQ for RocketMQ HTTP client SDK.
How scheduled and delayed messages work
Scheduled and delayed messages defer delivery instead of arriving immediately after publishing.
Delayed message: Delivered after a specified delay period. Example: deliver 30 seconds after sending.
Scheduled message: Delivered at a specified point in time. Example: deliver at 2024-06-15 09:00:00.
Both types share the same API. Call setStartDeliverTime with a millisecond-level Unix timestamp that represents when the broker should deliver the message.
For a delayed message, calculate the timestamp as
current time + delay duration.For a scheduled message, convert the target delivery time directly to a millisecond-level Unix timestamp.
Common use cases:
Order timeout: Cancel an unpaid order after 30 minutes by publishing a delayed message when the order is created.
Retry with backoff: Re-process a failed task after a delay instead of retrying immediately.
Timed notifications: Trigger a reminder at a specific time, such as 15 minutes before a scheduled event.
For more information, see Scheduled messages and delayed messages.
Prerequisites
Before you begin, make sure that you have:
Created resources in the ApsaraMQ for RocketMQ console: an instance, a topic, and a consumer group
Created an AccessKey pair and stored the AccessKey ID and AccessKey secret as environment variables
Send scheduled or delayed messages
The following code sends four delayed messages, each delivered 10 seconds after publishing. To send a scheduled message instead, pass the target delivery time as a millisecond-level Unix timestamp to setStartDeliverTime.
Replace the placeholders before running the code:
| Placeholder | Description | Example |
|---|---|---|
<HTTP_ENDPOINT> | HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console | http://xxxx.mq-http.cn-hangzhou.aliyuncs.com |
<TOPIC> | Topic name created in the console | scheduled-msg-topic |
<INSTANCE_ID> | Instance ID. If the instance has no namespace, set to null or "" | MQ_INST_xxxx |
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ProducerTest
{
private $client;
private $producer;
public function __construct()
{
$this->client = new MQClient(
// HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console
"<HTTP_ENDPOINT>",
// AccessKey ID for authentication
getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// AccessKey secret for authentication
getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
);
$topic = "<TOPIC>";
// Instance ID. If the instance has no namespace, set to null or "".
$instanceId = "<INSTANCE_ID>";
$this->producer = $this->client->getProducer($instanceId, $topic);
}
public function run()
{
try
{
for ($i = 1; $i <= 4; $i++)
{
$publishMessage = new TopicMessage(
"hello mq! " // Message body
);
// Custom message property
$publishMessage->putProperty("a", $i);
// Message key for tracing
$publishMessage->setMessageKey("MessageKey");
// Deliver the message 10 seconds from now.
// For a scheduled message, set this to the target delivery time
// as a millisecond-level Unix timestamp.
$publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
$result = $this->producer->publishMessage($publishMessage);
print "Send mq message success. msgId is:" . $result->getMessageId()
. ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
}
} catch (\Exception $e) {
print_r($e->getMessage() . "\n");
}
}
}
$instance = new ProducerTest();
$instance->run();
?>Key points:
setStartDeliverTimeaccepts a millisecond-level Unix timestamp. In PHP,time()returns seconds, so multiply by 1000 to convert.For a 10-second delay:
time() * 1000 + 10 * 1000.For delivery at a specific time, convert that time to a Unix timestamp in milliseconds. For example, to deliver at
2024-06-15 09:00:00 UTC:strtotime('2024-06-15 09:00:00') * 1000.
Consume scheduled or delayed messages
Scheduled and delayed messages are consumed the same way as normal messages. After the delivery time arrives, messages become available through long polling.
Replace the placeholders before running the code:
| Placeholder | Description | Example |
|---|---|---|
<HTTP_ENDPOINT> | HTTP endpoint from the Instance Details page | http://xxxx.mq-http.cn-hangzhou.aliyuncs.com |
<TOPIC> | Topic name created in the console | scheduled-msg-topic |
<GROUP_ID> | Consumer group ID created in the console | GID_scheduled_consumer |
<INSTANCE_ID> | Instance ID. If the instance has no namespace, set to null or "" | MQ_INST_xxxx |
<?php
use MQ\MQClient;
require "vendor/autoload.php";
class ConsumerTest
{
private $client;
private $consumer;
public function __construct()
{
$this->client = new MQClient(
// HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console
"<HTTP_ENDPOINT>",
// AccessKey ID for authentication
getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// AccessKey secret for authentication
getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
);
$topic = "<TOPIC>";
// Consumer group ID created in the ApsaraMQ for RocketMQ console
$groupId = "<GROUP_ID>";
// Instance ID. If the instance has no namespace, set to null or "".
$instanceId = "<INSTANCE_ID>";
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
public function ackMessages($receiptHandles)
{
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\AckMessageException) {
// ACK may fail if the receipt handle has expired
printf("Ack Error, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
$errorItem->getReceiptHandle(),
$errorItem->getErrorCode(),
$errorItem->getErrorMessage());
}
}
}
}
public function run()
{
// Consume messages in a loop. For production use, run multiple threads
// to consume messages concurrently.
while (True) {
try {
// Long polling: if no message is available, the request waits
// on the broker until a message arrives or the polling period ends.
$messages = $this->consumer->consumeMessage(
3, // Max messages per request (up to 16)
3 // Long polling timeout in seconds (up to 30)
);
} catch (\MQ\Exception\MessageResolveException $e) {
// Thrown when some messages contain invalid characters and cannot be parsed
$messages = $e->getPartialResult()->getMessages();
$failMessages = $e->getPartialResult()->getFailResolveMessages();
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MsgID %s\n", $message->getMessageId());
}
foreach ($failMessages as $failMessage) {
$receiptHandles[] = $failMessage->getReceiptHandle();
printf("Fail To Resolve Message. MsgID %s\n", $failMessage->getMessageId());
}
$this->ackMessages($receiptHandles);
continue;
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\MessageNotExistException) {
// No messages available. Long polling continues on the next iteration.
printf("No message, continue long polling! RequestId:%s\n", $e->getRequestId());
continue;
}
print_r($e->getMessage() . "\n");
sleep(3);
continue;
}
print "consume finish, messages:\n";
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d, MessageKey:%s\n",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(),
$message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
}
// If the broker does not receive an ACK before the time specified
// in getNextConsumeTime(), the message is redelivered.
// Each delivery generates a new receipt handle.
print_r($receiptHandles);
$this->ackMessages($receiptHandles);
print "ack finish\n";
}
}
}
$instance = new ConsumerTest();
$instance->run();
?>What to know about message consumption
Long polling: When no message is available, the broker holds the consumer's request until a message arrives or the polling timeout expires. This reduces unnecessary network requests.
ACK (consumption acknowledgment): After processing a message, the consumer sends an ACK with the receipt handle. If the broker receives no ACK before
getNextConsumeTime(), it redelivers the message.Receipt handle expiry: Each delivery assigns a unique receipt handle with its own expiry time. An expired handle causes ACK to fail, triggering redelivery.
Parse failures: If a message body contains invalid characters, the SDK throws
MessageResolveException. CallgetPartialResult()to retrieve successfully parsed messages and handle the failures separately.