All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive scheduled messages and delayed messages

Last Updated:Dec 13, 2023

This topic provides sample code based on which you can use the TCP client SDK for C++ of the Community Edition to send and receive scheduled messages and delayed messages.

Background information

  • Scheduled messages: Scheduled messages are messages that are delivered by ApsaraMQ for RocketMQ brokers to consumers at a specified point in time.
  • Delayed messages: Delayed messages are messages that are delivered by ApsaraMQ for RocketMQ brokers to consumers after a specified period of time.

For more information, see Scheduled messages and delayed messages.

Important

The configuration methods and results of scheduled messages and delayed messages vary between Apache RocketMQ and ApsaraMQ for RocketMQ. Apache RocketMQ supports delayed messages but not scheduled messages. Therefore, no dedicated interface is available for schedule messages. ApsaraMQ for RocketMQ supports delayed messages and scheduled messages. It allows you to configure scheduled time and delay periods that are accurate to seconds and provides higher concurrency. We recommend that you send and receive scheduled messages and delayed messages on the cloud. For more information, see the following sections.

Prerequisites

Send scheduled messages

  1. Copy the following code to the DelayProducerDemo.cpp file. In the file, modify the corresponding parameters, run the g++ command to compile the code, and then generate an executable file that you can run.

    g++ -o delay_producer_demo -std=c++11 -lz -lrocketmq DelayProducerDemo.cpp

  2. The following sample code provides an example on how to send scheduled messages by using the TCP client SDK for C++ of the Community Edition:

    #include <iostream>
    #include <chrono>
    #include <thread>
    #include "DefaultMQProducer.h"
    
    using namespace std;
    using namespace rocketmq;
    
    int main() {
        std::cout << "=======Before sending messages=======" << std::endl;
        // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. 
        DefaultMQProducer producer("GID_XXXXXXXXXXX");
        // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. 
        producer.setNamesrvAddr("http://MQ_INST_XXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account. They are used for authentication. 
        // The user channel. Default value: ALIYUN. 
        producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
    
        // After you configure the required parameters, start the producer. 
        producer.start();
        auto start = std::chrono::system_clock::now();
        int count = 32;
        for (int i = 0; i < count; ++i) {
            // The topic for which you applied in the ApsaraMQ for RocketMQ console. 
            MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message.");
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            // The timestamp that indicates the time after which the scheduled message is delivered. Unit: milliseconds. For example, you can specify a timestamp to deliver the scheduled message at 16:21:00 on March 7, 2020. 
            // If you specify a timestamp that is earlier than the current point in time, the message is immediately delivered to consumers. 
            // The delay period for the delay message. For example, if you want to deliver the message 10 seconds after the current point in time, specify the value as 10000. 
            long exp = mil.count() + 10000;
            msg.setProperty("__STARTDELIVERTIME", to_string(exp));
            std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl;
            try {
                SendResult sendResult = producer.send(msg);
                std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                          << std::endl;
                this_thread::sleep_for(chrono::seconds(1));
            } catch (MQException e) {
                std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
            }
        }
        auto interval = std::chrono::system_clock::now() - start;
        std::cout << "Send " << count << " messages OK, costs "
                  << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
    
        producer.shutdown();
        std::cout << "=======After sending messages=======" << std::endl;
        return 0;
    }

Consume scheduled messages

  1. Copy the following code to the DelayConsumerDemo.cpp file. In the file, modify the corresponding parameters, run the g++ command to compile the code, and then generate an executable file that you can run.

    g++ -o delay_consumer_demo -std=c++11 -lz -lrocketmq DelayConsumerDemo.cpp

  2. The following code provides an example on how to consume scheduled messages by using the TCP client SDK for C++ of the Community Edition:

    
    #include <iostream>
    #include <thread>
    #include <chrono>
    #include "DefaultMQPushConsumer.h"
    
    using namespace rocketmq;
    using namespace std;
    
    class ExampleDelayMessageListener : public MessageListenerConcurrently {
    public:
        ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
            for (auto item = msgs.begin(); item != msgs.end(); item++) {
                chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
                chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
                std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:"
                          << item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
            }
            return CONSUME_SUCCESS;
        }
    };
    
    int main(int argc, char *argv[]) {
        std::cout << "=======Before consuming messages=======" << std::endl;
        // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. 
        DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXX");
        // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. 
        consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account. They are used for authentication. 
        // The user channel. Default value: ALIYUN. 
        consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
        auto start = std::chrono::system_clock::now();
    
        // Register your own listener here to handle the messages received.
        // Use a custom listener function to process the received messages and return the processing results. 
        ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
        consumer->subscribe("YOUR DELAY TOPIC", "*");
        consumer->registerMessageListener(messageListener);
    
        //Start this consumer
        // The preparation is complete. You must invoke the startup function to start the consumer. 
        // ********************************************
        // 1. Before you start the consumer, make sure that the subscription is configured. 
        // 2. Make sure that the subscriptions of consumers in the same group are consistent. 
        // *********************************************
        consumer->start();
    
        //Keep main thread running until process finished.
        // Keep the thread running and do not shut down the consumer. 
        std::this_thread::sleep_for(std::chrono::seconds(600));
        consumer->shutdown();
        std::cout << "=======After consuming messages======" << std::endl;
        return 0;
    }