このトピックでは、Link SDK for CのAPI操作を呼び出して、MQTT (Message Queuing Telemetry Transport) を介してデバイスをIoT Platformに接続し、メッセージングを有効にする方法について説明します。 この例では、サンプルコードファイルが使用されます。
背景情報
MQTT接続の詳細については、「概要」をご参照ください。
IoT Platform Link SDK for Cを使用して非クラウドゲートウェイデバイスに接続する場合、という名前のサンプルコードファイル。/mqtt_basic_demo.cが使用されます。 IoT Platform Link SDK for Cを使用してクラウドゲートウェイデバイスを接続する場合、という名前のサンプルコードファイル。/mqtt_userdefine_demo.cが使用されます。
ステップ1: クライアントの初期化
ヘッダーファイルの追加
を含む# 「aiot_state_api.h」を含める # 「aiot_sysdep_api.h」を含む # 「aiot_mqtt_api.h」基になる依存関係を追加し、ログ出力機能を設定します。
aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile); aiot_state_set_logcb(demo_state_logcb);aiot_mqtt_init操作を呼び出してMQTTクライアントインスタンスを作成し、デフォルトパラメーターを初期化します。
mqtt_handle = aiot_mqtt_init(); if (mqtt_handle == NULL) { printf("aiot_mqtt_init failed\n"); return -1; }
ステップ2: 機能の設定
aiot_mqtt_setopt操作を呼び出して、次の項目を設定します。
操作のパラメーターの詳細については、「aiot_mqtt_option_t」をご参照ください。
接続パラメータを設定します。
非クラウドゲートウェイデバイス
サンプルコード:
/* TODO: 次のパラメーターの値をデバイス検証情報に置き換えます。 */ char * product_key = "a18wP ******"; char * device_name = "LightSwitch"; char * device_secret = "uwMTmVAMnGGHaAkqmeDY6cHxxB ******"; /* TODO: 次のパラメーターの値をデバイスのエンドポイントに置き換えます。 */ char * mqtt_host = "iot-06z00ax1o ****** .mqtt.iothub.aliyuncs.com"; ... ... /* 以下のパラメーターはオプションです。 パラメーターにはLink SDK for Cのデフォルト値を使用できます。 */ /* MQTTブローカーのエンドポイントを指定します。 */ aiot_mqtt_setopt(mqtt_handle、AIOT_MQTTOPT_HOST、(void *)url); /* MQTTブローカーのポートを指定します。 */ aiot_mqtt_setopt(mqtt_handle、AIOT_MQTTOPT_PORT、(void *)&port); /* デバイスが属するプロダクトのProductKeyを指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key); /* デバイスのDeviceNameを指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name); /* デバイスのDeviceSecretを指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret); /* 接続のセキュリティ資格情報を指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);パラメーター:
パラメーター
説明
mqtt_host
デバイスを接続するエンドポイント。
以前のバージョンのパブリックインスタンスの
エンドポイント: エンドポイントは ${YourProductKey}.iot-as-mqtt.${YourRegionId} 形式です。
新バージョンと旧バージョンのパブリックインスタンス、Enterprise Editionインスタンス、およびエンドポイントの詳細については、「インスタンスのエンドポイントの表示」をご参照ください。
product_key
デバイス証明書情報。 詳細については、「デバイス検証情報の取得」をご参照ください。
この例では、デバイス固有証明書検証方法が使用されます。
device_name
device_secret
MQTTキープアライブメカニズムの説明:
重要キープアライブ期間中、デバイスはping要求を含む少なくとも1つのメッセージを送信する必要があります。
タイマーは、IoT PlatformがCONNECTメッセージへの応答としてCONNACKメッセージを送信するときに開始されます。 IoT PlatformがPUBLISH、SUBSCRIBE、PING、またはPUBACKメッセージを受信すると、タイマーはリセットされます。 IoT Platformは、30秒ごとにMQTT接続のハートビートをチェックします。 ハートビートチェックの待機時間は、デバイスがIoT Platformに接続してから次のハートビートチェックが実行されるまでの期間です。 最大タイムアウト期間は、次の式を使用して計算されます。
心拍間隔 × 1.5 + 心拍チェックの待ち時間。 サーバが最大タイムアウト期間内にデバイスからメッセージを受信しない場合、サーバはデバイスから切断する。
Link SDK for Cは、キープアライブメカニズムを提供します。 次の表に、デバイス接続のカスタムハートビート関連の値を指定するために設定できるパラメーターを示します。 それ以外の場合は、デフォルト値が使用されます。
パラメーター
デフォルト値
説明
AIOT_MQTTOPT_HEARTBEAT_MAX_LOST
2
失われる可能性のある心拍の最大数。 制限を超えると、システムは新しい接続を確立します。
AIOT_MQTTOPT_HEARTBEAT_INTERVAL_MS
25,000
システムによって確立される切断と新しい接続の間隔。 有効な値: 1000〜1200000。 単位:ミリ秒。
AIOT_MQTTOPT_KEEPALIVE_SEC
1,200
キープアライブ期間。 ハートビートが失われた後、システムは指定された期間内に新しい接続を確立できます。 有効値: 30〜1200。 単位は秒です。 300より大きい値を指定することを推奨します。
クラウドゲートウェイ
サンプルコード:
/* TODO: mqtt_host、port、およびuser_ca_certパラメーターの値をデバイスに関する情報に置き換えます。 * / char * username = "LightSwitch"; char * password = "*******"; char * client_id = "client_********"; char * mqtt_host = "iot-****** .igw.iothub.aliyuncs.com"; char * product_key = "*******"; uint16_t port = 1883; const char * user_ca_cert = \ { "----- 認証を開始 -----\r\n" \ "MIIC4jCCAco ************************************* MQswCQYDVQQGEwJD\r\n" \ 「TjETMBEGA1U ************************************* IENBMB4XDTIyMTIw\r\n」 \ 「MjE0NDk1Mlo ************************************* Q04xEzARBgNVBAoM\r\n」 \ 「CkFsaXl1biB ************************************* KoZIhvcNAQEBBQAD\r\n」 \ "ggEPADCCAQo ************************************* 2VaEUrnXNoO40w71\r\n" \ "i3l4Alchs1M ************************************* VxRGEtybsIH8CYO\r\n" \ "kyzGgOKbx7M ************************************* 49l3opCIfg9LOwjF\r\n" \ 「R6x ZY6yGdv ************************************* CDxW2mILl VwYd9s\r\n」 \ 「2udrJ7riJ5i *************************************/P9s 2UaBX89TTUd\r\n」 \ 「lYWKe3tHRg + ************************************* BgkqhkiG9w0BAQsF\r\n」 \ 「AAOCAQEAhr8 ************************************* odRVrUaVBBLguSAH\r\n」 \ 「OZmtwUy0ZUf ************************************* aJ27G4prMD2DMoby\r\n」 \ 「uTbXKOPYCvT ************************************* 5smSmfXIrBrbGrG6\r\n」 \ 「0CL1Y8DTNlF ************************************* TjixfpGS3C/Ogu0H\r\n」 \ 「uMMbA4RCFhF ************************************* X0VprxMrDarUJAa1\r\n」 \ "tJ ************************ Iw =\ r\n" \ "----- END CERTIFICATE -----\r\n" \ }; ... ... /* MQTTブローカーのIPアドレスを指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)mqtt_host); /* MQTTブローカーのポートを指定します。 */ aiot_mqtt_setopt(mqtt_handle、AIOT_MQTTOPT_PORT、(void *)&port); /* デバイスへのアクセスに使用するユーザー名を指定します。* / aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERNAME, (void *)username); /* 対応するパスワードを指定します。 */ aiot_mqtt_setopt(mqtt_handle、AIOT_MQTTOPT_PASSWORD、(void *) パスワード); /* デバイスのクライアントIDを指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_CLIENTID, (void *)client_id); /* デバイスのProductKeyを指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key); /* デバイスのDeviceNameを指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)username); /* ネットワーク接続のセキュリティ資格情報を設定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred); /* 受信したMQTTメッセージのデフォルトのコールバック関数を指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler); /* MQTTイベントコールバック関数を指定します。 */ aiot_mqtt_setopt(mqtt_handle、AIOT_MQTTOPT_EVENT_HANDLER、(void *)demo_mqtt_event_handler); /* スラッシュ (/) で始まるトピックの検証を無効にします。 */ uint8_t topic_check = 0; aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_TOPIC_HEADER_CHECK, (void *)&topic_check);パラメーター:
パラメーター
説明
product_key
デバイスが属するプロダクトのProductKey。
username
デバイス検証情報。 詳細については、「クラウドゲートウェイデバイスの検証情報」をご参照ください。
password
client_id
クライアントのID。 クライアントIDの長さは1 ~ 64文字である必要があります。 デバイスのMACアドレスまたはシリアル番号 (SN) をクライアントIDとして使用することを推奨します。
mqtt_host
MQTTクラウドゲートウェイデバイスの接続に使用されるエンドポイントとポート番号。 デフォルトのポート番号は1883です。 上記の情報を取得する方法の詳細については、「MQTTクラウドゲートウェイ製品の作成」をご参照ください。
port
user_ca_cert
root-ca.crtデバイスのルート証明書の内容。スラッシュ (/)
始まるトピックの検証を無効にするには、AIOT_MQTTOPT_TOPIC_HEADER_CHECKを指定します。/* スラッシュ (/) で始まるトピックの検証を無効にします。 */ uint8_t topic_check = 0; aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_TOPIC_HEADER_CHECK, (void *)&topic_check);IoT Platformは、トピックを使用してMQTT経由でクラウドゲートウェイデバイスと通信できます。 通信のトピックは、標準のMQTTプロトコルで定義されているトピック仕様に準拠している必要があります。 トピックはスラッシュ (
/) で始める必要はありません。MQTTクラウドゲートウェイデバイスがIoT Platformと通信する方法の詳細については、「メッセージング」をご参照ください。
コールバックを設定してステータスを監視し、メッセージを受信します。
コールバックを設定してステータスを監視します。
サンプルコード:
int main(int argc, char * argv[]) { ... ... /* MQTTメッセージを受信するデフォルトのコールバックを指定します。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler); /* MQTTイベントを処理するコールバックを指定します。 */ aiot_mqtt_setopt(mqtt_handle、AIOT_MQTTOPT_EVENT_HANDLER、(void *)demo_mqtt_event_handler); ... ... }パラメーター:
パラメーター
例
説明
AIOT_MQTTOPT_RECV_HANDLER
demo_mqtt_default_recv_handler
メッセージが受信されると、コールバックが呼び出されて必要な操作が実行されます。
AIOT_MQTTOPT_EVENT_HANDLER
demo_mqtt_event_handler
デバイス接続のステータスが変更されると、コールバックが呼び出されて必要な操作が実行されます。
コールバックを定義してステータスを監視します。
重要イベントを処理するための時間のかかるロジックを定義しないでください。 これにより、パケットの受信に使用されるスレッドがブロックされなくなります。
接続ステータスの変更には、ネットワーク例外、自動再接続、および切断が含まれます。
接続ステータスの変更を処理するには、ビジネス要件に基づいて
TODOセクションのコードを変更します。
/* MQTTイベントを処理するコールバック。 接続が作成、回復、または閉じられた場合、コールバックが呼び出されます。 イベント定義の詳細については、core/aiot_mqtt_api.hをご参照ください。 * / void demo_mqtt_event_handler(void * handle, const aiot_mqtt_event_t * event, void * userdata) { switch (event->type) { /* aiot_mqtt_connect操作を呼び出して、MQTTブローカーへの接続を確立します。 */ ケースAIOT_MQTTEVT_CONNECT: { printf("AIOT_MQTTEVT_CONNECT\n"); /* TODO: SDKとMQTTブローカー間の接続が確立された後、処理ロジックを定義します。 スレッドをブロックする可能性のある時間のかかる関数を呼び出さないでください。 */ } break; /* ネットワーク例外により切断エラーが発生した場合、SDKはMQTTブローカーとの再接続要求を自動的に開始します。 */ ケースAIOT_MQTTEVT_RECONNECT: { printf("AIOT_MQTTEVT_RECONNECT\n"); /* TODO: SDKとMQTTブローカーの間に新しい接続が確立された後、処理ロジックを定義します。 スレッドをブロックする可能性のある時間のかかる関数を呼び出さないでください。 */ } break; /* ネットワーク例外により切断エラーが発生しました。 基になる読み取りまたは書き込み操作が失敗します。 ハートビート応答はMQTTブローカーから取得されません。 */ ケースAIOT_MQTTEVT_DISCONNECT: { char * cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? (「ネットワーク切断」): (「ハートビート切断」); printf("AIOT_MQTTEVT_DISCONNECT: % s\n" 、原因); /* TODO: SDKがMQTTブローカーから受動的に切断された後、処理ロジックを定義します。 スレッドをブロックする可能性のある時間のかかる関数を呼び出さないでください。 */ } break; default: { } } }メッセージを受信するコールバックを定義します。
重要イベントを処理するための時間のかかるロジックを定義しないでください。 これにより、パケットの受信に使用されるスレッドがブロックされなくなります。
受信したメッセージを処理するには、ビジネス要件に基づいて
TODOセクションのコードを変更します。
/* MQTTメッセージを処理するためのデフォルトのコールバック。 SDKがMQTTブローカーからメッセージを受信し、コールバックを設定しない場合、次の操作が呼び出されます。 * / void demo_mqtt_default_recv_handler(void * handle, const aiot_mqtt_recv_t * packet, void * userdata) { switch (packet->type) { case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: { printf("ハートビート応答 \n"); /* TODO: MQTTブローカーからのハートビート応答を処理するロジックを定義します。 ほとんどの場合、ロジックは必要ありません。 */ } break; ケースAIOT_MQTTRECV_SUB_ACK: { printf("suback、res: -0x % 04X、packet id: % d、max qos: % d\n" 、 -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos); /* TODO: サブスクリプションリクエストに対するMQTTブローカーの応答を処理するロジックを定義します。 ほとんどの場合、ロジックは必要ありません。 */ */ } break; ケースAIOT_MQTTRECV_PUB: { printf("pub, qos: % d, topic: %.* s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic); printf("pub, payload: %.* s\n", packet->data.pub.payload_len, packet->data.pub.payload); /* TODO: MQTTブローカーによって送信されるビジネスメッセージを処理するロジックを定義します。 */ } break; ケースAIOT_MQTTRECV_PUB_ACK: { printf("puback, packet id: % d\n", packet->data.pub_ack.packet_id); /* TODO: QoS 1メッセージに対するMQTTブローカーの応答を処理するロジックを定義します。 ほとんどの場合、ロジックは必要ありません。 */ } break; default: { } } }
ステップ3: 接続を確立する
aiot_mqtt_connect操作を呼び出して、IoT Platformに接続および認証リクエストを送信します。 パラメーターの指定方法については、「接続パラメーターの設定」をご参照ください。
/* IoT PlatformへのMQTT接続を確立します。 */
res = aiot_mqtt_connect(mqtt_handle);
if (res < STATE_SUCCESS) {
/* MQTT接続の確立に失敗した場合、MQTTインスタンスのリソースを解放します。 */
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_connect failed: -0x % 04X\n", -res);
printf (「デモ \r\nのmqtt_host、produt_key、device_name、device_secretなどの変数を確認してください」);
return -1;
}ステップ4: keep-aliveスレッドを有効にする
aiot_mqtt_process操作を呼び出して、ハートビートメッセージをMQTTブローカーに送信し、応答が生成されていないQoS 1メッセージを再送信します。 このようにして、永続的な接続が可能になる。
キープアライブスレッドを有効にします。
res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_process_thread failed: % d\n", res); return -1; }キープアライブスレッドを管理する機能を設定します。
void * demo_mqtt_process_thread(void * args) { int32_t res = STATE_SUCCESS; while (g_mqtt_process_thread_running) { res = aiot_mqtt_process(args); if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } 睡眠 (1); } NULLを返します。}
ステップ5: スレッドがメッセージを受信できるようにする
ブローカーからMQTTメッセージを受信するには、aiot_mqtt_recv操作を呼び出します。 必要な操作は、コールバックを使用してメッセージを受信することによって実行されます。 切断と自動再接続が発生した場合、必要な操作はコールバックを使用してイベントを処理します。
スレッドがメッセージを受信できるようにします。
res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_recv_thread failed: % d\n", res); return -1; }スレッドを管理する関数を設定します。
void * demo_mqtt_recv_thread(void * args) { int32_t res = STATE_SUCCESS; while (g_mqtt_recv_thread_running) { res = aiot_mqtt_recv(args); if (res < STATE_SUCCESS) { if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } 睡眠 (1); } } NULLを返します。}
ステップ6: トピックを購読する
aiot_mqtt_sub操作を呼び出して、指定したトピックをサブスクライブします。
サンプルコード:
{ char * sub_topic = "/a18wP ******/LightSwitch/user/get"; res = aiot_mqtt_sub(mqtt_handle, sub_topic, NULL, 1, NULL); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x % 04X\n", -res); return -1; } }説明サンプルコードを設定したら、コードの両側にある注釈記号を削除します。
パラメーター:
パラメーター
例
説明
sub_topic
/a18wP ******/LightSwitch /ユーザー /取得
デバイスがサブスクライブ権限を持つトピック。
a1oGs ******はデバイスのProductKeyです。LightSwitchは、デバイスのDeviceNameです。
この例では、デフォルトのカスタムトピックが使用されます。
デバイスは、このトピックを使用してIoT Platformからメッセージを受信できます。
詳細については、「トピック」をご参照ください。
ステップ7: メッセージを送信する
aiot_mqtt_pub操作を呼び出して、指定したトピックにメッセージを送信します。
サンプルコード:
{ char * pub_topic = "/a18wP ******/LightSwitch/user/update"; char * pub_payload = "{\" id\":\" 1\",\" version\":\" 1.0\",\" params\":{\" LightSwitch\":0}"; res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)strlen(pub_payload), 0); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x % 04X\n", -res); return -1; } }説明サンプルコードを設定したら、コードの両側にある注釈記号を削除します。
パラメーター:
パラメーター
例
説明
pub_topic
/a18wP ******/LightSwitch /ユーザー /アップデート
デバイスが発行権限を持つトピック。
a1oGs ******はデバイスのProductKeyです。LightSwitchは、デバイスのDeviceNameです。
デバイスは、このトピックを使用してIoT Platformにメッセージを送信します。
詳細については、「トピック」をご参照ください。
pub_payload
{\"id\":\"1\",\"version\" \"1.0\",\"params\" :{\ "LightSwitch\":0}}
デバイスがIoT Platformに送信するメッセージの内容。
この例では、カスタムトピックが使用されています。 カスタムメッセージ形式を指定できます。
詳細については、「データ形式」をご参照ください。
デバイスとIoT Platform間のMQTT接続が確立されたら、メッセージ数がしきい値を超えないようにします。
通信制限の詳細については、「制限」をご参照ください。
メッセージ数がしきい値を超えた場合は、IoT Platformコンソールにログインして、蓄積されたメッセージを表示します。 詳細については、「コンシューマーグループの表示と監視」をご参照ください。
ステップ8: IoT Platformからデバイスを切断する
MQTT接続は、永続的に接続されたままのデバイスに適用されます。 IoT Platformからデバイスを手動で切断できます。
この例では、メインスレッドを使用してパラメータを設定し、接続を確立します。 接続が確立されたら、メインスレッドを一時停止できます。
aiot_mqtt_disconnect操作を呼び出して、デバイスをIoT Platformから切断します。
res = aiot_mqtt_disconnect(mqtt_handle);
if (res < STATE_SUCCESS) {
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_disconnect failed: -0x % 04X\n", -res);
return -1;
}ステップ9: プログラムを終了する
aiot_mqtt_deinit操作を呼び出して、MQTTクライアントインスタンスを削除し、リソースをリリースします。
res = aiot_mqtt_deinit(&mqtt_handle);
if (res < STATE_SUCCESS) {
printf("aiot_mqtt_deinit failed: -0x % 04X\n" 、-res);
return -1;
}