全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發順序訊息

更新時間:Jul 01, 2024

順序訊息(FIFO訊息)是阿里雲雲訊息佇列 RocketMQ 版提供的一種嚴格按照順序來發布和消費的訊息類型。本文提供使用TCP協議下的社區版C++ SDK收發順序訊息的範例程式碼供您參考。

順序訊息分為兩類:

  • 全域順序:對於指定的一個Topic,所有訊息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和消費。

  • 分區順序:對於指定的一個Topic,所有訊息根據Sharding Key進行區塊分區。同一個分區內的訊息按照嚴格的FIFO順序進行發布和消費。Sharding Key是順序訊息中用來區分不同分區的關鍵字段,和普通訊息的Key是完全不同的概念。

更多資訊,請參見順序訊息

前提條件

發送順序訊息

重要

雲訊息佇列 RocketMQ 版服務端判定訊息產生的順序性是參照單一生產者、單一線程並發下訊息發送的時序。如果發送方有多個生產者或者有多個線程並發發送訊息,則此時只能以到達雲訊息佇列 RocketMQ 版服務端的時序作為訊息順序的依據,和業務側的發送順序未必一致。

  1. 將以下代碼拷貝到OrderProducerDemo.cpp檔案中,修改相應的參數後,使用g++命令進行編譯,產生可執行檔,即可直接運行。

    g++ -o order_producer_demo -std=c++11 -lz -lrocketmq OrderProducerDemo.cpp

  2. 發送順序訊息的範例程式碼如下。

    #include <iostream>
    #include <chrono>
    #include <thread>
    #include "DefaultMQProducer.h"
    
    using namespace std;
    using namespace rocketmq;
    
    class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
    public:
        MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) {
            // 實現自訂分區邏輯,根據業務傳入arg參數即分區鍵,計算路由到哪個隊列,這裡以arg為int型參數為例。
            int orderId = *static_cast<int *>(arg);
            int index = orderId % mqs.size();
            return mqs[0];
        }
    };
    
    int main() {
        std::cout << "=======Before sending messages=======" << std::endl;
        //您在阿里雲訊息佇列RocketMQ控制台上申請的GID。
        DefaultMQProducer producer("GID_XXXXXXXX");
        //設定TCP協議存取點,從阿里雲訊息佇列RocketMQ控制台的執行個體詳情頁面擷取。
        producer.setNamesrvAddr("http://MQ_INST_XXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
    	  //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分別為阿里雲帳號的AccessKey ID和AccessKey Secret,用於身分識別驗證。
        //使用者渠道,預設值為:ALIYUN。
        producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
    
        //請確保參數設定完成之後啟動Producer。
        producer.start();
        auto start = std::chrono::system_clock::now();
        int count = 32;
        //可以設定發送重試的次數,確保發送成功。
        int retryTimes = 1;
        //參考介面MessageQueueSelector,通過設定的自訂參數arg,計算髮送到指定的路由隊列中,此處的arg便是分區ID。
        ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
        for (int i = 0; i < count; ++i) {
            //發送訊息時請設定您在阿里雲訊息佇列RocketMQ控制台上申請的Topic。
            MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message.");
            try {
                SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
                std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                          << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl;
                this_thread::sleep_for(chrono::seconds(1));
            } catch (MQException e) {
                std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
            }
        }
        auto interval = std::chrono::system_clock::now() - start;
        std::cout << "Send " << count << " messages OK, costs "
                  << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
    
        producer.shutdown();
        std::cout << "=======After sending messages=======" << std::endl;
        return 0;
    }

消費順序訊息

  1. 將以下代碼拷貝到OrderConsumerDemo.cpp檔案中,修改相應的參數後,使用g++命令進行編譯,產生可執行檔,即可直接運行。

    g++ -o order_consumer_demo -std=c++11 -lz -lrocketmq OrderConsumerDemo.cpp

  2. 消費順序訊息的範例程式碼如下。

    
    #include <iostream>
    #include <thread>
    #include "DefaultMQPushConsumer.h"
    
    using namespace rocketmq;
    
    
    class ExampleOrderlyMessageListener : public MessageListenerOrderly {
    public:
        ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
            for (auto item = msgs.begin(); item != msgs.end(); item++) {
                std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
            }
            return CONSUME_SUCCESS;
        }
    };
    
    int main(int argc, char *argv[]) {
        std::cout << "=======Before consuming messages=======" << std::endl;
        //您在阿里雲訊息佇列RocketMQ控制台上申請的GID。
        DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXXX");
        //設定TCP協議存取點,從阿里雲訊息佇列RocketMQ控制台的執行個體詳情頁面擷取。
        consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
    	  //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分別為阿里雲帳號的AccessKey ID和AccessKey Secret,用於身分識別驗證。
        //使用者渠道,預設值為:ALIYUN。
        consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
        auto start = std::chrono::system_clock::now();
    
        //register your own listener here to handle the messages received.
        //請註冊自訂偵聽函數用來處理接收到的訊息,並返迴響應的處理結果。
        ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
        consumer->subscribe("YOUR ORDERLY TOPIC", "*");
        consumer->registerMessageListener(messageListener);
    
        //Start this consumer
        //準備工作完成,必須調用啟動函數,才可以正常工作。
        // ********************************************
        // 1.確保訂閱關係的設定在啟動之前完成。
        // 2.確保相同GID下面的消費者的訂閱關係一致。
        // *********************************************
        consumer->start();
    
        //Keep main thread running until process finished.
        //請保持線程常駐,不要執行shutdown操作。
        std::this_thread::sleep_for(std::chrono::seconds (60 ));
        consumer->shutdown();
        std::cout << "=======After consuming messages======" << std::endl;
        return 0;
    }