このトピックでは、SDK for Python 2.7 を使用して Advanced Message Queuing Protocol(AMQP)クライアントを Alibaba Cloud IoT Platform に接続し、サーバー側サブスクリプション機能を使用して IoT Platform からメッセージを受信する方法について説明します。
前提条件
トピックのメッセージをサブスクライブするコンシューマーグループの ID を取得します。
DEFAULT_GROUP という名前のデフォルトのコンシューマーグループを使用するか、IoT Platform コンソールでコンシューマーグループを作成できます。詳細については、「コンシューマーグループを管理する」をご参照ください。
コンシューマーグループを使用して、トピックのメッセージをサブスクライブできます。詳細については、「AMQP サーバー側サブスクリプションを設定する」をご参照ください。
開発環境
この例では、Python 2.7 を使用します。
SDK のダウンロード
Apache Qpid Proton 0.29.0 ライブラリを使用することをお勧めします。このライブラリは Python API をカプセル化しています。ライブラリをダウンロードして手順を表示するには、Qpid Proton 0.29.0 にアクセスしてください。
Qpid Proton をインストールします。詳細については、「Qpid Proton のインストール」をご参照ください。
Qpid Proton をインストールした後、次の Python コマンドを実行して、SSL ライブラリが使用可能かどうかを確認します。
import proton;print('%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')サンプルコード
# encoding=utf-8
import sys
import logging
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
import hashlib
import hmac
import base64
import os
reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)
def current_time_millis():
return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest())
class AmqpClient(MessagingHandler):
def __init__(self):
super(AmqpClient, self).__init__()
def on_start(self, event):
# エンドポイント。「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
url = "amqps://${YourHost}:5671"
# AccessKey ペアをプロジェクトコードにハードコードすると、プロジェクトコードが漏洩した場合に AccessKey ペアが公開される可能性があります。この場合、アカウント内のリソースは安全ではなくなります。次のサンプルコードは、環境変数から AccessKey ペアを取得する方法の例を示しています。この例は参照用です。
accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
consumerGroupId = "${YourConsumerGroupId}"
clientId = "${YourClientId}"
# iotInstanceId: IoT Platform インスタンスの ID。
iotInstanceId = "${YourIotInstanceId}"
# 署名アルゴリズム。有効な値: hmacmd5、hmacsha1、hmacsha256。
signMethod = "hmacsha1"
timestamp = current_time_millis()
# userName パラメータの構造。「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# 署名を計算します。パスワードの構成方法の詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60)
self.receiver = event.container.create_receiver(conn)
# 接続が確立されると、次の関数が呼び出されます。
def on_connection_opened(self, event):
logger.info("接続が確立されました。remoteUrl: %s", event.connection.hostname)
# 接続が閉じられると、次の関数が呼び出されます。
def on_connection_closed(self, event):
logger.info("接続が閉じられました: %s", self)
# リモートサーバーがエラーのために接続を閉じると、次の関数が呼び出されます。
def on_connection_error(self, event):
logger.info("接続エラー")
# 認証エラーやソケットエラーなどの AMQP 接続エラーが発生すると、次の関数が呼び出されます。
def on_transport_error(self, event):
if event.transport.condition:
if event.transport.condition.info:
logger.error("%s: %s: %s" % (
event.transport.condition.name, event.transport.condition.description,
event.transport.condition.info))
else:
logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
else:
logging.error("Unspecified transport error")
# メッセージを受信すると、次の関数が呼び出されます。
def on_message(self, event):
message = event.message
content = message.body.decode('utf-8')
topic = message.properties.get("topic")
message_id = message.properties.get("messageId")
print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content))
event.receiver.flow(1)
Container(AmqpClient()).run()次の表のパラメータの説明に基づいて、上記のコードのパラメータを設定できます。その他のパラメータの詳細については、「AMQP クライアントを IoT Platform に接続する」をご参照ください。
有効なパラメータ値を指定してください。そうでない場合、AMQP クライアントは IoT Platform に接続できません。
パラメータ | 説明 |
url | AMQP クライアントが IoT Platform に接続するために使用するエンドポイント。形式:
|
accessKey | IoT Platform コンソールにログインし、右上隅のプロファイル画像にポインタを移動して、[accesskey 管理] をクリックして、AccessKey ID と AccessKey シークレットを取得します。 説明 Resource Access Management(RAM)ユーザーを使用する場合は、AliyunIOTFullAccess ポリシーを RAM ユーザーにアタッチする必要があります。このポリシーにより、RAM ユーザーは IoT Platform リソースを管理できます。そうでない場合、IoT Platform への接続は失敗します。詳細については、「RAM ユーザーとして IoT Platform にアクセスする」をご参照ください。 |
accessSecret | |
consumerGroupId | IoT Platform インスタンスのコンシューマーグループの ID。 コンシューマーグループの ID を表示するには、次の手順を実行します。IoT Platform コンソールにログインし、管理するインスタンスのカードをクリックします。左側のナビゲーションウィンドウで、 を選択します。コンシューマーグループの ID は、[コンシューマーグループ] タブに表示されます。 |
iotInstanceId | IoT Platform インスタンスの ID。IoT Platform コンソール の [概要] タブでインスタンス ID を表示できます。IoT Platform コンソール
|
clientId | クライアントの ID。カスタム ID を指定する必要があります。ID は 1 ~ 64 文字の長さでなければなりません。クライアントを実行するサーバーの UUID、MAC アドレス、IP アドレスなど、一意の識別子をクライアント ID として使用することをお勧めします。 AMQP クライアントが IoT Platform に接続して起動した後、次の手順を実行してクライアントの詳細を表示します。IoT Platform コンソールにログインし、管理する インスタンス のカードをクリックします。左側のナビゲーションウィンドウで、 を選択します。[コンシューマーグループ] タブで、管理するコンシューマーグループを見つけて、[アクション] 列の [表示] をクリックします。各クライアントの ID は、[コンシューマーグループステータス] タブに表示されます。クライアント ID を使用すると、クライアントを簡単に識別できます。 |
サンプル結果
次のような出力が表示された場合、AMQP クライアントは IoT Platform に接続されており、メッセージを受信できます。

パラメータ
例
説明
message_id
2**************7
メッセージの ID。
topic
/***********/******/thing/event/property/post
デバイスプロパティを送信するために使用されるトピック。
content
{"deviceType":"CustomCategory","iotId":"qPi***","requestId":"161***","checkFailedData":{},"productKey":"g4***","gmtCreate":1613635594038,"deviceName":"de***","items":{"Temperature":{"value":24,"time":1613635594036},"Humidity":{"value":26,"time":1613635594036}}}
メッセージの内容。
次のような出力が表示された場合、AMQP クライアントは IoT Platform に接続できません。
ログに基づいてコードまたはネットワーク環境を確認し、問題を解決してから、コードを再度実行できます。

関連情報
サーバー側サブスクリプション機能に関連するエラーコードの詳細については、「IoT プラットフォームログ」トピックの メッセージに関連するエラーコード セクションをご参照ください。