すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:スケジュールされたメッセージと遅延メッセージの送受信

最終更新日:Jul 09, 2024

このトピックでは、Community EditionのTCPクライアントSDK for C ++ を使用して、スケジュールされたメッセージと遅延されたメッセージを送受信するためのサンプルコードを提供します。

背景情報

  • スケジュールされたメッセージ: スケジュールされたメッセージは、ApsaraMQ for RocketMQブローカーによって指定された時点でコンシューマに配信されるメッセージです。

  • 遅延メッセージ: 遅延メッセージは、ApsaraMQ for RocketMQブローカーによって指定された期間後にコンシューマに配信されるメッセージです。

詳細については、「スケジュールされたメッセージと遅延メッセージ」をご参照ください。

重要

スケジュールされたメッセージと遅延メッセージの設定方法と結果は、Apache RocketMQとApsaraMQ for RocketMQの間で異なります。 Apache RocketMQは遅延メッセージをサポートしますが、スケジュールメッセージはサポートしません。 したがって、スケジュールメッセージに専用のインターフェイスは使用できません。 ApsaraMQ for RocketMQは、遅延メッセージとスケジュール済みメッセージをサポートしています。 これにより、秒単位の正確なスケジュール時間と遅延期間を設定でき、同時実行性が高くなります。 クラウド上でスケジュールメッセージと遅延メッセージを送受信することを推奨します。 詳細は、以下のセクションをご参照ください。

前提条件

スケジュールされたメッセージの送信

  1. 次のコードをDelayProducerDemo.cppファイルにコピーします。 ファイルで、対応するパラメーターを変更し、g ++ コマンドを実行してコードをコンパイルし、実行可能な実行可能ファイルを生成します。

    g ++ -o delay_producer_demo -std=c ++ 11 -lz -lrocketmq DelayProducerDemo.cpp

  2. 次のサンプルコードは、Community EditionのTCPクライアントSDK for C ++ を使用してスケジュールされたメッセージを送信する方法の例を示しています。

    #include <iostream>
    #include <chrono>
    #include <thread>
    #include "DefaultMQProducer.h"
    
    using namespace std;
    using namespace rocketmq;
    
    int main() {
        std::cout << "=======Before sending messages=======" << std::endl;
        // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. 
        DefaultMQProducer producer("GID_XXXXXXXXXXX");
        // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. 
        producer.setNamesrvAddr("http://MQ_INST_XXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account. They are used for authentication. 
        // The user channel. Default value: ALIYUN. 
        producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
    
        // After you configure the required parameters, start the producer. 
        producer.start();
        auto start = std::chrono::system_clock::now();
        int count = 32;
        for (int i = 0; i < count; ++i) {
            // The topic for which you applied in the ApsaraMQ for RocketMQ console. 
            MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message.");
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            // The timestamp that indicates the time after which the scheduled message is delivered. Unit: milliseconds. For example, you can specify a timestamp to deliver the scheduled message at 16:21:00 on March 7, 2020. 
            // If you specify a timestamp that is earlier than the current point in time, the message is immediately delivered to consumers. 
            // The delay period for the delay message. For example, if you want to deliver the message 10 seconds after the current point in time, specify the value as 10000. 
            long exp = mil.count() + 10000;
            msg.setProperty("__STARTDELIVERTIME", to_string(exp));
            std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl;
            try {
                SendResult sendResult = producer.send(msg);
                std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                          << std::endl;
                this_thread::sleep_for(chrono::seconds(1));
            } catch (MQException e) {
                std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
            }
        }
        auto interval = std::chrono::system_clock::now() - start;
        std::cout << "Send " << count << " messages OK, costs "
                  << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
    
        producer.shutdown();
        std::cout << "=======After sending messages=======" << std::endl;
        return 0;
    }

スケジュールされたメッセージを消費する

  1. 次のコードをDelayConsumerDemo.cppファイルにコピーします。 ファイルで、対応するパラメーターを変更し、g ++ コマンドを実行してコードをコンパイルし、実行可能な実行可能ファイルを生成します。

    g ++ -o delay_consumer_demo -std=c ++ 11 -lz -lrocketmq DelayConsumerDemo.cpp

  2. 次のコードは、Community EditionのTCPクライアントSDK for C ++ を使用してスケジュールされたメッセージを消費する方法の例を示しています。

    
    #include <iostream>
    #include <thread>
    #include <chrono>
    #include "DefaultMQPushConsumer.h"
    
    using namespace rocketmq;
    using namespace std;
    
    class ExampleDelayMessageListener : public MessageListenerConcurrently {
    public:
        ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
            for (auto item = msgs.begin(); item != msgs.end(); item++) {
                chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
                chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
                std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:"
                          << item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
            }
            return CONSUME_SUCCESS;
        }
    };
    
    int main(int argc, char *argv[]) {
        std::cout << "=======Before consuming messages=======" << std::endl;
        // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. 
        DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXX");
        // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. 
        consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account. They are used for authentication. 
        // The user channel. Default value: ALIYUN. 
        consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
        auto start = std::chrono::system_clock::now();
    
        // Register your own listener here to handle the messages received.
        // Use a custom listener function to process the received messages and return the processing results. 
        ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
        consumer->subscribe("YOUR DELAY TOPIC", "*");
        consumer->registerMessageListener(messageListener);
    
        //Start this consumer
        // The preparation is complete. You must invoke the startup function to start the consumer. 
        // ********************************************
        // 1. Before you start the consumer, make sure that the subscription is configured. 
        // 2. Make sure that the subscriptions of consumers in the same group are consistent. 
        // *********************************************
        consumer->start();
    
        //Keep main thread running until process finished.
        // Keep the thread running and do not shut down the consumer. 
        std::this_thread::sleep_for(std::chrono::seconds(600));
        consumer->shutdown();
        std::cout << "=======After consuming messages======" << std::endl;
        return 0;
    }