本文提供使用TCP協議下的社區版C++ SDK來收發定時和延時訊息的範例程式碼供您參考。
背景資訊
- 定時訊息:Producer將訊息發送到雲訊息佇列 RocketMQ 版服務端,但並不期望立馬投遞這條訊息,而是延遲到在目前時間點之後的某一個時間投遞到Consumer進行消費,該訊息即定時訊息。
- 延時訊息:Producer將訊息發送到雲訊息佇列 RocketMQ 版服務端,但並不期望立馬投遞這條訊息,而是延遲一定時間後才投遞到Consumer進行消費,該訊息即延時訊息。
更多資訊,請參見定時和延時訊息。
重要
社區版的Apache RocketMQ和阿里雲雲訊息佇列 RocketMQ 版配置方式不同,實現效果也有所差異。社區版的Apache RocketMQ支援延時訊息,但不支援定時訊息,因此沒有專門的定時訊息介面。阿里雲雲訊息佇列 RocketMQ 版不僅同時支援配置延時訊息和定時訊息,並且定時和延時時間可以精確到秒級、擁有更高的並發性。建議您優先使用雲上定時延時的方式,使用方法,請參考以下步驟。
前提條件
擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey。
發送定時訊息
將以下代碼拷貝到DelayProducerDemo.cpp檔案中,修改相應的參數後,使用g++命令進行編譯,產生可執行檔,即可直接運行。
g++ -o delay_producer_demo -std=c++11 -lz -lrocketmq DelayProducerDemo.cpp發送定時訊息的範例程式碼如下。
#include <iostream> #include <chrono> #include <thread> #include "DefaultMQProducer.h" using namespace std; using namespace rocketmq; int main() { std::cout << "=======Before sending messages=======" << std::endl; //您在阿里雲訊息佇列RocketMQ版上申請的GID。 DefaultMQProducer producer("GID_XXXXXXXXXXX"); //設定TCP協議存取點,從阿里雲訊息佇列RocketMQ版控制台的執行個體詳情頁面擷取。 producer.setNamesrvAddr("http://MQ_INST_XXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。 //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分別為阿里雲帳號的AccessKey ID和AccessKey Secret,用於身分識別驗證。 //使用者渠道,預設值為:ALIYUN。 producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); //請確保參數設定完成之後啟動Producer。 producer.start(); auto start = std::chrono::system_clock::now(); int count = 32; for (int i = 0; i < count; ++i) { //發送訊息時請設定您在阿里雲訊息佇列RocketMQ版控制台上申請的Topic。 MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message."); chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch(); chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d); //定時延時訊息,單位毫秒(ms),在指定時間戳記(目前時間之後)進行投遞,例如2020-03-07 16:21:00投遞。 //如果被設定成目前時間戳之前的某個時刻,訊息將立刻投遞給消費者。 //設定需要延時或者定時的時間,例如目前時間延遲10秒後投遞。 long exp = mil.count() + 10000; msg.setProperty("__STARTDELIVERTIME", to_string(exp)); std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl; try { SendResult sendResult = producer.send(msg); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Send " << count << " messages OK, costs " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; producer.shutdown(); std::cout << "=======After sending messages=======" << std::endl; return 0; }
消費定時訊息
將以下代碼拷貝到DelayConsumerDemo.cpp檔案中,修改相應的參數後,使用g++命令進行編譯,產生可執行檔,即可直接運行。
g++ -o delay_consumer_demo -std=c++11 -lz -lrocketmq DelayConsumerDemo.cpp消費定時訊息的範例程式碼如下。
#include <iostream> #include <thread> #include <chrono> #include "DefaultMQPushConsumer.h" using namespace rocketmq; using namespace std; class ExampleDelayMessageListener : public MessageListenerConcurrently { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch(); chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d); std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Before consuming messages=======" << std::endl; //您在阿里雲訊息佇列RocketMQ版控制台上申請的GID。 DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXX"); //設定TCP協議存取點,從阿里雲訊息佇列RocketMQ版控制台的執行個體詳情頁面擷取。 consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。 //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分別為阿里雲帳號的AccessKey ID和AccessKey Secret,用於身分識別驗證。 //使用者渠道,預設值為:ALIYUN。 consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); auto start = std::chrono::system_clock::now(); //register your own listener here to handle the messages received. //請註冊自訂偵聽函數用來處理接收到的訊息,並返迴響應的處理結果。 ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener(); consumer->subscribe("YOUR DELAY TOPIC", "*"); consumer->registerMessageListener(messageListener); //Start this consumer //準備工作完成,必須調用啟動函數,才可以正常工作。 // ******************************************** // 1.確保訂閱關係的設定在啟動之前完成。 // 2.確保相同GID下面的消費者的訂閱關係一致。 // ********************************************* consumer->start(); //Keep main thread running until process finished. //請保持線程常駐,不要執行shutdown操作。 std::this_thread::sleep_for(std::chrono::seconds(600)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }