全部產品
Search
文件中心

ApsaraMQ for MQTT:MQTT上下線通知規則的實現

更新時間:Aug 08, 2025

雲Message QueueTT 版支援通過建立上下線通知規則,將線上狀態通知訊息推送至雲訊息佇列 RocketMQ 版,以擷取用戶端的線上狀態。本文為您介紹如何通過建立上下線通知規則來擷取用戶端的線上狀態。

背景資訊

在實際業務中服務端需要對用戶端的上下線資料進行統計分析,並根據用戶端的線上狀態推送訊息。雲Message QueueTT 版提供非同步上下線事件通知擷取用戶端線上狀態,MQTT用戶端的上下線事件將會觸發MQTT服務端產生一條通知訊息,擷取通知訊息有以下方式:

  • 通過雲端SDK接入雲Message QueueTT 版服務端擷取用戶端的線上狀態。更多資訊,請參見擷取MQTT用戶端線上狀態

  • 通過建立上下線通知規則,將線上狀態通知訊息推送至雲訊息佇列 RocketMQ 版,然後訂閱雲訊息佇列 RocketMQ 版擷取用戶端的線上狀態。

本文以建立上下線通知規則為例,介紹後端應用如何擷取用戶端線上狀態。

quick_start_client_stats_notify

網路訪問

雲Message QueueTT 版同時提供了公网接入点VPC 接入点
  • 公网接入点為本地公網環境訪問的IP地址,一般用於物聯網和移動互連網情境中;
  • VPC 接入点為雲上私網訪問的IP地址,一般用於雲端應用接入雲Message QueueTT 版
重要 用戶端使用存取點串連服務時務必使用網域名稱接入,不得直接使用網域名稱背後的IP地址直接連接,因為IP地址隨時會變化。在以下使用方式中出現的問題雲Message QueueTT 版產品方概不負責:
  • 用戶端不使用網域名稱接入而是使用IP地址接入,產品方更新了網域名稱解析導致原有IP地址失效。
  • 用戶端網路對IP地址設定網路防火牆策略,產品方更新了網域名稱解析後新IP地址被您的防火牆策略攔截。

前提條件

  • 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
  • 安裝Java 8或11。更多資訊,請參見安裝JDK

  • 已建立雲Message QueueTT 版執行個體、Topic和Group ID,具體操作,請參見建立資源

  • 已建立雲訊息佇列 RocketMQ 版執行個體、Topic和Group ID,具體操作,請參見步驟二:建立資源

重要
  • 雲Message QueueTT 版用戶端上下線通知規則僅支援雲訊息佇列 RocketMQ 版4.x系列執行個體。

  • 雲Message QueueTT 版用戶端上下線通知規則不能跨地區使用,因此,雲Message QueueTT 版雲訊息佇列 RocketMQ 版的資源都必須建立在同一地區。

1.建立上下線通知規則

  1. 登入雲Message QueueTT 版控制台,並在左側導覽列單擊实例列表

  2. 在頂部功能表列選擇目標地區,然後在執行個體列表中單擊執行個體名稱進入实例详情頁面。

  3. 在左側導覽列單擊规则管理

  4. 规则管理頁面,單擊RocketMQ 规则頁簽,然後單擊创建规则

  5. 创建规则頁面完成以下操作。

    1. 配置基本資料。輸入規則ID,選擇上下線通知的規則類型。

      image

    2. 配置規則源。選擇已經建立好的雲Message QueueTT 版的Group ID。

      image

    3. 配置規則目標。選擇已經建立好的雲訊息佇列 RocketMQ 版的執行個體和Topic。

      image

2.準備測試代碼

用戶端的上下線操作,上下線通知的訊息處理都需要程式碼完成,本文以Java的Demo樣本作為您代碼開發的參考。

2.1下載範例程式碼

  1. 下載mqtt-java-demo,並解壓該Demo工程包至您指定的檔案夾。

  2. 在解壓的Demo工程中找到lmq-java-demo檔案夾,將此檔案夾匯入IntelliJ IDEA,並確認pom.xml中已包含以下依賴。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. 配置訪問憑證。

    • 擷取AccessKey資訊。擷取方式,請參見建立AccessKey

    • 配置環境變數。雲Message QueueTT 版的AccessKey ID和AccessKey Secret的環境變數名稱分別為MQTT_AK_ENVMQTT_SK_ENV。關於配置環境變數的方法,請參見配置訪問憑證

2.2用戶端上下線代碼

MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java類中,按代碼注釋說明填寫雲Message QueueTT 版資源的參數。

測試中僅類比上下線的操作不需要發送訊息,可以將其中發送訊息的相關代碼去掉,範例程式碼如下。

用戶端上下線程式碼範例

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT 執行個體 ID,購買後控制台擷取
         */
        String instanceId = "XXXXX";
        /**
         * 存取點地址,購買 MQ4IOT 執行個體,且配置完成後即可擷取,存取點地址必須填寫分配的網域名稱,不得使用 IP 位址直接連接,否則可能會導致用戶端異常。
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * 帳號 AccessKey,從帳號系統控制台擷取
         * 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
         * 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
         * 本樣本以把AccessKey ID和AccessKey Secret儲存在環境變數為例說明。運行本程式碼範例之前,請先配置環境變數MQTT_AK_ENV和MQTT_SK_ENV
         * 例如:export MQTT_AK_ENV=<access_key_id>
         *      export MQTT_SK_ENV=<access_key_secret>
         * 需要將<access_key_id>替換為已準備好的AccessKey ID,<access_key_secret>替換為AccessKey Secret。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * 帳號 secretKey,從帳號系統控制台擷取,僅在Signature鑒權模式下需要設定
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * MQ4IOT clientId,由業務系統分配,需要保證每個 tcp 串連都不一樣,保證全域唯一,如果不同的用戶端對象(tcp 串連)使用了相同的 clientId 會導致串連異常斷開。
         * clientId 由兩部分組成,格式為 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申請,DeviceId 由業務方自己設定,clientId 總長度不得超過64個字元。
         */
        String clientId = "GID_XXXXX@@@XXXXX";

        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 用戶端使用的協議和連接埠必須匹配,具體參考文檔 https://www.alibabacloud.com/help/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
         * 如果是 SSL 加密則設定ssl://endpoint:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 用戶端設定好發送逾時時間,防止無限阻塞
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        Thread.sleep(Long.MAX_VALUE);
    }
}

2.3上下線通知訊息的處理代碼

上下線通知訊息被推送到雲訊息佇列 RocketMQ 版,消費者訂閱訊息後需要根據業務需求進行處理。

MQTTClientStatusNoticeProcessDemo.java類中按代碼注釋說明填寫雲訊息佇列 RocketMQ 版資源的參數,範例程式碼如下。

上下線通知訊息的處理程式碼範例

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * 初始化訊息佇列RocketMQ版接收用戶端,實際業務中一般部署在服務端應用中。
         */
        Properties properties = new Properties();
        /**
         * 設定訊息佇列RocketMQ版Group ID,在訊息佇列RocketMQ版控制台建立。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
         * 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
         * 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
         * 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
        /**
         * 設定TCP存取點,該存取點為訊息佇列RocketMQ版執行個體的存取點。進入訊息佇列RocketMQ版控制台執行個體詳情頁面擷取。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        /**
         * 使用訊息佇列RocketMQ版消費端來處理MQTT用戶端的上下線通知時,訂閱的Topic為上下線通知Topic。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * 用戶端狀態資料,實際生產環境中建議使用資料庫或者Redis等外部持久化儲存來儲存該資訊,避免應用重啟丟失狀態,本樣本以單機記憶體版實現為例。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         *  此處僅處理用戶端是否線上,因此只需要關注connect事件和tcpclean事件即可。
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 處理上下線通知的邏輯。
     * 實際部署過程中,消費上下線通知的應用可能部署多台機器,因此用戶端線上狀態的資料可以使用資料庫或者Redis等外部共用儲存來維護。
     * 其次需要單獨做訊息等冪處理,以免重複接收訊息導致狀態機器判斷錯誤。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * 首先儲存新的事件。
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * 讀取當前channel的事件列表。
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * 如果事件列表裡上線和下線事件都已經收到,則當前channel已經掉線,可以清理掉這個channel的資料。
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * 根據狀態表判斷一個clientId是否有活躍的TCP串連。
     * 1.如果沒有channel表,則用戶端一定不線上。
     * 2.如果channel表非空,檢查一下channel資料中是否僅包含上線事件,如果有則代表有活躍串連,用戶端線上。
     * 如果全部的channel都有掉線斷開事件則用戶端一定不線上。
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }
}

3.結果驗證

  1. 執行MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java類中的Main函數運行代碼類比用戶端上線操作。可以根據下面操作查詢用戶端狀態和訊息推送情況。

    說明

    可以通過終止Main函數的執行來類比用戶端的離線操作。

    • 查詢用戶端狀態。在雲Message QueueTT 版控制台设备状态查询頁面,根據Device ID查詢用戶端此時已是線上狀態,如下圖所示。

      image

    • 查詢事件訊息推送情況。在雲訊息佇列 RocketMQ 版控制台訊息查詢頁面,根據Topic查詢到上線事件訊息已經被推送過來,如下圖所示。

      image

  2. 執行MQTTClientStatusNoticeProcessDemo.java類中的Main函數運行代碼。此時已經收到上線的事件訊息,ClientStatus狀態也從false變成true,如下圖所示。

    image

更多資訊