This article describes how to use the SDK for Python 2.7 to connect a client to IoT Platform and receive messages from IoT Platform.

Development environment

Python 2.7 is used in this example.

Download the SDK for Python

We recommend that you use the Apache Qpid Proton 0.29.0 library. Python APIs are included in the library. To download the library and view the instructions, see Qpid Proton 0.29.0.

Install Proton. For more information about how to install Proton, see Installing Qpid Proton.

After you complete the installation, run the following Python command to check whether the SSL library is available:

import proton;print('%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')

Sample code

For more information about the parameters in the following sample code, see Connect an AMQP client to IoT Platform.

# encoding=utf-8
import sys
import logging
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
import hashlib
import hmac
import base64

reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)


def current_time_millis():
    return str(int(round(time.time() * 1000)))


def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest())


class AmqpClient(MessagingHandler):
    def __init__(self):
        super(AmqpClient, self).__init__()

    def on_start(self, event):
        # The endpoint. For more information, see Connect an AMQP client to IoT Platform.
        url = "amqps://${YourHost}:5671"
        accessKey = "${YourAccessKeyID}"
        accessSecret = "${YourAccessKeySecret}"
        consumerGroupId = "${YourConsumerGroupId}"
        clientId = "${YourClientId}"
        # iotInstanceId: If you are using a purchased instance, you must specify the instance ID. If you are using a public instance, you can enter an empty string "".
        iotInstanceId = "${YourIotInstanceId}"
        # The signature algorithm. Valid values: hmacmd5, hmacsha1, and hmacsha256.
        signMethod = "hmacsha1"
        timestamp = current_time_millis()
        # The structure of the userName parameter. For more information, see Connect an AMQP client to IoT Platform.
        userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                        + ",timestamp=" + timestamp + ",authId=" + accessKey \
                        + ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"
        signContent = "authId=" + accessKey + "&timestamp=" + timestamp
        # The structure of the signature and password parameters. For more information, see Connect an AMQP client to IoT Platform.
        passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
        conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60)
        self.receiver = event.container.create_receiver(conn)

    # When the connection is established, the following code is run:
    def on_connection_opened(self, event):
        logger.info("Connection established, remoteUrl: %s", event.connection.hostname)

    # When the connection is ended, the following code is run:
    def on_connection_closed(self, event):
        logger.info("Connection closed: %s", self)

    # When the remote server ends the connection due to an error, the following code is run:
    def on_connection_error(self, event):
        logger.info("Connection error")

    # When an AMQP connection error occurs, such as an authentication error or a socket error, the following code is run:
    def on_transport_error(self, event):
        if event.transport.condition:
            if event.transport.condition.info:
                logger.error("%s: %s: %s" % (
                    event.transport.condition.name, event.transport.condition.description,
                    event.transport.condition.info))
            else:
                logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
        else:
            logging.error("Unspecified transport error")

    # When a message is received, the following code is run:
    def on_message(self, event):
        message = event.message
        content = message.body.decode('utf-8')
        topic = message.properties.get("topic")
        message_id = message.properties.get("messageId")
        print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content))
        event.receiver.flow(1)


Container(AmqpClient()).run()