All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive scheduled and delayed messages by using the C++ HTTP SDK

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ supports two types of time-deferred messages through the HTTP client SDK for C++:

  • Delayed messages: delivered after a specified delay from the time the message is sent. For example, a 10-second delay means the consumer receives the message 10 seconds after it is sent.

  • Scheduled messages: delivered at a specific point in time. For example, a message scheduled for 14:00 is delivered at 14:00.

Both types use the same API. Set StartDeliverTime to a millisecond-level Unix timestamp that represents the desired delivery time:

  • Delayed message: current time + delay duration. Example: time(NULL) * 1000 + 10 * 1000 delivers after 10 seconds.

  • Scheduled message: the target delivery time converted to a millisecond-level Unix timestamp.

For more information about these message types, see Scheduled messages and delayed messages.

Prerequisites

Before you begin, make sure that you have:

Send scheduled and delayed messages

The following code sends four delayed messages, each delivered 10 seconds after sending. To send a scheduled message instead, set StartDeliverTime to the target delivery time as a millisecond-level Unix timestamp.

Replace the following placeholders with your actual values:

PlaceholderDescriptionWhere to find it
${HTTP_ENDPOINT}HTTP endpoint of the instanceInstance Details page > HTTP Endpoint section in the ApsaraMQ for RocketMQ console
${TOPIC}Topic nameApsaraMQ for RocketMQ console
${INSTANCE_ID}Instance ID. Leave blank if the instance has no namespaceInstance Details page in the ApsaraMQ for RocketMQ console
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // HTTP endpoint of the instance.
            "${HTTP_ENDPOINT}",
            // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
            // and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
            // AccessKey ID for authentication.
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            // AccessKey secret for authentication.
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
            );

    // Topic to send messages to. Create this topic in the ApsaraMQ for RocketMQ console.
    string topic = "${TOPIC}";
    // Instance ID. If the instance has a namespace, specify the instance ID.
    // If not, set this to an empty string.
    string instanceId = "${INSTANCE_ID}";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        // Send four messages in a loop.
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            // Message body.
            TopicMessage pubMsg("Hello, mq!have key!");
            // Custom message property.
            pubMsg.putProperty("a",std::to_string(i));
            // Message key.
            pubMsg.setMessageKey("MessageKey" + std::to_string(i));
            // Deliver after a 10-second delay.
            // StartDeliverTime is a millisecond-level Unix timestamp.
            // For a scheduled message, set this to the target delivery time
            // as a millisecond-level Unix timestamp.
            pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
            producer->publishMessage(pubMsg, pmResp);

            cout << "Publish mq message success. Topic is: " << topic
                << ", msgId is:" << pmResp.getMessageId()
                << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Request Failed: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}

Subscribe to scheduled and delayed messages

Scheduled and delayed messages are consumed the same way as normal messages. The broker holds each message until its delivery time and then makes it available for consumption.

The following code uses long polling to consume messages. In long polling mode, if no message is available, the request stays open on the broker for the specified duration. The broker responds immediately when a message arrives.

Replace the following placeholders with your actual values:

PlaceholderDescriptionWhere to find it
${HTTP_ENDPOINT}HTTP endpoint of the instanceInstance Details page > HTTP Endpoint section in the ApsaraMQ for RocketMQ console
${TOPIC}Topic nameApsaraMQ for RocketMQ console
${GROUP_ID}Consumer group IDApsaraMQ for RocketMQ console
${INSTANCE_ID}Instance ID. Leave blank if the instance has no namespaceInstance Details page in the ApsaraMQ for RocketMQ console
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // HTTP endpoint of the instance.
            "${HTTP_ENDPOINT}",
            // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
            // and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
            // AccessKey ID for authentication.
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            // AccessKey secret for authentication.
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
            );

    // Topic to consume messages from. Create this topic in the ApsaraMQ for RocketMQ console.
    string topic = "${TOPIC}";
    // Consumer group ID. Create this in the ApsaraMQ for RocketMQ console.
    string groupId = "${GROUP_ID}";
    // Instance ID. If the instance has a namespace, specify the instance ID.
    // If not, set this to an empty string.
    string instanceId = "${INSTANCE_ID}";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // Consume messages in long polling mode.
            // If no message is available, the request is held on the broker
            // for the specified duration until a message arrives.
            consumer->consumeMessage(
                    3, // Maximum messages per request (max: 16).
                    3, // Long polling timeout in seconds (max: 30).
                    messages
            );
            cout << "Consume: " << messages.size() << " Messages!" << endl;

            // Process consumed messages.
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter != messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes()
                    << " Properties: " << iter->getPropertiesAsString()
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // ACK the consumed messages.
            // If the broker does not receive an ACK before NextConsumeTime,
            // it redelivers the message. Each consumption attempt generates
            // a new receipt handle with a unique timestamp.
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (!bdmResp.isSuccess()) {
                // Log failed ACKs. A timed-out receipt handle prevents
                // the broker from receiving the ACK.
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
                        iter != failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " messages suc!" << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
                continue;
            }
            cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}

See also