順序付きメッセージは、FIFO (first in first out) メッセージとも呼ばれ、ApsaraMQ for RocketMQによって提供されるメッセージの一種です。 このタイプのメッセージは、厳密な順序で発行および使用されます。 このトピックでは、Community EditionのTCPクライアントSDK for C ++ を使用して順序付きメッセージを送受信できるサンプルコードを提供します。
順序付けられたメッセージは、次のタイプに分類されます。
グローバルに順序付けられたメッセージ: 指定されたトピックのすべてのメッセージは、先入れ先出し (FIFO) の順序で発行および使用されます。
パーティション順メッセージ: 指定されたトピックのすべてのメッセージは、シャーディングキーを使用して異なるパーティションに配布されます。 各パーティション内のメッセージは、FIFO順に発行され、消費される。 シャーディングキーは、異なるパーティションを識別するために順序付けられたメッセージに使用されるキーフィールドです。 シャーディングキーは、通常のメッセージのキーとは異なります。
詳細については、「注文メッセージ」をご参照ください。
前提条件
C ++ ダイナミックライブラリがインストールされています。 詳細については、「C ++ ダイナミックライブラリのインストール」をご参照ください。
Alibaba Cloudアカウント用にAccessKeyペアが作成されます。 詳細については、「AccessKey の作成」をご参照ください。
順序付けられたメッセージを送信する
ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。
次のコードをOrderProducerDemo.cppファイルにコピーします。 このファイルで、対応するパラメーターを設定し、g ++ コマンドを実行してコードをコンパイルし、実行できる実行可能ファイルを生成します。
g ++ -o order_producer_demo -std=c ++ 11 -lz -lrocketmq OrderProducerDemo.cpp次のサンプルコードは、Community EditionのTCPクライアントSDK for C ++ を使用して順序付けられたメッセージを送信する方法の例を示しています。
#include <iostream> #include <chrono> #include <thread> #include "DefaultMQProducer.h" using namespace std; using namespace rocketmq; class ExampleSelectMessageQueueByHash : public MessageQueueSelector { public: MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) { // Specify a custom logic for partitioning. The system calculates the queue to which a message is routed based on the arg parameter that specifies the partition key. In this example, the arg parameter of the INT type is used. int orderId = *static_cast<int *>(arg); int index = orderId % mqs.size(); return mqs[0]; } }; int main() { std::cout << "=======Before sending messages=======" << std::endl; // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. DefaultMQProducer producer("GID_XXXXXXXX"); // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. producer.setNamesrvAddr("http://MQ_INST_XXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. // The user channel. Default value: ALIYUN. producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); // After you configure the required parameters, start the producer. producer.start(); auto start = std::chrono::system_clock::now(); int count = 32; // The number of retries that the message can perform after it fails to be sent. This ensures the successful delivery of the message. int retryTimes = 1; // Use the configured custom parameter arg to calculate the queue to which a message is sent and send the message to the specified queue. The arg parameter specifies the partition ID. For more information about how to select a message queue, see the MessageQueueSelector operation. ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash(); for (int i = 0; i < count; ++i) { // Before you send the message, specify the topic for which you applied in the ApsaraMQ for RocketMQ console. MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message."); try { SendResult sendResult = producer.send(msg, pSelector, &i, 1, false); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Send " << count << " messages OK, costs " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; producer.shutdown(); std::cout << "=======After sending messages=======" << std::endl; return 0; }
注文したメッセージを消費する
次のコードをOrderConsumerDemo.cppファイルにコピーします。 このファイルで、対応するパラメーターを設定し、g ++ コマンドを実行してコードをコンパイルし、実行できる実行可能ファイルを生成します。
g ++ -o order_consumer_demo -std=c ++ 11 -lz -lrocketmq OrderConsumerDemo.cpp次のサンプルコードは、商用エディションのTCPクライアントSDK for C ++ を使用して順序付きメッセージを消費する方法の例を示しています。
#include <iostream> #include <thread> #include "DefaultMQPushConsumer.h" using namespace rocketmq; class ExampleOrderlyMessageListener : public MessageListenerOrderly { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Before consuming messages=======" << std::endl; // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXXX"); // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. // The user channel. Default value: ALIYUN. consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); auto start = std::chrono::system_clock::now(); //register your own listener here to handle the messages received. // Use a custom listener function to process the received message and return the result. ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener(); consumer->subscribe("YOUR ORDERLY TOPIC", "*"); consumer->registerMessageListener(messageListener); //Start this consumer // The preparation is complete. You must invoke the startup function to start the consumer. // ******************************************** // 1. Before you start the consumer, make sure that the subscription is configured. // 2. Make sure that the subscriptions of consumers in the same group are consistent. // ********************************************* consumer->start(); //Keep main thread running until process finished. // Keep the thread running and do not shut down the consumer. std::this_thread::sleep_for(std::chrono::seconds (60 )); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }