Topik ini menjelaskan cara menggunakan SDK untuk Go guna menghubungkan klien Advanced Message Queuing Protocol (AMQP) ke Alibaba Cloud IoT Platform dan menerima pesan dari IoT Platform melalui fitur langganan sisi server.
Prasyarat
ID grup konsumen yang berlangganan pesan dari topik telah diperoleh.
Anda dapat menggunakan grup konsumen default bernama DEFAULT_GROUP atau membuat grup konsumen di konsol IoT Platform. Untuk informasi lebih lanjut, lihat Kelola Grup Konsumen.
Anda dapat menggunakan grup konsumen untuk berlangganan pesan dari topik. Untuk informasi lebih lanjut, lihat Konfigurasikan Langganan Sisi Server AMQP.
Lingkungan pengembangan
Contoh ini menggunakan Go versi 1.12.7.
Unduh SDK
Jalankan perintah berikut untuk mengimpor SDK AMQP untuk Go:
import "pack.ag/amqp"Untuk informasi lebih lanjut tentang penggunaan SDK, lihat paket amqp.
Kode contoh
package main
import (
"os"
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"pack.ag/amqp"
"time"
)
// Untuk informasi lebih lanjut tentang parameter, lihat topik "Menghubungkan klien AMQP ke IoT Platform".
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
// iotInstanceId: ID instance IoT Platform.
const iotInstanceId = "${YourIotInstanceId}"
// Titik akhir. Untuk informasi lebih lanjut, lihat topik "Menghubungkan klien AMQP ke IoT Platform".
const host = "${YourHost}"
func main() {
// Jika Anda melakukan hard-code pasangan AccessKey dalam kode proyek, pasangan AccessKey tersebut mungkin terungkap jika kode proyek bocor. Dalam hal ini, sumber daya dalam akun Anda menjadi tidak aman. Contoh kode berikut memberikan contoh tentang cara memperoleh pasangan AccessKey dari variabel lingkungan. Contoh ini hanya untuk referensi.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
accessSecret := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
address := "amqps://" + host + ":5671"
timestamp := time.Now().Nanosecond() / 1000000
// Struktur parameter userName. Untuk informasi lebih lanjut, lihat topik "Menghubungkan klien AMQP ke IoT Platform".
userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|",
clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
stringToSign := fmt.Sprintf("authId=%s×tamp=%d", accessKey, timestamp)
hmacKey := hmac.New(sha1.New, []byte(accessSecret))
hmacKey.Write([]byte(stringToSign))
// Hitung tanda tangan. Untuk informasi lebih lanjut tentang cara membangun kata sandi, lihat topik "Menghubungkan klien AMQP ke IoT Platform".
password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))
amqpManager := &AmqpManager{
address:address,
userName:userName,
password:password,
}
// Jika Anda perlu mengaktifkan atau menonaktifkan fitur penerimaan pesan, Anda dapat membuat konteks dengan menggunakan fungsi Background().
ctx := context.Background()
amqpManager.startReceiveMessage(ctx)
}
// Fungsi yang mengimplementasikan logika bisnis Anda. Fungsi ini adalah fungsi yang ditentukan pengguna yang dipanggil secara asinkron. Sebelum Anda mengonfigurasi fungsi ini, kami sarankan Anda mempertimbangkan konsumsi sumber daya sistem.
func (am *AmqpManager) processMessage(message *amqp.Message) {
fmt.Println("data diterima:", string(message.GetData()), " properti:", message.ApplicationProperties)
}
type AmqpManager struct {
address string
userName string
password string
client *amqp.Client
session *amqp.Session
receiver *amqp.Receiver
}
func (am *AmqpManager) startReceiveMessage(ctx context.Context) {
childCtx, _ := context.WithCancel(ctx)
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
defer func() {
am.receiver.Close(childCtx)
am.session.Close(childCtx)
am.client.Close()
}()
for {
// Blokir penerimaan pesan. Jika ctx adalah konteks baru yang dibuat berdasarkan fungsi Background(), penerimaan pesan tidak diblokir.
message, err := am.receiver.Receive(ctx)
if nil == err {
go am.processMessage(message)
message.Accept()
} else {
fmt.Println("amqp menerima kesalahan data:", err)
// Jika penerimaan pesan dinonaktifkan secara manual, keluar dari program.
select {
case <- childCtx.Done(): return
default:
}
// Jika penerimaan pesan tidak dinonaktifkan secara manual, buat kembali koneksi.
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
}
}
}
func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
// Coba ulang dengan backoff eksponensial, dari 10 ms hingga 20 detik.
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
times := 1
// Jika pengecualian terjadi, coba ulang dengan backoff eksponensial.
for {
select {
case <- ctx.Done(): return amqp.ErrConnClosed
default:
}
err := am.generateReceiver()
if nil != err {
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
fmt.Println("amqp mencoba ulang,times:", times, ",durasi:", duration)
times ++
} else {
fmt.Println("amqp berhasil tersambung")
return nil
}
}
}
// Status koneksi dan sesi tidak dapat ditentukan karena paket tidak tersedia. Buat kembali koneksi untuk mendapatkan informasi.
func (am *AmqpManager) generateReceiver() error {
if am.session != nil {
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
// Jika terjadi kesalahan pemutusan jaringan, koneksi ditutup dan sesi gagal dibuat. Jika tidak, koneksi berhasil dibuat.
if err == nil {
am.receiver = receiver
return nil
}
}
// Hapus koneksi sebelumnya.
if am.client != nil {
am.client.Close()
}
client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
if err != nil {
return err
}
am.client = client
session, err := client.NewSession()
if err != nil {
return err
}
am.session = session
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
if err != nil {
return err
}
am.receiver = receiver
return nil
}Konfigurasikan parameter dalam kode sebelumnya sesuai dengan deskripsi parameter pada tabel berikut. Untuk informasi lebih lanjut tentang parameter lainnya, lihat Menghubungkan Klien AMQP ke IoT Platform.
Pastikan Anda menentukan nilai parameter yang valid. Jika tidak, klien AMQP gagal terhubung ke IoT Platform.
Parameter | Deskripsi |
accessKey | Masuk ke konsol IoT Platform, arahkan pointer ke foto profil di sudut kanan atas, lalu klik AccessKey Management untuk mendapatkan ID AccessKey dan rahasia AccessKey. Catatan Jika Anda menggunakan Pengguna Resource Access Management (RAM), Anda harus melampirkan kebijakan AliyunIOTFullAccess ke pengguna RAM. Kebijakan ini memungkinkan pengguna RAM mengelola sumber daya IoT Platform. Jika tidak, koneksi ke IoT Platform gagal. Untuk informasi lebih lanjut, lihat Akses IoT Platform sebagai pengguna RAM. |
accessSecret | |
consumerGroupId | ID grup konsumen dari instance IoT Platform. Untuk melihat ID grup konsumen, lakukan langkah-langkah berikut: Masuk ke konsol IoT Platform dan klik kartu instance yang ingin Anda kelola. Di panel navigasi di sebelah kiri, pilih . ID grup konsumen ditampilkan pada tab Consumer Groups. |
iotInstanceId | ID instance IoT Platform. Anda dapat melihat ID instance pada tab Ikhtisar di konsol IoT Platform.
|
clientId | ID klien. Anda harus menentukan ID kustom. ID harus memiliki panjang 1 hingga 64 karakter. Kami sarankan Anda menggunakan pengenal unik sebagai ID klien, seperti UUID, alamat MAC, atau alamat IP server tempat klien berjalan. Setelah klien AMQP terhubung ke IoT Platform dan dimulai, lakukan langkah-langkah berikut untuk melihat detail klien: Masuk ke konsol IoT Platform dan klik kartu instance yang ingin Anda kelola. Di panel navigasi di sebelah kiri, pilih . Pada tab Consumer Groups, temukan grup konsumen yang ingin Anda kelola dan klik View di kolom Tindakan. ID setiap klien ditampilkan pada tab Consumer Group Status. Anda dapat menggunakan ID klien untuk mengidentifikasi klien dengan mudah. |
host | Titik akhir yang digunakan klien AMQP untuk terhubung ke IoT Platform. Untuk informasi lebih lanjut tentang titik akhir yang dapat Anda tentukan untuk variabel |
Hasil contoh
Jika informasi serupa dengan output berikut ditampilkan, klien AMQP berhasil terhubung ke IoT Platform dan dapat menerima pesan.

Jika informasi serupa dengan output berikut ditampilkan, klien AMQP gagal terhubung ke IoT Platform.
Periksa kode atau lingkungan jaringan berdasarkan log, perbaiki masalah, lalu jalankan kode lagi.

Referensi
Untuk informasi lebih lanjut tentang kode kesalahan terkait fitur langganan sisi server, lihat bagian Kode Kesalahan Terkait Pesan dari topik "Log IoT Platform".