All Products
Search
Document Center

IoT Platform:Connect a client to IoT Platform by using the SDK for Python 3

Last Updated:Oct 22, 2023

This topic describes how to use the SDK for Python 3 to connect an Advanced Message Queuing Protocol (AMQP) client to Alibaba Cloud IoT Platform and receive messages from IoT Platform by using the server-side subscription feature.

Prerequisites

The ID of the consumer group that subscribes to the messages of a topic is obtained.

Development environment

You can use Python 3.0 or later. In this example, Python 3.8 is used.

Download the SDK

In this example, the stomp.py and schedule libraries are used. To download the libraries and view the instructions, visit stomp.py and schedule.

Install the stomp.py and schedule libraries. For more information, see Installing Packages.

Sample code

This topic provides the sample code in which stomp.py 7.0.0 is used.

# encoding=utf-8

import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading
import os

def connect_and_subscribe(conn):
    # If you hard-code the AccessKey pair in the project code, the AccessKey pair may be disclosed if the project code is leaked. In this case, the resources within your account become insecure. The following sample code provides an example on how to obtain the AccessKey pair from environment variables. This example is for reference only.
    accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
    accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    consumerGroupId = "${YourConsumerGroupId}"
    # iotInstanceId: The ID of the IoT Platform instance. 
    iotInstanceId = "${YourIotInstanceId}"
    clientId = "${YourClientId}"
    # The signature algorithm. Valid values: hmacmd5, hmacsha1, and hmacsha256. 
    signMethod = "hmacsha1"
    timestamp = current_time_millis()
    # The structure of the userName parameter. For more information, see the "Connect an AMQP client to IoT Platform" topic. 
    # If you need to transmit messages in the binary format, specify encode=base64 in the userName parameter. Before IoT Platform sends these messages, IoT Platform encodes these messages by using the Base64 algorithm. For more information, see the "Messages in the binary format" section of this topic. 
    username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId + "|"
    signContent = "authId=" + accessKey + "&timestamp=" + timestamp
    # Calculate a signature. For more information about how to construct the password, see the "Connect an AMQP client to IoT Platform" topic. 
    password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
    
    conn.set_listener('', MyListener(conn))
    conn.connect(username, password, wait=True)
    # Clear historical tasks that are used to check connections and create tasks to check connections.
    schedule.clear('conn-check')
    schedule.every(1).seconds.do(do_check,conn).tag('conn-check')

class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn

    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

    def on_heartbeat_timeout(self):
        print('on_heartbeat_timeout')

    def on_connected(self, headers):
        print("successfully connected")
        conn.subscribe(destination='/topic/#', id=1, ack='auto')
        print("successfully subscribe")

    def on_disconnected(self):
        print('disconnected')
        connect_and_subscribe(self.conn)

def current_time_millis():
    return str(int(round(time.time() * 1000)))

def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest()).decode("utf-8")

# Check the connection. If the connection fails to be established, this method re-establishes a connection.
def do_check(conn):
    print('check connection, is_connected: %s', conn.is_connected())
    if (not conn.is_connected()):
        try:
            connect_and_subscribe(conn)
        except Exception as e:
            print('disconnected, ', e)

# Check the connection status as scheduled.
def connection_check_timer():
    while 1:
        schedule.run_pending()
        time.sleep(10)

// The endpoint. For more information, see the "Connect an AMQP client to IoT Platform" topic. Do not prefix the endpoint with amqps://.
conn = stomp.Connection([('${YourHost}', 61614)], heartbeats=(0,300))
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)

try:
    connect_and_subscribe(conn)
except Exception as e:
    print('connecting failed')
    raise e
    
# Check the connection status as scheduled in an asynchronous thread.
thread = threading.Thread(target=connection_check_timer)
thread.start()

You can configure the parameters in the preceding code based on the parameter description in the following table. For more information about other parameters, see Connect an AMQP client to IoT Platform.

Important

Make sure that you specify valid parameter values. Otherwise, the AMQP client fails to connect to IoT Platform.

Parameter

Description

accessKey

Log on to the IoT Platform console, move the pointer over the profile picture in the upper-right corner, and then click AccessKey Management to obtain the AccessKey ID and AccessKey secret.

Note: If you use a Resource Access Management (RAM) user, you must attach the AliyunIOTFullAccess policy to the RAM user. This policy allows the RAM user to manage IoT Platform resources. Otherwise, the connection to IoT Platform fails. For more information, see Access IoT Platform as a RAM user.

accessSecret

consumerGroupId

The ID of the consumer group of the IoT Platform instance.

To view the ID of the consumer group, perform the following steps: Log on to the IoT Platform console and click the card of the instance that you want to manage. In the left-side navigation pane, choose Message Forwarding > Server-side Subscription. The ID of the consumer group is displayed on the Consumer Groups tab.

iotInstanceId

The ID of the IoT Platform instance. You can view the instance ID on the Overview tab in the IoT Platform console.

  • If the instance ID is displayed, you must set this parameter to the instance ID.

  • If the Overview tab is not displayed or your instance does not have an ID, leave this parameter empty in the format of iotInstanceId = "".

clientId

The ID of the client. You must specify a custom ID. The ID must be 1 to 64 characters in length. We recommend that you use a unique identifier as the client ID, such as the UUID, MAC address, or IP address of the server on which the client runs.

After the AMQP client is connected to IoT Platform and started, perform the following steps to view the details of the client: Log on to the IoT Platform console and click the card of the instance that you want to manage. In the left-side navigation pane, choose Message Forwarding > Server-side Subscription. On the Consumer Groups tab, find the consumer group that you want to manage and click View in the Actions column. The ID of each client is displayed on the Consumer Group Status tab. You can use client IDs to identify clients with ease.

conn

The Transport Layer Security (TLS) connection that is established between the AMQP client and IoT Platform.

For more information about the endpoint that you can specify for the ${YourHost} variable, see Manage the endpoint of an instance.

conn.set_ssl

Sample results

  • If information similar to the following output is displayed, the AMQP client is connected to IoT Platform and can receive messages.成功

  • If information similar to the following output is displayed, the AMQP client fails to connect to IoT Platform.

    You can check the code or network environment based on logs, resolve the issue, and then run the code again.

    失败

Messages in the binary format

If you want to transmit messages in the binary format, use the Base64 algorithm to encode the messages. If you do not use the Base64 algorithm to encode the messages, the messages may be truncated because STOMP is a text-based protocol.

The following code shows how to specify encode=base64 in the userName parameter. This setting enables IoT Platform to encode messages by using the Base64 algorithm before IoT Platform send the messages.

username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId \ 
                    + ",encode=base64"+"|"

References

For more information about the error codes that are related to the server-side subscription feature, see the Error codes that are related to messages section of the "IoT Platform logs" topic.