ApsaraMQ for RocketMQ menyediakan pemrosesan transaksi terdistribusi yang mirip dengan arsitektur eXtended (X/Open XA) untuk memastikan konsistensi transaksi. Topik ini menjelaskan cara mengirim dan menerima pesan transaksional menggunakan TCP client SDK untuk Java.
Jika Anda baru mengenal ApsaraMQ for RocketMQ, lihat proyek Demo untuk menyiapkan proyek yang berfungsi sebelum mengirim dan menerima pesan.
Cara kerja pesan transaksional
Diagram berikut menggambarkan interaksi antara produsen, broker, dan transaksi lokal selama pengiriman pesan transaksional.

Untuk informasi lebih lanjut tentang model pesan transaksional, lihat Pesan transaksional.
Prasyarat
Sebelum memulai, pastikan Anda telah:
Mengunduh SDK untuk Java. Untuk informasi versi, lihat Catatan rilis.
Menyiapkan lingkungan pengembangan Anda. Untuk informasi lebih lanjut, lihat Persiapkan lingkungan.
(Opsional) Mengonfigurasi logging. Untuk informasi lebih lanjut, lihat Pengaturan logging.
Kirim pesan transaksional
Pengiriman pesan transaksional memerlukan tiga komponen berikut:
TransactionProduceruntuk mengirim pesan.LocalTransactionExecuteruntuk menjalankan transaksi lokal saat half message dikirim.LocalTransactionCheckeryang dipanggil oleh broker untuk memverifikasi transaksi yang belum terselesaikan.
Untuk kode sumber lengkap, lihat pustaka kode ApsaraMQ for RocketMQ.
package com.aliyun.openservices.tcp.example.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Date;
import java.util.Properties;
public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// ID kelompok konsumen yang dibuat di Konsol ApsaraMQ for RocketMQ.
// ID kelompok konsumen untuk pesan transaksional tidak boleh sama
// dengan ID yang digunakan untuk jenis pesan lainnya.
properties.put(PropertyKeyConst.GROUP_ID, "<your-group-id>");
// Dapatkan kredensial AccessKey dari variabel lingkungan
// untuk menghindari hardcoding informasi sensitif.
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Titik akhir TCP. Anda dapat menemukan nilai ini di bagian TCP Endpoint
// pada halaman Detail Instans di Konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");
// Daftarkan pemeriksa transaksi sebelum membuat produsen.
// Broker memanggil pemeriksa ini untuk memverifikasi transaksi yang belum terselesaikan.
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
Message msg = new Message("<your-topic>", "TagA", "Hello MQ transaction===".getBytes());
for (int i = 0; i < 3; i++) {
try {
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
// Jalankan logika transaksi lokal Anda di sini.
System.out.println("Jalankan transaksi lokal dan komit status transaksi.");
return TransactionStatus.CommitTransaction;
}
}, null);
assert sendResult != null;
} catch (ONSClientException e) {
// Tangani kegagalan: coba kirim ulang atau simpan pesan untuk diproses nanti.
System.out.println(new Date() + " Gagal mengirim pesan mq! Topik adalah:" + msg.getTopic());
e.printStackTrace();
}
}
System.out.println("Berhasil mengirim pesan transaksi.");
}
}Ganti placeholder berikut dengan nilai aktual Anda:
| Placeholder | Deskripsi | Lokasi pencarian |
|---|---|---|
<your-group-id> | ID kelompok konsumen untuk pesan transaksional | Konsol ApsaraMQ for RocketMQ |
<your-tcp-endpoint> | Titik akhir TCP instans Anda | Instance Details > bagian TCP Endpoint |
<your-topic> | Nama topik | Konsol ApsaraMQ for RocketMQ |
Implementasikan pemeriksa transaksi
Broker memanggil pemeriksa transaksi ketika tidak dapat menentukan status half message. Hal ini terjadi jika:
LocalTransactionExecutermengembalikanTransactionStatus.Unknow.Produsen keluar sebelum mengommit status transaksi.
Dalam kedua kasus tersebut, broker secara berkala mengirim permintaan pemeriksaan status ke produsen dalam kluster produsen. Produsen kemudian memeriksa transaksi lokal dan melaporkan status akhirnya.
// Implementasi pemeriksa transaksi
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("Menerima permintaan pemeriksaan status transaksi. MsgId: " + msg.getMsgID());
// Periksa status transaksi lokal dan kembalikan hasilnya.
// Kembalikan CommitTransaction, RollbackTransaction, atau Unknow.
return TransactionStatus.CommitTransaction;
}
}Metode check Anda harus:
Memeriksa status (committed atau rolled back) dari transaksi lokal yang terkait dengan half message tersebut.
Mengembalikan status transaksi ke broker:
TransactionStatus.CommitTransaction,TransactionStatus.RollbackTransaction, atauTransactionStatus.Unknow.
Berlangganan pesan transaksional
Berlangganan pesan transaksional dilakukan sama seperti berlangganan pesan biasa. Untuk informasi lebih lanjut, lihat Berlangganan pesan.