All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and consume scheduled or delayed messages

Last Updated:Mar 10, 2026

This topic provides Java sample code for sending and consuming scheduled or delayed messages through the ApsaraMQ for RocketMQ HTTP client SDK.

Scheduled vs. delayed messages

Scheduled messages and delayed messages are the same feature at the API level. Both use the setStartDeliverTime method to defer delivery. The only difference is how you calculate the timestamp:

TypeHow to set the delivery timeExample
Delayed messageCurrent time + offsetSystem.currentTimeMillis() + 10 * 1000 (10-second delay)
Scheduled messageAbsolute Unix timestamp in milliseconds1654770600000 (2022-06-09 18:30:00)

The broker holds each message until its specified delivery time, then releases it for consumption. This is a per-message setting, so each message can have its own delivery time.

For more information, see Scheduled messages and delayed messages.

Prerequisites

Before you begin, make sure that you have:

Important

Each topic supports only one message type. Do not reuse a topic configured for normal messages to send scheduled or delayed messages.

Send scheduled or delayed messages

All messages in this example use setStartDeliverTime to defer delivery by 10 seconds. To send a scheduled message, replace the relative offset with an absolute Unix timestamp in milliseconds (for example, 1654770600000 for 2022-06-09 18:30:00).

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // HTTP endpoint. Find this on the Instance Details page, in the
                // Endpoints tab under Basic Information.
                "<your-http-endpoint>",
                // Load credentials from environment variables to avoid hardcoding secrets.
                System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
                System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
        );

        final String topic = "<your-topic>";
        // If the instance has a namespace, specify the instance ID.
        // If it does not have a namespace, set this to null or "".
        final String instanceId = "<your-instance-id>";

        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg = new TopicMessage(
                        "hello mq!".getBytes(),   // Message body
                        "A"                        // Message tag
                );
                pubMsg.getProperties().put("a", String.valueOf(i));  // Custom property
                pubMsg.setMessageKey("MessageKey");

                // Defer delivery by 10 seconds.
                // For a scheduled message, use an absolute timestamp instead,
                // e.g., pubMsg.setStartDeliverTime(1654770600000L);
                pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);

                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                System.out.println(new Date() + " Send mq message success."
                        + " Topic is:" + topic
                        + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }
}

Replace the following placeholders with your actual values:

PlaceholderDescriptionWhere to find it
<your-http-endpoint>HTTP endpointInstance Details page > Endpoints tab > HTTP Endpoint
<your-topic>Topic nameApsaraMQ for RocketMQ console > Topics
<your-instance-id>Instance ID, or null if the instance has no namespaceInstance Details page > Basic Information

Consume scheduled or delayed messages

Consume scheduled and delayed messages the same way as normal messages. The broker holds them until the delivery time arrives, then releases them for consumption.

The following example uses long polling to consume messages and send acknowledgments (ACKs) back to the broker.

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;

import java.util.ArrayList;
import java.util.List;

public class Consumer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // HTTP endpoint. Find this on the Instance Details page, in the
                // Endpoints tab under Basic Information.
                "<your-http-endpoint>",
                // The AccessKey ID for authentication.
                "${ACCESS_KEY}",
                // The AccessKey secret for authentication.
                "${SECRET_KEY}"
        );

        final String topic = "<your-topic>";
        final String groupId = "<your-group-id>";
        // If the instance has a namespace, specify the instance ID.
        // If it does not have a namespace, set this to null or "".
        final String instanceId = "<your-instance-id>";

        final MQConsumer consumer;
        if (instanceId != null && instanceId != "") {
            consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
        } else {
            consumer = mqClient.getConsumer(topic, groupId);
        }

        // Consume messages in a loop. For production use, run multiple threads
        // to consume messages concurrently.
        do {
            List<Message> messages = null;

            try {
                // Long polling: wait up to 3 seconds for new messages.
                // - First parameter: max messages per batch (max: 16).
                // - Second parameter: long polling timeout in seconds (max: 30).
                messages = consumer.consumeMessage(3, 3);
            } catch (Throwable e) {
                e.printStackTrace();
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }

            if (messages == null || messages.isEmpty()) {
                System.out.println(Thread.currentThread().getName()
                        + ": no new message, continue!");
                continue;
            }

            // Process messages
            for (Message message : messages) {
                System.out.println("Receive message: " + message);
            }

            // ACK consumed messages. If the broker does not receive an ACK
            // before the retry interval elapses, the message is redelivered.
            {
                List<String> handles = new ArrayList<String>();
                for (Message message : messages) {
                    handles.add(message.getReceiptHandle());
                }

                try {
                    consumer.ackMessage(handles);
                } catch (Throwable e) {
                    if (e instanceof AckMessageException) {
                        AckMessageException errors = (AckMessageException) e;
                        System.out.println("Ack message fail, requestId is:"
                                + errors.getRequestId() + ", fail handles:");
                        if (errors.getErrorMessages() != null) {
                            for (String errorHandle : errors.getErrorMessages().keySet()) {
                                System.out.println("Handle:" + errorHandle
                                        + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                        + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                            }
                        }
                        continue;
                    }
                    e.printStackTrace();
                }
            }
        } while (true);
    }
}

Replace the following placeholders with your actual values:

PlaceholderDescriptionWhere to find it
<your-http-endpoint>HTTP endpointInstance Details page > Endpoints tab > HTTP Endpoint
${ACCESS_KEY}AccessKey ID for authenticationSee Create an AccessKey pair
${SECRET_KEY}AccessKey secret for authenticationSee Create an AccessKey pair
<your-topic>Topic nameApsaraMQ for RocketMQ console > Topics
<your-group-id>Consumer group IDApsaraMQ for RocketMQ console > Groups
<your-instance-id>Instance ID, or null if the instance has no namespaceInstance Details page > Basic Information

What's next