ApsaraMQ for RocketMQ supports two types of time-deferred messages through the HTTP client SDK for C++:
Delayed messages: delivered after a specified delay from the time the message is sent. For example, a 10-second delay means the consumer receives the message 10 seconds after it is sent.
Scheduled messages: delivered at a specific point in time. For example, a message scheduled for 14:00 is delivered at 14:00.
Both types use the same API. Set StartDeliverTime to a millisecond-level Unix timestamp that represents the desired delivery time:
Delayed message: current time + delay duration. Example:
time(NULL) * 1000 + 10 * 1000delivers after 10 seconds.Scheduled message: the target delivery time converted to a millisecond-level Unix timestamp.
For more information about these message types, see Scheduled messages and delayed messages.
Prerequisites
Before you begin, make sure that you have:
The C++ HTTP client SDK installed. For more information, see Prepare the environment
An ApsaraMQ for RocketMQ instance, a topic, and a consumer group created in the ApsaraMQ for RocketMQ console
An AccessKey pair for your Alibaba Cloud account. For more information, see Create an AccessKey pair
Send scheduled and delayed messages
The following code sends four delayed messages, each delivered 10 seconds after sending. To send a scheduled message instead, set StartDeliverTime to the target delivery time as a millisecond-level Unix timestamp.
Replace the following placeholders with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
${HTTP_ENDPOINT} | HTTP endpoint of the instance | Instance Details page > HTTP Endpoint section in the ApsaraMQ for RocketMQ console |
${TOPIC} | Topic name | ApsaraMQ for RocketMQ console |
${INSTANCE_ID} | Instance ID. Leave blank if the instance has no namespace | Instance Details page in the ApsaraMQ for RocketMQ console |
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// HTTP endpoint of the instance.
"${HTTP_ENDPOINT}",
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
// and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// AccessKey ID for authentication.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// AccessKey secret for authentication.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topic to send messages to. Create this topic in the ApsaraMQ for RocketMQ console.
string topic = "${TOPIC}";
// Instance ID. If the instance has a namespace, specify the instance ID.
// If not, set this to an empty string.
string instanceId = "${INSTANCE_ID}";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
// Send four messages in a loop.
for (int i = 0; i < 4; i++)
{
PublishMessageResponse pmResp;
// Message body.
TopicMessage pubMsg("Hello, mq!have key!");
// Custom message property.
pubMsg.putProperty("a",std::to_string(i));
// Message key.
pubMsg.setMessageKey("MessageKey" + std::to_string(i));
// Deliver after a 10-second delay.
// StartDeliverTime is a millisecond-level Unix timestamp.
// For a scheduled message, set this to the target delivery time
// as a millisecond-level Unix timestamp.
pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
producer->publishMessage(pubMsg, pmResp);
cout << "Publish mq message success. Topic is: " << topic
<< ", msgId is:" << pmResp.getMessageId()
<< ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
return -2;
}
return 0;
}Subscribe to scheduled and delayed messages
Scheduled and delayed messages are consumed the same way as normal messages. The broker holds each message until its delivery time and then makes it available for consumption.
The following code uses long polling to consume messages. In long polling mode, if no message is available, the request stays open on the broker for the specified duration. The broker responds immediately when a message arrives.
Replace the following placeholders with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
${HTTP_ENDPOINT} | HTTP endpoint of the instance | Instance Details page > HTTP Endpoint section in the ApsaraMQ for RocketMQ console |
${TOPIC} | Topic name | ApsaraMQ for RocketMQ console |
${GROUP_ID} | Consumer group ID | ApsaraMQ for RocketMQ console |
${INSTANCE_ID} | Instance ID. Leave blank if the instance has no namespace | Instance Details page in the ApsaraMQ for RocketMQ console |
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// HTTP endpoint of the instance.
"${HTTP_ENDPOINT}",
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
// and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// AccessKey ID for authentication.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// AccessKey secret for authentication.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topic to consume messages from. Create this topic in the ApsaraMQ for RocketMQ console.
string topic = "${TOPIC}";
// Consumer group ID. Create this in the ApsaraMQ for RocketMQ console.
string groupId = "${GROUP_ID}";
// Instance ID. If the instance has a namespace, specify the instance ID.
// If not, set this to an empty string.
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Consume messages in long polling mode.
// If no message is available, the request is held on the broker
// for the specified duration until a message arrives.
consumer->consumeMessage(
3, // Maximum messages per request (max: 16).
3, // Long polling timeout in seconds (max: 30).
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// Process consumed messages.
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// ACK the consumed messages.
// If the broker does not receive an ACK before NextConsumeTime,
// it redelivers the message. Each consumption attempt generates
// a new receipt handle with a unique timestamp.
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// Log failed ACKs. A timed-out receipt handle prevents
// the broker from receiving the ACK.
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}See also
Scheduled messages and delayed messages -- Concepts and limits for scheduled and delayed messages
Prepare the environment -- Set up the C++ HTTP client SDK