ApsaraMQ for RocketMQ supports two types of time-based message delivery: scheduled messages, delivered at a specific point in time, and delayed messages, delivered after a fixed delay. Both use the __STARTDELIVERTIME property to control when the broker delivers the message to consumers.
The Java sample code below demonstrates how to send and subscribe to scheduled and delayed messages with the TCP client SDK (Community Edition).
How it works
When a producer sends a message with the __STARTDELIVERTIME property, the ApsaraMQ for RocketMQ broker holds the message until the specified delivery time. At that point, the broker delivers the message to subscribed consumers like any normal message.
Delayed message: Set
__STARTDELIVERTIMEtoSystem.currentTimeMillis() + delayMillis. The broker delivers the message after the delay elapses.Scheduled message: Set
__STARTDELIVERTIMEto a future Unix timestamp in milliseconds. The broker delivers the message at that exact time.
If the specified time is in the past, the broker delivers the message immediately.
Scheduled vs. delayed: when to use each
| Type | Use case | Example |
|---|---|---|
| Delayed | Trigger an action after a fixed wait | Cancel an unpaid order 30 minutes after creation |
| Scheduled | Trigger an action at a specific time | Send a notification at 09:00 every Monday |
Differences from Apache RocketMQ
Apache RocketMQ supports delayed messages but does not support scheduled messages, and no dedicated interface exists for scheduled messages.
ApsaraMQ for RocketMQ provides additional capabilities:
Both delayed and scheduled message types
Second-level precision for delivery time
Higher concurrency for time-based message processing
The configuration methods and results differ between Apache RocketMQ and ApsaraMQ for RocketMQ. Use the sample code on this page for ApsaraMQ for RocketMQ on the cloud.
Prerequisites
Before you begin, make sure that you have:
Community Edition of the SDK for Java 4.5.2 or later installed
An AccessKey pair created for your Alibaba Cloud account
A topic and a Group ID created in the ApsaraMQ for RocketMQ console
Send scheduled and delayed messages
Both scheduled and delayed messages use the same __STARTDELIVERTIME user property. The only difference is how the timestamp value is calculated.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-group-id> | Group ID created in the ApsaraMQ for RocketMQ console | GID_example |
<your-access-point> | Instance endpoint from the console | http://MQ_INST_XXXX.aliyuncs.com:80 |
<your-topic> | Topic created in the ApsaraMQ for RocketMQ console | Topic_example |
<your-message-tag> | Message tag for filtering | TagA |
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
/**
* Reads credentials from environment variables:
* ALIBABA_CLOUD_ACCESS_KEY_ID
* ALIBABA_CLOUD_ACCESS_KEY_SECRET
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
// Create a producer with message trace enabled.
// To disable message trace, use:
// new DefaultMQProducer("<your-group-id>", getAclRPCHook());
DefaultMQProducer producer = new DefaultMQProducer(
"<your-group-id>", getAclRPCHook(), true, null);
// Required for message trace on the cloud.
producer.setAccessChannel(AccessChannel.CLOUD);
// Set the instance endpoint from the ApsaraMQ for RocketMQ console.
producer.setNamesrvAddr("<your-access-point>");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(
"<your-topic>",
"<your-message-tag>",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// --- Option A: Delayed message ---
// Deliver 3 seconds from now.
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
// --- Option B: Scheduled message ---
// Deliver at a specific date and time.
// Uncomment the following lines and comment out Option A to use.
//
// long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// .parse("2025-08-10 18:45:00").getTime();
// msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// Shut down the producer before exiting (optional).
producer.shutdown();
}
}Summary:
The
__STARTDELIVERTIMEproperty accepts a Unix timestamp in milliseconds.For delayed messages, add the desired delay (in milliseconds) to the current time.
For scheduled messages, parse the target date-time string in
yyyy-MM-dd HH:mm:ssformat into a Unix timestamp.If the specified time is earlier than the current time, the message is delivered immediately.
Subscribe to scheduled and delayed messages
Subscribing to scheduled and delayed messages is identical to subscribing to normal messages. The broker holds the message until the specified delivery time, so no special consumer-side configuration is needed.
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/**
* Reads credentials from environment variables:
* ALIBABA_CLOUD_ACCESS_KEY_ID
* ALIBABA_CLOUD_ACCESS_KEY_SECRET
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
// Create a consumer with message trace enabled.
// To disable message trace, use:
// new DefaultMQPushConsumer("<your-group-id>", getAclRPCHook(),
// new AllocateMessageQueueAveragely());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"<your-group-id>", getAclRPCHook(),
new AllocateMessageQueueAveragely(), true, null);
// Set the instance endpoint from the ApsaraMQ for RocketMQ console.
// The value is in the format of http://xxxx.mq-internet.aliyuncs.com:80.
consumer.setNamesrvAddr("<your-access-point>");
// Required for message trace on the cloud.
consumer.setAccessChannel(AccessChannel.CLOUD);
// Subscribe to all tags (*) on the topic.
consumer.subscribe("<your-topic>", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}See also
Scheduled messages and delayed messages: concepts, delivery precision, and usage limits
Explore other message types such as transactional messages and ordered messages