全部产品
Search
文档中心

IoT Platform:Menghubungkan klien ke IoT Platform menggunakan SDK untuk Go

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan SDK untuk Go guna menghubungkan klien Advanced Message Queuing Protocol (AMQP) ke Alibaba Cloud IoT Platform dan menerima pesan dari IoT Platform melalui fitur langganan sisi server.

Prasyarat

ID grup konsumen yang berlangganan pesan dari topik telah diperoleh.

  • Anda dapat menggunakan grup konsumen default bernama DEFAULT_GROUP atau membuat grup konsumen di konsol IoT Platform. Untuk informasi lebih lanjut, lihat Kelola Grup Konsumen.

  • Anda dapat menggunakan grup konsumen untuk berlangganan pesan dari topik. Untuk informasi lebih lanjut, lihat Konfigurasikan Langganan Sisi Server AMQP.

Lingkungan pengembangan

Contoh ini menggunakan Go versi 1.12.7.

Unduh SDK

Jalankan perintah berikut untuk mengimpor SDK AMQP untuk Go:

import "pack.ag/amqp"

Untuk informasi lebih lanjut tentang penggunaan SDK, lihat paket amqp.

Kode contoh

package main

import (
	"os"
    "context"
    "crypto/hmac"
    "crypto/sha1"
    "encoding/base64"
    "fmt"
    "pack.ag/amqp"
    "time"
)
// Untuk informasi lebih lanjut tentang parameter, lihat topik "Menghubungkan klien AMQP ke IoT Platform".
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
// iotInstanceId: ID instance IoT Platform.
const iotInstanceId = "${YourIotInstanceId}"
// Titik akhir. Untuk informasi lebih lanjut, lihat topik "Menghubungkan klien AMQP ke IoT Platform".
const host = "${YourHost}"

func main() {
	// Jika Anda melakukan hard-code pasangan AccessKey dalam kode proyek, pasangan AccessKey tersebut mungkin terungkap jika kode proyek bocor. Dalam hal ini, sumber daya dalam akun Anda menjadi tidak aman. Contoh kode berikut memberikan contoh tentang cara memperoleh pasangan AccessKey dari variabel lingkungan. Contoh ini hanya untuk referensi.
	accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
	accessSecret := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    address := "amqps://" + host + ":5671"
    timestamp := time.Now().Nanosecond() / 1000000
    // Struktur parameter userName. Untuk informasi lebih lanjut, lihat topik "Menghubungkan klien AMQP ke IoT Platform".
    userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|", 
        clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
    stringToSign := fmt.Sprintf("authId=%s&timestamp=%d", accessKey, timestamp)
    hmacKey := hmac.New(sha1.New, []byte(accessSecret))
    hmacKey.Write([]byte(stringToSign))
    // Hitung tanda tangan. Untuk informasi lebih lanjut tentang cara membangun kata sandi, lihat topik "Menghubungkan klien AMQP ke IoT Platform".
    password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))

    amqpManager := &AmqpManager{
        address:address,
        userName:userName,
        password:password,
    }

    // Jika Anda perlu mengaktifkan atau menonaktifkan fitur penerimaan pesan, Anda dapat membuat konteks dengan menggunakan fungsi Background().
    ctx := context.Background()

    amqpManager.startReceiveMessage(ctx)
}

// Fungsi yang mengimplementasikan logika bisnis Anda. Fungsi ini adalah fungsi yang ditentukan pengguna yang dipanggil secara asinkron. Sebelum Anda mengonfigurasi fungsi ini, kami sarankan Anda mempertimbangkan konsumsi sumber daya sistem.
func (am *AmqpManager) processMessage(message *amqp.Message) {
    fmt.Println("data diterima:", string(message.GetData()), " properti:", message.ApplicationProperties)
}

type AmqpManager struct {
    address     string
    userName     string
    password     string
    client       *amqp.Client
    session     *amqp.Session
    receiver     *amqp.Receiver
}

func (am *AmqpManager) startReceiveMessage(ctx context.Context)  {

    childCtx, _ := context.WithCancel(ctx)
    err := am.generateReceiverWithRetry(childCtx)
    if nil != err {
        return
    }
    defer func() {
        am.receiver.Close(childCtx)
        am.session.Close(childCtx)
        am.client.Close()
    }()

    for {
        // Blokir penerimaan pesan. Jika ctx adalah konteks baru yang dibuat berdasarkan fungsi Background(), penerimaan pesan tidak diblokir.
        message, err := am.receiver.Receive(ctx)

        if nil == err {
            go am.processMessage(message)
            message.Accept()
        } else {
            fmt.Println("amqp menerima kesalahan data:", err)

            // Jika penerimaan pesan dinonaktifkan secara manual, keluar dari program.
            select {
            case <- childCtx.Done(): return
            default:
            }

            // Jika penerimaan pesan tidak dinonaktifkan secara manual, buat kembali koneksi.
            err := am.generateReceiverWithRetry(childCtx)
            if nil != err {
                return
            }
        }
    }
}

func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
    // Coba ulang dengan backoff eksponensial, dari 10 ms hingga 20 detik.
    duration := 10 * time.Millisecond
    maxDuration := 20000 * time.Millisecond
    times := 1

    // Jika pengecualian terjadi, coba ulang dengan backoff eksponensial.
    for {
        select {
        case <- ctx.Done(): return amqp.ErrConnClosed
        default:
        }

        err := am.generateReceiver()
        if nil != err {
            time.Sleep(duration)
            if duration < maxDuration {
                duration *= 2
            }
            fmt.Println("amqp mencoba ulang,times:", times, ",durasi:", duration)
            times ++
        } else {
            fmt.Println("amqp berhasil tersambung")
            return nil
        }
    }
}

// Status koneksi dan sesi tidak dapat ditentukan karena paket tidak tersedia. Buat kembali koneksi untuk mendapatkan informasi.
func (am *AmqpManager) generateReceiver() error {

    if am.session != nil {
        receiver, err := am.session.NewReceiver(
            amqp.LinkSourceAddress("/queue-name"),
            amqp.LinkCredit(20),
        )
        // Jika terjadi kesalahan pemutusan jaringan, koneksi ditutup dan sesi gagal dibuat. Jika tidak, koneksi berhasil dibuat.
        if err == nil {
            am.receiver = receiver
            return nil
        }
    }

    // Hapus koneksi sebelumnya.
    if am.client != nil {
        am.client.Close()
    }

    client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
    if err != nil {
        return err
    }
    am.client = client

    session, err := client.NewSession()
    if err != nil {
        return err
    }
    am.session = session

    receiver, err := am.session.NewReceiver(
        amqp.LinkSourceAddress("/queue-name"),
        amqp.LinkCredit(20),
    )
    if err != nil {
        return err
    }
    am.receiver = receiver

    return nil
}

Konfigurasikan parameter dalam kode sebelumnya sesuai dengan deskripsi parameter pada tabel berikut. Untuk informasi lebih lanjut tentang parameter lainnya, lihat Menghubungkan Klien AMQP ke IoT Platform.

Penting

Pastikan Anda menentukan nilai parameter yang valid. Jika tidak, klien AMQP gagal terhubung ke IoT Platform.

Parameter

Deskripsi

accessKey

Masuk ke konsol IoT Platform, arahkan pointer ke foto profil di sudut kanan atas, lalu klik AccessKey Management untuk mendapatkan ID AccessKey dan rahasia AccessKey.

Catatan

Jika Anda menggunakan Pengguna Resource Access Management (RAM), Anda harus melampirkan kebijakan AliyunIOTFullAccess ke pengguna RAM. Kebijakan ini memungkinkan pengguna RAM mengelola sumber daya IoT Platform. Jika tidak, koneksi ke IoT Platform gagal. Untuk informasi lebih lanjut, lihat Akses IoT Platform sebagai pengguna RAM.

accessSecret

consumerGroupId

ID grup konsumen dari instance IoT Platform.

Untuk melihat ID grup konsumen, lakukan langkah-langkah berikut: Masuk ke konsol IoT Platform dan klik kartu instance yang ingin Anda kelola. Di panel navigasi di sebelah kiri, pilih Message Forwarding > Server-side Subscription. ID grup konsumen ditampilkan pada tab Consumer Groups.

iotInstanceId

ID instance IoT Platform. Anda dapat melihat ID instance pada tab Ikhtisar di konsol IoT Platform.

  • Jika ID instance ditampilkan, Anda harus menyetel parameter ini ke ID instance.

  • Jika tab Ikhtisar tidak ditampilkan atau instance Anda tidak memiliki ID, biarkan parameter ini kosong dalam format iotInstanceId = "".

clientId

ID klien. Anda harus menentukan ID kustom. ID harus memiliki panjang 1 hingga 64 karakter. Kami sarankan Anda menggunakan pengenal unik sebagai ID klien, seperti UUID, alamat MAC, atau alamat IP server tempat klien berjalan.

Setelah klien AMQP terhubung ke IoT Platform dan dimulai, lakukan langkah-langkah berikut untuk melihat detail klien: Masuk ke konsol IoT Platform dan klik kartu instance yang ingin Anda kelola. Di panel navigasi di sebelah kiri, pilih Message Forwarding > Server-side Subscription. Pada tab Consumer Groups, temukan grup konsumen yang ingin Anda kelola dan klik View di kolom Tindakan. ID setiap klien ditampilkan pada tab Consumer Group Status. Anda dapat menggunakan ID klien untuk mengidentifikasi klien dengan mudah.

host

Titik akhir yang digunakan klien AMQP untuk terhubung ke IoT Platform.

Untuk informasi lebih lanjut tentang titik akhir yang dapat Anda tentukan untuk variabel ${YourHost}, lihat Kelola titik akhir instance.

Hasil contoh

  • Jika informasi serupa dengan output berikut ditampilkan, klien AMQP berhasil terhubung ke IoT Platform dan dapat menerima pesan.成功

  • Jika informasi serupa dengan output berikut ditampilkan, klien AMQP gagal terhubung ke IoT Platform.

    Periksa kode atau lingkungan jaringan berdasarkan log, perbaiki masalah, lalu jalankan kode lagi.

    失败

Referensi

Untuk informasi lebih lanjut tentang kode kesalahan terkait fitur langganan sisi server, lihat bagian Kode Kesalahan Terkait Pesan dari topik "Log IoT Platform".