Topik ini menyediakan contoh kode yang dapat digunakan untuk mengirim dan menerima pesan transaksional menggunakan TCP client SDK untuk C++ dari Community Edition.
ApsaraMQ for RocketMQ menawarkan fitur pemrosesan transaksi terdistribusi yang serupa dengan mode eXtended Architecture (XA) dan Open XA. Fitur ini memastikan konsistensi data di ApsaraMQ for RocketMQ.
Proses interaksi
Gambar berikut menggambarkan proses interaksi pesan transaksional.

Untuk informasi lebih lanjut, lihat Pesan Transaksional.
Prasyarat
Pustaka dinamis C++ telah diinstal. Untuk informasi lebih lanjut, lihat Instal Pustaka Dinamis C++.
Sepasang AccessKey telah dibuat untuk akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Sepasang AccessKey.
Kirim pesan transaksional
Salin kode berikut ke file TransProducerDemo.cpp. Modifikasi parameter yang sesuai, jalankan perintah g++ untuk mengompilasi kode, lalu hasilkan file eksekusi yang dapat dijalankan.
g++ -o trans_producer_demo -std=c++11 -lz -lrocketmq TransProducerDemo.cppKode sampel berikut menunjukkan cara mengirim pesan transaksional menggunakan TCP client SDK untuk C++ dari Community Edition:
#include <iostream> #include <chrono> #include <thread> #include "TransactionMQProducer.h" #include "MQClientException.h" #include "TransactionListener.h" using namespace std; using namespace rocketmq; class ExampleTransactionListener : public TransactionListener { public: LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) { // Jalankan transaksi lokal. Jika transaksi lokal berhasil dieksekusi, COMMIT_MESSAGE dikembalikan. Jika transaksi lokal gagal dieksekusi, ROLLBACK_MESSAGE dikembalikan. Jika status eksekusi transaksi lokal tidak diketahui, UNKNOWN dikembalikan. // Jika UNKNOWN dikembalikan, tugas terjadwal untuk memeriksa status transaksi lokal dipicu. std::cout << "Jalankan Transaksi Lokal, Pesan Diterima Topik:" << msg.getTopic() << ", MsgId:" << msg.getBody() << std::endl; return UNKNOWN; } LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) { // Periksa status eksekusi transaksi lokal. Jika transaksi lokal berhasil dieksekusi, COMMIT_MESSAGE dikembalikan. Jika transaksi lokal gagal dieksekusi, ROLLBACK_MESSAGE dikembalikan. Jika status eksekusi transaksi lokal tidak diketahui, UNKNOWN dikembalikan. // Jika UNKNOWN dikembalikan, tunggu hingga tugas terjadwal berikutnya untuk memeriksa status transaksi lokal dipicu. std::cout << "Periksa Transaksi Lokal, Pesan Diterima Topik:" << msg.getTopic() << ", MsgId:" << msg.getMsgId() << std::endl; return COMMIT_MESSAGE; } }; int main() { std::cout << "=======Sebelum mengirim pesan=======" << std::endl; // ID grup yang Anda ajukan di konsol ApsaraMQ for RocketMQ. TransactionMQProducer producer("GID_XXXXXXXXXXXXXXXX"); // Titik akhir TCP yang Anda peroleh dari halaman Detail Instance di konsol ApsaraMQ for RocketMQ. producer.setNamesrvAddr("http://MQ_XXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. // ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET menentukan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda, yang digunakan untuk verifikasi identitas. // Saluran pengguna. Nilai default: ALIYUN. producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); // Pendengar transaksi. ExampleTransactionListener *exampleTransactionListener = new ExampleTransactionListener(); producer.setTransactionListener(exampleTransactionListener); // Setelah Anda mengonfigurasi parameter yang diperlukan, mulailah produser. producer.start(); auto start = std::chrono::system_clock::now(); int count = 3; for (int i = 0; i < count; ++i) { // Topik yang Anda ajukan di konsol ApsaraMQ for RocketMQ. MQMessage msg("YOUR TRANSACTION TOPIC", "HiTAG", "Hello,CPP SDK, Transaction Message."); try { SendResult sendResult = producer.sendMessageInTransaction(msg, &i); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Kirim " << count << " pesan OK, biaya " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; std::cout << "Tunggu pemeriksaan transaksi lokal..... " << std::endl; for (int i = 0; i < 6; ++i) { this_thread::sleep_for(chrono::seconds(10)); std::cout << "Berjalan "<< i*10 + 10 << " Detik......"<< std::endl; } producer.shutdown(); std::cout << "=======Setelah mengirim pesan=======" << std::endl; return 0; }
Konsumsi pesan transaksional
Salin kode berikut ke file ConsumerDemo.cpp. Modifikasi parameter yang sesuai, jalankan perintah g++ untuk mengompilasi kode, lalu hasilkan file eksekusi yang dapat dijalankan.
g++ -o consumer_demo -std=c++11 -lz -lrocketmq ConsumerDemo.cppKode sampel berikut menunjukkan cara mengonsumsi pesan transaksional menggunakan TCP client SDK untuk C++ dari Community Edition:
#include <iostream> #include <thread> #include "DefaultMQPushConsumer.h" using namespace rocketmq; class ExampleMessageListener : public MessageListenerConcurrently { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { std::cout << "Pesan Diterima Topik:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl; // ID grup yang Anda ajukan di konsol ApsaraMQ for RocketMQ. DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXX"); // Titik akhir TCP yang Anda peroleh dari halaman Detail Instance di konsol ApsaraMQ for RocketMQ. consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. // ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET menentukan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud Anda, yang digunakan untuk verifikasi identitas. // Saluran pengguna. Nilai default: ALIYUN. consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); auto start = std::chrono::system_clock::now(); // Gunakan fungsi pendengar kustom untuk memproses pesan yang diterima dan mengembalikan hasilnya. ExampleMessageListener *messageListener = new ExampleMessageListener(); consumer->subscribe("YOURTOPIC", "*"); consumer->registerMessageListener(messageListener); // Persiapan selesai. Anda harus memanggil fungsi startup untuk memulai konsumen. // ******************************************** // 1. Sebelum Anda memulai konsumen, pastikan langganan telah dikonfigurasi. // 2. Pastikan langganan konsumen dalam grup yang sama konsisten. // ********************************************* consumer->start(); // Pertahankan thread utama tetap berjalan hingga proses dimatikan. std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000)); consumer->shutdown(); std::cout << "=======Setelah mengonsumsi pesan======" << std::endl; return 0; }