Pesan terurut adalah jenis pesan yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini diterbitkan dan dikonsumsi dalam urutan pertama masuk, pertama keluar (FIFO) yang ketat. Topik ini menyajikan contoh kode untuk mengirim dan menerima pesan terurut menggunakan HTTP client SDK untuk Go.
Informasi latar belakang
Pesan terurut dibagi menjadi beberapa jenis berikut:
Pesan terurut global: Jika pesan dalam topik termasuk jenis ini, pesan tersebut diterbitkan dan dikonsumsi dalam urutan FIFO.
Pesan terurut berpartisi: Jika pesan dalam topik termasuk jenis ini, pesan tersebut didistribusikan ke partisi yang berbeda menggunakan kunci sharding. Pesan dalam setiap partisi dikonsumsi dalam urutan FIFO. Kunci sharding adalah bidang kunci yang digunakan untuk pesan terurut guna mengidentifikasi partisi. Kunci sharding berbeda dari kunci pesan.
Untuk informasi lebih lanjut, lihat Pesan Terurut.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
Instal SDK untuk Go. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Buat sumber daya yang ingin Anda tentukan dalam kode di konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Peroleh pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan terurut
Broker ApsaraMQ for RocketMQ menentukan urutan pembuatan pesan berdasarkan urutan pengiriman menggunakan satu produser atau thread untuk mengirim pesan. Jika pengirim menggunakan beberapa produser atau thread untuk mengirim pesan secara bersamaan, urutan pesan ditentukan oleh urutan penerimaan pesan oleh broker ApsaraMQ for RocketMQ. Urutan ini mungkin berbeda dari urutan pengiriman di sisi bisnis.
Kode sampel berikut memberikan contoh cara mengirim pesan terurut menggunakan HTTP client SDK untuk Go:
package main
import (
"fmt"
"time"
"strconv"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
endpoint := "${HTTP_ENDPOINT}"
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
// Rahasia AccessKey yang digunakan untuk autentikasi.
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
topic := "${TOPIC}"
// ID instance tempat topik dimiliki. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instanceId := "${INSTANCE_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// Kirim delapan pesan secara siklik.
for i := 0; i < 8; i++ {
msg := mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!", // Isi pesan.
MessageTag: "", // Tag pesan.
Properties: map[string]string{}, // Atribut pesan.
}
// Kunci pesan.
msg.MessageKey = "MessageKey"
// Atribut kustom pesan.
msg.Properties["a"] = strconv.Itoa(i)
// Kunci sharding yang digunakan untuk mendistribusikan pesan terurut ke partisi tertentu. Kunci sharding dapat digunakan untuk mengidentifikasi partisi. Kunci sharding berbeda dari kunci pesan.
msg.ShardingKey = strconv.Itoa(i % 2)
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}Berlangganan pesan terurut
Kode sampel berikut memberikan contoh cara berlangganan pesan terurut menggunakan HTTP client SDK untuk Go:
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
endpoint := "${HTTP_ENDPOINT}"
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
// Rahasia AccessKey yang digunakan untuk autentikasi.
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
topic := "${TOPIC}"
// ID instance tempat topik dimiliki. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instanceId := "${INSTANCE_ID}"
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// Logika konsumsi pesan.
var handles []string
fmt.Printf("Konsumsi %d pesan---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n"+
"\tShardingKey: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties, v.ShardingKey)
}
// Jika broker gagal menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter NextConsumeTime berakhir, broker akan mengirimkan pesan di partisi ke konsumen lagi.
// Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// Jika handle pesan kedaluwarsa, broker gagal menerima ACK untuk pesan dari konsumen.
fmt.Println(ackerr)
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
fmt.Println("ack err =", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// Tidak ada pesan yang tersedia untuk dikonsumsi di topik.
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nTidak ada pesan baru, lanjutkan!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout konsumsi pesan ??")
endChan <- 1
}
}
}()
// Konsumen mungkin menarik pesan terurut berpartisi dari beberapa partisi. Konsumen mengonsumsi pesan dalam setiap partisi sesuai dengan urutan pengiriman pesan.
// Anggaplah konsumen menarik pesan terurut berpartisi dari satu partisi. Jika broker tidak menerima ACK untuk pesan dari konsumen, broker akan mengirimkan pesan ke konsumen lagi.
// Konsumen hanya dapat mengonsumsi batch pesan berikutnya dari partisi setelah semua pesan yang ditarik dari partisi dalam batch sebelumnya diakui telah dikonsumsi.
// Konsumsi pesan dalam mode polling panjang. Dalam mode polling panjang, periode timeout jaringan adalah 35 detik.
// Dalam mode polling panjang, jika tidak ada pesan yang tersedia untuk dikonsumsi di topik, permintaan ditangguhkan di broker selama periode waktu yang ditentukan. Jika pesan tersedia untuk dikonsumsi selama periode waktu yang ditentukan, broker segera mengirimkan respons ke konsumen. Dalam contoh ini, nilai ditentukan sebagai 3 detik.
mqConsumer.ConsumeMessageOrderly(respChan, errChan,
3, // Jumlah maksimum pesan yang dapat dikonsumsi pada satu waktu. Dalam contoh ini, nilai ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3, // Durasi periode polling panjang. Unit: detik. Dalam contoh ini, nilai ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
)
<-endChan
}
}