本文介绍使用Python3 SDK接入阿里云物联网平台,接收服务端订阅消息的示例。

开发环境

可使用Python 3.0及更高版本。本示例使用了Python 3.8版本。

下载SDK

本示例使用stomp.py和schedule,您可访问stomp.pyschedule查看使用说明。

安装stomp.py和schedule的操作指导,请参见Installing Packages

代码示例

本文提供基于stomp.py的7.0.0版本示例代码。

# encoding=utf-8

import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading

def connect_and_subscribe(conn):
    accessKey = "${YourAccessKeyId}"
    accessSecret = "${YourAccessKeySecret}"
    consumerGroupId = "${YourConsumerGroupId}"
    # iotInstanceId:实例ID。
    iotInstanceId = "${YourIotInstanceId}"
    clientId = "${YourClientId}"
    # 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
    signMethod = "hmacsha1"
    timestamp = current_time_millis()
    # userName组装方法,请参见AMQP客户端接入说明文档。
    # 若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
    username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId + "|"
    signContent = "authId=" + accessKey + "&timestamp=" + timestamp
    # 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
    password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
    
    conn.set_listener('', MyListener(conn))
    conn.connect(username, password, wait=True)
    # 清除历史连接检查任务,新建连接检查任务
    schedule.clear('conn-check')
    schedule.every(1).seconds.do(do_check,conn).tag('conn-check')

class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn

    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

    def on_heartbeat_timeout(self):
        print('on_heartbeat_timeout')

    def on_connected(self, headers):
        print("successfully connected")
        conn.subscribe(destination='/topic/#', id=1, ack='auto')
        print("successfully subscribe")

    def on_disconnected(self):
        print('disconnected')
        connect_and_subscribe(self.conn)

def current_time_millis():
    return str(int(round(time.time() * 1000)))

def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest()).decode("utf-8")

# 检查连接,如果未连接则重新建连
def do_check(conn):
    print('check connection, is_connected: %s', conn.is_connected())
    if (not conn.is_connected()):
        try:
            connect_and_subscribe(conn)
        except Exception as e:
            print('disconnected, ', e)

# 定时任务方法,检查连接状态
def connection_check_timer():
    while 1:
        schedule.run_pending()
        time.sleep(10)

#  接入域名,请参见AMQP客户端接入说明文档。这里直接填入域名,不需要带amqps://前缀
conn = stomp.Connection([('${YourHost}', 61614)])
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)

try:
    connect_and_subscribe(conn)
except Exception as e:
    print('connecting failed')
    raise e
    
# 异步线程运行定时任务,检查连接状态
thread = threading.Thread(target=connection_check_timer)
thread.start()

您需按照如下表格中的参数说明,修改代码中的参数值。更多参数说明,请参见AMQP客户端接入说明

参数 示例 说明
accessKey LTAI4GFGQvKuqHJhFa****** 登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。
说明 如果使用RAM用户,您需授予该RAM用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见 授权RAM用户访问物联网平台
accessSecret iMS8ZhCDdfJbCMeA005sieKe******
consumerGroupId VWhGZ2QnP7kxWpeSSjt****** 消费组ID。

登录物联网平台控制台,在规则引擎 > 服务端订阅 > 消费组列表查看您的消费组ID。

iotInstanceId "" 实例ID。传入空值,即iotInstanceId = ""
clientId 12345 表示客户端ID,建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。长度不可超过64个字符。

登录物联网平台控制台,在规则引擎 > 服务端订阅 > 消费组列表,单击消费组对应的查看消费组详情页将显示该参数,方便您识别区分不同的客户端。

conn stomp.Connection([('iot-***.amqp.iothub.aliyuncs.com', 61614)]) 创建AMQP客户端与物联网平台的TLS连接。

${YourHost}格式为${uid}.iot-amqp.${YourRegionId}.aliyuncs.com,其中:

  • ${uid}:您的阿里云账号ID。可登录物联网平台控制台,将鼠标指针移动到账号头像,查看账号ID
  • ${YourRegionId}:请替换为您的物联网平台设备所在地域代码。地域代码表达方法,请参见地域和可用区
conn.set_ssl for_hosts=[('iot-***.amqp.iothub.aliyuncs.com', 61614)], ssl_version=ssl.PROTOCOL_TLS

运行结果示例

  • 成功:返回类似如下日志信息,表示AMQP客户端已接入物联网平台并成功接收消息。成功
  • 失败:返回类似如下日志信息,表示AMQP客户端连接物联网平台失败。

    您可根据日志提示,检查代码或网络环境,然后修正问题,重新运行代码。

    失败

二进制消息体说明

当您需要传输二进制数据时,由于STOMP协议为文本协议,需要使用base64编码参数,否则消息体可能会被截断。

本示例中,userName需要按以下方法添加encode=base64参数,使服务端将消息体base64编码后再推送。

username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId \ 
                    + ",encode="+base64 +"|"