All Products
Search
Document Center

ApsaraMQ for RocketMQ:Kirim dan terima pesan transaksional

Last Updated:Jul 02, 2025

ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang mirip dengan eXtended Architecture (X/Open XA) untuk memastikan konsistensi transaksi di ApsaraMQ for RocketMQ. Topik ini memberikan contoh kode untuk mengirim dan menerima pesan transaksional menggunakan SDK Klien HTTP untuk PHP.

Informasi latar belakang

Gambar berikut menunjukkan proses interaksi pesan transaksional.

图片1.png

Untuk informasi lebih lanjut, lihat Pesan Transaksional.

Prasyarat

Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:

  • Instal SDK untuk PHP. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.

  • Buat sumber daya yang ingin Anda tentukan dalam kode di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Dapatkan pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

Kirim pesan transaksional

Contoh kode berikut menunjukkan cara mengirim pesan transaksional menggunakan SDK Klien HTTP untuk PHP:

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $transProducer;
    private $count;
    private $popMsgCount;

    public function __construct()
    {
        $this->client = new MQClient(
            // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
            "${HTTP_ENDPOINT}",
            // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. 
	          // ID AccessKey yang digunakan untuk otentikasi. 
	          getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
	          // Rahasia AccessKey yang digunakan untuk otentikasi. 
	          getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
        );

        // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ. 
        $topic = "${TOPIC}";        
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. 
        $groupId = "${GROUP_ID}";
        // ID instance tempat topik berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ. 
        // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        $instanceId = "${INSTANCE_ID}";

        $this->transProducer = $this->client->getTransProducer($instanceId,$topic, $groupId);
        $this->count = 0;
        $this->popMsgCount = 0;
    }

    function processAckError($e) {
        if ($e instanceof MQ\Exception\AckMessageException) {
            // Jika pesan transaksional dikomit atau dibatalkan setelah waktu yang ditentukan oleh parameter TransCheckImmunityTime untuk handle pesan transaksional berakhir atau waktu yang ditentukan oleh parameter NextConsumeTime untuk handle consumeHalfMessage berakhir, komit atau pembatalan gagal. 
            printf("Kesalahan Commit/Rollback, RequestId:%s\n", $e->getRequestId());
            foreach ($e->getAckMessageErrorItems() as $errorItem) {
                printf("\tReceiptHandle:%s, KodeKesalahan:%s, PesanKesalahan:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
            }
        } else {
            print_r($e);
        }
    }

    function consumeHalfMsg() {
        while($this->count < 3 && $this->popMsgCount < 15) {
            $this->popMsgCount++;

            try {
                $messages = $this->transProducer->consumeHalfMessage(4, 3);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    print "tidak ada pesan transaksi setengah\n";
                    continue;
                }
                print_r($e->getMessage() . "\n");
                sleep(3);
                continue;
            }

            foreach ($messages as $message) {
                printf("ID:%s TAG:%s BODY:%s \nWaktuPublikasi:%d, WaktuKonsumsiPertama:%d\nJumlahKonsumsi:%d, WaktuKonsumsiBerikutnya:%d\nPropA:%s\n",
                    $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getProperty("a"));
                print_r($message->getProperties());
                $propA = $message->getProperty("a");
                $consumeTimes = $message->getConsumedTimes();
                try {
                    if ($propA == "1") {
                        print "\n commit pesan transaksi: " . $message->getMessageId() . "\n";
                        $this->transProducer->commit($message->getReceiptHandle());
                        $this->count++;
                    } else if ($propA == "2" && $consumeTimes > 1) {
                        print "\n commit pesan transaksi: " . $message->getMessageId() . "\n";
                        $this->transProducer->commit($message->getReceiptHandle());
                        $this->count++;
                    } else if ($propA == "3") {
                        print "\n rollback pesan transaksi: " . $message->getMessageId() . "\n";
                        $this->transProducer->rollback($message->getReceiptHandle());
                        $this->count++;
                    } else {
                        print "\n pesan transaksi tidak dikenal: " . $message->getMessageId() . "\n";
                    }
                } catch (\Exception $e) {
                    $this->processAckError($e);
                }
            }
        }
    }

    public function run()
    {   
        // Kirim empat pesan transaksional secara siklik. 
        for ($i = 0; $i < 4; $i++) {
            $pubMsg = new TopicMessage("hello,mq");
            // Atribut kustom pesan. 
            $pubMsg->putProperty("a", $i);
            // Kunci pesan. 
            $pubMsg->setMessageKey("MessageKey");
            // Interval waktu antara waktu pengiriman pesan transaksional dan waktu mulai pemeriksaan pertama status transaksi lokal. Interval waktu ini menentukan waktu relatif saat status pertama kali diperiksa. Unit: detik. Nilai valid: 10 hingga 300. 
            // Jika pesan tidak dikomit atau dibatalkan setelah pemeriksaan status transaksi pertama dilakukan, broker memulai permintaan untuk memeriksa status transaksi lokal pada interval 10 detik dalam 24 jam berikutnya. 
            $pubMsg->setTransCheckImmunityTime(10);
            $topicMessage = $this->transProducer->publishMessage($pubMsg);

            print "\npublish -> \n\t" . $topicMessage->getMessageId() . " " . $topicMessage->getReceiptHandle() . "\n";

            if ($i == 0) {
                try {
                    // Setelah produser mengirim pesan transaksional, broker memperoleh handle dari pesan setengah yang sesuai dengan pesan transaksional dan mengkomit atau membatalkan pesan transaksional berdasarkan status handle. 
                    $this->transProducer->commit($topicMessage->getReceiptHandle());
                    print "\n commit pesan transaksi saat publish: " . $topicMessage->getMessageId() . "\n";
                } catch (\Exception $e) {
                    // Jika pesan transaksional dikomit atau dibatalkan setelah waktu yang ditentukan oleh parameter TransCheckImmunityTime berakhir, komit atau pembatalan gagal. 
                    $this->processAckError($e);
                }
            }
        }

        // Klien memerlukan thread atau proses untuk memproses pesan transaksional yang belum diakui. 
        // Proses pesan transaksional yang belum diakui. 
        $this->consumeHalfMsg();
    }
}


$instance = new ProducerTest();
$instance->run();

?>

Berlangganan pesan transaksional

Contoh kode berikut menunjukkan cara berlangganan pesan transaksional menggunakan SDK Klien HTTP untuk PHP:

<?php

use MQ\MQClient;

require "vendor/autoload.php";

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
            "${HTTP_ENDPOINT}",
            // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. 
	          // ID AccessKey yang digunakan untuk otentikasi. 
	          getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
	          // Rahasia AccessKey yang digunakan untuk otentikasi. 
	          getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
        );

        // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ. 
        $topic = "${TOPIC}";
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. 
        $groupId = "${GROUP_ID}";
        // ID instance tempat topik berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ. 
        // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        $instanceId = "${INSTANCE_ID}";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function ackMessages($receiptHandles)
    {
        try {
            $this->consumer->ackMessage($receiptHandles);
        } catch (\Exception $e) {
            if ($e instanceof MQ\Exception\AckMessageException) {
                // Jika handle pesan kedaluwarsa, broker tidak dapat menerima pengakuan (ACK) untuk pesan dari konsumen. 
                printf("Kesalahan Ack, RequestId:%s\n", $e->getRequestId());
                foreach ($e->getAckMessageErrorItems() as $errorItem) {
                    printf("\tReceiptHandle:%s, KodeKesalahan:%s, PesanKesalahan:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
                }
            }
        }
    }

    public function run()
    {
        // Konsumsi pesan secara siklik di thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan. 
        while (True) {
            try {
                // Konsumsi pesan dalam mode polling panjang. 
                // Jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan di broker selama periode waktu tertentu. Periode waktu ini dikenal sebagai periode polling panjang. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu tersebut, broker segera mengirim respons ke konsumen. 
                $messages = $this->consumer->consumeMessage(
                    3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16. 
                    3 // Durasi periode polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30. 
                );
            } catch (\MQ\Exception\MessageResolveException $e) {
                // Jika pesan tidak dapat diuraikan karena karakter tidak valid dalam badan pesan, pengecualian ini dilemparkan. 
                // Pesan yang dapat diuraikan seperti yang diharapkan. 
                $messages = $e->getPartialResult()->getMessages();
                // Pesan yang tidak dapat diuraikan seperti yang diharapkan. 
                $failMessages = $e->getPartialResult()->getFailResolveMessages();

                $receiptHandles = array();
                foreach ($messages as $message) {
                    // Logika konsumsi pesan. 
                    $receiptHandles[] = $message->getReceiptHandle();
                    printf("MsgID %s\n", $message->getMessageId());
                }
                foreach ($failMessages as $failMessage) {
                    // Tangani pesan yang tidak dapat diuraikan karena karakter tidak valid dalam badan pesan. 
                    $receiptHandles[] = $failMessage->getReceiptHandle();
                    printf("Gagal Mengurai Pesan. MsgID %s\n", $failMessage->getMessageId());
                }
                $this->ackMessages($receiptHandles);
                continue;
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    // Jika tidak ada pesan yang tersedia untuk dikonsumsi, dan mode polling panjang terus berlaku. 
                    printf("Tidak ada pesan, lanjutkan polling panjang!RequestId:%s\n", $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "konsumsi selesai, pesan:\n";

            // Logika konsumsi pesan. 
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
              
                printf("MessageID:%s TAG:%s BODY:%s \nWaktuPublikasi:%d, WaktuKonsumsiPertama:%d, \nJumlahKonsumsi:%d, WaktuKonsumsiBerikutnya:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // Jika broker tidak menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan dalam $message->getNextConsumeTime() berakhir, broker mengirimkan pesan ke konsumen lagi. 
            // Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi. 
            print_r($receiptHandles);
            $this->ackMessages($receiptHandles);
            print "ack selesai\n";
        }

    }
}

$instance = new ConsumerTest();
$instance->run();


?>