This topic describes how to use the SDK for Python 3 to connect a client to IoT Platform and receive messages from a server-side subscription.

Prerequisites

The ID of the consumer group that has subscribed to the messages of a topic is obtained.

Prepare the development environment

You can use Python 3.0 or later. In this example, Python 3.8 is used.

Download the SDK

In this example, the stomp.py and schedule libraries are used. To download the libraries and view the instructions, see stomp.py and schedule.

Install the stomp.py and schedule libraries. For more information, see Installing Packages.

Sample code

This topic provides the sample code in which stomp.py 7.0.0 is used.

# encoding=utf-8

import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading

def connect_and_subscribe(conn):
    accessKey = "${YourAccessKeyId}"
    accessSecret = "${YourAccessKeySecret}"
    consumerGroupId = "${YourConsumerGroupId}"
    IotInstanceId: the ID of the instance. 
    iotInstanceId = "${YourIotInstanceId}"
    clientId = "${YourClientId}"
    # The signature algorithm. Valid values: hmacmd5, hmacsha1, and hmacsha256. 
    signMethod = "hmacsha1"
    timestamp = current_time_millis()
    # The structure of the userName parameter. For more information, see Connect an AMQP client to IoT Platform. 
    # If you need to transmit messages in the binary format, specify encode=base64 in the userName parameter. Before IoT Platform sends these messages, IoT Platform encodes these messages by using the Base64 algorithm. For more information, see the "Messages in the binary format" section of this topic. 
    username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId + "|"
    signContent = "authId=" + accessKey + "&timestamp=" + timestamp
    # The structure of the signature and password parameters. For more information, see Connect an AMQP client to IoT Platform. 
    password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
    
    conn.set_listener('', MyListener(conn))
    conn.connect(username, password, wait=True)
    # Clear historical tasks that are used to check connections and creates tasks to check connections.
    schedule.clear('conn-check')
    schedule.every(1).seconds.do(do_check,conn).tag('conn-check')

class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn

    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

    def on_heartbeat_timeout(self):
        print('on_heartbeat_timeout')

    def on_connected(self, headers):
        print("successfully connected")
        conn.subscribe(destination='/topic/#', id=1, ack='auto')
        print("successfully subscribe")

    def on_disconnected(self):
        print('disconnected')
        connect_and_subscribe(self.conn)

def current_time_millis():
    return str(int(round(time.time() * 1000)))

def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest()).decode("utf-8")

# Check the connection. If the connection fails to be established, this method re-establishes a connection.
def do_check(conn):
    print('check connection, is_connected: %s', conn.is_connected())
    if (not conn.is_connected()):
        try:
            connect_and_subscribe(conn)
        except Exception as e:
            print('disconnected, ', e)

# The method that is used to schedule tasks. Check the connection status.
def connection_check_timer():
    while 1:
        schedule.run_pending()
        time.sleep(10)

# The endpoint. For more information, see Connect an AMQP client to IoT Platform. Do not prefix the endpoint with amqps://.
conn = stomp.Connection([('${YourHost}', 61614)], heartbeats=(0,300))
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)

try:
    connect_and_subscribe(conn)
except Exception as e:
    print('connecting failed')
    raise e
    
# If a scheduled task runs in an asynchronous thread, check the connection status of the scheduled task.
thread = threading.Thread(target=connection_check_timer)
thread.start()

You can configure the parameters in the preceding code based on the parameter descriptions in the following table. For more information about the parameters, see Connect an AMQP client to IoT Platform.

Parameter Example Description
accessKey LTAI4GFGQvKuqHJhFa****** Log on to the IoT Platform console, move the pointer over the profile picture, and then click AccessKey Management to obtain the AccessKey ID and AccessKey secret.
Note If you use a RAM user, you must attach the AliyunIOTFullAccess permission policy to the RAM user. This policy allows the RAM user to manage IoT Platform resources. Otherwise, the connection with IoT Platform fails. For more information about how to authorize a RAM user, see RAM user access.
accessSecret iMS8ZhCDdfJbCMeA005sieKe******
consumerGroupId VWhGZ2QnP7kxWpeSSjt****** The ID of the consumer group.

To view the ID of the consumer group, perform the following steps: Log on to the IoT Platform console and click the card of the instance that you want to manange. Choose Rules Engine > Server-side Subscription > Consumer Groups. The ID is displayed on the Consumer Groups tab.

iotInstanceId "" The ID of the instance. You can view the ID of the instance on the Overview page in the IoT Platform console.
  • If you have an ID value, you must specify the ID for this parameter.
  • If no Overview or ID is generated for your instance, specify an empty string (iotInstanceId = "") for the parameter.
clientId 12345 The ID of the client. We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address of the client. The client ID must be 1 to 64 characters in length.

Log on to the IoT Platform console and click the card of the instance that you want to manage. Choose Rules Engine > Server-side Subscription > Consumer Groups. Find the consumer group that you want to manage and click View in the Actions column. The ID of each client is displayed on the Consumer Group Details page. You can use client IDs to efficiently identify clients.

conn stomp.Connection([('198426864******.iot-amqp.cn-shanghai.aliyuncs.com', 61614)]) The Transport Layer Security (TLS) connection that is established between the AMQP client and IoT Platform.

For more information about the endpoints that you can specify for the ${YourHost} variable, see View the endpoint of an instance.

conn.set_ssl for_hosts=[('198426864******.iot-amqp.cn-shanghai.aliyuncs.com', 61614)], ssl_version=ssl.PROTOCOL_TLS

Sample results

  • Sample success result: If the output that is similar to the following log information appears, the AMQP client is connected to IoT Platform and can receive messages.Success
  • Sample failure result: If the output that is similar to the following information appears, the AMQP client fails to connect to IoT Platform.

    You can check the code or network environment based on logs, solve the problem, and then run the code again.

    Failed

Messages in the binary format

If you want to transmit messages in the binary format, use the Base64 algorithm to encode the messages. If you do not use the Base64 algorithm to encode the messages, the messages may be truncated because STOMP is a text-based protocol.

The following code shows how to specify encode=base64 in the userName parameter. This setting enables IoT Platform to encode messages by using the Base64 algorithm before IoT Platform send the messages.

username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId \ 
                    + ",encode=base64"+"|"

References

For more information about the error codes that are related to server-side subscription, see Error codes that are related to messages.