All Products
Search
Document Center

ApsaraMQ for RocketMQ:Kirim dan terima pesan transaksional

Last Updated:Jul 02, 2025

ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang mirip dengan eXtended Architecture (X/Open XA) untuk memastikan konsistensi transaksi di ApsaraMQ for RocketMQ. Topik ini mencakup contoh kode untuk mengirim dan menerima pesan transaksional menggunakan HTTP client SDK untuk C++.

Informasi latar belakang

Gambar berikut mengilustrasikan proses interaksi pesan transaksional.

图片1.png

Untuk informasi lebih lanjut, lihat Pesan Transaksional.

Prasyarat

Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:

  • Instal SDK untuk C++. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.

  • Buat sumber daya yang ingin Anda tentukan dalam kode di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Dapatkan pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

Kirim pesan transaksional

Contoh kode berikut menunjukkan cara mengirim pesan transaksional menggunakan HTTP client SDK untuk C++:

//#include <iostream>
#include <fstream>
#ifdef _WIN32
#include <windows.h>
#include <process.h>
#else
#include "pthread.h"
#endif
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;


const int32_t pubMsgCount = 4;
const int32_t halfCheckCount = 3;

void processCommitRollError(AckMessageResponse& bdmResp, const std::string& messageId) {
    if (bdmResp.isSuccess()) {
        cout << "Commit/Roll Transaksi Berhasil: " << messageId << endl;
        return;
    }
    const std::vector<AckMessageFailedItem>& failedItems =
        bdmResp.getAckMessageFailedItem();
    for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
            iter != failedItems.end(); ++iter)
    {
        cout << "Commit/Roll Transaksi GAGAL: " << iter->errorCode
            << "  " << iter->receiptHandle << endl;
    }
}

#ifdef WIN32
unsigned __stdcall consumeHalfMessageThread(void *arg)
#else
void* consumeHalfMessageThread(void *arg)
#endif
{
    MQTransProducerPtr transProducer = *(MQTransProducerPtr*)(arg);
    int count = 0;
    do {
        std::vector<Message> halfMsgs;
        try {
            // Konsumsi pesan dalam mode polling panjang.
            // Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan pada broker selama periode waktu tertentu. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu tertentu, broker segera mengirimkan respons ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
            transProducer->consumeHalfMessage(
                    1, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 1. Nilai maksimum yang dapat Anda tentukan adalah 16.
                    3, // Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
                    halfMsgs
                    );
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "Tidak ada pesan setengah untuk dikonsumsi! RequestId: " + me.GetRequestId() << endl;
                continue;
            }
            cout << "Permintaan Gagal: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
        }
        if (halfMsgs.size() == 0) {
            continue;
        }

        cout << "Konsumsi Setengah: " << halfMsgs.size() << " Pesan!" << endl;
        // Proses pesan setengah.
        std::vector<std::string> receiptHandles;
        for (std::vector<Message>::iterator iter = halfMsgs.begin();
                iter != halfMsgs.end(); ++iter)
        {
            cout << "MessageId: " << iter->getMessageId()
                << " PublishTime: " << iter->getPublishTime()
                << " Tag: " << iter->getMessageTag()
                << " Body: " << iter->getMessageBody()
                << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                << " NextConsumeTime: " << iter->getNextConsumeTime()
                << " ConsumedTimes: " << iter->getConsumedTimes()
                << " Properties: " << iter->getPropertiesAsString()
                << " Key: " << iter->getMessageKey() << endl;

            int32_t consumedTimes = iter->getConsumedTimes();
            const std::string propA = iter->getProperty("a");
            const std::string handle = iter->getReceiptHandle();
            AckMessageResponse bdmResp;
            if (propA == "1") {
                cout << "Commit msg.." << endl;
                transProducer->commit(handle, bdmResp);
                count++;
            } else if(propA == "2") {
                if (consumedTimes > 1) {
                    cout << "Commit msg.." << endl;
                    transProducer->commit(handle, bdmResp);
                    count++;
                } else {
                    cout << "Commit Nanti!!!" << endl;
                }
            } else if(propA == "3") {
                cout << "Rollback msg.." << endl;
                transProducer->rollback(handle, bdmResp);
                count++;
            } else {
                transProducer->commit(handle, bdmResp);
                cout << "Pesan Tidak Dikenal.." << endl;
            }
            // Jika pesan transaksional dikomit atau dibatalkan setelah waktu yang ditentukan oleh parameter NextConsumeTime berakhir, commit atau rollback gagal.
            processCommitRollError(bdmResp, iter->getMessageId());
        }

    } while(count < halfCheckCount);

#ifdef WIN32
    return 0;
#else
    return NULL;
#endif
}

int main() {

    MQClient mqClient(
            // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
            "${HTTP_ENDPOINT}",
            // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
	          // ID AccessKey yang digunakan untuk otentikasi.
	          System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
	          // Rahasia AccessKey yang digunakan untuk otentikasi.
	          System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
            );

    // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
    string topic = "${TOPIC}";
    // ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
    // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
    string instanceId = "${INSTANCE_ID}";
    // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
    string groupId = "${GROUP_ID}";

    MQTransProducerPtr transProducer;
    if (instanceId == "") {
        transProducer = mqClient.getTransProducerRef(topic, groupId);
    } else {
        transProducer = mqClient.getTransProducerRef(instanceId, topic, groupId);
    }

    // Klien memerlukan thread atau proses untuk memproses pesan transaksional yang belum diakui.
    // Mulai thread untuk memproses pesan transaksional yang belum diakui.
#ifdef WIN32
    HANDLE thread;
    unsigned int threadId;
    thread = (HANDLE)_beginthreadex(NULL, 0, consumeHalfMessageThread, &transProducer, 0, &threadId);
#else
    pthread_t thread;
    pthread_create(&thread, NULL, consumeHalfMessageThread, static_cast<void *>(&transProducer));
#endif

    try {
        for (int i = 0; i < pubMsgCount; i++)
        {
            PublishMessageResponse pmResp;
            TopicMessage pubMsg("Halo, mq, trans_msg!");
            pubMsg.putProperty("a",std::to_string(i));
            pubMsg.setMessageKey("ImKey");
            pubMsg.setTransCheckImmunityTime(10);
            transProducer->publishMessage(pubMsg, pmResp);
            cout << "Publikasi pesan mq berhasil. Topik:" << topic
                << ", msgId:" << pmResp.getMessageId()
                << ", bodyMD5:" << pmResp.getMessageBodyMD5()
                << ", Handle:" << pmResp.getReceiptHandle() << endl;

            if (i == 0) {
                // Setelah produser mengirim pesan transaksional, broker memperoleh handle dari pesan setengah yang sesuai dengan pesan transaksional dan melakukan commit atau rollback pesan transaksional berdasarkan status handle.
                // Jika pesan transaksional dikomit atau dibatalkan setelah waktu yang ditentukan oleh parameter TransCheckImmunityTime berakhir, commit atau rollback gagal.
                AckMessageResponse bdmResp;
                transProducer->commit(pmResp.getReceiptHandle(), bdmResp);
                processCommitRollError(bdmResp, pmResp.getMessageId());
            }
        }
    } catch (MQServerException& me) {
        cout << "Permintaan Gagal: " + me.GetErrorCode() << ", requestId adalah:" << me.GetRequestId() << endl;
    } catch (MQExceptionBase& mb) {
        cout << "Permintaan Gagal: " + mb.ToString() << endl;
    }

#ifdef WIN32
    WaitForSingleObject(thread, INFINITE);
    CloseHandle(thread);
#else
    pthread_join(thread, NULL);
#endif

    return 0;
}

Berlangganan pesan transaksional

Contoh kode berikut menunjukkan cara berlangganan pesan transaksional menggunakan HTTP client SDK untuk C++:

#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
            "${HTTP_ENDPOINT}",
            // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
	          // ID AccessKey yang digunakan untuk otentikasi.
	          System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
	          // Rahasia AccessKey yang digunakan untuk otentikasi.
	          System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
            );

    // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
    string topic = "${TOPIC}";
    // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
    string groupId = "${GROUP_ID}";
    // ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
    // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
    string instanceId = "${INSTANCE_ID}";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // Konsumsi pesan dalam mode polling panjang.
            // Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan pada broker selama periode waktu tertentu. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu tertentu, broker segera mengirimkan respons ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
            consumer->consumeMessage(
                    3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
                    3, // Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
                    messages
            );
            cout << "Konsumsi: " << messages.size() << " Pesan!" << endl;

            // Logika konsumsi pesan.
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter != messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes()
                    << " Properties: " << iter->getPropertiesAsString()
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // Dapatkan pengakuan (ACK) dari konsumen.
            // Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter Message.NextConsumeTime berakhir, broker akan mengirimkan pesan untuk dikonsumsi lagi.
            // Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (!bdmResp.isSuccess()) {
                // Jika handle pesan kedaluwarsa, broker tidak dapat menerima ACK untuk pesan dari konsumen.
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
                        iter != failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " pesan berhasil!" << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "Tidak ada pesan untuk dikonsumsi! RequestId: " + me.GetRequestId() << endl;
                continue;
            }
            cout << "Permintaan Gagal: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Permintaan Gagal: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}