すべてのプロダクト
Search
ドキュメントセンター

IoT Platform:SDK for Python 3 を使用してクライアントを IoT Platform に接続する

最終更新日:Mar 22, 2025

このトピックでは、SDK for Python 3 を使用して Advanced Message Queuing Protocol(AMQP)クライアントを Alibaba Cloud IoT Platform に接続し、サーバー側サブスクリプション機能を使用して IoT Platform からメッセージを受信する方法について説明します。

前提条件

トピックのメッセージをサブスクライブするコンシューマーグループの ID を取得します。

開発環境

Python 3.0 以降を使用できます。この例では、Python 3.8 を使用します。

SDK のダウンロード

この例では、stomp.py ライブラリと schedule ライブラリを使用します。ライブラリをダウンロードして手順を表示するには、stomp.pyschedule にアクセスしてください。

stomp.py ライブラリと schedule ライブラリをインストールします。詳細については、「パッケージのインストール」をご参照ください。

サンプルコード

このトピックでは、stomp.py 7.0.0 を使用したサンプルコードを提供します。

# encoding=utf-8

import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading
import os

def connect_and_subscribe(conn):
    # AccessKey ペアをプロジェクトコードにハードコードすると、プロジェクトコードが漏洩した場合に AccessKey ペアが公開される可能性があります。この場合、アカウント内のリソースは安全ではなくなります。次のサンプルコードは、環境変数から AccessKey ペアを取得する方法の例を示しています。この例は参照用です。
    accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
    accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    consumerGroupId = "${YourConsumerGroupId}"
    # iotInstanceId: IoT Platform インスタンスの ID。
    iotInstanceId = "${YourIotInstanceId}"
    clientId = "${YourClientId}"
    # 署名アルゴリズム。有効な値: hmacmd5、hmacsha1、hmacsha256。
    signMethod = "hmacsha1"
    timestamp = current_time_millis()
    # userName パラメーターの構造。「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
    # バイナリ形式でメッセージを送信する必要がある場合は、userName パラメーターに encode=base64 を指定します。IoT Platform は、これらのメッセージを送信する前に、Base64 アルゴリズムを使用してこれらのメッセージをエンコードします。詳細については、このトピックの「バイナリ形式のメッセージ」セクションを参照してください。
    username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId + "|"
    signContent = "authId=" + accessKey + "&timestamp=" + timestamp
    # 署名を計算します。パスワードの構成方法の詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
    password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
    
    conn.set_listener('', MyListener(conn))
    conn.connect(username, password, wait=True)
    # 接続を確認するために使用される履歴タスクをクリアし、接続を確認するためのタスクを作成します。
    schedule.clear('conn-check')
    schedule.every(1).seconds.do(do_check,conn).tag('conn-check')

class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn

    def on_error(self, frame):
        print('エラーを受信しました "%s"' % frame.body)

    def on_message(self, frame):
        print('メッセージを受信しました "%s"' % frame.body)

    def on_heartbeat_timeout(self):
        print('ハートビートタイムアウト')

    def on_connected(self, headers):
        print("接続に成功しました")
        conn.subscribe(destination='/topic/#', id=1, ack='auto')
        print("サブスクライブに成功しました")

    def on_disconnected(self):
        print('切断されました')
        connect_and_subscribe(self.conn)

def current_time_millis():
    return str(int(round(time.time() * 1000)))

def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest()).decode("utf-8")

# 接続を確認します。接続の確立に失敗した場合、このメソッドは接続を再確立します。
def do_check(conn):
    print('接続を確認中、is_connected: %s', conn.is_connected())
    if (not conn.is_connected()):
        try:
            connect_and_subscribe(conn)
        except Exception as e:
            print('切断されました、', e)

# スケジュールに従って接続ステータスを確認します。
def connection_check_timer():
    while 1:
        schedule.run_pending()
        time.sleep(10)

// エンドポイント。詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。エンドポイントの先頭に amqps:// を付けないでください。
conn = stomp.Connection([('${YourHost}', 61614)], heartbeats=(0,300))
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)

try:
    connect_and_subscribe(conn)
except Exception as e:
    print('接続に失敗しました')
    raise e
    
# 非同期スレッドでスケジュールに従って接続ステータスを確認します。
thread = threading.Thread(target=connection_check_timer)
thread.start()

次の表のパラメーターの説明に基づいて、上記のコードのパラメーターを設定できます。その他のパラメーターの詳細については、「AMQP クライアントを IoT Platform に接続する」をご参照ください。

重要

有効なパラメーター値を指定していることを確認してください。そうしないと、AMQP クライアントは IoT Platform に接続できません。

パラメーター

説明

accessKey

IoT Platform コンソールにログインし、右上隅のプロフィール画像にポインターを移動して、[accesskey 管理] をクリックして、AccessKey ID と AccessKey シークレットを取得します。

: Resource Access Management(RAM)ユーザーを使用する場合は、AliyunIOTFullAccess ポリシーを RAM ユーザーにアタッチする必要があります。このポリシーにより、RAM ユーザーは IoT Platform リソースを管理できます。そうしないと、IoT Platform への接続は失敗します。詳細については、「RAM ユーザーとして IoT Platform にアクセスする」をご参照ください。

accessSecret

consumerGroupId

IoT Platform インスタンスのコンシューマーグループの ID。

コンシューマーグループの ID を表示するには、次の手順を実行します。IoT Platform コンソールにログインし、管理するインスタンスのカードをクリックします。左側のナビゲーションウィンドウで、[メッセージ転送] > [サーバー側サブスクリプション] を選択します。コンシューマーグループの ID は、[コンシューマーグループ] タブに表示されます。

iotInstanceId

IoT Platform インスタンスの ID。IoT Platform コンソール の [概要] タブでインスタンス ID を表示できます。IoT Platform コンソール

  • インスタンス ID が表示されている場合は、このパラメーターをインスタンス ID に設定する必要があります。

  • [概要] タブが表示されていない場合、または インスタンスに ID がない場合は、iotInstanceId = "" の形式でこのパラメーターを空のままにします。

clientId

クライアントの ID。カスタム ID を指定する必要があります。ID は 1 ~ 64 文字の長さでなければなりません。クライアントが実行されているサーバーの UUID、MAC アドレス、IP アドレスなど、一意の識別子をクライアント ID として使用することをお勧めします。

AMQP クライアントが IoT Platform に接続して起動した後、次の手順を実行してクライアントの詳細を表示します。IoT Platform コンソールにログインし、管理する インスタンス のカードをクリックします。左側のナビゲーションウィンドウで、[メッセージ転送] > [サーバー側サブスクリプション] を選択します。[コンシューマーグループ] タブで、管理するコンシューマーグループを見つけ、[アクション] 列の [表示] をクリックします。各クライアントの ID は、[コンシューマーグループステータス] タブに表示されます。クライアント ID を使用して、クライアントを簡単に識別できます。

conn

AMQP クライアントと IoT Platform の間に確立される Transport Layer Security(TLS)接続。

${YourHost} 変数に指定できるエンドポイントの詳細については、「インスタンスのエンドポイントを管理する」をご参照ください。

conn.set_ssl

サンプル結果

  • 次の出力と同様の情報が表示された場合、AMQP クライアントは IoT Platform に接続されており、メッセージを受信できます。成功

  • 次の出力と同様の情報が表示された場合、AMQP クライアントは IoT Platform に接続できません。

    ログに基づいてコードまたはネットワーク環境を確認し、問題を解決してから、コードを再実行できます。

    失败

バイナリ形式のメッセージ

バイナリ形式でメッセージを送信する場合は、Base64 アルゴリズムを使用してメッセージをエンコードします。Base64 アルゴリズムを使用してメッセージをエンコードしないと、STOMP はテキストベースのプロトコルであるため、メッセージが切り捨てられる可能性があります。

次のコードは、userName パラメーターに encode=base64 を指定する方法を示しています。この設定により、IoT Platform はメッセージを送信する前に Base64 アルゴリズムを使用してメッセージをエンコードできます。

username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId \ 
                    + ",encode=base64"+"|"

関連情報

サーバー側サブスクリプション機能に関連するエラーコードの詳細については、「IoT プラットフォーム ログ」トピックの メッセージに関連するエラーコード セクションをご参照ください。