This topic provides Java sample code for sending and consuming scheduled or delayed messages through the ApsaraMQ for RocketMQ HTTP client SDK.
Scheduled vs. delayed messages
Scheduled messages and delayed messages are the same feature at the API level. Both use the setStartDeliverTime method to defer delivery. The only difference is how you calculate the timestamp:
| Type | How to set the delivery time | Example |
|---|---|---|
| Delayed message | Current time + offset | System.currentTimeMillis() + 10 * 1000 (10-second delay) |
| Scheduled message | Absolute Unix timestamp in milliseconds | 1654770600000 (2022-06-09 18:30:00) |
The broker holds each message until its specified delivery time, then releases it for consumption. This is a per-message setting, so each message can have its own delivery time.
For more information, see Scheduled messages and delayed messages.
Prerequisites
Before you begin, make sure that you have:
Created an instance, a topic, and a consumer group in the ApsaraMQ for RocketMQ console
Created an AccessKey pair for your Alibaba Cloud account
Each topic supports only one message type. Do not reuse a topic configured for normal messages to send scheduled or delayed messages.
Send scheduled or delayed messages
All messages in this example use setStartDeliverTime to defer delivery by 10 seconds. To send a scheduled message, replace the relative offset with an absolute Unix timestamp in milliseconds (for example, 1654770600000 for 2022-06-09 18:30:00).
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class Producer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// HTTP endpoint. Find this on the Instance Details page, in the
// Endpoints tab under Basic Information.
"<your-http-endpoint>",
// Load credentials from environment variables to avoid hardcoding secrets.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
final String topic = "<your-topic>";
// If the instance has a namespace, specify the instance ID.
// If it does not have a namespace, set this to null or "".
final String instanceId = "<your-instance-id>";
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
for (int i = 0; i < 4; i++) {
TopicMessage pubMsg = new TopicMessage(
"hello mq!".getBytes(), // Message body
"A" // Message tag
);
pubMsg.getProperties().put("a", String.valueOf(i)); // Custom property
pubMsg.setMessageKey("MessageKey");
// Defer delivery by 10 seconds.
// For a scheduled message, use an absolute timestamp instead,
// e.g., pubMsg.setStartDeliverTime(1654770600000L);
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
System.out.println(new Date() + " Send mq message success."
+ " Topic is:" + topic
+ ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}Replace the following placeholders with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
<your-http-endpoint> | HTTP endpoint | Instance Details page > Endpoints tab > HTTP Endpoint |
<your-topic> | Topic name | ApsaraMQ for RocketMQ console > Topics |
<your-instance-id> | Instance ID, or null if the instance has no namespace | Instance Details page > Basic Information |
Consume scheduled or delayed messages
Consume scheduled and delayed messages the same way as normal messages. The broker holds them until the delivery time arrives, then releases them for consumption.
The following example uses long polling to consume messages and send acknowledgments (ACKs) back to the broker.
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// HTTP endpoint. Find this on the Instance Details page, in the
// Endpoints tab under Basic Information.
"<your-http-endpoint>",
// The AccessKey ID for authentication.
"${ACCESS_KEY}",
// The AccessKey secret for authentication.
"${SECRET_KEY}"
);
final String topic = "<your-topic>";
final String groupId = "<your-group-id>";
// If the instance has a namespace, specify the instance ID.
// If it does not have a namespace, set this to null or "".
final String instanceId = "<your-instance-id>";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// Consume messages in a loop. For production use, run multiple threads
// to consume messages concurrently.
do {
List<Message> messages = null;
try {
// Long polling: wait up to 3 seconds for new messages.
// - First parameter: max messages per batch (max: 16).
// - Second parameter: long polling timeout in seconds (max: 30).
messages = consumer.consumeMessage(3, 3);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName()
+ ": no new message, continue!");
continue;
}
// Process messages
for (Message message : messages) {
System.out.println("Receive message: " + message);
}
// ACK consumed messages. If the broker does not receive an ACK
// before the retry interval elapses, the message is redelivered.
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:"
+ errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle : errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle
+ ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}Replace the following placeholders with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
<your-http-endpoint> | HTTP endpoint | Instance Details page > Endpoints tab > HTTP Endpoint |
${ACCESS_KEY} | AccessKey ID for authentication | See Create an AccessKey pair |
${SECRET_KEY} | AccessKey secret for authentication | See Create an AccessKey pair |
<your-topic> | Topic name | ApsaraMQ for RocketMQ console > Topics |
<your-group-id> | Consumer group ID | ApsaraMQ for RocketMQ console > Groups |
<your-instance-id> | Instance ID, or null if the instance has no namespace | Instance Details page > Basic Information |
What's next
Scheduled messages and delayed messages: Understand the lifecycle, delivery rules, and limits of scheduled and delayed messages.
Send and receive normal messages: Send messages that are delivered immediately without any delay.