Topik ini memberikan contoh kode untuk mengirim dan menerima pesan transaksional menggunakan TCP client SDK untuk Java.
ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang serupa dengan eXtended Architecture (X/Open XA) untuk memastikan konsistensi transaksi di ApsaraMQ for RocketMQ.
Jika Anda adalah pengguna baru ApsaraMQ for RocketMQ, disarankan untuk merujuk ke Proyek Demo untuk membangun proyek sebelum menggunakan layanan ini dalam mengirim dan menerima pesan.
Proses interaksi
Gambar berikut mengilustrasikan proses interaksi pesan transaksional.

Untuk informasi lebih lanjut, lihat Pesan Transaksional.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
SDK untuk Java telah diunduh. Untuk detail catatan rilis SDK untuk Java, lihat Catatan Rilis.
Lingkungan telah disiapkan. Untuk informasi lebih lanjut, lihat Siapkan Lingkungan.
(Opsional) Pengaturan logging telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Pengaturan Logging.
Kirim pesan transaksional
Untuk contoh kode lengkap, lihat ApsaraMQ for RocketMQ Repositori Kode.
package com.aliyun.openservices.tcp.example.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Date;
import java.util.Properties;
public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. Catatan: ID grup konsumen yang digunakan untuk pesan transaksional tidak boleh sama dengan ID grup konsumen yang digunakan untuk jenis pesan lainnya.
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// AccessKey ID yang digunakan untuk autentikasi.
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// Rahasia AccessKey yang digunakan untuk autentikasi.
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
// Sebelum Anda menginisialisasi produser, Anda harus mendaftarkan checker untuk memeriksa status transaksi lokal.
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());
for (int i = 0; i < 3; i++) {
try{
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("Jalankan transaksi lokal dan komit status transaksi.");
return TransactionStatus.CommitTransaction;
}
}, null);
assert sendResult != null;
}catch (ONSClientException e){
// Tentukan logika yang ingin Anda gunakan untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim lagi.
System.out.println(new Date() + " Pengiriman pesan mq gagal! Topik adalah:" + msg.getTopic());
e.printStackTrace();
}
}
System.out.println("Pengiriman pesan transaksi berhasil.");
}
}
// Pemeriksa transaksi lokal.
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("Permintaan untuk memeriksa status transaksi pesan. MsgId: " + msg.getMsgID());
return TransactionStatus.CommitTransaction;
}
}Mekanisme pemeriksaan status transaksi
Mengapa mekanisme pemeriksaan status transaksi perlu diimplementasikan saat mengirim pesan transaksional?
Jika pesan setengah dikirim tetapi
TransactionStatus.Unknowdikembalikan atau tidak ada status yang dikomit untuk transaksi lokal karena keluarnya aplikasi, broker ApsaraMQ for RocketMQ tidak mengetahui status pesan tersebut. Oleh karena itu, broker secara berkala mengirim permintaan ke produser dalam kluster untuk memeriksa status pesan setengah. Setelah menerima permintaan pemeriksaan, produser memverifikasi dan mengkomit status akhir transaksi lokal yang sesuai dengan pesan setengah.Apa yang dilakukan logika bisnis saat metode pemeriksaan dipanggil balik?
Metode pemeriksaan untuk pesan transaksional harus mencakup logika untuk memeriksa konsistensi transaksi. Setelah pesan transaksional dikirim, ApsaraMQ for RocketMQ memanggil operasi API
LocalTransactionCheckerguntuk menanggapi permintaan broker terkait status transaksi lokal. Metode ini bertujuan untuk mencapai hal-hal berikut:Memeriksa status (dikomit atau rollback) transaksi lokal yang sesuai dengan pesan setengah.
Mengkomit status transaksi lokal yang sesuai dengan pesan setengah ke broker.
Berlangganan pesan transaksional
Contoh kode untuk berlangganan pesan transaksional sama dengan berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Berlangganan Pesan.