全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Kirim dan terima pesan transaksional

更新时间:Jul 02, 2025

Topik ini memberikan contoh kode untuk mengirim dan menerima pesan transaksional menggunakan SDK klien TCP untuk C atau C++.

ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang mirip dengan eXtended Architecture (X/Open XA) untuk memastikan konsistensi transaksi di ApsaraMQ for RocketMQ.

Proses interaksi

Gambar berikut menunjukkan proses interaksi pesan transaksional. Transactional messages

Untuk informasi lebih lanjut, lihat Pesan Transaksional.

Prasyarat

Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:

  • SDK untuk C atau C++ telah diunduh. Untuk informasi lebih lanjut, lihat Catatan Rilis.

  • Lingkungan telah disiapkan. Untuk informasi lebih lanjut, lihat Persiapan Lingkungan (V1.x.x).

  • Sumber daya yang ingin ditentukan dalam kode telah dibuat di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Pasangan AccessKey dari akun Alibaba Cloud Anda telah diperoleh. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

Kirim pesan transaksional

Ikuti langkah-langkah berikut untuk mengirim pesan transaksional:

  1. Kirim pesan setengah jadi dan jalankan transaksi lokal. Contoh kode:

    #include "ONSFactory.h"
    #include "ONSClientException.h"
    using namespace ons;
    
        class MyLocalTransactionExecuter : LocalTransactionExecuter
        {
            MyLocalTransactionExecuter()
            {
            }
    
            ~MyLocalTransactionExecuter()
            {
            }
            virtual TransactionStatus execute(Message &value)
            {
                    // ID pesan. Dua pesan dapat memiliki body pesan yang sama tetapi tidak ID yang sama. Anda tidak dapat memeriksa ID pesan saat ini di konsol ApsaraMQ for RocketMQ.)
                    string msgId = value.getMsgID();
                    // Hitung body pesan menggunakan algoritma seperti CRC32 dan MD5.
                    // ID pesan dan ID CRC32 digunakan untuk mencegah pesan duplikat.
                    // Anda tidak perlu menentukan ID pesan atau ID CRC32 jika bisnis Anda bersifat idempoten. Jika tidak, tentukan ID pesan atau ID CRC32 untuk memastikan idempotensi.
                    // Untuk mencegah pesan duplikat, kami sarankan Anda menghitung body pesan menggunakan algoritma CRC32 atau MD5.
                    TransactionStatus transactionStatus = Unknow;
                    try {
                        boolean isCommit = Hasil eksekusi transaksi lokal;
                        if (isCommit) {
                            // Commit pesan jika transaksi lokal dieksekusi.
                            transactionStatus = CommitTransaction;
                        } else {
                            // Rollback pesan jika transaksi lokal gagal dieksekusi.
                            transactionStatus = RollbackTransaction;
                        }
                    } catch (...) {
                        //penanganan pengecualian
                    }
                    return transactionStatus;
            }
        }
    
        int main(int argc, char* argv[])
        {
            // Parameter yang diperlukan untuk membuat produsen dan mengirim pesan.
            ONSFactoryProperty factoryInfo;
            // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
            // Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
            // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
            // Konten pesan.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
            // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
            // ID AccessKey yang digunakan untuk autentikasi.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    		    // Rahasia AccessKey yang digunakan untuk autentikasi.
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    
            // Buat produsen. ApsaraMQ for RocketMQ tidak melepaskan pChecker. Anda harus melepaskan pChecker sendiri.
            MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
            g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
    
            // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produsen.
            pProducer->start();
    
            Message msg(
                //Topik Pesan
                factoryInfo.getPublishTopics(),
                // Tag pesan. Tag pesan mirip dengan tag Gmail dan digunakan oleh konsumen untuk menyortir dan memfilter pesan di broker ApsaraMQ for RocketMQ.
                "TagA",
                // Body pesan. Anda tidak dapat meninggalkan parameter ini kosong. ApsaraMQ for RocketMQ tidak memproses body pesan. Produsen dan konsumen harus sepakat pada metode yang digunakan untuk serialisasi dan deserialisasi body pesan.
                factoryInfo.getMessageContent()
            );
    
            // Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
            // Jika Anda tidak dapat menerima pesan sesuai harapan, Anda dapat menggunakan kunci untuk memeriksa pesan di konsol ApsaraMQ for RocketMQ.
            // Catatan: Anda dapat mengirim dan menerima pesan bahkan jika Anda tidak menentukan kunci.
            msg.setKey("ORDERID_100");
    
            // Kirim pesan. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
            try
            {
                // ApsaraMQ for RocketMQ tidak melepaskan pExecuter. Anda harus melepaskan pExecuter sendiri.
                MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
                SendResultONS sendResult = pProducer->send(msg,pExecuter);
            }
            catch(ONSClientException & e)
            {
                // Tentukan logika untuk menangani pengecualian.
            }
            // Sebelum Anda keluar dari aplikasi, hancurkan produsen. Jika tidak, masalah seperti kebocoran memori akan terjadi.
            pProducer->shutdown();
    
            return 0;
    
        }        
  2. Contoh kode berikut menunjukkan cara mengcommit status pesan transaksional:

     class MyLocalTransactionChecker : LocalTransactionChecker
     {
         MyLocalTransactionChecker()
         {
         }
    
         ~MyLocalTransactionChecker()
         {
         }
    
         virtual TransactionStatus check(Message &value)
         {
             // ID pesan. Dua pesan dapat memiliki body pesan yang sama tetapi tidak ID yang sama. Anda tidak dapat memeriksa ID pesan saat ini di konsol ApsaraMQ for RocketMQ.
             string msgId = value.getMsgID();
             // Hitung body pesan menggunakan algoritma seperti CRC32 dan MD5.
             // ID pesan dan ID CRC32 digunakan untuk mencegah pesan duplikat.
             // Anda tidak perlu menentukan ID pesan atau ID CRC32 jika bisnis Anda bersifat idempoten. Jika tidak, tentukan ID pesan atau ID CRC32 untuk memastikan idempotensi.
             // Untuk mencegah pesan duplikat, kami sarankan Anda menghitung body pesan menggunakan algoritma CRC32 atau MD5.
             TransactionStatus transactionStatus = Unknow;
             try {
                 boolean isCommit = Hasil eksekusi transaksi lokal;
                 if (isCommit) {
                     // Commit pesan jika transaksi lokal dieksekusi.
                     transactionStatus = CommitTransaction;
                 } else {
                     // Rollback pesan jika transaksi lokal gagal dieksekusi.
                     transactionStatus = RollbackTransaction;
                 }
             } catch(...) {
                 //kesalahan pengecualian
             }
             return transactionStatus;
         }
     }               

Mekanisme Pemeriksaan Status Transaksi

  • Mengapa mekanisme pemeriksaan status transaksi harus diimplementasikan ketika pesan transaksional dikirim?

    Jika pesan setengah jadi dikirim tetapi TransactionStatus.Unknow dikembalikan atau tidak ada status yang dicommit untuk transaksi lokal karena keluarnya aplikasi, status pesan setengah jadi tidak diketahui oleh broker ApsaraMQ for RocketMQ. Oleh karena itu, broker secara berkala mengirim permintaan ke produsen di kluster produsen untuk memeriksa status pesan setengah jadi. Setelah permintaan pemeriksaan status diterima, produsen memeriksa dan mengcommit status akhir transaksi lokal yang sesuai dengan pesan setengah jadi.

  • Apa yang dilakukan logika bisnis ketika metode pemeriksaan dipanggil balik?

    Metode pemeriksaan untuk pesan transaksional harus mencakup logika yang digunakan untuk memeriksa konsistensi transaksi. Setelah pesan transaksional dikirim, ApsaraMQ for RocketMQ harus memanggil operasi API LocalTransactionCheckerg untuk menanggapi permintaan dari broker untuk status transaksi lokal. Oleh karena itu, metode yang digunakan untuk memeriksa pesan transaksional harus mencapai tujuan berikut:

    1. Periksa status (dicommit atau rollback) transaksi lokal yang sesuai dengan pesan setengah jadi.

    2. Commit status transaksi lokal yang sesuai dengan pesan setengah jadi ke broker.

Berlangganan pesan transaksional

Contoh kode untuk berlangganan pesan transaksional sama dengan berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Berlangganan Pesan.