Topik ini menyediakan contoh kode berdasarkan SDK Klien TCP untuk Java dari Edisi Komunitas yang dapat digunakan untuk mengirim dan menerima pesan transaksional.
Proses interaksi
Gambar berikut menunjukkan proses interaksi pesan transaksional.

Untuk informasi lebih lanjut, lihat Pesan Transaksional.
Prasyarat
Sebelum memulai, pastikan operasi berikut telah dilakukan:
SDK Edisi Komunitas untuk Java versi 4.5.2 atau lebih baru telah diunduh. Untuk informasi lebih lanjut, kunjungi Halaman Unduhan RocketMQ.
Lingkungan telah disiapkan. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Sepasang AccessKey telah dibuat pada akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Sepasang AccessKey.
Kirim pesan transaksional
Langkah-langkah berikut diperlukan untuk mengirim pesan transaksional:
Kirim pesan setengah dan jalankan transaksi lokal yang sesuai. Contoh kode:
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQTransactionProducer { private static RPCHook getAclRPCHook() { /** * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda. * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. */ return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))); } public static void main(String[] args) throws MQClientException { /** * Buat produser dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser: * TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook()); */ TransactionMQProducer transactionMQProducer = new TransactionMQProducer(null, "YOUR TRANSACTION GROUP ID", getAclRPCHook(), true, null); /** * Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80. */ transactionMQProducer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80"); // Atur parameter AccessChannel ke CLOUD. Jika Anda ingin menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong. transactionMQProducer.setAccessChannel(AccessChannel.CLOUD); transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl()); transactionMQProducer.start(); for (int i = 0; i < 10; i++) { try { Message message = new Message("YOUR TRANSACTION TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() { @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("Mulai menjalankan transaksi lokal: " + msg); return LocalTransactionState.UNKNOW; } }, null); assert sendResult != null; } catch (Exception e) { e.printStackTrace(); } } } }Komit status pesan transaksional.
Setelah transaksi lokal dijalankan, broker ApsaraMQ for RocketMQ harus diberi tahu tentang status transaksi pesan saat ini, terlepas dari apakah eksekusinya berhasil atau tidak. Broker ApsaraMQ for RocketMQ dapat diberi tahu dengan salah satu cara berikut:
Komit status setelah transaksi lokal dijalankan.
Tunggu hingga broker ApsaraMQ for RocketMQ mengirim permintaan untuk memeriksa status transaksi pesan.
Transaksi dapat berada dalam salah satu status berikut:
LocalTransactionState.COMMIT_MESSAGE: Transaksi dikomit. Pesan dapat dikonsumsi oleh konsumen.LocalTransactionState.ROLLBACK_MESSAGE: Transaksi dibatalkan. Pesan dibuang dan tidak dapat dikonsumsi.LocalTransactionState.UNKNOW: Status transaksi tidak diketahui, dan sistem sedang menunggu broker ApsaraMQ for RocketMQ untuk memeriksa status transaksi lokal yang sesuai dengan pesan.
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; /** * Kelas yang digunakan untuk memeriksa status transaksi lokal yang diimplementasikan dengan mengirim pesan transaksional ApsaraMQ for RocketMQ. */ public class LocalTransactionCheckerImpl implements TransactionCheckListener { /** * Pemeriksa transaksi lokal. Untuk informasi lebih lanjut, lihat Pesan transaksional. */ @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("Permintaan untuk memeriksa status transaksi pesan diterima. MsgId: " + msg.getMsgId()); return LocalTransactionState.COMMIT_MESSAGE; } }
Mekanisme Pemeriksaan Status Transaksi
Mengapa mekanisme pemeriksaan status transaksi harus diimplementasikan ketika pesan transaksional dikirim?
Jika pesan setengah dikirim di Langkah 1 tetapi
TransactionStatus.Unknowdikembalikan atau tidak ada status yang dikomit untuk transaksi lokal karena keluarnya aplikasi, status pesan setengah tidak diketahui oleh broker ApsaraMQ for RocketMQ. Dalam hal ini, broker secara berkala meminta pengirim untuk memeriksa dan melaporkan status pesan setengah.Apa yang dilakukan logika bisnis ketika metode pemeriksaan dipanggil kembali?
Metode pemeriksaan untuk pesan transaksional harus mencakup logika yang digunakan untuk memeriksa konsistensi transaksi. Setelah pesan transaksional dikirim, ApsaraMQ for RocketMQ harus memanggil API operasi
LocalTransactionCheckeruntuk menanggapi permintaan pemeriksaan status dari broker untuk transaksi lokal. Oleh karena itu, metode yang digunakan untuk memeriksa pesan transaksional harus mencapai tujuan berikut:Periksa status (dikomit atau dibatalkan) transaksi lokal yang sesuai dengan pesan setengah.
Komit status transaksi lokal yang sesuai dengan pesan setengah ke broker.
Berlangganan pesan transaksional
Metode yang digunakan untuk berlangganan pesan transaksional sama dengan metode yang digunakan untuk berlangganan pesan normal. Contoh kode:
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/**
* ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda.
* Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* Buat konsumen dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
* Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
// Titik akhir instance ApsaraMQ for RocketMQ.
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
// Atur parameter AccessChannel ke CLOUD. Jika Anda ingin menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong.
consumer.setAccessChannel(AccessChannel.CLOUD);
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
consumer.subscribe("YOUR TOPIC", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Terima Pesan Baru: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}