This article describes how to connect a Go client to Alibaba Cloud IoT Platform over Advanced Message Queuing Protocol (AMQP) and enable the client to receive messages from IoT Platform.
Development environment
Go 1.12.7 is used in this example.
Download the SDK
You can run the following command to import the AMQP SDK for Go:
import "pack.ag/amqp"
For more information about how to use the SDK, see package amqp.
Sample code
For more information about the parameters in the following sample code, see Connect an AMQP client to IoT Platform.
package main
import (
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"pack.ag/amqp"
"time"
)
// For more information about the parameters, see Connect an AMQP client to IoT Platform.
const accessKey = "${YourAccessKey}"
const accessSecret = "${YourAccessSecret}"
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
// iotInstanceId: If you are using a purchased instance, you must specify the instance ID. If you are using a public instance, you can enter an empty string ("").
const iotInstanceId = "${YourIotInstanceId}"
// The endpoint. For more information, see Connect an AMQP client to IoT Platform.
const host = "${YourHost}"
func main() {
address := "amqps://" + host + ":5671"
timestamp := time.Now().Nanosecond() / 1000000
// The structure of the userName parameter. For more information, see Connect an AMQP client to IoT Platform.
userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|",
clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
stringToSign := fmt.Sprintf("authId=%s×tamp=%d", accessKey, timestamp)
hmacKey := hmac.New(sha1.New, []byte(accessSecret))
hmacKey.Write([]byte(stringToSign))
// The structure of the signature and the password parameters. For more information, see Connect an AMQP client to IoT Platform.
password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))
amqpManager := &AmqpManager{
address:address,
userName:userName,
password:password,
}
// If you need to enable or disable the message receiving function, you can create a context by using the Background function.
ctx := context.Background()
amqpManager.startReceiveMessage(ctx)
}
// The function that implements your business logic. The function is a user-defined function that is asynchronously executed. Before you configure this function, we recommend that you consider the consumption of system resources.
func (am *AmqpManager) processMessage(message *amqp.Message) {
fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)
}
type AmqpManager struct {
address string
userName string
password string
client *amqp.Client
session *amqp.Session
receiver *amqp.Receiver
}
func (am *AmqpManager) startReceiveMessage(ctx context.Context) {
childCtx, _ := context.WithCancel(ctx)
err := am.generateReceiverWithRetry(childCtx)
if nil ! = err {
return
}
defer func() {
am.receiver.Close(childCtx)
am.session.Close(childCtx)
am.client.Close()
}()
for {
// Blocks message receiving. If ctx is the new context that is created based on the Background function, message receiving is not blocked.
message, err := am.receiver.Receive(ctx)
if nil == err {
go am.processMessage(message)
message.Accept()
} else {
fmt.Println("amqp receive data error:", err)
// If message receiving is manually disabled, exit the program.
select {
case <- childCtx.Done(): return
default:
}
// If message receiving is not manually disabled, retry the connection.
err := am.generateReceiverWithRetry(childCtx)
if nil ! = err {
return
}
}
}
}
func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
// Retry with exponential backoff, from 10 ms to 20s.
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
times := 1
// If exceptions occur, retry with exponential backoff.
for {
select {
case <- ctx.Done(): return amqp.ErrConnClosed
default:
}
err := am.generateReceiver()
if nil ! = err {
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
times ++
} else {
fmt.Println("amqp connect init success")
return nil
}
}
}
// The statuses of the connection and session cannot be determined because the packets are unavailable. Retry the connection to obtain the information.
func (am *AmqpManager) generateReceiver() error {
if am.session ! = nil {
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
// If a network disconnection error occurs, the connection is ended and the session establishment fails. Otherwise, the connection is established.
if err == nil {
am.receiver = receiver
return nil
}
}
// Deletes the previous connection.
if am.client ! = nil {
am.client.Close()
}
client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
if err ! = nil {
return err
}
am.client = client
session, err := client.NewSession()
if err ! = nil {
return err
}
am.session = session
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
if err ! = nil {
return err
}
am.receiver = receiver
return nil
}