This topic describes how to use the AMQP protocol with a Go client to access Alibaba Cloud IoT Platform and receive subscribed messages.

Development environment

In this example, Go 1.12.7 is used for development.

Download the SDK

You can run the following command to import the AMQP SDK for the Go language:

import "pack.ag/amqp"

For more information about how to use the SDK, see package amqp.

Sample code

For more information about parameters in the following sample, see AMQP client access instructions.

package main

import (
    "context"
    "crypto/hmac"
    "crypto/sha1"
    "encoding/base64"
    "fmt"
    "pack.ag/amqp"
    "time"
)

const uid = "${uid}"
const accessKey = "${accessKey}"
const accessSecret = "${accessSecret}"
const region = "${regionId}"
const consumerGroupId = "${consumerGroupId}"
const clientId = "${clientId}"

func main() {
    address := fmt.Sprintf("amqps://%s.iot-amqp.%s.aliyuncs.com:5671", uid, region)
    timestamp := time.Now().Nanosecond() / 1000000
    username := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,timestamp=%d|",
        clientId, consumerGroupId, accessKey, timestamp)
    stringToSign := fmt.Sprintf("authId=%s&timestamp=%d", accessKey, timestamp)
    hmacKey := hmac.New(sha1.New, []byte(accessSecret))
    hmacKey.Write([]byte(stringToSign))
    password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))

    client, err := amqp.Dial(address, amqp.ConnSASLPlain(username, password))
    if nil ! = err {
        fmt.Println("amqp client create error:", err)
    }
    defer client.Close()
    session, err := client.NewSession()
    if nil ! = err {
        fmt.Println("amqp session create error:", err)
    }

    //Receive messages and create a new context by using the Background function.
    ctx := context.Background()

    receiver, err := session.NewReceiver(
        amqp.LinkSourceAddress("/queue-name"),
        amqp.LinkCredit(20),
    )
    if nil ! = err {
        fmt.Println("amqp receiver create error:", err)
    }

    defer receiver.Close(ctx)

    //Disable the receiver
    defer func() {
        ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
        receiver.Close(ctx)
        cancel()
    }()

  ch := make(chan int)

  go receiveData(receiver, ctx, ch)

  //When messages are being blocked or other services are being run, if you need to interrupt the context, you can send a cancelation signal to the channel ch.
  select {
  case <- ch:
  }

}

func receiveData(receiver *amqp.Receiver, ctx context.Context, ch chan int) {

  for {
    //When messages are being blocked, if ctx is a Background, the context is not interrupted.
    message, err := receiver.Receive(ctx)

    if nil ! = err {
      fmt.Println("amqp receive data error:", err)
      //Exit the program
      ch <- 1
    }

    //Do not perform time-consuming operations here. If you need to process additional workloads, we recommend that you process them asynchronously.
    fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)

    message.Accept()
  }

}