すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for MQTT:ApsaraMQ for MQTT から ApsaraMQ for RocketMQ にクライアントステータス通知を送信する

最終更新日:Jan 15, 2025

ApsaraMQ for MQTT では、クライアントステータス通知を ApsaraMQ for RocketMQ に送信するためのルールを作成できます。このトピックでは、クライアントステータス通知ルールを作成する方法について説明します。

背景情報

実際のビジネスシナリオでは、ApsaraMQ for MQTT ブローカーは、クライアントのステータス変更に関連するデータを収集および分析し、クライアントのステータスに基づいてメッセージをプッシュする必要があります。ApsaraMQ for MQTT では、非同期ステータス通知を使用してクライアントのステータスを取得できます。 ApsaraMQ for MQTT クライアントがオンラインまたはオフラインになると、ApsaraMQ for MQTT ブローカーで通知が生成されます。このような通知を取得するには、次の方法を使用できます。

  • クラウド SDK を使用して ApsaraMQ for MQTT ブローカーに接続します。詳細については、「ApsaraMQ for MQTT クライアントのステータスを取得する」をご参照ください。

  • クライアントステータス通知ルールを作成します。クライアントステータス通知ルールを作成すると、クライアントステータス通知が ApsaraMQ for RocketMQ に送信されます。ApsaraMQ for RocketMQ のメッセージをサブスクライブして、クライアントのステータスを取得できます。

このトピックでは、クライアントのステータスを取得するためにクライアントステータス通知ルールが作成されます。

quick_start_client_stats_notify

ネットワークアクセス

ApsaraMQ for MQTT は、インターネットアクセスポイントVPC アクセスポイントを提供します。

  • インターネットアクセスポイントは、インターネット経由で ApsaraMQ for MQTT にアクセスするために使用される IP アドレスです。ほとんどの場合、パブリックエンドポイントは IoT およびモバイルインターネットのシナリオで使用されます。

  • VPC アクセスポイントは、プライベート仮想クラウド (VPC) 内で ApsaraMQ for MQTT にアクセスするために使用される IP アドレスです。ほとんどの場合、VPC エンドポイントはクラウドアプリケーションによって ApsaraMQ for MQTT に接続するために使用されます。

重要

エンドポイントを使用してクライアントを ApsaraMQ for MQTT に接続する場合は、IP アドレスではなくドメイン名を使用してください。IP アドレスは動的に変化するためです。ApsaraMQ for MQTT 技術チームは、次のシナリオにおける障害および直接的または間接的な損失について責任を負いません。

  • IP アドレスを使用してクライアントに ApsaraMQ for MQTT にアクセスします。 ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、元の IP アドレスは無効になります。

  • クライアントが実行されているネットワークで、IP アドレスのファイアウォールポリシーが設定されています。 ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、ファイアウォールポリシーのために新しい IP アドレスがブロックされます。

前提条件

  • 統合開発環境 (IDE) がインストールされています。詳細については、「IDE」をご参照ください。 IntelliJ IDEA または Eclipse を使用できます。この例では、IntelliJ IDEA を使用しています。

  • Java 8 または 11 がインストールされています。詳細については、「Java のダウンロード」をご参照ください。

  • ApsaraMQ for MQTT インスタンスが作成され、インスタンスにトピックとグループが作成されています。詳細については、「リソースを作成する」をご参照ください。

  • ApsaraMQ for RocketMQ インスタンスが作成され、インスタンスにトピックとグループが作成されています。詳細については、「手順 2: リソースを作成する」をご参照ください。

重要
  • ApsaraMQ for MQTT のクライアントステータス通知ルールを使用して、ApsaraMQ for RocketMQ 4.x インスタンスにのみクライアントステータス通知を送信できます。

  • リージョンをまたがって ApsaraMQ for MQTT のクライアントステータス通知ルールを使用することはできません。クライアントステータス通知ルールを作成する場合は、すべての ApsaraMQ for MQTT リソースと ApsaraMQ for RocketMQ リソースが同じリージョンにあることを確認してください。

1. クライアントステータス通知ルールを作成する

  1. ApsaraMQ for MQTT コンソール にログインします。左側のナビゲーションペインで、インスタンスリスト をクリックします。

  2. 上部のナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。[インスタンス] ページで、インスタンス名をクリックして インスタンスの詳細 ページに移動します。

  3. 左側のナビゲーションペインで、ルール管理 をクリックします。[ルール] ページの左上隅にある ルールの作成 をクリックします。

  4. ルールの作成 ウィザードで、次の手順を実行します。

    1. 基本情報の構成 手順で、ルール ID を入力し、[ルールの種類] パラメーターで [クライアントステータス通知] を選択します。

      image

    2. ルールソースの構成 手順で、ApsaraMQ for MQTT インスタンスで作成されたグループを選択します。

      image

    3. ルールデスティネーションの構成 手順で、作成済みの ApsaraMQ for RocketMQ インスタンスと、そのインスタンスで作成済みのトピックを選択します。

      image

2. テストコードを準備する

コードを使用してクライアントステータスを変更し、クライアントステータス通知を処理する必要があります。このトピックでは、Java のサンプルコードが提供されています。

2.1 サンプルコードをダウンロードする

  1. mqtt-java-demo デモプロジェクトをダウンロードし、デモプロジェクトパッケージをオンプレミスマシンのフォルダーに解凍します。

  2. 解凍したデモプロジェクトで、lmq-java-demo フォルダーを見つけ、そのフォルダーを IntelliJ IDEA にインポートし、pom.xml ファイルに次の依存関係が含まれているかどうかを確認します。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. アクセス認証情報を構成します。

    • AccessKey ペアを取得します。 AccessKey ペアの取得方法については、「AccessKey を作成する」をご参照ください。

    • 環境変数を構成します。 ApsaraMQ for MQTT へのアクセスに使用される AccessKey ID の環境変数名は MQTT_AK_ENV で、ApsaraMQ for MQTT へのアクセスに使用される AccessKey シークレットの環境変数名は MQTT_SK_ENV です。環境変数の構成方法については、「アクセス認証情報を構成する」をご参照ください。

2.2 クライアントステータスを変更するためのコードを構成する

MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java クラスで、コード内のコメントに基づいて ApsaraMQ for MQTT リソースに関連するパラメーターを構成します。

テスト中は、ステータス変更のみをシミュレートする必要があります。したがって、メッセージ送信に関連するコードを削除できます。サンプルコード:

クライアントステータス変更のサンプルコード

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
    public static void main(String[] args) throws Exception {
        /**
         * ApsaraMQ for MQTT インスタンスの ID です。インスタンスを購入した後、ApsaraMQ for MQTT コンソールで ID を取得できます。
         */
        String instanceId = "XXXXX";
        /**
         * ApsaraMQ for MQTT インスタンスのエンドポイントです。インスタンスを購入して構成した後、ApsaraMQ for MQTT コンソールでエンドポイントを取得できます。インスタンスに接続するには、割り当てられたドメイン名を使用する必要があります。 IP アドレスは使用しないでください。そうしないと、クライアントオブジェクトで例外が発生する可能性があります。
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * AccessKey ID です。これは、Resource Access Management (RAM) コンソールで取得できます。
         * Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。セキュリティリスクを防ぐため、RAM ユーザーを使用して API 操作を呼び出したり、日常の O&M を実行することをお勧めします。
         * AccessKey ペアをプロジェクトコードに保存しないことを強くお勧めします。そうしないと、AccessKey ペアが漏洩し、アカウント内のすべてのリソースが潜在的なセキュリティリスクにさらされる可能性があります。
         * この例では、AccessKey ID と AccessKey シークレットは環境変数に格納されています。サンプルコードを実行する前に、MQTT_AK_ENV および MQTT_SK_ENV 環境変数が構成されていることを確認してください。
         * 例:export MQTT_AK_ENV=<access_key_id>
         *      export MQTT_SK_ENV=<access_key_secret>
         * <access_key_id> を AccessKey ID に、<access_key_secret> を AccessKey シークレットに置き換えます。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * AccessKey シークレットです。これは、RAM コンソールで取得できます。このパラメーターは、署名認証モードを使用する場合にのみ必要です。
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * システムが ApsaraMQ for MQTT クライアントに割り当てるグローバルに一意な ID です。クライアント ID は、TCP 接続に基づいて変化する必要があります。複数の TCP 接続が同じクライアント ID を使用すると、例外が発生し、接続が予期せず閉じられます。
         * clientId パラメーターの値は、GroupID@@@DeviceId 形式です。 GroupID は、ApsaraMQ for MQTT コンソールで作成したグループの ID を指定し、DeviceId はデバイスのカスタム ID を指定します。 clientId パラメーターの値は、最大 64 文字の長さにすることができます。
         */
        String clientId = "GID_XXXXX@@@XXXXX";

        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * ApsaraMQ for MQTT クライアントオブジェクトで使用されるプロトコルとポートは一致する必要があります。詳細については、https://www.alibabacloud.com/help/doc-detail/44866.htm?spm=a2c4g.11186623.6.552.25302386RcuYFB を参照してください。
         * Secure Sockets Layer (SSL) 暗号化を使用する場合は、ssl://endpoint:8883 をプロトコルとポートとして指定します。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * ApsaraMQ for MQTT クライアントが応答を待つタイムアウト期間です。タイムアウト期間により、ApsaraMQ for MQTT クライアントが無期限に応答を待つことがなくなります。
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        Thread.sleep(Long.MAX_VALUE);
    }
}

2.3 クライアントステータス通知を処理するためのコードを構成する

クライアントステータス通知が ApsaraMQ for RocketMQ に送信されると、対応するメッセージをサブスクライブし、ビジネス要件に基づいて通知を処理できます。

MQTTClientStatusNoticeProcessDemo.java クラスで、コード内のコメントに基づいて ApsaraMQ for RocketMQ リソースに関連するパラメーターを構成します。

クライアントステータス通知を処理するためのサンプルコード

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * レシーバーとして ApsaraMQ for RocketMQ クライアントを初期化します。ほとんどのビジネスシナリオでは、レシーバーはバックエンドアプリケーションにデプロイされます。
         */
        Properties properties = new Properties();
        /**
         * ApsaraMQ for RocketMQ コンソールで作成したグループ ID です。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey ID です。
         * Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。セキュリティリスクを防ぐため、RAM ユーザーを使用して API 操作を呼び出したり、日常の O&M を実行することをお勧めします。
         * AccessKey ペアをプロジェクトコードに保存しないことを強くお勧めします。そうしないと、AccessKey ペアが漏洩し、アカウント内のすべてのリソースが潜在的なセキュリティリスクにさらされる可能性があります。
         * この例では、AccessKey ID と AccessKey シークレットは環境変数に格納されています。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey シークレットです。このパラメーターは、署名認証モードを使用する場合にのみ必要です。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
        /**
         * ApsaraMQ for RocketMQ インスタンスにアクセスするために使用される TCP エンドポイントです。 TCP エンドポイントは、ApsaraMQ for RocketMQ コンソールの [インスタンスの詳細] ページで取得できます。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        /**
         * ApsaraMQ for MQTT からのクライアントステータス通知の処理に使用する ApsaraMQ for RocketMQ トピックです。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * クライアントステータスデータです。本番環境では、データベースや Redis システムなどの外部永続ストレージシステムを使用してステータスデータを格納し、アプリケーションの再起動時にステータスデータが失われないようにすることをお勧めします。この例では、ステータスデータはオンプレミスマシンに格納されます。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         * この例では、クライアントがオンラインかどうか

3. 結果の確認

  1. MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java クラスのメイン関数を実行して、クライアントがオンラインになることをシミュレートし、次のいずれかの方法を使用して、クライアントのステータスとメッセージの送信ステータスを確認します。

    説明

    メイン関数の実行を停止して、クライアントがオフラインになることをシミュレートできます。

    • クライアントステータスの確認ApsaraMQ for MQTT コンソールのインスタンス詳細ページに移動し、左側のナビゲーションペインで デバイスステータスのクエリ をクリックします。[デバイスステータスクエリ] ページで、デバイス ID を使用してクライアントがオンラインになっているかどうかを確認します。

      image

    • メッセージが送信されたかどうかの確認ApsaraMQ for RocketMQ コンソールのインスタンス詳細ページに移動し、左側のナビゲーションペインで [メッセージクエリ] をクリックします。[メッセージクエリ] ページで、トピックに基づいてメッセージが送信されたかどうかを確認します。

      image

  2. MQTTClientStatusNoticeProcessDemo.java クラスのメイン関数を実行します。メッセージが受信されると、次の図に示すように、ClientStatus の値が false から true に変わります。

    image

参考資料