All Products
Search
Document Center

ApsaraMQ for RocketMQ:Kirim dan terima pesan transaksional

Last Updated:Jul 02, 2025

ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang mirip dengan eXtended Architecture (X/Open XA) untuk memastikan konsistensi transaksi dalam ApsaraMQ for RocketMQ. Topik ini mencakup contoh kode untuk mengirim dan menerima pesan transaksional menggunakan SDK klien HTTP untuk C#.

Informasi latar belakang

Gambar berikut mengilustrasikan proses interaksi dari pesan transaksional.

图片1.png

Untuk informasi lebih lanjut, lihat Pesan Transaksional.

Prasyarat

Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:

  • Instal SDK untuk C#. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.

  • Buat sumber daya yang ingin Anda tentukan dalam kode di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut meliputi instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Dapatkan pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

Kirim pesan transaksional

Contoh kode berikut menunjukkan cara mengirim pesan transaksional menggunakan SDK klien HTTP untuk C#:

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class TransProducerSample
    {
        // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. 
        // ID AccessKey yang digunakan untuk otentikasi. 
        private const string _accessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
        // Rahasia AccessKey yang digunakan untuk otentikasi. 
        private const string _secretAccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ. 
        private const string _topicName = "${TOPIC}";
        // ID instance tempat topik tersebut termasuk. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ. 
        // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        private const string _instanceId = "${INSTANCE_ID}";
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. 
        private const string _groupId = "${GROUP_ID}";

        private static readonly MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        private static readonly MQTransProducer transProducer = _client.GetTransProdcuer(_instanceId, _topicName, _groupId);

        static void ProcessAckError(Exception exception)
        {
            // Jika pesan transaksional dikomit atau dibatalkan setelah waktu yang ditentukan oleh parameter TransCheckImmunityTime untuk penanganan pesan transaksional berakhir atau setelah periode timeout yang ditentukan untuk penanganan consumeHalfMessage berakhir, komit atau pembatalan gagal. Dalam contoh ini, periode timeout ditentukan sebagai 10 detik untuk penanganan consumeHalfMessage. 
            if (exception is AckMessageException)
            {
                AckMessageException ackExp = (AckMessageException)exception;
                Console.WriteLine("Pengakuan pesan gagal, RequestId:" + ackExp.RequestId);
                foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                {
                    Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",Kode Kesalahan:" + errorItem.ErrorCode + ",Pesan Kesalahan:" + errorItem.ErrorMessage);
                }
            }
        }

        static void ConsumeHalfMessage()
        {
            int count = 0;
            while (true)
            {
                if (count == 3)
                    break;
                try
                {
                    // Periksa pesan setengah. Ini mirip dengan mengonsumsi pesan normal. 
                    List<Message> messages = null;
                    try
                    {
                        messages = transProducer.ConsumeHalfMessage(3, 3);
                    } catch (Exception exp1) {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " Tidak ada pesan setengah, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                        continue;
                    // Logika konsumsi pesan. 
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        int a = int.Parse(message.GetProperty("a"));
                        uint consumeTimes = message.ConsumedTimes;
                        try {
                            if (a == 1) {
                                // Konfirmasi untuk mengomitm pesan transaksional. 
                                transProducer.Commit(message.ReceiptHandle);
                                count++;
                                Console.WriteLine("Id:" + message.Id + ", commit");
                            } else if (a == 2 && consumeTimes > 1) {
                                // Konfirmasi untuk mengomitm pesan transaksional. 
                                transProducer.Commit(message.ReceiptHandle);
                                count++;
                                Console.WriteLine("Id:" + message.Id + ", commit");
                            } else if (a == 3) {
                                // Konfirmasi untuk membatalkan pesan transaksional. 
                                transProducer.Rollback(message.ReceiptHandle);
                                count++;
                                Console.WriteLine("Id:" + message.Id + ", rollback");
                            } else {
                                // Periksa status lain kali. 
                                Console.WriteLine("Id:" + message.Id + ", tidak diketahui");
                            }
                        } catch (Exception ackError) {
                            ProcessAckError(ackError);
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }

        static void Main(string[] args)
        {
            // Klien memerlukan thread atau proses untuk memproses pesan transaksional yang belum diakui. 
            // Mulai thread untuk memproses pesan transaksional yang belum diakui. 
            Thread consumeHalfThread = new Thread(ConsumeHalfMessage);
            consumeHalfThread.Start();

            try
            {
                // Kirim empat pesan transaksional secara siklik. Di antara empat pesan tersebut, komit satu pesan setelah pesan dikirim, dan proses tiga pesan lainnya berdasarkan kondisi yang ditentukan. 
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg = new TopicMessage("trans_msg");
                    sendMsg.MessageTag = "a";
                    sendMsg.MessageKey = "MessageKey";
                    sendMsg.PutProperty("a", i.ToString());
                    // Interval waktu antara waktu pengiriman pesan transaksional dan waktu mulai pemeriksaan pertama status transaksi lokal. Interval waktu ini menentukan waktu relatif saat status diperiksa pertama kali. Unit: detik. Nilai valid: 10 hingga 300. 
                    // Jika pesan tidak dikomit atau dibatalkan setelah pemeriksaan balik pertama status transaksi lokal, broker memulai permintaan pemeriksaan balik status transaksi lokal setiap 10 detik selama 24 jam. 
                    sendMsg.TransCheckImmunityTime = 10;

                    TopicMessage result = transProducer.PublishMessage(sendMsg);
                    Console.WriteLine("publis pesan berhasil:" + result);
                    try {
                        if (!string.IsNullOrEmpty(result.ReceiptHandle) && i == 0)
                        {
                            // Setelah produser mengirim pesan transaksional, broker mendapatkan handle dari pesan setengah yang sesuai dengan pesan transaksional, dan dapat langsung mengomitm atau membatalkan pesan setengah tersebut. 
                            transProducer.Commit(result.ReceiptHandle);
                            Console.WriteLine("Id:" + result.Id + ", commit");
                        }
                    } catch (Exception ackError) {
                        ProcessAckError(ackError);
                    }
                }
            } catch (Exception ex) {
                Console.Write(ex);
            }

            consumeHalfThread.Join();
        }
    }
}

Berlangganan pesan transaksional

Contoh kode berikut menunjukkan cara berlangganan pesan transaksional:

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
    public class ConsumerSample
    {
        // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. 
        // ID AccessKey yang digunakan untuk otentikasi. 
        private const string _accessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
        // Rahasia AccessKey yang digunakan untuk otentikasi. 
        private const string _secretAccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ. 
        private const string _topicName = "${TOPIC}";
        // ID instance tempat topik tersebut termasuk. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ. 
        // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        private const string _instanceId = "${INSTANCE_ID}";
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. 
        private const string _groupId = "${GROUP_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // Konsumsi pesan secara siklik di thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan. 
            while (true)
            {
                try
                {
                    // Konsumsi pesan dalam mode polling panjang. 
                    // Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan di broker selama periode waktu tertentu. Jika pesan tersedia untuk dikonsumsi dalam periode waktu tertentu, respons segera dikirim ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik. 
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessage(
                            3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai terbesar yang dapat Anda tentukan adalah 16. 
                            3 // Durasi periode polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30. 
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " Tidak ada pesan baru, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Menerima Pesan:");
                    // Logika konsumsi pesan. 
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Properti a adalah:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // Jika broker gagal menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter Message.nextConsumeTime berakhir, broker mengirimkan pesan untuk dikonsumsi lagi. 
                    // Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi. 
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Pengakuan pesan berhasil:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // Jika handle pesan kedaluwarsa, broker gagal menerima ACK untuk pesan dari konsumen. 
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Pengakuan pesan gagal, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",Kode Kesalahan:" + errorItem.ErrorCode + ",Pesan Kesalahan:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}