This article describes how to connect a Go client to Alibaba Cloud IoT Platform over AMQP and enable the client to receive messages from IoT Platform.

Development environment

Go 1.12.7 is used in this example.

Download the SDK

If you want to install the AMQP SDK for Go, run the following command:

import "pack.ag/amqp"

For more information about how to use the SDK, visit package amqp.

Sample code

For more information about the parameters in the following sample code, see Connect an AMQP client to IoT Platform.

package main

import (
    "context"
    "crypto/hmac"
    "crypto/sha1"
    "encoding/base64"
    "fmt"
    "pack.ag/amqp"
    "time"
)
// For more information about the parameters, see the "Connect an AMQP client to IoT Platform" topic.
const accessKey = "${accessKey}"
const accessSecret = "${YourAccessSecret}"
const consumerGroupId = "${YourConsumerGroupId}"
// iotInstanceId: If you are using a purchased instance, you must specify the instance ID. If you are using a public instance, you can enter an empty string "".
const iotInstanceId = "${YourIotInstanceId}"
// The value of the clientId parameter is displayed in the Client ID column on the Consumer Group Status tab of a consumer group for server-side subscription in the console.
// We recommend that you set clientId to a unique identifier, such as the UUID, MAC address, or IP address. 
const clientId = "${YourClientId}"
// For more information about how to configure the host(endpoint), see the "Connect an AMQP client to IoT Platform" topic.
const host = "${YourHost}"

func main() {
    address := "amqps://" + host + ":5671"
    timestamp := time.Now().Nanosecond() / 1000000
    userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|", 
        clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
    stringToSign := fmt.Sprintf("authId=%s&timestamp=%d", accessKey, timestamp)
    hmacKey := hmac.New(sha1.New, []byte(accessSecret))
    hmacKey.Write([]byte(stringToSign))
    password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))

    amqpManager := &AmqpManager{
        address:address,
        userName:userName,
        password:password,
    }

    // If you want to enable or disable the message receiving function, you can create a new context by using the Background function.
    ctx := context.Background()

    amqpManager.startReceiveMessage(ctx)
}

// The function that implements your business logic. The function is a user-defined function that is asynchronously executed. Before you configure this function, we recommend that you consider the consumption of system resources.
func (am *AmqpManager) processMessage(message *amqp.Message) {
    fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)
}

type AmqpManager struct {
    address     string
    userName     string
    password     string
    client         *amqp.Client
    session     *amqp.Session
    receiver     *amqp.Receiver
}

func (am *AmqpManager) startReceiveMessage(ctx context.Context)  {

    childCtx, _ := context.WithCancel(ctx)
    err := am.generateReceiverWithRetry(childCtx)
    if nil ! = err {
        return
    }
    defer func() {
        am.receiver.Close(childCtx)
        am.session.Close(childCtx)
        am.client.Close()
    }()

    for {
        //Block message receiving. If ctx is the new context that was created based on the Background function, message receiving is not blocked.
        message, err := am.receiver.Receive(ctx)

        if nil == err {
            go am.processMessage(message)
            message.Accept()
        } else {
            fmt.Println("amqp receive data error:", err)

            // If message receiving is manually disabled, exit the program.
            select {
            case <- childCtx.Done(): return
            default:
            }

            // If message receiving is not manually disabled, retry the connection.
            err := am.generateReceiverWithRetry(childCtx)
            if nil ! = err {
                return
            }
        }
    }
}

func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
    // Retry with exponential backoff, from 10 ms to 20s.
    duration := 10 * time.Millisecond
    maxDuration := 20000 * time.Millisecond
    times := 1

    // If exceptions occur, retry with exponential backoff.
    for {
        select {
        case <- ctx.Done(): return amqp.ErrConnClosed
        default:
        }

        err := am.generateReceiver()
        if nil ! = err {
            time.Sleep(duration)
            if duration < maxDuration {
                duration *= 2
            }
            fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
            times ++
        } else {
            fmt.Println("amqp connect init success")
            return nil
        }
    }
}

// The status of the connection and session cannot be determined because the packets are unavailable. Retry the connection to obtain the information.
func (am *AmqpManager) generateReceiver() error {

    if am.session ! = nil {
        receiver, err := am.session.NewReceiver(
            amqp.LinkSourceAddress("/queue-name"),
            amqp.LinkCredit(20),
        )
        // If a network disconnection error occurs, the connection is ended and the session establishment fails. Otherwise, the connection is established.
        if err == nil {
            am.receiver = receiver
            return nil
        }
    }

    // Deletes the previous connection.
    if am.client ! = nil {
        am.client.Close()
    }

    client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
    if err ! = nil {
        return err
    }
    am.client = client

    session, err := client.NewSession()
    if err ! = nil {
        return err
    }
    am.session = session

    receiver, err := am.session.NewReceiver(
        amqp.LinkSourceAddress("/queue-name"),
        amqp.LinkCredit(20),
    )
    if err ! = nil {
        return err
    }
    am.receiver = receiver

    return nil
}