すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:注文メッセージの送受信

最終更新日:Jul 09, 2024

順序付きメッセージは、FIFO (first in first out) メッセージとも呼ばれ、ApsaraMQ for RocketMQによって提供されるメッセージの一種です。 このタイプのメッセージは、厳密な順序で発行および使用されます。 このトピックでは、Community EditionのTCPクライアントSDK for C ++ を使用して順序付きメッセージを送受信できるサンプルコードを提供します。

順序付けられたメッセージは、次のタイプに分類されます。

  • グローバルに順序付けられたメッセージ: 指定されたトピックのすべてのメッセージは、先入れ先出し (FIFO) の順序で発行および使用されます。

  • パーティション順メッセージ: 指定されたトピックのすべてのメッセージは、シャーディングキーを使用して異なるパーティションに配布されます。 各パーティション内のメッセージは、FIFO順に発行され、消費される。 シャーディングキーは、異なるパーティションを識別するために順序付けられたメッセージに使用されるキーフィールドです。 シャーディングキーは、通常のメッセージのキーとは異なります。

詳細については、「注文メッセージ」をご参照ください。

前提条件

順序付けられたメッセージを送信する

重要

ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。

  1. 次のコードをOrderProducerDemo.cppファイルにコピーします。 このファイルで、対応するパラメーターを設定し、g ++ コマンドを実行してコードをコンパイルし、実行できる実行可能ファイルを生成します。

    g ++ -o order_producer_demo -std=c ++ 11 -lz -lrocketmq OrderProducerDemo.cpp

  2. 次のサンプルコードは、Community EditionのTCPクライアントSDK for C ++ を使用して順序付けられたメッセージを送信する方法の例を示しています。

    #include <iostream>
    #include <chrono>
    #include <thread>
    #include "DefaultMQProducer.h"
    
    using namespace std;
    using namespace rocketmq;
    
    class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
    public:
        MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) {
            // Specify a custom logic for partitioning. The system calculates the queue to which a message is routed based on the arg parameter that specifies the partition key. In this example, the arg parameter of the INT type is used. 
            int orderId = *static_cast<int *>(arg);
            int index = orderId % mqs.size();
            return mqs[0];
        }
    };
    
    int main() {
        std::cout << "=======Before sending messages=======" << std::endl;
        // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. 
        DefaultMQProducer producer("GID_XXXXXXXX");
        // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. 
        producer.setNamesrvAddr("http://MQ_INST_XXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. 
        // The user channel. Default value: ALIYUN. 
        producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
    
        // After you configure the required parameters, start the producer. 
        producer.start();
        auto start = std::chrono::system_clock::now();
        int count = 32;
        // The number of retries that the message can perform after it fails to be sent. This ensures the successful delivery of the message. 
        int retryTimes = 1;
        // Use the configured custom parameter arg to calculate the queue to which a message is sent and send the message to the specified queue. The arg parameter specifies the partition ID. For more information about how to select a message queue, see the MessageQueueSelector operation. 
        ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
        for (int i = 0; i < count; ++i) {
            // Before you send the message, specify the topic for which you applied in the ApsaraMQ for RocketMQ console. 
            MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message.");
            try {
                SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
                std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
                          << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl;
                this_thread::sleep_for(chrono::seconds(1));
            } catch (MQException e) {
                std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
            }
        }
        auto interval = std::chrono::system_clock::now() - start;
        std::cout << "Send " << count << " messages OK, costs "
                  << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
    
        producer.shutdown();
        std::cout << "=======After sending messages=======" << std::endl;
        return 0;
    }

注文したメッセージを消費する

  1. 次のコードをOrderConsumerDemo.cppファイルにコピーします。 このファイルで、対応するパラメーターを設定し、g ++ コマンドを実行してコードをコンパイルし、実行できる実行可能ファイルを生成します。

    g ++ -o order_consumer_demo -std=c ++ 11 -lz -lrocketmq OrderConsumerDemo.cpp

  2. 次のサンプルコードは、商用エディションのTCPクライアントSDK for C ++ を使用して順序付きメッセージを消費する方法の例を示しています。

    
    #include <iostream>
    #include <thread>
    #include "DefaultMQPushConsumer.h"
    
    using namespace rocketmq;
    
    
    class ExampleOrderlyMessageListener : public MessageListenerOrderly {
    public:
        ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
            for (auto item = msgs.begin(); item != msgs.end(); item++) {
                std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
            }
            return CONSUME_SUCCESS;
        }
    };
    
    int main(int argc, char *argv[]) {
        std::cout << "=======Before consuming messages=======" << std::endl;
        // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. 
        DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXXX");
        // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. 
        consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. 
        // The user channel. Default value: ALIYUN. 
        consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
        auto start = std::chrono::system_clock::now();
    
        //register your own listener here to handle the messages received.
        // Use a custom listener function to process the received message and return the result. 
        ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
        consumer->subscribe("YOUR ORDERLY TOPIC", "*");
        consumer->registerMessageListener(messageListener);
    
        //Start this consumer
        // The preparation is complete. You must invoke the startup function to start the consumer. 
        // ********************************************
        // 1. Before you start the consumer, make sure that the subscription is configured. 
        // 2. Make sure that the subscriptions of consumers in the same group are consistent. 
        // *********************************************
        consumer->start();
    
        //Keep main thread running until process finished.
        // Keep the thread running and do not shut down the consumer. 
        std::this_thread::sleep_for(std::chrono::seconds (60 ));
        consumer->shutdown();
        std::cout << "=======After consuming messages======" << std::endl;
        return 0;
    }