This topic describes how to connect a Go client to IoT Platform over Advanced Message Queuing Protocol (AMQP) and enable the client to receive messages from IoT Platform.

Prerequisites

The ID of the consumer group that has subscribed to the messages of a topic is obtained.

Prepare the development environment

In this example, Go 1.12.7 is used.

Download the SDK

You can run the following command to import AMQP SDK for Go:

import "pack.ag/amqp"

For more information about how to use the SDK, see package amqp.

Sample code

package main

import (
    "context"
    "crypto/hmac"
    "crypto/sha1"
    "encoding/base64"
    "fmt"
    "pack.ag/amqp"
    "time"
)
// The parameters. For more information, see Connect an AMQP client to IoT Platform. 
const accessKey = "${YourAccessKey}"
const accessSecret = "${YourAccessSecret}"
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
// iotInstanceId: the ID of the instance. 
const iotInstanceId = "${YourIotInstanceId}"
// The endpoint. For more information, see Connect an AMQP client to IoT Platform. 
const host = "${YourHost}"

func main() {
    address := "amqps://" + host + ":5671"
    timestamp := time.Now().Nanosecond() / 1000000
    // The structure of the userName parameter. For more information, see Connect an AMQP client to IoT Platform. 
    userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|", 
        clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
    stringToSign := fmt.Sprintf("authId=%s&timestamp=%d", accessKey, timestamp)
    hmacKey := hmac.New(sha1.New, []byte(accessSecret))
    hmacKey.Write([]byte(stringToSign))
    // Calculate a signature. For more information about how to configure the password parameter, see Connect an AMQP client to IoT Platform. 
    password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))

    amqpManager := &AmqpManager{
        address:address,
        userName:userName,
        password:password,
    }

    // If you need to enable or disable the message receiving feature, you can create a context by using the Background() function. 
    ctx := context.Background()

    amqpManager.startReceiveMessage(ctx)
}

// The function that implements your business logic. The function is a user-defined function that is asynchronously implemented. Before you configure this function, we recommend that you consider the consumption of system resources. 
func (am *AmqpManager) processMessage(message *amqp.Message) {
    fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)
}

type AmqpManager struct {
    address     string
    userName     string
    password     string
    client       *amqp.Client
    session     *amqp.Session
    receiver     *amqp.Receiver
}

func (am *AmqpManager) startReceiveMessage(ctx context.Context)  {

    childCtx, _ := context.WithCancel(ctx)
    err := am.generateReceiverWithRetry(childCtx)
    if nil != err {
        return
    }
    defer func() {
        am.receiver.Close(childCtx)
        am.session.Close(childCtx)
        am.client.Close()
    }()

    for {
        // Block message receiving. If ctx is the new context that is created based on the Background() function, message receiving is not blocked. 
        message, err := am.receiver.Receive(ctx)

        if nil == err {
            go am.processMessage(message)
            message.Accept()
        } else {
            fmt.Println("amqp receive data error:", err)

            // If message receiving is manually disabled, exit the program. 
            select {
            case <- childCtx.Done(): return
            default:
            }

            // If message receiving is not manually disabled, retry the connection. 
            err := am.generateReceiverWithRetry(childCtx)
            if nil != err {
                return
            }
        }
    }
}

func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
    // Retry with exponential backoff, from 10 ms to 20s. 
    duration := 10 * time.Millisecond
    maxDuration := 20000 * time.Millisecond
    times := 1

    // If exceptions occur, retry with exponential backoff. 
    for {
        select {
        case <- ctx.Done(): return amqp.ErrConnClosed
        default:
        }

        err := am.generateReceiver()
        if nil != err {
            time.Sleep(duration)
            if duration < maxDuration {
                duration *= 2
            }
            fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
            times ++
        } else {
            fmt.Println("amqp connect init success")
            return nil
        }
    }
}

// The statuses of the connection and session cannot be determined because the packets are unavailable. Retry the connection to obtain the information. 
func (am *AmqpManager) generateReceiver() error {

    if am.session != nil {
        receiver, err := am.session.NewReceiver(
            amqp.LinkSourceAddress("/queue-name"),
            amqp.LinkCredit(20),
        )
        // If a network disconnection error occurs, the connection is ended and the session fails to be established. Otherwise, the connection is established. 
        if err == nil {
            am.receiver = receiver
            return nil
        }
    }

    // Delete the previous connection. 
    if am.client != nil {
        am.client.Close()
    }

    client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
    if err != nil {
        return err
    }
    am.client = client

    session, err := client.NewSession()
    if err != nil {
        return err
    }
    am.session = session

    receiver, err := am.session.NewReceiver(
        amqp.LinkSourceAddress("/queue-name"),
        amqp.LinkCredit(20),
    )
    if err != nil {
        return err
    }
    am.receiver = receiver

    return nil
}

You can specify the parameters in the preceding code based on the parameter descriptions in the following table. For more information, see Connect an AMQP client to IoT Platform.

Parameter Example Description
accessKey LTAI4GFGQvKuqHJhFa****** Log on to the IoT Platform console, move the pointer over the profile picture, and then click AccessKey Management to obtain the AccessKey ID and AccessKey secret.
Note If you use a RAM user, you must attach the AliyunIOTFullAccess permission policy to the RAM user. This policy allows the RAM user to manage IoT Platform resources. Otherwise, the connection with IoT Platform fails. For more information about how to authorize a RAM user, see RAM user access.
accessSecret iMS8ZhCDdfJbCMeA005sieKe******
consumerGroupId VWhGZ2QnP7kxWpeSSjt****** The ID of the consumer group.

To view the ID of the consumer group, perform the following steps: Log on to the IoT Platform console and click the card of the instance that you want to manange. Choose Rules Engine > Server-side Subscription > Consumer Groups. The ID is displayed on the Consumer Groups tab.

iotInstanceId "" The ID of the instance. You can view the ID of the instance on the Overview page in the IoT Platform console.
  • If you have an ID value, you must specify the ID for this parameter.
  • If no Overview or ID is generated for your instance, specify an empty string (iotInstanceId = "") for the parameter.
clientId 12345 The ID of the client. We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address of the client. The client ID must be 1 to 64 characters in length.

Log on to the IoT Platform console and click the card of the instance that you want to manage. Choose Rules Engine > Server-side Subscription > Consumer Groups. Find the consumer group that you want to manage and click View in the Actions column. The ID of each client is displayed on the Consumer Group Details page. You can use client IDs to efficiently identify clients.

host 198426864******.iot-amqp.cn-shanghai.aliyuncs.com The AMQP endpoint.

For more information about the endpoints that you can specify for the ${YourHost} variable, see View the endpoint of an instance.

Sample results

  • Sample success result: After you run the code, the following log data may be returned. The data indicates that the AMQP client is connected to IoT Platform and can receive messages.Success
  • Sample failure result:

    You can check the code or network environment based on logs, solve the problem, and then run the code again.

    Failed

References

For more information about the error codes that are related to server-side subscription, see Message-related error codes.