全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Pesan transaksional

更新时间:Jul 02, 2025

Pesan transaksional adalah jenis pesan unggulan yang disediakan oleh ApsaraMQ for RocketMQ. Dokumen ini menjelaskan skenario, mekanisme kerja, batasan, metode penggunaan, serta catatan penting terkait pesan transaksional.

Skenario

Transaksi terdistribusi

Dalam sistem terdistribusi, logika bisnis inti sering kali memerlukan pemrosesan simultan oleh beberapa sistem hilir. Masalah utama dalam transaksi terdistribusi adalah memastikan konsistensi hasil antara sistem bisnis inti dan sistem hilir.

Kebutuhan Pesan Transaksional

Sebagai contoh dalam skenario e-commerce, ketika pengguna melakukan pemesanan, perubahan juga dipicu di sistem hilir seperti logistik, poin kredit, dan keranjang belanja. Berikut adalah cabang-cabang transaksi yang terlibat:

  • Sistem pesanan: Status pesanan berubah dari belum dibayar menjadi telah dibayar.

  • Sistem logistik: Menambahkan catatan pengiriman dan membuat entri logistik.

  • Sistem poin: Memperbarui poin kredit pengguna.

  • Sistem keranjang belanja: Membersihkan item dan memperbarui catatan pengguna.

Solusi transaksi berbasis XA tradisional: performa buruk

Solusi umum untuk memastikan konsistensi hasil di antara cabang-cabang transaksi adalah menggunakan sistem transaksi terdistribusi berbasis protokol eXtended Architecture (XA). Solusi ini mengenkapsulasi perubahan ke dalam transaksi besar yang mencakup empat cabang transaksi independen. Meskipun solusi ini menjamin konsistensi hasil, sejumlah besar sumber daya harus dikunci selama pemrosesan, menyebabkan konkurensi rendah dan performa buruk. Jumlah sumber daya yang dikunci meningkat seiring dengan jumlah cabang transaksi.

Solusi berbasis pesan normal: konsistensi hasil buruk

Solusi alternatif yang lebih sederhana menganggap perubahan di sistem pesanan sebagai transaksi lokal dan perubahan di sistem hilir sebagai tugas hilir. Cabang-cabang transaksi dianggap sebagai transaksi pesan normal dan tabel pesanan. Solusi ini memproses pesan secara asinkron untuk mempersingkat waktu pemrosesan dan meningkatkan konkurensi sistem.

Solusi Pesan Normal

Namun, solusi ini dapat menghasilkan ketidaksesuaian antara transaksi inti dan cabang-cabang transaksi. Contohnya:

  • Pesan dikirim, tetapi pesanan belum selesai. Dalam hal ini, seluruh transaksi harus dibatalkan.

  • Pesanan selesai, tetapi pesan tidak dikirim. Dalam hal ini, pesan harus dikirim ulang untuk dikonsumsi.

  • Kesalahan timeout tidak dapat dideteksi secara andal, sehingga sulit untuk menentukan apakah pesanan perlu dibatalkan atau perubahan pesanan perlu dikomit.

Solusi pesan transaksional ApsaraMQ for RocketMQ: konsistensi hasil

Solusi pesan normal tidak dapat menjamin konsistensi hasil karena kurangnya kemampuan commit, rollback, dan koordinasi terpadu seperti pada transaksi database standalone.

Solusi pesan transaksional yang disediakan oleh ApsaraMQ for RocketMQ mendukung commit dua fase. Solusi ini menggabungkan commit dua fase dengan transaksi lokal untuk memastikan konsistensi global hasil commit.

Pesan Transaksional

Solusi pesan transaksional ApsaraMQ for RocketMQ menawarkan performa tinggi, skalabilitas tinggi, dan kemudahan pengembangan bisnis. Untuk informasi lebih lanjut tentang mekanisme kerja dan alur pemrosesan pesan transaksional, lihat Mekanisme Kerja.

Mekanisme kerja

Apa itu pesan transaksional?

Pesan transaksional adalah jenis pesan unggulan yang disediakan oleh ApsaraMQ for RocketMQ untuk memastikan konsistensi akhir antara produksi pesan dan transaksi lokal.

Alur pemrosesan

Gambar berikut mengilustrasikan proses penggunaan pesan transaksional.Pesan Transaksional

  1. Produsen mengirim pesan ke broker ApsaraMQ for RocketMQ.

  2. Broker ApsaraMQ for RocketMQ menyimpan pesan secara persisten dan mengembalikan acknowledgment (ACK) kepada produsen. Pada tahap ini, pesan ditandai sebagai "tidak siap untuk dikirim". Pesan dalam status ini disebut pesan setengah.

  3. Produsen mengeksekusi transaksi lokal.

  4. Produsen mengirimkan hasil eksekusi transaksi lokal ke broker. Hasil eksekusi dapat berupa Commit atau Rollback. Berikut adalah logika pemrosesan setelah broker menerima hasil eksekusi:

    • Jika hasil eksekusi adalah Commit, broker menandai pesan setengah sebagai "siap untuk dikirim" dan mengirimkan pesan ke konsumen.

    • Jika hasil eksekusi adalah Rollback, broker membatalkan transaksi dan tidak mengirimkan pesan setengah ke konsumen.

  5. Jika broker tidak menerima hasil eksekusi atau status pesan setengah tidak diketahui karena pemutusan jaringan atau restart produsen, broker menunggu periode waktu tertentu dan mengirim permintaan ke produsen di kluster produsen untuk memeriksa status pesan setengah.

    Catatan

    Untuk informasi tentang interval antara dua permintaan status dan jumlah maksimum permintaan, lihat Batasan Parameter.

  6. Setelah produsen menerima permintaan untuk memeriksa status pesan setengah, produsen memeriksa hasil eksekusi transaksi lokal yang sesuai dengan pesan setengah.

  7. Produsen mengirimkan hasil eksekusi ke broker berdasarkan status transaksi lokal yang diperiksa. Kemudian, broker melakukan operasi di Langkah 4 untuk memproses pesan setengah.

Siklus hidup pesan transaksional

Pesan Transaksional

  • Inisialisasi

    Pesan setengah diproduksi dan diinisialisasi oleh produsen dan siap untuk dikirim ke broker.

  • Transaksi yang akan dikomit

    Pesan setengah dikirim ke broker. Tidak seperti pesan normal, pesan setengah tidak disimpan secara persisten oleh broker. Sebaliknya, pesan setengah disimpan di sistem penyimpanan transaksi dan tidak dikomit sampai hasil eksekusi transaksi lokal dikembalikan. Pada fase ini, pesan tidak terlihat bagi konsumen hilir.

  • Pembatalan Pesan

    Jika hasil eksekusi transaksi lokal adalah Rollback, broker membatalkan pesan setengah dan mengakhiri alur kerja.

  • Dikomit untuk Dikonsumsi

    Jika hasil eksekusi transaksi lokal adalah Commit, broker menyimpan pesan setengah di sistem penyimpanan. Pesan menjadi terlihat dan siap untuk dikonsumsi oleh konsumen hilir.

  • Sedang Dikonsumsi

    Pesan diperoleh oleh konsumen dan diproses berdasarkan logika konsumsi lokal yang ditentukan oleh konsumen.

    Selama proses ini, broker menunggu konsumen mengembalikan hasil konsumsi. Jika tidak ada respons yang diterima dari konsumen dalam periode waktu tertentu, ApsaraMQ for RocketMQ melakukan upaya ulang pada pesan. Untuk informasi lebih lanjut, lihat Percobaan Konsumsi Ulang.

  • Komit Hasil Konsumsi

    Konsumen menyelesaikan konsumsi dan mengirimkan hasil konsumsi ke broker. Broker menandai apakah pesan telah dikonsumsi.

    Secara default, ApsaraMQ for RocketMQ menyimpan semua pesan. Ketika hasil konsumsi dikomit, pesan ditandai sebagai telah dikonsumsi alih-alih langsung dihapus. Pesan hanya dihapus jika masa retensi habis atau sistem kehabisan ruang penyimpanan. Sebelum pesan dihapus, konsumen dapat mengonsumsi ulang pesan tersebut.

  • Penghapusan Pesan

    Jika masa retensi pesan habis atau ruang penyimpanan tidak mencukupi, ApsaraMQ for RocketMQ menghapus pesan yang tersimpan paling awal dari file fisik secara bergulir. Untuk informasi lebih lanjut, lihat Penyimpanan dan Pembersihan Pesan.

Batasan

Konsistensi jenis pesan

Pesan transaksional hanya dapat digunakan di topik yang MessageType-nya diatur ke Transaction.

Konsumsi berpusat pada transaksi

Pesan transaksional yang disediakan oleh ApsaraMQ for RocketMQ hanya memastikan konsistensi hasil antara transaksi inti lokal dan cabang-cabang transaksi hilir. Sistem bisnis hilir harus memastikan bahwa pesan diproses dengan benar. Kami merekomendasikan agar konsumen melakukan percobaan konsumsi ulang dengan tepat untuk memastikan pemrosesan pesan yang berhasil. Untuk informasi lebih lanjut, lihat Percobaan Konsumsi Ulang.

Visibilitas status perantara

Pesan transaksional yang disediakan oleh ApsaraMQ for RocketMQ hanya memastikan konsistensi hasil. Ketidaksesuaian status antara cabang-cabang transaksi hilir dan transaksi hulu ada sebelum pesan dikirim ke konsumen. Oleh karena itu, pesan transaksional cocok hanya untuk skenario transaksi di mana eksekusi asinkron dapat digunakan.

Mekanisme timeout transaksi

Mekanisme timeout digunakan dalam siklus hidup pesan transaksional yang disediakan oleh ApsaraMQ for RocketMQ. Setelah broker menerima pesan setengah, pesan tersebut secara default dibatalkan jika broker tidak dapat menentukan hasil eksekusi pesan transaksi. Untuk informasi lebih lanjut, lihat Batasan Parameter.

Multiple SendReceipts tidak didukung

Hanya satu SendReceipt yang diizinkan untuk pesan transaksional dalam satu transaksi.

Contoh kode

Pengiriman pesan transaksional berbeda dari pengiriman pesan normal dalam aspek-aspek berikut:

  • Sebelum mengirim pesan transaksional, Anda harus mengaktifkan pemeriksa transaksi dan mengaitkannya dengan eksekusi transaksi lokal.

  • Untuk memastikan konsistensi transaksi, Anda harus mengonfigurasi pemeriksa transaksi dan mengikat topik tempat Anda ingin mengirim pesan saat membangun produsen. Jika pengecualian terjadi pada topik yang terikat, pemeriksa transaksi bawaan dapat digunakan untuk memulihkan status.

Contoh kode berikut memberikan ilustrasi cara menggunakan pesan transaksional dalam Java.

Untuk informasi tentang kode contoh lengkap untuk perpesanan, lihat Apache RocketMQ 5.x SDKs (Direkomendasikan).

Contoh kode

import java.time.Duration;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.shaded.com.google.common.base.Strings;

public class ProducerTransactionMessageExample {
    /**
     // Demo ini digunakan untuk mensimulasikan layanan query tabel pesanan untuk memeriksa apakah transaksi pesanan telah dikirim. 
     */
    private static boolean checkOrderById(String orderId) {
        return true;
    }

    /**
     // Demo ini digunakan untuk mensimulasikan hasil eksekusi transaksi lokal. 
     */
    private static boolean doLocalTransaction() {
        return true;
    }

    public static void main(String[] args) throws ClientException {
        /**
         * Titik akhir dari instance. Anda dapat melihat titik akhir di tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
         * Jika klien instance ApsaraMQ for RocketMQ ditempatkan di instance Elastic Compute Service (ECS) dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, kami sarankan Anda menentukan titik akhir virtual private cloud (VPC). 
         * Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance. 
         */
        String endpoints = "xxx-hangzhou.rmq.aliyuncs.com:8080";
        // Nama topik ke mana pesan dikirim. Sebelum Anda menggunakan topik untuk menerima pesan, Anda harus membuat topik di konsol ApsaraMQ for RocketMQ. Jika tidak, kesalahan akan dikembalikan. 
        String topic = "topic1";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * Jika Anda mengakses instance menggunakan titik akhir publik, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Intelligent Authentication halaman Access Control yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. 
         * Jika klien instance ApsaraMQ for RocketMQ ditempatkan di instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis memperoleh nama pengguna dan kata sandi berdasarkan informasi VPC. 
         * Jika instance adalah instance serverless, Anda harus menentukan nama pengguna dan kata sandi instance, terlepas dari apakah Anda mengakses instance melalui Internet atau di VPC. 
         */
        builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        builder.setRequestTimeout(Duration.ofMillis(5000));
        ClientConfiguration configuration = builder.build();
        
        MessageBuilder messageBuilder = new MessageBuilderImpl();

        // Bangun produsen transaksi. Produsen harus membangun pemeriksa transaksi untuk memeriksa status perantara dari pesan setengah yang luar biasa. 
        Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                /**
                 * Pemeriksa transaksi memeriksa apakah transaksi lokal dikomit atau dibatalkan dengan benar berdasarkan ID bisnis. Dalam contoh ini, ID pesanan digunakan. 
                 * Jika pesanan ini ditemukan di tabel pesanan, tindakan penyisipan pesanan dikomit dengan benar oleh transaksi lokal. Jika tidak ada pesanan yang ditemukan di tabel pesanan, transaksi lokal dibatalkan. 
                 */
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // Terjadi kesalahan pada pesan. Rollback dikembalikan. 
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            }).setTopics(topic)
            .setClientConfiguration(configuration)
            .build();
        // Buat cabang transaksi. 
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            // Jika cabang transaksi gagal dibuat, transaksi dihentikan. 
            return;
        }
        Message message = messageBuilder.setTopic(topic)
            // Kunci pesan. Anda dapat menggunakan kata kunci untuk menemukan pesan secara akurat. 
            .setKeys("messageKey1")
            // Tag pesan. Konsumen dapat menggunakan tag untuk memfilter pesan. 
            .setTag("messageTag")
            // ID unik yang terkait dengan transaksi lokal. ID ini digunakan untuk memverifikasi query status transaksi lokal. 
            .addProperty("OrderId", "xxx")
            // Tubuh pesan. 
            .setBody("messageBody".getBytes())
            .build();
        // Kirim pesan setengah.
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            // Jika pesan setengah gagal dikirim, transaksi dihentikan dan pesan dibatalkan. 
            return;
        }
        /**
         * Eksekusi transaksi lokal dan tentukan hasil eksekusi. 
         * 1. Jika hasilnya adalah Commit, kirimkan pesan. 
         * 2. Jika hasilnya adalah Rollback, batalkan pesan. 
         * 3. Jika pengecualian tidak diketahui terjadi, tidak ada tindakan yang dilakukan sampai respons diperoleh dari query status transaksi lokal. 
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // Anda dapat menentukan apakah akan mencoba ulang pesan berdasarkan persyaratan bisnis Anda. Jika Anda tidak ingin mencoba ulang pesan, Anda dapat memeriksa status transaksi lokal untuk mengirim status transaksi. 
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // Kami sarankan Anda mencatat informasi pengecualian. Dengan cara ini, jika pengecualian terjadi selama pembatalan pesan, Anda dapat memeriksa status transaksi lokal untuk mengirim status transaksi tanpa mencoba ulang pesan. 
                e.printStackTrace();
            }
        }
    }
}

Catatan penggunaan

Cegah timeout yang disebabkan oleh transaksi dengan hasil tidak diketahui

ApsaraMQ for RocketMQ memungkinkan Anda memulai permintaan untuk memeriksa status transaksi lokal jika pengecualian terjadi dalam fase commit transaksi untuk memastikan konsistensi transaksional. Namun, produsen harus mencegah transaksi lokal mengembalikan hasil yang tidak diketahui. Sejumlah besar pemeriksaan transaksi dapat memburuknya performa sistem dan menyebabkan penundaan dalam pemrosesan transaksi.

Tangani transaksi yang sedang berlangsung dengan benar

Selama pemeriksaan status transaksi lokal, jangan kembalikan Rollback atau Commit untuk transaksi yang sedang berlangsung. Sebagai gantinya, pertahankan status Tidak Diketahui untuk transaksi.

Dalam kebanyakan kasus, transaksi sedang berlangsung karena eksekusi transaksi lambat dan pemeriksaan status transaksi lokal terlalu cepat. Solusi berikut disediakan:

  • Tentukan nilai yang lebih besar untuk waktu melakukan pemeriksaan pertama status transaksi lokal. Ini dapat menyebabkan penundaan besar untuk transaksi yang bergantung pada hasil query.

  • Buat program dengan benar mengidentifikasi transaksi yang sedang berlangsung.