All Products
Search
Document Center

ApsaraMQ for RocketMQ:Kirim dan terima pesan transaksional

Last Updated:Jul 02, 2025

ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang mirip dengan eXtended Architecture (X/Open XA). ApsaraMQ for RocketMQ menggunakan pesan transaksional untuk memastikan konsistensi akhir dari transaksi terdistribusi. Topik ini memberikan contoh kode tentang cara mengirim dan menerima pesan transaksional menggunakan SDK klien HTTP untuk Go.

Informasi latar belakang

Gambar berikut menunjukkan proses interaksi pesan transaksional.

图片1.png

Untuk informasi lebih lanjut, lihat Pesan Transaksional.

Prasyarat

Sebelum memulai, pastikan operasi berikut telah dilakukan:

  • Instal SDK untuk Go. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.

  • Buat sumber daya yang ingin Anda tentukan dalam kode di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Dapatkan pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

Kirim pesan transaksional

Contoh kode berikut menunjukkan cara mengirim pesan transaksional menggunakan SDK klien HTTP untuk Go:

package main

import (
    "fmt"
    "github.com/gogap/errors"
    "strconv"
    "strings"
    "time"
    "os"

    "github.com/aliyunmq/mq-http-go-sdk"
)

var loopCount = 0

func ProcessError(err error) {
    // Periode timeout. Jika pesan transaksional dikomit atau dibatalkan setelah waktu yang ditentukan oleh parameter TransCheckImmunityTime untuk penanganan pesan transaksional berakhir atau setelah periode timeout yang ditentukan untuk penanganan consumeHalfMessage berakhir, komit atau pembatalan gagal. Dalam contoh ini, periode timeout ditentukan sebagai 10 detik untuk penanganan consumeHalfMessage. 
    if err == nil {
        return
    }
    fmt.Println(err)
    for _, errAckItem := range err.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
        fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
            errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
    }
}

func ConsumeHalfMsg(mqTransProducer *mq_http_sdk.MQTransProducer) {
    for {
        if loopCount >= 10 {
            return
        }
        loopCount++
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    // Logika konsumsi pesan. 
                    var handles []string
                    fmt.Printf("Konsumsi %d pesan---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n\tBody: %s\n"+
                            "\tProperties:%s, Key:%s, Timer:%d, Trans:%d\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
                            v.Properties, v.MessageKey, v.StartDeliverTime, v.TransCheckImmunityTime)

                        a, _ := strconv.Atoi(v.Properties["a"])
                        var comRollErr error
                        if a == 1 {
                            // Konfirmasi untuk mengirim pesan transaksional. 
                            comRollErr = (*mqTransProducer).Commit(v.ReceiptHandle)
                            fmt.Println("Commit---------->")
                        } else if a == 2 && v.ConsumedTimes > 1 {
                            // Konfirmasi untuk mengirim pesan transaksional. 
                            comRollErr = (*mqTransProducer).Commit(v.ReceiptHandle)
                            fmt.Println("Commit---------->")
                        } else if a == 3 {
                            // Konfirmasi untuk membatalkan pesan transaksional. 
                            comRollErr = (*mqTransProducer).Rollback(v.ReceiptHandle)
                            fmt.Println("Rollback---------->")
                        } else {
                            // Periksa status lain kali. 
                            fmt.Println("Tidak diketahui---------->")
                        }
                        ProcessError(comRollErr)
                    }
                    endChan <- 1
                }
            case err := <-errChan:
                {
                    // Tidak ada pesan yang tersedia untuk dikonsumsi di topik. 
                    if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                        fmt.Println("\nTidak ada pesan baru, lanjutkan!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout konsumsi pesan ??")
                    return
                }
            }
        }()

        // Periksa status pesan setengah dalam mode polling panjang. 
        // Dalam mode polling panjang, jika tidak ada pesan di topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan pada broker selama periode waktu tertentu. Jika pesan tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, respons segera dikirim ke konsumen. Dalam contoh ini, periode waktu ditentukan sebagai 3 detik. 
        (*mqTransProducer).ConsumeHalfMessage(respChan, errChan,
            3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16. 
            3, // Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilai diatur menjadi 3. Nilai terbesar yang dapat Anda atur adalah 30. 
        )
        <-endChan
    }
}

func main() {
    // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
    endpoint := "${HTTP_ENDPOINT}"
    // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. 
    // ID AccessKey yang digunakan untuk otentikasi. 
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    // Rahasia AccessKey yang digunakan untuk otentikasi. 
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.    
    // Setiap topik dapat digunakan untuk mengirim dan menerima pesan jenis tertentu. Misalnya, topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain. 
    topic := "${TOPIC}"
    // ID instance tempat topik dimiliki. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ. 
    // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
    instanceId := "${INSTANCE_ID}"
    // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. 
    groupId := "${GROUP_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqTransProducer := client.GetTransProducer(instanceId, topic, groupId)

    // Produser memerlukan thread atau proses untuk memproses pesan transaksional yang belum diakui. 
    // Mulai goroutine untuk memproses pesan transaksional yang belum diakui. 
    go ConsumeHalfMsg(&mqTransProducer)

    // Kirim empat pesan transaksional. Komit satu pesan setelah pesan dikirim. Periksa status pesan setengah yang sesuai dengan tiga pesan transaksional lainnya setelah ketiga pesan dikirim. 
    for i := 0; i < 4; i++ {
        msg := mq_http_sdk.PublishMessageRequest{
            MessageBody:"Saya adalah pesan transaksi!",
            Properties: map[string]string{"a":strconv.Itoa(i)},
        }
        // Interval waktu antara waktu pengiriman pesan transaksional dan waktu mulai pemeriksaan balik pertama untuk status transaksi lokal. Interval waktu adalah interval waktu relatif. Unit: detik. Nilai valid: 10 hingga 300. 
        // Jika pesan tidak dikomit atau dibatalkan setelah pemeriksaan balik pertama untuk status transaksi lokal, broker memulai permintaan pemeriksaan balik untuk status transaksi lokal setiap 10 detik dalam 24 jam. 
        msg.TransCheckImmunityTime = 10

        resp, pubErr := mqTransProducer.PublishMessage(msg)
        if pubErr != nil {
            fmt.Println(pubErr)
            return
        }
        fmt.Printf("Publikasikan ---->\n\tMessageId:%s, BodyMD5:%s, Handle:%s\n",
            resp.MessageId, resp.MessageBodyMD5, resp.ReceiptHandle)
        if i == 0 {
            // Setelah produser mengirim pesan transaksional, broker mendapatkan handle pesan setengah yang sesuai dengan pesan transaksional dan mengkomit atau membatalkan pesan transaksional berdasarkan status handle. 
            ackErr := mqTransProducer.Commit(resp.ReceiptHandle)
            fmt.Println("Commit---------->")
            ProcessError(ackErr)
        }
    }

    for ; loopCount < 10 ; {
        time.Sleep(time.Duration(1) * time.Second)
    }
}

Berlangganan pesan transaksional

Contoh kode berikut menunjukkan cara berlangganan pesan transaksional menggunakan SDK klien HTTP untuk Go:

package main

import (
    "fmt"
    "github.com/gogap/errors"
    "strings"
    "time"
    "os"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
    endpoint := "${HTTP_ENDPOINT}"
    // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. 
    // ID AccessKey yang digunakan untuk otentikasi. 
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    // Rahasia AccessKey yang digunakan untuk otentikasi. 
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ. 
    // Setiap topik dapat digunakan untuk mengirim dan menerima pesan jenis tertentu. Misalnya, topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain. 
    topic := "${TOPIC}"
    // ID instance tempat topik dimiliki. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ. 
    // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
    instanceId := "${INSTANCE_ID}"
    // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. 
    groupId := "${GROUP_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    // Logika konsumsi pesan. 
                    var handles []string
                    fmt.Printf("Konsumsi %d pesan---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
                    }

                    // Jika broker gagal menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter NextConsumeTime berakhir, broker mengirimkan ulang pesan di partisi ke konsumen. 
                    // Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi. 
                    ackerr := mqConsumer.AckMessage(handles)
                    if ackerr != nil {
                        // Jika handle pesan habis waktu, broker gagal menerima ACK untuk pesan dari konsumen. 
                        fmt.Println(ackerr)
                        if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
                           for _, errAckItem := range errAckItems {
                             fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                               errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
                           }
                        } else {
                           fmt.Println("ack err =", ackerr)
                        }
                        time.Sleep(time.Duration(3) * time.Second)
                    } else {
                        fmt.Printf("Ack ---->\n\t%s\n", handles)
                    }

                    endChan <- 1
                }
            case err := <-errChan:
                {
                    // Tidak ada pesan yang tersedia untuk dikonsumsi di topik. 
                    if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                        fmt.Println("\nTidak ada pesan baru, lanjutkan!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout konsumsi pesan ??")
                    endChan <- 1
                }
            }
        }()

        // Konsumsi pesan dalam mode polling panjang. Periode timeout jaringan default adalah 35 detik. 
        // Dalam mode polling panjang, jika tidak ada pesan di topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan pada broker selama periode waktu tertentu. Jika pesan tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, broker segera mengirim respons ke konsumen. Dalam contoh ini, nilai ditentukan sebagai 3 detik. 
        mqConsumer.ConsumeMessage(respChan, errChan,
            3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16. 
            3, // Durasi periode polling panjang. Unit: detik. Dalam contoh ini, nilai ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30. 
        )
        <-endChan
    }
}