すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for MQTT:MQTT 版 ApsaraMQ クライアントのステータスイベントをエクスポートする

最終更新日:Apr 08, 2025

MQTT 版 ApsaraMQ でクライアントステータス通知のルールを設定して、MQTT 版 ApsaraMQ クライアントのオンラインイベントとオフラインイベントを他の Alibaba Cloud サービスにエクスポートできます。オフラインイベントとオンラインイベントの通知は、クライアントステータスとは非同期です。このトピックでは、クライアントステータス通知の動作メカニズム、シナリオ、および制限について説明します。また、MQTT 版 ApsaraMQ と他の Alibaba Cloud サービス間のリソースマッピングについても説明します。

動作メカニズム

MQTT 版 ApsaraMQ クライアントがオンラインまたはオフラインになると、MQTT 版 ApsaraMQ ブローカーは、設定したルールに基づいて、クライアントステータス通知を Alibaba Cloud サービスにプッシュします。Elastic Compute Service(ECS)インスタンスにデプロイされたバックエンドサービスアプリケーションは、Alibaba Cloud サービスからのクライアントステータス通知をサブスクライブして、MQTT 版 ApsaraMQ クライアントがオンラインまたはオフラインになったときに通知を受信できます。上下线事件流出

このメソッドは非同期プロセスであり、リアルタイムの情報は提供しません。アプリケーションは、イベント通知のタイムラインに基づいてのみ、クライアントの実際のステータスを判断できます。

これは、クライアントの実際のステータスを判断するための複雑で不正確な方法です。ただし、オフラインイベントとオンラインイベントの通知を使用して、複数の MQTT 版 ApsaraMQ クライアントのステータスのトレースを分析できます。

シナリオ

クライアントステータス通知は、MQTT 版 ApsaraMQ クライアントがオンラインまたはオフラインになったときに、サービスアプリケーションの事前定義されたアクションをトリガーするために使用されます。

たとえば、MQTT 版 ApsaraMQ クライアントのステータスが複数回変更された場合、MQTT 版 ApsaraMQ ブローカーは、クライアントステータス通知用に設定されたルールに基づいてステータスの変更を通知にカプセル化し、RocketMQ 版 ApsaraMQ インスタンスに通知を送信します。このようにして、クライアントのすべてのステータスデータを収集して分析できます。

説明

上記の例以外のシナリオでは、API オペレーションを呼び出して、MQTT 版 ApsaraMQ クライアントのステータスをリアルタイムでクエリすることをお勧めします。詳細については、「MQTT 版 ApsaraMQ クライアントのステータスを取得する」をご参照ください。

制限

項目

制限

説明

単一インスタンスに作成されるルールの数

100

インスタンスに 100 を超えるルールを作成する場合は、DingTalkグループ 116015007918 に参加して、MQTT 版 ApsaraMQ テクニカルサポートにお問い合わせください。

ルールの重複排除

各内部リソースに対して、同じタイプのルールを 1 つだけ作成できます。

たとえば、MQTT 版 ApsaraMQ の各グループに対してクライアントステータス通知のルールを 1 つだけ、各トピックに対してデータインバウンドルールとデータアウトバウンドルールを 1 つだけ作成できます。

リージョン

リージョンをまたいでルールを作成することはできません。ルール内のデータソースとデータデスティネーションが属するインスタンスは、同じリージョンに存在する必要があります。

たとえば、データソースが中国(杭州)リージョンの MQTT 版 ApsaraMQ として指定されているデータアウトバウンドルールを作成する場合、データデスティネーションとして中国(杭州)リージョンの RocketMQ 版 ApsaraMQ インスタンスのみを選択できます。

MQTT 版 ApsaraMQ インスタンスバージョン

カーネルバージョンが V3.x.x のインスタンスに対してのみルールを作成できます。

MQTT 版 ApsaraMQ インスタンスのカーネルバージョンは、MQTT 版 ApsaraMQ コンソール のインスタンスリストまたは [インスタンスの詳細] ページで確認できます。

RocketMQ 版 ApsaraMQ インスタンスバージョン

RocketMQ 4.0 版 ApsaraMQ インスタンスのみがサポートされています。

データインバウンドルールまたはデータアウトバウンドルールを使用して MQTT 版 ApsaraMQRocketMQ 版 ApsaraMQ 間でデータを交換する場合、RocketMQ 版 ApsaraMQ 4.0 インスタンスのみを使用できます。

リソースマッピング方法

MQTT 版 ApsaraMQ の同じグループ内のすべてのクライアントのステータス通知は、設定した同じ Alibaba Cloud サービスのリソースに転送されます。

表 1. マッピング

MQTT 版 ApsaraMQ リソース

Alibaba Cloud サービス

Alibaba Cloud サービスのリソース

パケット定義

MQTT 版 ApsaraMQ のグループ

RocketMQ 版 ApsaraMQ

RocketMQ 版 ApsaraMQ のトピック

MQTT 版 ApsaraMQ と RocketMQ 版 ApsaraMQ 間のメッセージ構造マッピング

手順

クライアントステータス通知に非同期メソッドを使用する場合は、通知をバックエンドの Alibaba Cloud サービスにエクスポートする前に、通知のルールを作成する必要があります。次の例では、RocketMQ 版 ApsaraMQ をバックエンドの Alibaba Cloud サービスとして使用して、クライアントステータス通知に非同期メソッドを使用する方法について説明します。

  1. クライアントステータス通知のルールを作成します。

    ルールを作成するときは、MQTT 版 ApsaraMQ コンソールで、クライアントステータスイベントを受信するグループを選択します。詳細については、「クライアントステータス通知のルールを作成する」をご参照ください。

  2. サービスアプリケーションでクライアントステータス通知をサブスクライブします。

    手順 1 で作成したルールが有効になると、サービスアプリケーションは対応する MQTT 版 ApsaraMQ クライアントのステータスイベントを受信できます。RocketMQ 版 ApsaraMQ を使用して通知を受信する方法については、「メッセージをサブスクライブする」をご参照ください。サンプルコードについては、「MQTTClientStatusNoticeProcessDemo.java」をご参照ください。

    イベントタイプは、RocketMQ 版 ApsaraMQ に添付されているタグで指定されます。タグ形式:

    MQ Tag:connect/disconnect/tcpclean

    次の項目では、タグの情報について説明します。

    • connect は、MQTT 版 ApsaraMQ クライアントがオンラインになったことを示します。

    • disconnect は、MQTT 版 ApsaraMQ クライアントが MQTT 版 ApsaraMQ ブローカーとの接続を終了したことを示します。MQTT プロトコルに基づいて、MQTT 版 ApsaraMQ クライアントは、TCP 接続を終了する前に disconnect パケットを送信します。MQTT 版 ApsaraMQ ブローカーは、MQTT 版 ApsaraMQ クライアントから disconnect パケットを受信した後にのみ、クライアントステータス通知をトリガーします。MQTT 版 ApsaraMQ クライアントの SDK が MQTT プロトコルに基づいて disconnect パケットを送信しない場合、MQTT 版 ApsaraMQ ブローカーはメッセージを受信できません。

    • tcpclean は、TCP 接続が閉じられたことを示します。TCP 接続が閉じられた場合、MQTT 版 ApsaraMQ クライアントが disconnect パケットを送信したかどうかに関係なく、タグには tcpclean が含まれます。

    説明

    tcpclean は、MQTT 版 ApsaraMQ クライアントがネットワーク層で切断されていることを示します。disconnect は、MQTT 版 ApsaraMQ クライアントが切断パケットを送信したことを示すだけです。MQTT 版 ApsaraMQ クライアントが切断パケットを送信せずに予期せず終了した場合、disconnect はタグに表示されない場合があります。したがって、MQTT 版 ApsaraMQ クライアントがオフラインかどうかを判断するには、タグに tcpclean が含まれているかどうかを確認します。

    タグのデータは JSON 形式で、次のキーが含まれています。

    • clientId は、特定の MQTT 版 ApsaraMQ クライアントを示します。

    • time は、イベントが発生した時刻を示します。

    • eventType は、MQTT 版 ApsaraMQ クライアントがイベントを区別するために使用するイベントタイプを示します。

    • channelId は、TCP 接続の一意の識別子を示します。

    • clientIp は、MQTT 版 ApsaraMQ クライアントが使用するパブリック IP アドレスを示します。

    例:

    clientId: GID_XXX@@@YYYYY
    time:1212121212
    eventType:connect/disconnect/tcpclean
    channelId:2b9b1281046046faafe5e0b458e4XXXX
    clientIp:192.168.XX.XX:133XX  

    MQTT 版 ApsaraMQ クライアントのステータスを判断するには、受信したクライアントステータス通知のタイムラインを確認します。最後に受信した通知だけに基づいてクライアントステータスを判断することはできません。

    次のルールに基づいてクライアントステータスを判断します。

    • タイムスタンプは、clientId で指定された MQTT 版 ApsaraMQ クライアントが生成するステータスイベントのシーケンスを示します。より新しいイベントのタイムスタンプ値は大きくなります。

    • clientId で指定された MQTT 版 ApsaraMQ クライアントで、一時的な接続エラーが複数回発生する可能性があります。したがって、オフラインイベント通知を受信したときは、channelId 値を確認して、通知が現在の TCP 接続に関連しているかどうかを判断します。オフラインイベント通知は、同じ channelId を持つオフラインイベント通知のみを上書きできます。2 つのオフラインイベント通知の channelId 値が異なる場合、time 値が大きい方の通知は、もう一方の通知を上書きできません。channelId は TCP 接続を識別します。TCP 接続には、connect イベントと close イベントが 1 つずつあります。

Java での非同期ステータス通知のサンプルコード

package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * レシーバーとして RocketMQ 版 ApsaraMQ クライアントを初期化します。ほとんどのビジネスシナリオでは、コードはバックエンドアプリケーションにデプロイされます。
         */
        Properties properties = new Properties();
        /**
         * RocketMQ 版 ApsaraMQ クライアントのグループ ID を指定します。このパラメーターに指定するグループ ID は、MQTT 版 ApsaraMQ インスタンスのグループの ID とは異なることに注意してください。対応するサービスの説明に従ってグループ ID を指定する必要があります。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * Alibaba Cloud アカウントの AccessKey ID。AccessKey ID は、Resource Access Management(RAM)コンソールから取得できます。
         */
        properties.put(PropertyKeyConst.AccessKey, "XXXX");
        /**
         * Alibaba Cloud アカウントの AccessKey シークレット。AccessKey シークレットは、RAM コンソールから取得できます。AccessKey シークレットは、署名認証モードを使用する場合にのみ必要です。
         */
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        /**
         * TCP エンドポイントを指定します。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
        /**
         * RocketMQ 版 ApsaraMQ コンシューマーを使用して MQTT 版 ApsaraMQ クライアントのステータス通知を処理する場合、コンシューマーはステータス通知を受信するために使用されるトピックをサブスクライブする必要があります。RocketMQ 版 ApsaraMQ コンソールのドキュメントを参照して、事前にトピックを作成する必要があります。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * クライアントステータスデータ。本番環境では、アプリケーションの再起動時にステータスデータが失われないように、データベースや Redis システムなどの外部永続ストレージシステムを使用してステータスデータを格納することをお勧めします。この例では、ステータスデータはオンプレミスコンピューターに格納されます。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         * 次のコードは、クライアントがオンラインかどうかのみを判断します。したがって、connect イベントと tcpclean イベントにのみ注意する必要があります。
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * クライアントステータス通知を処理するために使用されるロジック。
     * 実際のデプロイプロセスでは、ステータス通知メッセージを消費するアプリケーションが複数のサーバーにデプロイされる場合があります。したがって、クライアントのステータスデータは、データベースや Redis システムなどの外部共有ストレージに保持できます。
     * ステートマシンがメッセージを繰り返し受信する場合は、メッセージに対して消費冪等処理を実行して、発生する可能性のあるエラーを防ぎます。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * 最初に新しいイベントを格納します。
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * 現在の接続のイベントリストを読み取ります。
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * リスト内のすべてのオンラインイベントとオフラインイベントが受信され、現在の接続が閉じられた場合、接続のデータをクリアできます。
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * チャネルテーブルに基づいて、MQTT 版 ApsaraMQ クライアントにアクティブな TCP 接続が存在するかどうかを確認します。
     * 1. チャネルテーブルが空の場合、MQTT 版 ApsaraMQ クライアントはオフラインです。
     * 2. チャネルテーブルが空でない場合は、接続でオンラインイベントのみが受信されているかどうかを確認します。はいの場合、アクティブな接続が存在し、MQTT 版 ApsaraMQ クライアントはオンラインです。
     * すべてのチャネルに切断イベントがある場合、クライアントはオフラインです。
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

関連情報

コンソールでの操作については、「クライアントステータス通知のルールを管理する」をご参照ください。