All Products
Search
Document Center

ApsaraMQ for RocketMQ:Kirim dan terima pesan transaksional

Last Updated:Mar 12, 2026

ApsaraMQ for RocketMQ menyediakan pemrosesan transaksi terdistribusi yang mirip dengan arsitektur eXtended (X/Open XA) untuk memastikan konsistensi transaksi. Topik ini menjelaskan cara mengirim dan menerima pesan transaksional menggunakan TCP client SDK untuk Java.

Catatan

Jika Anda baru mengenal ApsaraMQ for RocketMQ, lihat proyek Demo untuk menyiapkan proyek yang berfungsi sebelum mengirim dan menerima pesan.

Cara kerja pesan transaksional

Diagram berikut menggambarkan interaksi antara produsen, broker, dan transaksi lokal selama pengiriman pesan transaksional.

Transactional message interaction process

Untuk informasi lebih lanjut tentang model pesan transaksional, lihat Pesan transaksional.

Prasyarat

Sebelum memulai, pastikan Anda telah:

Kirim pesan transaksional

Pengiriman pesan transaksional memerlukan tiga komponen berikut:

  • TransactionProducer untuk mengirim pesan.

  • LocalTransactionExecuter untuk menjalankan transaksi lokal saat half message dikirim.

  • LocalTransactionChecker yang dipanggil oleh broker untuk memverifikasi transaksi yang belum terselesaikan.

Untuk kode sumber lengkap, lihat pustaka kode ApsaraMQ for RocketMQ.

package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // ID kelompok konsumen yang dibuat di Konsol ApsaraMQ for RocketMQ.
        // ID kelompok konsumen untuk pesan transaksional tidak boleh sama
        // dengan ID yang digunakan untuk jenis pesan lainnya.
        properties.put(PropertyKeyConst.GROUP_ID, "<your-group-id>");
        // Dapatkan kredensial AccessKey dari variabel lingkungan
        // untuk menghindari hardcoding informasi sensitif.
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // Titik akhir TCP. Anda dapat menemukan nilai ini di bagian TCP Endpoint
        // pada halaman Detail Instans di Konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");

        // Daftarkan pemeriksa transaksi sebelum membuat produsen.
        // Broker memanggil pemeriksa ini untuk memverifikasi transaksi yang belum terselesaikan.
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("<your-topic>", "TagA", "Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try {
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        // Jalankan logika transaksi lokal Anda di sini.
                        System.out.println("Jalankan transaksi lokal dan komit status transaksi.");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            } catch (ONSClientException e) {
                // Tangani kegagalan: coba kirim ulang atau simpan pesan untuk diproses nanti.
                System.out.println(new Date() + " Gagal mengirim pesan mq! Topik adalah:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Berhasil mengirim pesan transaksi.");
    }
}

Ganti placeholder berikut dengan nilai aktual Anda:

PlaceholderDeskripsiLokasi pencarian
<your-group-id>ID kelompok konsumen untuk pesan transaksionalKonsol ApsaraMQ for RocketMQ
<your-tcp-endpoint>Titik akhir TCP instans AndaInstance Details > bagian TCP Endpoint
<your-topic>Nama topikKonsol ApsaraMQ for RocketMQ

Implementasikan pemeriksa transaksi

Broker memanggil pemeriksa transaksi ketika tidak dapat menentukan status half message. Hal ini terjadi jika:

  • LocalTransactionExecuter mengembalikan TransactionStatus.Unknow.

  • Produsen keluar sebelum mengommit status transaksi.

Dalam kedua kasus tersebut, broker secara berkala mengirim permintaan pemeriksaan status ke produsen dalam kluster produsen. Produsen kemudian memeriksa transaksi lokal dan melaporkan status akhirnya.

// Implementasi pemeriksa transaksi
class LocalTransactionCheckerImpl implements LocalTransactionChecker {

    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("Menerima permintaan pemeriksaan status transaksi. MsgId: " + msg.getMsgID());
        // Periksa status transaksi lokal dan kembalikan hasilnya.
        // Kembalikan CommitTransaction, RollbackTransaction, atau Unknow.
        return TransactionStatus.CommitTransaction;
    }
}

Metode check Anda harus:

  1. Memeriksa status (committed atau rolled back) dari transaksi lokal yang terkait dengan half message tersebut.

  2. Mengembalikan status transaksi ke broker: TransactionStatus.CommitTransaction, TransactionStatus.RollbackTransaction, atau TransactionStatus.Unknow.

Berlangganan pesan transaksional

Berlangganan pesan transaksional dilakukan sama seperti berlangganan pesan biasa. Untuk informasi lebih lanjut, lihat Berlangganan pesan.