本文介绍使用AMQP协议的Go客户端接入阿里云物联网平台,接收服务端订阅消息的示例。
开发环境
本示例的测试环境为Go 1.12.7。
下载SDK
可使用以下命令导入Go语言AMQP SDK。
import "pack.ag/amqp"
SDK使用说明,请参见package amqp。
代码示例
以下Demo中涉及的参数说明,请参见AMQP客户端接入说明。
package main
import (
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"pack.ag/amqp"
"time"
)
//参数说明,请参见AMQP客户端接入说明文档。
const accessKey = "${YourAccessKey}"
const accessSecret = "${YourAccessSecret}"
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
//iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
const iotInstanceId = "${YourIotInstanceId}"
//接入域名,请参见AMQP客户端接入说明文档。
const host = "${YourHost}"
func main() {
address := "amqps://" + host + ":5671"
timestamp := time.Now().Nanosecond() / 1000000
//userName组装方法,请参见AMQP客户端接入说明文档。
userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|",
clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
stringToSign := fmt.Sprintf("authId=%s×tamp=%d", accessKey, timestamp)
hmacKey := hmac.New(sha1.New, []byte(accessSecret))
hmacKey.Write([]byte(stringToSign))
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))
amqpManager := &AmqpManager{
address:address,
userName:userName,
password:password,
}
//如果需要做接受消息通信或者取消操作,从Background衍生context。
ctx := context.Background()
amqpManager.startReceiveMessage(ctx)
}
//业务函数。用户自定义实现,该函数被异步执行,请考虑系统资源消耗情况。
func (am *AmqpManager) processMessage(message *amqp.Message) {
fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)
}
type AmqpManager struct {
address string
userName string
password string
client *amqp.Client
session *amqp.Session
receiver *amqp.Receiver
}
func (am *AmqpManager) startReceiveMessage(ctx context.Context) {
childCtx, _ := context.WithCancel(ctx)
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
defer func() {
am.receiver.Close(childCtx)
am.session.Close(childCtx)
am.client.Close()
}()
for {
//阻塞接受消息,如果ctx是background则不会被打断。
message, err := am.receiver.Receive(ctx)
if nil == err {
go am.processMessage(message)
message.Accept()
} else {
fmt.Println("amqp receive data error:", err)
//如果是主动取消,则退出程序。
select {
case <- childCtx.Done(): return
default:
}
//非主动取消,则重新建立连接。
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
}
}
}
func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
//退避重连,从10ms依次x2,直到20s。
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
times := 1
//异常情况,退避重连。
for {
select {
case <- ctx.Done(): return amqp.ErrConnClosed
default:
}
err := am.generateReceiver()
if nil != err {
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
times ++
} else {
fmt.Println("amqp connect init success")
return nil
}
}
}
//由于包不可见,无法判断Connection和Session状态,重启连接获取。
func (am *AmqpManager) generateReceiver() error {
if am.session != nil {
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
//如果断网等行为发生,Connection会关闭导致Session建立失败,未关闭连接则建立成功。
if err == nil {
am.receiver = receiver
return nil
}
}
//清理上一个连接。
if am.client != nil {
am.client.Close()
}
client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
if err != nil {
return err
}
am.client = client
session, err := client.NewSession()
if err != nil {
return err
}
am.session = session
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
if err != nil {
return err
}
am.receiver = receiver
return nil
}
您需按照如下表格中的参数说明,修改代码中的参数值。更多参数说明,请参见AMQP客户端接入说明。
参数 | 示例 | 说明 |
---|---|---|
accessKey | LTAI4GFGQvKuqHJhFa****** |
登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。 说明 如果使用RAM用户,您需授予该用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见授权RAM用户访问物联网平台。
|
accessSecret | iMS8ZhCDdfJbCMeA005sieKe****** | |
consumerGroupId | VWhGZ2QnP7kxWpeSSjt****** | 消费组ID。
登录物联网平台控制台,在 查看您的消费组ID。 |
iotInstanceId | "" | 实例ID。传入空值,即iotInstanceId = "" 。 |
clientId | 12345 | 表示客户端ID,建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。长度不可超过64个字符。
登录物联网平台控制台,在查看,消费组详情页将显示该参数,方便您识别区分不同的客户端。 ,单击消费组对应的 |
host | 233***.iot-amqp.cn-shanghai.aliyuncs.com | AMQP客户端接入物联网平台的接入域名。格式为${uid}.iot-amqp.${YourRegionId}.aliyuncs.com ,其中:
|
运行结果示例
- 成功:返回类似如下日志信息,表示AMQP客户端已接入物联网平台并成功接收消息。
- 失败:返回类似如下日志信息,表示AMQP客户端连接物联网平台失败。
您可根据日志提示,检查代码或网络环境,然后修正问题,重新运行代码。