Normal messages are featureless messages provided by ApsaraMQ for RocketMQ. Normal messages are different from featured messages, including scheduled messages, delayed messages, ordered messages, and transactional messages. This topic provides sample code based on which you can use the TCP client SDK for C++ of the Community Edition to send and receive normal messages.
Prerequisites
The C++ dynamic library is installed. For more information, see Install the C++ dynamic library.
An AccessKey pair is created for your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Send normal messages
Copy the following code to the ProducerDemo.cpp file. In the file, modify the corresponding parameters, run the g++ command to compile the code, and then generate an executable file that you can run.
g++ -o producer_demo -std=c++11 -lz -lrocketmq ProducerDemo.cppThe following sample code provides an example on how to send normal messages by using the TCP client SDK for C++ of the Community Edition:
#include <iostream> #include <chrono> #include <thread> #include "DefaultMQProducer.h" using namespace std; using namespace rocketmq; int main() { std::cout << "=======Before sending messages=======" << std::endl; // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. DefaultMQProducer producer("GID_XXXXXXXXX"); // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. producer.setNamesrvAddr("http://MQ_INST_XXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. // The user channel. Default value: ALIYUN. producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); // After you configure the required parameters, start the producer. producer.start(); auto start = std::chrono::system_clock::now(); int count = 32; for (int i = 0; i < count; ++i) { // Before you send the message, specify the topic for which you applied in the ApsaraMQ for RocketMQ console. MQMessage msg("YOURTOPIC","HiTAG","HelloCPPSDK."); try { SendResult sendResult = producer.send(msg); std::cout <<"SendResult:"<<sendResult.getSendStatus()<< ", Message ID: " << sendResult.getMsgId() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Send " << count << " messages OK, costs " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; producer.shutdown(); std::cout << "=======After sending messages=======" << std::endl; return 0; }
Consume normal messages
Copy the following code to the ConsumerDemo.cpp file. In the file, modify the corresponding parameters, run the g++ command to compile the code, and then generate an executable file that you can run.
g++ -o consumer_demo -std=c++11 -lz -lrocketmq ConsumerDemo.cppThe following sample code provides an example on how to consume normal messages by using the TCP client SDK for C++ of the Community Edition:
#include <iostream> #include <thread> #include "DefaultMQPushConsumer.h" using namespace rocketmq; class ExampleMessageListener : public MessageListenerConcurrently { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Before consuming messages=======" << std::endl; // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXX"); // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. // The user channel. Default value: ALIYUN. consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); auto start = std::chrono::system_clock::now(); // Use a custom listener function to process the received message and return the result. ExampleMessageListener *messageListener = new ExampleMessageListener(); consumer->subscribe("YOURTOPIC", "*"); consumer->registerMessageListener(messageListener); // The preparation is complete. You must invoke the startup function to start the consumer. // ******************************************** // 1. Before you start the consumer, make sure that the subscription is configured. // 2. Make sure that the subscriptions of consumers in the same group are consistent. // ********************************************* consumer->start(); // Keep the main thread running until the process is shut down. std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }