This article describes how to use the SDK for Python 3 to connect a client to IoT Platform and receive messages from a server-side subscription.

Development environment

Python 3.0 or later is used in this example.

Download the SDK for Python

The stomp.py and schedule libraries are used in this example. To download the libraries and view the instructions, visit stomp.py and schedule.

Install the stomp.py and schedule libraries. For more information, see Installing Packages.

Sample code

For more information about the parameters in the following sample code, see Connect an AMQP client to IoT Platform.

# encoding=utf-8

import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading

def connect_and_subscribe(conn):
    accessKey = "${YourAccessKeyId}"
    accessSecret = "${YourAccessKeySecret}"
    consumerGroupId = "${YourConsumerGroupId}"
    # iotInstanceId:  If you use a purchased instance, you must specify the instance ID. If you use a public instance, you can enter an empty string ("").
    iotInstanceId = "${YourIotInstanceId}"
    clientId = "${YourClientId}"
    # The signature algorithm. Valid values: hmacmd5, hmacsha1, and hmacsha256.
    signMethod = "hmacsha1"
    timestamp = current_time_millis()
    # The structure of the userName parameter. For more information, see Connect an AMQP client to IoT Platform.
    # If you need to transmit messages in the binary format, you must specify encode=base64 in the userName parameter. Before IoT Platform sends these messages, it encodes these messages by using the Base64 algorithm. For more information, see the "Messages in the binary format" section.
    username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId + "|"
    signContent = "authId=" + accessKey + "&timestamp=" + timestamp
    # The structure of the signature and password parameters. For more information, see Connect an AMQP client to IoT Platform.
    password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
    
    conn.set_listener('', MyListener(conn))
    conn.connect(username, password, wait=True)
    # Clears historical tasks that are used to check connections and creates tasks to check connections.
    schedule.clear('conn-check')
    schedule.every(1).seconds.do(do_check,conn).tag('conn-check')

class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn
    def on_error(self, headers, message):
        print('received an error "%s"' % message)
    def on_message(self, headers, message):
        print('received a message "%s"' % message)
    def on_disconnected(self):
        print('disconnected')
        connect_and_subscribe(self.conn)
    def on_heartbeat_timeout(self):
        print('on_heartbeat_timeout')
    def on_connected(self, headers, body):
        print("successfully connected")
        conn.subscribe(destination='/topic/#', id=1, ack='auto')
        print("successfully subscribe")

def current_time_millis():
    return str(int(round(time.time() * 1000)))

def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest()).decode("utf-8")

# Checks the connection. If the connection fails, this method re-establishes a connection.
def do_check(conn):
    print('check connection, is_connected: %s', conn.is_connected())
    if (not conn.is_connected()):
        try:
            connect_and_subscribe(conn)
        except Exception as e:
            print('disconnected, ', e)

# The method that is used to schedule tasks. Check the connection status.
def connection_check_timer():
    while 1:
        schedule.run_pending()
        time.sleep(10)

# The endpoint. For more information, see Connect an AMQP client to IoT Platform. Do not prefix the endpoint with amqps://.
conn = stomp.Connection([('${YourHost}', 61614)])
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)

try:
    connect_and_subscribe(conn)
except Exception as e:
    print('connecting failed')
    raise e
    
# If a scheduled task runs in an asynchronous thread, check the connection status of the scheduled task.
thread = threading.Thread(target=connection_check_timer)
thread.start()

Messages in the binary format

If you need to transmit messages in the binary format, you must use the Base64 algorithm to encode these messages because STOMP is a text-based protocol. Otherwise, messages may be truncated.

The following code shows how to specify encode=base64 in the userName parameter. This setting enables IoT Platform to sends messages after it encodes these messages by using the Base64 algorithm.

username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId \ 
                    + ",encode="+base64 +"|"