This topic provides sample code based on which you can use the TCP client SDK for C++ of the Community Edition to send and receive transactional messages.
ApsaraMQ for RocketMQ provides a distributed transaction processing feature that is similar to the eXtended Architecture (XA) and Open XA distributed transaction processing modes. This feature helps ensure data consistency in ApsaraMQ for RocketMQ.
Interaction process
The following figure shows the interaction process of transactional messages.

For more information, see Transactional messages.
Prerequisites
The C++ dynamic library is installed. For more information, see Install the C++ dynamic library.
An AccessKey pair is created for your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Send transactional messages
Copy the following code to the TransProducerDemo.cpp file. In the file, modify the corresponding parameters, run the g++ command to compile the code, and then generate an executable file that you can run.
g++ -o trans_producer_demo -std=c++11 -lz -lrocketmq TransProducerDemo.cppThe following sample code provides an example on how to send transactional messages by using the TCP client SDK for C++ of the Community Edition:
#include <iostream> #include <chrono> #include <thread> #include "TransactionMQProducer.h" #include "MQClientException.h" #include "TransactionListener.h" using namespace std; using namespace rocketmq; class ExampleTransactionListener : public TransactionListener { public: LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) { // Execute the local transaction. If the local transaction is executed, COMMIT_MESSAGE is returned. If the local transaction fails to be executed, ROLLBACK_MESSAGE is returned. If the execution status of the local transaction is unknown, UNKNOWN is returned. // If UNKNOWN is returned, the scheduled task to query the status of the local transaction is triggered. std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:" << msg.getBody() << std::endl; return UNKNOWN; } LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) { // Query the execution status of the local transaction. If the local transaction is executed, COMMIT_MESSAGE is returned. If the local transaction fails to be executed, ROLLBACK_MESSAGE is returned. If the execution status of the local transaction is unknown, UNKNOWN is returned. // If UNKNOWN is returned, wait until the next scheduled task to query the status of the local transaction is triggered. std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:" << msg.getMsgId() << std::endl; return COMMIT_MESSAGE; } }; int main() { std::cout << "=======Before sending messages=======" << std::endl; // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. TransactionMQProducer producer("GID_XXXXXXXXXXXXXXXX"); // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. producer.setNamesrvAddr("http://MQ_XXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. // The user channel. Default value: ALIYUN. producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); // The transaction listener. ExampleTransactionListener *exampleTransactionListener = new ExampleTransactionListener(); producer.setTransactionListener(exampleTransactionListener); // After you configure the required parameters, start the producer. producer.start(); auto start = std::chrono::system_clock::now(); int count = 3; for (int i = 0; i < count; ++i) { // The topic for which you applied in the ApsaraMQ for RocketMQ console. MQMessage msg("YOUR TRANSACTION TOPIC", "HiTAG", "Hello,CPP SDK, Transaction Message."); try { SendResult sendResult = producer.sendMessageInTransaction(msg, &i); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Send " << count << " messages OK, costs " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; std::cout << "Wait for local transaction check..... " << std::endl; for (int i = 0; i < 6; ++i) { this_thread::sleep_for(chrono::seconds(10)); std::cout << "Running "<< i*10 + 10 << " Seconds......"<< std::endl; } producer.shutdown(); std::cout << "=======After sending messages=======" << std::endl; return 0; }
Consume transactional messages
Copy the following code to the ConsumerDemo.cpp file. In the file, modify the corresponding parameters, run the g++ command to compile the code, and then generate an executable file that you can run.
g++ -o consumer_demo -std=c++11 -lz -lrocketmq ConsumerDemo.cppThe following sample code provides an example on how to consume transactional messages by using the TCP client SDK for C++ of the Community Edition:
#include <iostream> #include <thread> #include "DefaultMQPushConsumer.h" using namespace rocketmq; class ExampleMessageListener : public MessageListenerConcurrently { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Before consuming messages=======" << std::endl; // The ID of the group for which you applied in the ApsaraMQ for RocketMQ console. DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXX"); // The TCP endpoint that you obtained from the Instance Details page in the ApsaraMQ for RocketMQ console. consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account, which are used for identity verification. // The user channel. Default value: ALIYUN. consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); auto start = std::chrono::system_clock::now(); // Use a custom listener function to process the received message and return the result. ExampleMessageListener *messageListener = new ExampleMessageListener(); consumer->subscribe("YOURTOPIC", "*"); consumer->registerMessageListener(messageListener); // The preparation is complete. You must invoke the startup function to start the consumer. // ******************************************** // 1. Before you start the consumer, make sure that the subscription is configured. // 2. Make sure that the subscriptions of consumers in the same group are consistent. // ********************************************* consumer->start(); // Keep the main thread running until the process is shut down. std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }