All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive scheduled and delayed messages

Last Updated:Mar 10, 2026

ApsaraMQ for RocketMQ supports two types of time-based message delivery: scheduled messages, delivered at a specific point in time, and delayed messages, delivered after a fixed delay. Both use the __STARTDELIVERTIME property to control when the broker delivers the message to consumers.

The Java sample code below demonstrates how to send and subscribe to scheduled and delayed messages with the TCP client SDK (Community Edition).

How it works

When a producer sends a message with the __STARTDELIVERTIME property, the ApsaraMQ for RocketMQ broker holds the message until the specified delivery time. At that point, the broker delivers the message to subscribed consumers like any normal message.

  • Delayed message: Set __STARTDELIVERTIME to System.currentTimeMillis() + delayMillis. The broker delivers the message after the delay elapses.

  • Scheduled message: Set __STARTDELIVERTIME to a future Unix timestamp in milliseconds. The broker delivers the message at that exact time.

If the specified time is in the past, the broker delivers the message immediately.

Scheduled vs. delayed: when to use each

TypeUse caseExample
DelayedTrigger an action after a fixed waitCancel an unpaid order 30 minutes after creation
ScheduledTrigger an action at a specific timeSend a notification at 09:00 every Monday

Differences from Apache RocketMQ

Apache RocketMQ supports delayed messages but does not support scheduled messages, and no dedicated interface exists for scheduled messages.

ApsaraMQ for RocketMQ provides additional capabilities:

  • Both delayed and scheduled message types

  • Second-level precision for delivery time

  • Higher concurrency for time-based message processing

Important

The configuration methods and results differ between Apache RocketMQ and ApsaraMQ for RocketMQ. Use the sample code on this page for ApsaraMQ for RocketMQ on the cloud.

Prerequisites

Before you begin, make sure that you have:

Send scheduled and delayed messages

Both scheduled and delayed messages use the same __STARTDELIVERTIME user property. The only difference is how the timestamp value is calculated.

Replace the following placeholders with your actual values:

PlaceholderDescriptionExample
<your-group-id>Group ID created in the ApsaraMQ for RocketMQ consoleGID_example
<your-access-point>Instance endpoint from the consolehttp://MQ_INST_XXXX.aliyuncs.com:80
<your-topic>Topic created in the ApsaraMQ for RocketMQ consoleTopic_example
<your-message-tag>Message tag for filteringTagA
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    /**
     * Reads credentials from environment variables:
     *   ALIBABA_CLOUD_ACCESS_KEY_ID
     *   ALIBABA_CLOUD_ACCESS_KEY_SECRET
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // Create a producer with message trace enabled.
        // To disable message trace, use:
        //   new DefaultMQProducer("<your-group-id>", getAclRPCHook());
        DefaultMQProducer producer = new DefaultMQProducer(
            "<your-group-id>", getAclRPCHook(), true, null);

        // Required for message trace on the cloud.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Set the instance endpoint from the ApsaraMQ for RocketMQ console.
        producer.setNamesrvAddr("<your-access-point>");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message(
                    "<your-topic>",
                    "<your-message-tag>",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                // --- Option A: Delayed message ---
                // Deliver 3 seconds from now.
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                // --- Option B: Scheduled message ---
                // Deliver at a specific date and time.
                // Uncomment the following lines and comment out Option A to use.
                //
                // long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                //     .parse("2025-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));

                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Shut down the producer before exiting (optional).
        producer.shutdown();
    }
}

Summary:

  • The __STARTDELIVERTIME property accepts a Unix timestamp in milliseconds.

  • For delayed messages, add the desired delay (in milliseconds) to the current time.

  • For scheduled messages, parse the target date-time string in yyyy-MM-dd HH:mm:ss format into a Unix timestamp.

  • If the specified time is earlier than the current time, the message is delivered immediately.

Subscribe to scheduled and delayed messages

Subscribing to scheduled and delayed messages is identical to subscribing to normal messages. The broker holds the message until the specified delivery time, so no special consumer-side configuration is needed.

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
     * Reads credentials from environment variables:
     *   ALIBABA_CLOUD_ACCESS_KEY_ID
     *   ALIBABA_CLOUD_ACCESS_KEY_SECRET
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // Create a consumer with message trace enabled.
        // To disable message trace, use:
        //   new DefaultMQPushConsumer("<your-group-id>", getAclRPCHook(),
        //       new AllocateMessageQueueAveragely());
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
            "<your-group-id>", getAclRPCHook(),
            new AllocateMessageQueueAveragely(), true, null);

        // Set the instance endpoint from the ApsaraMQ for RocketMQ console.
        // The value is in the format of http://xxxx.mq-internet.aliyuncs.com:80.
        consumer.setNamesrvAddr("<your-access-point>");

        // Required for message trace on the cloud.
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // Subscribe to all tags (*) on the topic.
        consumer.subscribe("<your-topic>", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

See also