Pesan normal adalah pesan tanpa fitur khusus yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini berbeda dari pesan berciri khas seperti pesan terjadwal, pesan tertunda, pesan terurut, dan pesan transaksional. Topik ini menyajikan contoh kode untuk mengirim dan menerima pesan normal menggunakan HTTP client SDK untuk Go.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
Instal SDK untuk Go. Untuk informasi lebih lanjut, lihat Siapkan Lingkungan.
Buat sumber daya yang ingin Anda gunakan dalam kode melalui konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Peroleh pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan normal
Contoh kode berikut menunjukkan cara mengirim pesan normal menggunakan HTTP client SDK untuk Go:
package main
import (
"fmt"
"time"
"strconv"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
endpoint := "${HTTP_ENDPOINT}"
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
// Rahasia AccessKey yang digunakan untuk autentikasi.
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
topic := "${TOPIC}"
// ID instance tempat topik dimiliki. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instanceId := "${INSTANCE_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// Kirim empat pesan secara siklik.
for i := 0; i < 4; i++ {
var msg mq_http_sdk.PublishMessageRequest
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!", // Isi pesan.
MessageTag: "", // Tag pesan.
Properties: map[string]string{}, // Atribut pesan.
}
// Kunci pesan.
msg.MessageKey = "MessageKey"
// Atribut kustom pesan.
msg.Properties["a"] = strconv.Itoa(i)
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}Berlangganan pesan normal
Contoh kode berikut menunjukkan cara berlangganan pesan normal menggunakan HTTP client SDK untuk Go:
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
endpoint := "${HTTP_ENDPOINT}"
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
// Rahasia AccessKey yang digunakan untuk autentikasi.
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
// Setiap topik dapat digunakan untuk mengirim dan menerima pesan jenis tertentu. Misalnya, topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
topic := "${TOPIC}"
// ID instance tempat topik dimiliki. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instanceId := "${INSTANCE_ID}"
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// Logika konsumsi pesan.
var handles []string
fmt.Printf("Konsumsi %d pesan---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
}
// Jika broker gagal menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter NextConsumeTime berakhir, broker akan mengirimkan ulang pesan di partisi kepada konsumen.
// Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// Jika handle pesan kedaluwarsa, broker gagal menerima ACK untuk pesan dari konsumen.
fmt.Println(ackerr)
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
fmt.Println("ack err =", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// Tidak ada pesan yang tersedia untuk dikonsumsi di topik.
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nTidak ada pesan baru, lanjutkan!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout konsumsi pesan ??")
endChan <- 1
}
}
}()
// Konsumsi pesan dalam mode polling panjang. Periode timeout jaringan default adalah 35 detik.
// Dalam mode polling panjang, jika tidak ada pesan di topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan pada broker selama periode waktu yang ditentukan. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, broker segera mengirimkan respons ke konsumen. Dalam contoh ini, nilai ditentukan sebagai 3 detik.
mqConsumer.ConsumeMessage(respChan, errChan,
3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3, // Durasi periode polling panjang. Unit: detik. Dalam contoh ini, nilai ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
)
<-endChan
}
}