すべてのプロダクト
Search
ドキュメントセンター

IoT Platform:ApsaraMQ for RocketMQを使用したビジネスサーバーへのデバイスデータの転送

最終更新日:Jan 26, 2024

IoT Platformは、デバイスによって送信されたデータをApsaraMQ for RocketMQのトピックに転送します。 その後、ApsaraMQ for RocketMQはデータをビジネスサーバーに転送します。 このトピックでは、データを転送する方法について説明します。

前提条件

  • Alibaba Cloud アカウントが作成済みであること。

  • IoT Platformが有効化されています。

  • ApsaraMQ for RocketMQが有効化されています。

    ApsaraMQ for RocketMQが有効化されていない場合は、 サービスを有効化するためのApsaraMQ for RocketMQの製品ページ

  • 開発環境が準備される。 この例では、以下のコンポーネントで構成されるJava開発環境を使用しています。

背景情報

データ転送プロセス

消息流转MQ

利点

  • MQTT (Message Queuing Telemetry Transport) を介してデバイスをIoT Platformに接続する場合、データ送信リンクはTLS (Transport Layer Security) 暗号化を使用してデータの改ざんを防ぎます。 MQTTの詳細については、「MQTTプロトコル」をご参照ください。

  • ApsaraMQ for RocketMQは、トラフィックの変動とビジネスサーバーの同時ワークロードを減らすためのメッセージバッファとして使用されます。

次のステップ

  1. IoT Platformコンソールにログインし、プロダクトとデバイスを作成します。

    1. [概要] ページで、[すべての環境] をクリックします。 [すべての環境] タブで、管理するインスタンスを見つけ、インスタンスIDまたはインスタンス名をクリックします。

      この例では、中国 (上海) リージョンが選択されています。

    2. 左側のナビゲーションウィンドウで、[デバイス] > [製品] を選択します。 [プロダクト] ページで、[プロダクトの作成] をクリックします。 [製品の作成] ページで、パラメーターを設定し、[OK] をクリックします。

      この例では、Product NameパラメーターはMQ_testに設定され、Node Typeパラメーターは 直接接続デバイス。 他のパラメーターにはデフォルト値を使用します。

    3. [View Product Details] をクリックします。 製品の詳細ページで、[トピックカテゴリ] > [トピックカテゴリ] を選択し、[トピックカテゴリの編集] をクリックして、デバイスデータの送信に使用するトピックカテゴリを作成します。

      この例では、/{YourProductKey}/${YourDeviceName}/user/dataというトピックカテゴリが作成されます。

    4. 左側のナビゲーションウィンドウで、[デバイス] > [デバイス] を選択します。 [デバイス] ページで、[デバイスの追加] をクリックして、MQ_test製品のデバイスを作成します。

      この例では、MQdeviceという名前のデバイスが作成されます。

  2. ApsaraMQ for RocketMQコンソールで、トピックとコンシューマーを作成します。

    1. ApsaraMQ for RocketMQコンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[インスタンス] をクリックします。 [インスタンス] ページで、[インスタンスの作成] をクリックします。 この例では、インスタンスバージョンパラメーターがV4.0に設定され、インスタンスタイプパラメーターがStandard Editionインスタンスに設定されているインスタンスが中国 (上海) リージョンに作成されます。

      詳細は、インスタンスの作成をご参照ください。

      重要
      • ApsaraMQ for RocketMQインスタンスは、IoT Platformインスタンスと同じリージョンに存在する必要があります。

      • ApsaraMQ for RocketMQ V4.xインスタンスのトピックにのみデータストリームを転送できます。

    3. [インスタンス] ページで、データベースを作成するインスタンスを見つけ、インスタンス名をクリックします。

    4. [インスタンスの詳細] ページで、[グループの作成] をクリックします。 次の図に示すように、[グループの作成] パネルでパラメーターを設定し、[OK] をクリックします。

      物联网设备消息

    5. [トピックの作成] をクリックします。 [トピックの作成] パネルで、[メッセージタイプ] パラメーターを [通常のメッセージ] に設定します。

      物联网设备消息

    6. ApsaraMQ for RocketMQコンソールでコンシューマーを作成し、コンシューマーのステータスを表示します。 コンシューマーがオンライン状態であり、グループ内のコンシューマーのサブスクリプションが一貫していることを確認します。

      この例では、TCP SDKを使用してメッセージを送受信します。 TCP SDKを取得して使用する方法の詳細については、「TCPクライアントSDKを使用して通常のメッセージを送信およびサブスクライブする」をご参照ください。

      サンプルJavaコード:

      説明
      com.aliyun.openservices.ons.api.Actionをインポートします。com.aliyun.openservices.ons.api.ConsumeContextをインポートします。com.aliyun.openservices.ons.api.Consumerをインポートします。com.aliyun.openservices.ons.api.Messageをインポートします。com.aliyun.openservices.ons.api.MessageListenerをインポートします。com.aliyun.openservices.ons.api.ONSFactoryをインポートします。com.aliyun.openservices.ons.api.PropertyKeyConstをインポートします。java.util.Propertiesをインポートします。パブリッククラスConsumerTest {
          public static void main(String[] args) {
              Properties properties = new Properties();
              // ApsaraMQ for RocketMQコンソールで作成したグループのID。
              properties.put(PropertyKeyConst.GROUP_ID、"XXX");
              // Alibaba Cloud管理コンソールでID認証用に作成したAccessKey ID。
              properties.put(PropertyKeyConst.AccessKey, "${AccessKey}");
              // Alibaba Cloud管理コンソールでID認証用に作成したAccessKeyシークレット。
              properties.put(PropertyKeyConst.SecretKey, "${SecretKey}");
              // TCPエンドポイント。 TCPエンドポイントを取得するには、IoT Platformコンソールのインスタンス詳細ページの基本情報セクションに移動します。
              properties.put(PropertyKeyConst.NAMESRV_ADDR,
                  "XXX");
              // サブスクリプションのクラスタリング。 デフォルトモードです。
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // 放送サブスクリプション。
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
              Consumer consumer = ONSFactory.createConsumer(properties);
              consumer.subscribe("iotx_test_mq", ", new MessageListener() { // 複数のタグをサブスクライブします。)
                  public Action consume(Message message, ConsumeContext context) {
                      System.out.println("Receive: " + message);
                      return Action.CommitMessage;
                  }
              });
              consumer.start();
              System.out.println("Consumer Started");
          }
      }
  3. [IoT Platformコンソール] で、インスタンスのデータ転送ルールを設定して、デバイスデータをApsaraMQ for RocketMQに転送します。

    1. 左側のナビゲーションウィンドウで、[メッセージ転送] > [データ転送] を選択します。

    2. [データ転送] ページで、[ルールの作成] をクリックします。

      重要

      最新バージョンのデータ転送ページが表示されたら、ページの右上隅にある [前のバージョンに戻る] をクリックします。 以前のバージョンのデータ転送ページが表示されたら、[ルールの作成] をクリックします。

    3. [データ転送ルールの作成] ダイアログボックスで、Rule NameパラメーターをMQ Forwardingに設定し、Data TypeパラメーターをJSONに設定します。 次に、[OK] をクリックします。

    4. ルールの詳細ページで、[SQL文の書き込み] をクリックします。 次の図に示すように、[SQL文の書き込み] ダイアログボックスでパラメーターを設定し、[OK] をクリックします。

      物联网设备消息

    5. [操作の追加] をクリックします。 次の図に示すように、[操作の追加] ダイアログボックスで、データの転送先を指定し、[OK] をクリックします。

      物联网设备消息

    6. [データ転送] ページに移動します。 MQ転送ルールを見つけ、[操作] 列の [開始] をクリックします。

      ルールを有効にすると、IoT PlatformはApsaraMQ for RocketMQトピックにデバイスデータを転送します。

  4. Java SDKを使用してデバイスをシミュレートし、デバイスをIoT Platformに接続してから、デバイスを使用してデータを送信します。

    1. Download Java SDK demoからデモパッケージをダウンロードし、パッケージを解凍します。

    2. IntelliJ IDEAを起動し、デモパッケージからJavaLinkKitDemoという名前のサンプルプロジェクトをインポートします。

    3. device_id.jsonファイル内のMQdeviceのデバイス証明書情報ProductKeyDeviceNameDeviceSecretを指定します。

    4. src\main\java\com.aliyun.alink.devicesdk.de mo\MqttSample.javaファイルのMQTTトピックを、デバイスがデータを送信するトピックに変更します。

      この例では、/{YourProductKey}/${YourDeviceName}/user/dataトピックが使用されています。

      /**
       * パブリッシュ操作の例
       * /
      public void publish() {
              MqttPublishRequest request = new MqttPublishRequest();
              // ビジネスシナリオに基づいてトピックを指定します。
              request.topic = "/" + productKey + "/" + deviceName + "/user/data";
              ......
              ......
      }
    5. src\main\java\com.aliyun.alink.devicesdk.de mo\HelloWorld.javaファイルのMQTTエンドポイントをデバイスのMQTTエンドポイントに変更します。

      この例では、次のMQTTエンドポイントが使用されます。 MQTTエンドポイントの取得方法の詳細については、「インスタンスのエンドポイントの表示」をご参照ください。

      public void init(final DeviceInfoData deviceInfoData) {
              ......
              ......
              /**
               * MQTT初期化のパラメーターを設定します。
               */
              IoTMqttClientConfig config = new IoTMqttClientConfig();
              config.productKey = deviceInfoData.productKey;
              config.de viceName = deviceInfoData.de viceName;
              config.de viceSecret = deviceInfoData.de viceSecret;
              config.channelHost = "iot-06 **** .mqtt.iothub.aliyuncs.com:1883";
              ......
              ......
      }
    6. src\main\java\com.aliyun.alink.devicesdk.de mo\HelloWorld.javaファイルを実行してデバイスを起動します。

    IoT Platformコンソールで、インスタンスの [インスタンスの詳細] ページに移動し、[メンテナンス] > [デバイスログ] を選択します。 デバイスログを表示し、デバイスデータがApsaraMQ for RocketMQに転送されているかどうかを確認できます。物联网设备消息

  5. ApsaraMQ for RocketMQコンソールでデータを表示します。

    1. オンプレミスサーバーでコードを実行し、ApsaraMQ for RocketMQトピックをサブスクライブします。

      物联网设备消息

    2. ApsaraMQ for RocketMQコンソールのインスタンスの詳細ページに移動し、[メッセージクエリ] をクリックします。 [メッセージクエリ] ページで、トピックまたはメッセージIDでメッセージを検索します。

      [詳細] をクリックすると、ApsaraMQ for RocketMQに転送されるメッセージの詳細が表示され、メッセージがダウンロードされます。

      サンプルメッセージ:

      {"deviceName":"MQdevice"}