Sample code for sending and receiving scheduled and delayed messages with the TCP client SDK for C++ (Community Edition).
Scheduled vs. delayed messages
ApsaraMQ for RocketMQ supports two types of time-based delivery:
Scheduled messages: Delivered to consumers at a specific point in time. For example, deliver a notification at 9:00 AM tomorrow.
Delayed messages: Delivered to consumers after a specified delay. For example, deliver a reminder 30 minutes from now.
Both types use the __STARTDELIVERTIME message property. The difference is how the timestamp is calculated:
| Type | Timestamp value | Example |
|---|---|---|
| Scheduled | Absolute time in milliseconds (Unix epoch) | 1583568060000 (March 7, 2020, 16:21:00) |
| Delayed | Current time + delay in milliseconds | currentTimeMillis + 10000 (10-second delay) |
For more information, see Scheduled messages and delayed messages.
Differences from Apache RocketMQ
| Capability | ApsaraMQ for RocketMQ | Apache RocketMQ |
|---|---|---|
| Scheduled messages | Supported | Not supported |
| Delayed messages | Supported | Supported |
| Time precision | Second-level accuracy | N/A |
| Dedicated API | __STARTDELIVERTIME property | No dedicated interface for scheduled messages |
Apache RocketMQ supports delayed messages but not scheduled messages. No dedicated interface is available for scheduled messages in the open-source SDK. ApsaraMQ for RocketMQ supports both scheduled and delayed messages, with scheduled time and delay periods accurate to seconds and higher concurrency.
Time-setting rules
The
__STARTDELIVERTIMEvalue is a Unix timestamp in milliseconds.Set an absolute timestamp for scheduled messages, or set
currentTimeMillis + delayMillisfor delayed messages.If the timestamp is earlier than the current time, the message is delivered immediately.
Prerequisites
Before you begin, make sure that you have:
The C++ dynamic library installed. For more information, see Install the C++ dynamic library
An AccessKey pair for your Alibaba Cloud account. For more information, see Create an AccessKey pair
A topic and a group ID created in the ApsaraMQ for RocketMQ console
Send scheduled or delayed messages
To schedule delivery, set the __STARTDELIVERTIME property on the message:
msg.setProperty("__STARTDELIVERTIME", to_string(deliverTimestamp));Set deliverTimestamp to a millisecond Unix timestamp. For a 10-second delay, calculate it as:
long deliverTimestamp = currentTimeMillis + 10000;Complete producer example
Save the following code as
DelayProducerDemo.cpp.Compile:
g++ -o delay_producer_demo -std=c++11 -lz -lrocketmq DelayProducerDemo.cppRun the compiled binary.
Replace the placeholders in the code with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<group-id> | Group ID from the ApsaraMQ for RocketMQ console | GID_delay_demo |
<endpoint> | TCP endpoint from the Instance Details page | http://MQ_INST_xxx.mq-internet-access.mq-internet.aliyuncs.com:80 |
<topic> | Topic from the ApsaraMQ for RocketMQ console | delay_topic |
#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main() {
std::cout << "=======Before sending messages=======" << std::endl;
// Group ID from the ApsaraMQ for RocketMQ console.
DefaultMQProducer producer("<group-id>");
// TCP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
producer.setNamesrvAddr("<endpoint>");
// Authenticate with your AccessKey ID and AccessKey secret from environment variables.
// Default user channel: ALIYUN.
producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
// Start the producer.
producer.start();
auto start = std::chrono::system_clock::now();
int count = 32;
for (int i = 0; i < count; ++i) {
// Topic from the ApsaraMQ for RocketMQ console.
MQMessage msg("<topic>", "HiTAG", "Hello,CPP SDK, Delay Message.");
chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
// Set the delivery time.
// - Scheduled message: use an absolute timestamp in milliseconds.
// For example, 1583568060000 delivers the message at 2020-03-07 16:21:00.
// - Delayed message: add the delay in milliseconds to the current time.
// The following example delays delivery by 10 seconds.
// If the timestamp is earlier than the current time, the message is delivered immediately.
long exp = mil.count() + 10000;
msg.setProperty("__STARTDELIVERTIME", to_string(exp));
std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl;
try {
SendResult sendResult = producer.send(msg);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
<< std::endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException e) {
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
auto interval = std::chrono::system_clock::now() - start;
std::cout << "Send " << count << " messages OK, costs "
<< std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
producer.shutdown();
std::cout << "=======After sending messages=======" << std::endl;
return 0;
}Receive scheduled or delayed messages
Consuming scheduled and delayed messages works the same as consuming normal messages. Register a MessageListenerConcurrently listener and subscribe to the topic.
Complete consumer example
Save the following code as
DelayConsumerDemo.cpp.Compile:
g++ -o delay_consumer_demo -std=c++11 -lz -lrocketmq DelayConsumerDemo.cppRun the compiled binary.
Replace the placeholders with your actual values (same as the producer example).
#include <iostream>
#include <thread>
#include <chrono>
#include "DefaultMQPushConsumer.h"
using namespace rocketmq;
using namespace std;
class ExampleDelayMessageListener : public MessageListenerConcurrently {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:"
<< item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
// Group ID from the ApsaraMQ for RocketMQ console.
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<group-id>");
// TCP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console.
consumer->setNamesrvAddr("<endpoint>");
// Authenticate with your AccessKey ID and AccessKey secret from environment variables.
// Default user channel: ALIYUN.
consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
auto start = std::chrono::system_clock::now();
// Register a message listener.
ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
consumer->subscribe("<topic>", "*");
consumer->registerMessageListener(messageListener);
// Before starting the consumer:
// 1. Make sure that the subscription is configured.
// 2. Make sure that consumers in the same group have consistent subscriptions.
consumer->start();
// Keep the thread running. Do not shut down the consumer while messages are being received.
std::this_thread::sleep_for(std::chrono::seconds(600));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}Best practices
Store credentials in environment variables. Both examples use
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRETenvironment variables instead of hardcoded values.Keep consumer subscriptions consistent. All consumers within the same group must subscribe to the same topics and tags. Inconsistent subscriptions lead to unpredictable message routing.