すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for MQTT:クライアントステータスイベントのエクスポート

最終更新日:Mar 11, 2026

ApsaraMQ for MQTT は、設定可能なルールを通じて、クライアントの接続および切断イベントをダウンストリームの Alibaba Cloud サービスにエクスポートします。バックエンドアプリケーションはこれらのイベントを利用して、接続ステータスを追跡したり、自動化ワークフローをトリガーしたり、接続パターンを分析したりします。

重要

ステータスイベント通知は非同期であり、リアルタイムのクライアントステータスを反映するものではありません。クライアントの現在のステータスをリアルタイムで照会するには、「ApsaraMQ for MQTT クライアントのステータス取得」をご参照ください。

仕組み

ApsaraMQ for MQTT クライアントが接続または切断すると、ブローカーは設定されたルールに基づいて、ダウンストリームの Alibaba Cloud サービスにステータス通知をプッシュします。Elastic Compute Service (ECS) インスタンスにデプロイされたバックエンドアプリケーションは、これらの通知をサブスクライブして、クライアントのオンラインおよびオフライン遷移を検出します。

Status event export flow

このプロセスは非同期であるため、単一の通知ではクライアントが現在オンラインであるかどうかは示されません。実際のステータスを判断するには、そのクライアントのイベント通知のタイムラインを分析します。詳細については、「イベントからのクライアントステータスの判断」をご参照ください。

利用シーン

ステータスイベントのエクスポートは、MQTT クライアントが接続または切断したときにバックエンドのアクションをトリガーします。接続アクティビティ分析の典型的なワークフローは次のとおりです。

  1. クライアントの接続ステータスが複数回変更されます。

  2. ブローカーは、設定されたルールに基づいて各変更を通知にカプセル化します。

  3. 通知は ApsaraMQ for RocketMQ のトピックに配信されます。

  4. バックエンドアプリケーションは通知を利用し、クライアントの接続履歴を構築します。

制限事項

項目制限説明
インスタンスあたりのルール数100上限の引き上げをリクエストするには、DingTalk グループ 116015007918 に参加して ApsaraMQ for MQTT のテクニカルサポートにご連絡ください。
ルールの重複排除内部リソースごとに各タイプのルールは 1 つ例えば、各グループは 1 つのクライアントステータス通知ルールをサポートし、各 ApsaraMQ for MQTT トピックは 1 つのデータインバウンドルールと 1 つのデータアウトバウンドルールをサポートします。
リージョン間のルールサポートされていませんデータソースとデータ送信先は同じリージョンに存在する必要があります。
ApsaraMQ for MQTT インスタンスのバージョンカーネル V3.x.x のみApsaraMQ for MQTT コンソールのインスタンスリストまたは [インスタンス詳細] ページでカーネルバージョンを確認してください。
ApsaraMQ for RocketMQ インスタンスのバージョンApsaraMQ for RocketMQ 4.0 のみApsaraMQ for MQTT と ApsaraMQ for RocketMQ 間のデータインバウンドルールおよびデータアウトバウンドルールに適用されます。

リソースマッピング

同じ ApsaraMQ for MQTT グループ内のすべてのクライアントのステータス通知は、同じダウンストリームリソースに転送されます。

ApsaraMQ for MQTT リソースAlibaba Cloud サービスダウンストリームリソースパケット定義
グループApsaraMQ for RocketMQトピックApsaraMQ for MQTT と ApsaraMQ for RocketMQ 間のメッセージ構造のマッピング

ステータスイベントエクスポートの設定

以下の手順では、ダウンストリームサービスとして ApsaraMQ for RocketMQ を使用します。

前提条件

開始する前に、以下を確認してください。

  • カーネルバージョンが V3.x.x の ApsaraMQ for MQTT インスタンス

  • 同じリージョンにある ApsaraMQ for RocketMQ 4.0 インスタンス

  • ApsaraMQ for MQTT インスタンスに少なくとも 1 つのグループが設定されていること

ステップ 1:クライアントステータス通知ルールの作成

ApsaraMQ for MQTT コンソールでルールを作成し、クライアントステータスイベントをエクスポートしたいグループを選択します。詳細な手順については、「クライアントステータス通知ルールの作成」をご参照ください。

ステップ 2:バックエンドアプリケーションでステータス通知をサブスクライブする

ルールが有効になると、ステータスイベントは設定された ApsaraMQ for RocketMQ トピックに配信されます。イベントを受信するには、バックエンドアプリケーションでそのトピックをサブスクライブします。サブスクリプションの詳細については、「メッセージのサブスクライブ」をご参照ください。

イベントタイプ

各ステータス通知は、ApsaraMQ for RocketMQ メッセージとして配信されます。イベントタイプはメッセージタグで指定されます。

タグのフォーマット: connectdisconnect、または tcpclean

タグ意味トリガー条件
connectクライアントが接続しましたクライアントがブローカーとの接続を確立します。
disconnectクライアントが DISCONNECT パケットを送信しましたMQTT プロトコルに従い、クライアントは TCP 接続を閉じる前に DISCONNECT パケットを送信します。クライアント SDK がこのパケットを送信しない場合、ブローカーは disconnect イベントを生成しません。
tcpcleanTCP 接続が閉じられましたクライアントが DISCONNECT パケットを送信したかどうかに関わらず、TCP 接続が閉じられます。
重要

クライアントがオフラインであるかどうかを判断するには、disconnect ではなく tcpclean タグを確認してください。予期せず終了したクライアントは DISCONNECT パケットを送信しない可能性があるため、disconnect イベントが表示されない場合があります。

イベントデータ形式

メッセージ本文は、以下のフィールドを持つ JSON オブジェクトです。

フィールドタイプ説明
clientIdStringApsaraMQ for MQTT のクライアント ID。フォーマット: GID_XXX@@@YYYYY
timeLongイベントが発生したときのタイムスタンプ。
eventTypeStringイベントタイプ: connectdisconnect、または tcpclean
channelIdStringTCP 接続の一意の識別子。
clientIpStringMQTT クライアントのパブリック IP アドレスとポート。

例:

clientId: GID_XXX@@@YYYYY
time:1212121212
eventType:connect/disconnect/tcpclean
channelId:2b9b1281046046faafe5e0b458e4XXXX
clientIp:192.168.XX.XX:133XX

イベントからのクライアントステータスの判断

通知は非同期であるため、単一のイベントからクライアントがオンラインであるかどうかを判断することはできません。各 clientId のイベントの完全なタイムラインを追跡し、以下のルールを適用してください。

  1. 順序付けにはタイムスタンプを使用します。 time の値が大きいほど、より新しいイベントを意味します。同じ clientId を共有するイベント内でのみタイムスタンプを比較してください。

  2. オフラインイベントのスコープを channelId で限定します。channelId は、正確に 1 つの connect イベントと 1 つの close イベント (tcpclean) を持つ 1 つの TCP 接続を表します。tcpclean イベントは、同じ channelId 上の接続のみを無効にし、異なる channelId 上の接続には影響しません。

  3. 一時的な再接続を処理します。 クライアントは複数回切断および再接続することがあり、複数の channelId 値を生成します。tcpclean イベントを受信した場合は、クライアントをオフラインとしてマークする前に、その channelId が追跡している接続と一致することを確認してください。

シナリオ例:

クライアントが接続し、短時間切断してから再接続します。

順序イベントchannelIdクライアントステータス
1connectaaa111オンライン (接続 aaa111 がアクティブ)
2tcpcleanaaa111オフライン (接続 aaa111 がクローズ)
3connectbbb222オンライン (接続 bbb222 がアクティブ)

aaa111tcpcleanbbb222connect の後に受信した場合、bbb222 には対応する tcpclean がないため、クライアントはまだオンラインです。

判断ロジック:

特定の clientId について:
  1. channelId -> イベントのマップを維持します。
  2. "connect" イベントを受信した場合:
     - イベントをその channelId の下に保存します。
  3. "tcpclean" イベントを受信した場合:
     - イベントをその channelId の下に保存します。
     - channelId に "connect" と "tcpclean" の両方のイベントが存在する場合、
       その接続はクローズされています。マップから channelId を削除します。
  4. クライアントがオンラインかどうかを確認します:
     - マップ内のいずれかの channelId に "connect" イベントがあり、
       "tcpclean" イベントがない場合、クライアントはオンラインです。
     - すべての channelId が削除された場合 (または両方のイベントがある場合)、
       クライアントはオフラインです。

サンプルコード (Java)

以下の例では、ApsaraMQ for RocketMQ コンシューマーを使用してクライアントステータス通知を受信し、処理します。コンシューマーは、指定された RocketMQ トピックの connect および tcpclean タグをサブスクライブします。

完全なソースコードについては、「MQTTClientStatusNoticeProcessDemo.java」をご参照ください。

package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        // ステータス通知を受信するための ApsaraMQ for RocketMQ コンシューマーを初期化します。
        Properties properties = new Properties();

        // RocketMQ のコンシューマーグループ ID (ApsaraMQ for MQTT のグループ ID とは異なります)。
        properties.setProperty(PropertyKeyConst.GROUP_ID, "<your-rocketmq-group-id>");

        // AccessKey ペア。RAM コンソールから取得します。
        properties.put(PropertyKeyConst.AccessKey, "<your-access-key>");
        properties.put(PropertyKeyConst.SecretKey, "<your-secret-key>");

        // ApsaraMQ for RocketMQ インスタンスの TCP エンドポイント。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-rocketmq-endpoint>");

        // MQTT クライアントステータス通知を受信する RocketMQ トピック。
        // このトピックは事前に ApsaraMQ for RocketMQ コンソールで作成しておきます。
        final String parentTopic = "GID_XXXX_MQTT";

        // 本番環境では、外部ストア (データベースまたは Redis) を使用して、
        // アプリケーションの再起動後もステータスデータを永続化します。
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();

        Consumer consumer = ONSFactory.createConsumer(properties);

        // "connect" と "tcpclean" イベントのみをサブスクライブします。
        consumer.subscribe(parentTopic, "connect||tcpclean",
            new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();

        // 特定のクライアントのステータスをポーリングします。
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("Client online: " + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 受信したステータス通知を処理し、チャネル-イベントマップを維持します。
     *
     * マルチサーバーデプロイメントでは、クライアントステータスを共有外部ストア
     * (データベースまたは Redis) に保存します。重複メッセージを処理するために、べき等な消費を実装します。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));

                // 新しいイベントを保存します。
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);

                // この接続のイベントリストを読み取ります。
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }

                // この channelId に connect と tcpclean の両方のイベントが存在する場合、
                // 接続はクローズされています。エントリをクリーンアップします。
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * クライアントがアクティブな TCP 接続を持っているかどうかを確認します。
     *
     * ロジック:
     * 1. チャネルマップが空の場合、クライアントはオフラインです。
     * 2. いずれかの channelId に connect イベントのみ (tcpclean なし) がある場合、クライアントはオンラインです。
     * 3. すべての channelId に connect と tcpclean の両方のイベントがある場合、クライアントはオフラインです。
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap =
            mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

以下のプレースホルダーを実際の値に置き換えてください。

プレースホルダー説明
<your-rocketmq-group-id>ApsaraMQ for RocketMQ のコンシューマーグループ IDGID_status_consumer
<your-access-key>RAM コンソールからの Alibaba Cloud AccessKey IDLTAI5tXxx
<your-secret-key>RAM コンソールからの Alibaba Cloud AccessKey SecretxXxXxXx
<your-rocketmq-endpoint>ApsaraMQ for RocketMQ インスタンスの TCP エンドポイントhttp://MQ_INST_xxx.mq-internet-access.mq-internet.aliyuncs.com:80

参照