Topik ini memberikan contoh kode untuk mengirim dan menerima pesan transaksional menggunakan TCP Client SDK untuk .NET. Fitur pesan terjadwal didukung di wilayah Internet, Tiongkok (Hangzhou), Tiongkok (Beijing), Tiongkok (Shanghai), dan Tiongkok (Shenzhen).
ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang mirip dengan eXtended Architecture (X/Open XA). ApsaraMQ for RocketMQ memastikan konsistensi transaksional melalui penggunaan pesan transaksional.
Proses interaksi
Gambar berikut menunjukkan proses interaksi dalam penggunaan pesan transaksional.
Untuk informasi lebih lanjut, lihat Pesan Transaksional.
Prasyarat
SDK untuk .NET telah diunduh. Untuk informasi lebih lanjut, lihat Catatan Rilis.
Lingkungan telah disiapkan. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Sumber daya seperti instance, topik, dan grup konsumen telah dibuat di konsol ApsaraMQ for RocketMQ. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Pasangan AccessKey dari akun Alibaba Cloud Anda telah diperoleh. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan transaksional
Ikuti langkah-langkah berikut untuk mengirim pesan transaksional:
Kirim pesan setengah dan jalankan transaksi lokal. Contoh kode:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.InteropServices; using ons; namespace ons { public class MyLocalTransactionExecuter : LocalTransactionExecuter { public MyLocalTransactionExecuter() { } ~MyLocalTransactionExecuter() { } public override TransactionStatus execute(Message value) { Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}", value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser")); // ID pesan. Dua pesan dapat memiliki body pesan yang sama tetapi tidak ID yang sama. Anda tidak dapat memeriksa ID pesan saat ini di konsol ApsaraMQ for RocketMQ. string msgId = value.getMsgID(); // Hitung body pesan menggunakan algoritma seperti CRC32 dan MD5. // ID pesan dan ID CRC32 digunakan untuk mencegah pesan duplikat. // Untuk mencegah pesan duplikat, kami sarankan Anda menghitung body pesan menggunakan algoritma CRC32 atau MD5. TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = Hasil eksekusi transaksi lokal; if (isCommit) { // Commit pesan jika transaksi lokal dieksekusi. transactionStatus = TransactionStatus.CommitTransaction; } else { // Rollback pesan jika transaksi lokal gagal dieksekusi. transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { // Logika penanganan pengecualian. } return transactionStatus; } } class onscsharp { static void Main(string[] args) { ONSFactoryProperty factoryInfo = new ONSFactoryProperty(); // Titik akhir TCP. Anda bisa mendapatkan titik akhir di bagian titik akhir TCP halaman Detail Instance di konsol ApsaraMQ for RocketMQ. factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX"); // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ. factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, ""); // Isi pesan. factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, ""); // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. // ID AccessKey yang digunakan untuk otentikasi. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // Rahasia AccessKey yang digunakan untuk otentikasi. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); //buat produser transaksi LocalTransactionChecker myChecker = new MyLocalTransactionChecker(); TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo, myChecker); // Sebelum mengirim pesan, panggil metode start() hanya sekali untuk memulai produser. Setelah produser dimulai, pesan dapat dikirim secara bersamaan di beberapa thread. pProducer.start(); Message msg = new Message( //Topik Pesan factoryInfo.getPublishTopics(), //Tag Pesan "TagA", //Body Pesan factoryInfo.getMessageContent() ); // Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan. // Jika Anda tidak dapat menerima pesan sesuai harapan, Anda dapat menggunakan kunci untuk memeriksa pesan di konsol ApsaraMQ for RocketMQ. // Catatan: Anda dapat mengirim dan menerima pesan meskipun Anda tidak menentukan kunci. msg.setKey("ORDERID_100"); // Kirim pesan. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim. try { LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter(); SendResultONS sendResult = pProducer.send(msg, myExecuter); } catch(ONSClientException e) { Console.WriteLine("\npengecualian dari sendmsg:{0}",e.what() ); } // Sebelum keluar dari aplikasi, hancurkan produser. Jika tidak, masalah seperti kebocoran memori akan terjadi. // Produser tidak dapat dimulai lagi setelah dihancurkan. pProducer.shutdown(); } } }Commit status pesan transaksional.
Setelah transaksi lokal dieksekusi, broker ApsaraMQ for RocketMQ harus diberi tahu tentang status transaksi pesan saat ini, baik berhasil maupun gagal. Broker dapat diberi tahu dengan salah satu cara berikut:
Commit status setelah transaksi lokal selesai dieksekusi.
Tunggu hingga broker ApsaraMQ for RocketMQ mengirim permintaan untuk memeriksa status transaksi pesan.
Transaksi dapat berada dalam salah satu status berikut:
TransactionStatus.CommitTransaction: Transaksi dicommit, dan konsumen dapat mengonsumsi pesan.TransactionStatus.RollbackTransaction: Transaksi dirollback, dan pesan dibuang serta tidak dapat dikonsumsi.TransactionStatus.Unknow: Status transaksi tidak diketahui, dan sistem sedang menunggu broker memeriksa status transaksi lokal yang sesuai dengan pesan.
public class MyLocalTransactionChecker : LocalTransactionChecker { public MyLocalTransactionChecker() { } ~MyLocalTransactionChecker() { } public override TransactionStatus check(Message value) { Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}", value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperties("VincentNoUser")); // ID pesan. Dua pesan dapat memiliki body pesan yang sama tetapi tidak ID yang sama. Anda tidak dapat memeriksa ID pesan saat ini di konsol ApsaraMQ for RocketMQ. string msgId = value.getMsgID(); // Hitung body pesan menggunakan algoritma seperti CRC32 dan MD5. // ID pesan dan ID CRC32 digunakan untuk mencegah pesan duplikat. // Anda tidak perlu menentukan ID pesan atau ID CRC32 jika bisnis Anda bersifat idempoten. Jika tidak, tentukan ID pesan atau ID CRC32 untuk memastikan idempotensi. // Untuk mencegah pesan duplikat, kami sarankan Anda menghitung body pesan menggunakan algoritma CRC32 atau MD5. TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = Hasil eksekusi transaksi lokal; if (isCommit) { // Commit pesan jika transaksi lokal dieksekusi. transactionStatus = TransactionStatus.CommitTransaction; } else { // Rollback pesan jika transaksi lokal gagal dieksekusi. transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { //penanganan pengecualian } return transactionStatus; } }Mekanisme Pemeriksaan Status Transaksi
Mengapa mekanisme pemeriksaan status transaksi harus diimplementasikan ketika pesan transaksional dikirim?
Jika pesan setengah dikirim pada Langkah 1 tetapi
TransactionStatus.Unknowdikembalikan atau tidak ada status transaksi lokal yang dicommit karena aplikasi keluar, status pesan setengah tidak diketahui oleh broker ApsaraMQ for RocketMQ. Oleh karena itu, broker secara berkala mengirim permintaan ke produser di kluster produser untuk memeriksa status pesan setengah. Setelah permintaan pemeriksaan status diterima, produser memeriksa dan mencatat status akhir transaksi lokal yang sesuai dengan pesan setengah.Apa yang dilakukan logika bisnis ketika metode pemeriksaan dipanggil balik?
Metode pemeriksaan untuk pesan transaksional harus mencakup logika untuk memeriksa konsistensi transaksi. Setelah pesan transaksional dikirim, ApsaraMQ for RocketMQ harus memanggil operasi API
LocalTransactionCheckeruntuk merespons permintaan pemeriksaan status dari broker untuk transaksi lokal. Metode ini harus mencapai tujuan berikut:Periksa status (dicommit atau dirollback) transaksi lokal yang sesuai dengan pesan setengah.
Catat status transaksi lokal yang sesuai dengan pesan setengah ke broker.
Bagaimana status transaksi lokal memengaruhi pesan setengah?
TransactionStatus.CommitTransaction: Transaksi dicommit. Pesan dapat dikonsumsi oleh konsumen.TransactionStatus.RollbackTransaction: Transaksi dirollback. Pesan dibuang dan tidak dapat dikonsumsi.TransactionStatus.Unknow: Status transaksi tidak diketahui, dan sistem sedang menunggu broker memeriksa status transaksi lokal yang sesuai dengan pesan.
Untuk informasi lebih lanjut, lihat implementasi
MyLocalTransactionChecker.
Berlangganan pesan transaksional
Contoh kode untuk berlangganan pesan transaksional sama dengan berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Berlangganan Pesan.