背景資訊
在實際業務中服務端需要對用戶端的上下線資料進行統計分析,並根據用戶端的線上狀態推送訊息。雲Message QueueTT 版提供非同步上下線事件通知擷取用戶端線上狀態,MQTT用戶端的上下線事件將會觸發MQTT服務端產生一條通知訊息,擷取通知訊息有以下方式:
本文以建立上下線通知規則為例,介紹後端應用如何擷取用戶端線上狀態。

網路訪問
雲Message QueueTT 版同時提供了
公网接入点和
VPC 接入点。
- 公网接入点為本地公網環境訪問的IP地址,一般用於物聯網和移動互連網情境中;
- VPC 接入点為雲上私網訪問的IP地址,一般用於雲端應用接入雲Message QueueTT 版。
重要 用戶端使用存取點串連服務時務必使用網域名稱接入,不得直接使用網域名稱背後的IP地址直接連接,因為IP地址隨時會變化。在以下使用方式中出現的問題
雲Message QueueTT 版產品方概不負責:
- 用戶端不使用網域名稱接入而是使用IP地址接入,產品方更新了網域名稱解析導致原有IP地址失效。
- 用戶端網路對IP地址設定網路防火牆策略,產品方更新了網域名稱解析後新IP地址被您的防火牆策略攔截。
前提條件
- 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
安裝Java 8或11。更多資訊,請參見安裝JDK。
已建立雲Message QueueTT 版執行個體、Topic和Group ID,具體操作,請參見建立資源。
已建立雲訊息佇列 RocketMQ 版執行個體、Topic和Group ID,具體操作,請參見步驟二:建立資源。
1.建立上下線通知規則
登入雲Message QueueTT 版控制台,並在左側導覽列單擊实例列表。
在頂部功能表列選擇目標地區,然後在執行個體列表中單擊執行個體名稱進入实例详情頁面。
在左側導覽列單擊规则管理。
在规则管理頁面,單擊RocketMQ 规则頁簽,然後單擊创建规则。
在创建规则頁面完成以下操作。
配置基本資料。輸入規則ID,選擇上下線通知的規則類型。

配置規則源。選擇已經建立好的雲Message QueueTT 版的Group ID。

配置規則目標。選擇已經建立好的雲訊息佇列 RocketMQ 版的執行個體和Topic。

2.準備測試代碼
用戶端的上下線操作,上下線通知的訊息處理都需要程式碼完成,本文以Java的Demo樣本作為您代碼開發的參考。
2.1下載範例程式碼
下載mqtt-java-demo,並解壓該Demo工程包至您指定的檔案夾。
在解壓的Demo工程中找到lmq-java-demo檔案夾,將此檔案夾匯入IntelliJ IDEA,並確認pom.xml中已包含以下依賴。
<dependencies>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.70</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.5.Final</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
配置訪問憑證。
2.2用戶端上下線代碼
在MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java類中,按代碼注釋說明填寫雲Message QueueTT 版資源的參數。
測試中僅類比上下線的操作不需要發送訊息,可以將其中發送訊息的相關代碼去掉,範例程式碼如下。
用戶端上下線程式碼範例
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
/**
* MQ4IOT 執行個體 ID,購買後控制台擷取
*/
String instanceId = "XXXXX";
/**
* 存取點地址,購買 MQ4IOT 執行個體,且配置完成後即可擷取,存取點地址必須填寫分配的網域名稱,不得使用 IP 位址直接連接,否則可能會導致用戶端異常。
*/
String endPoint = "XXXXX.mqtt.aliyuncs.com";
/**
* 帳號 AccessKey,從帳號系統控制台擷取
* 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
* 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
* 本樣本以把AccessKey ID和AccessKey Secret儲存在環境變數為例說明。運行本程式碼範例之前,請先配置環境變數MQTT_AK_ENV和MQTT_SK_ENV
* 例如:export MQTT_AK_ENV=<access_key_id>
* export MQTT_SK_ENV=<access_key_secret>
* 需要將<access_key_id>替換為已準備好的AccessKey ID,<access_key_secret>替換為AccessKey Secret。
*/
String accessKey = System.getenv("MQTT_AK_ENV");
/**
* 帳號 secretKey,從帳號系統控制台擷取,僅在Signature鑒權模式下需要設定
*/
String secretKey = System.getenv("MQTT_SK_ENV");
/**
* MQ4IOT clientId,由業務系統分配,需要保證每個 tcp 串連都不一樣,保證全域唯一,如果不同的用戶端對象(tcp 串連)使用了相同的 clientId 會導致串連異常斷開。
* clientId 由兩部分組成,格式為 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申請,DeviceId 由業務方自己設定,clientId 總長度不得超過64個字元。
*/
String clientId = "GID_XXXXX@@@XXXXX";
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 用戶端使用的協議和連接埠必須匹配,具體參考文檔 https://www.alibabacloud.com/help/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
* 如果是 SSL 加密則設定ssl://endpoint:8883
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 用戶端設定好發送逾時時間,防止無限阻塞
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
Thread.sleep(Long.MAX_VALUE);
}
}
2.3上下線通知訊息的處理代碼
上下線通知訊息被推送到雲訊息佇列 RocketMQ 版,消費者訂閱訊息後需要根據業務需求進行處理。
在MQTTClientStatusNoticeProcessDemo.java類中按代碼注釋說明填寫雲訊息佇列 RocketMQ 版資源的參數,範例程式碼如下。
上下線通知訊息的處理程式碼範例
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class MQTTClientStatusNoticeProcessDemo {
public static void main(String[] args) {
/**
* 初始化訊息佇列RocketMQ版接收用戶端,實際業務中一般部署在服務端應用中。
*/
Properties properties = new Properties();
/**
* 設定訊息佇列RocketMQ版Group ID,在訊息佇列RocketMQ版控制台建立。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
/**
* AccessKey ID,阿里雲身分識別驗證,在阿里雲RAM控制台建立。
* 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
* 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
* 本樣本以將AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。
*/
properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
/**
* AccessKey Secret,阿里雲身分識別驗證,在阿里雲RAM控制台建立。僅在簽名鑒權模式下需要設定。
*/
properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
/**
* 設定TCP存取點,該存取點為訊息佇列RocketMQ版執行個體的存取點。進入訊息佇列RocketMQ版控制台執行個體詳情頁面擷取。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
/**
* 使用訊息佇列RocketMQ版消費端來處理MQTT用戶端的上下線通知時,訂閱的Topic為上下線通知Topic。
*/
final String parentTopic = "GID_XXXX_MQTT";
/**
* 用戶端狀態資料,實際生產環境中建議使用資料庫或者Redis等外部持久化儲存來儲存該資訊,避免應用重啟丟失狀態,本樣本以單機記憶體版實現為例。
*/
MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
Consumer consumer = ONSFactory.createConsumer(properties);
/**
* 此處僅處理用戶端是否線上,因此只需要關注connect事件和tcpclean事件即可。
*/
consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
consumer.start();
String clientId = "GID_XXXXX@@@XXXXX";
while (true) {
System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 處理上下線通知的邏輯。
* 實際部署過程中,消費上下線通知的應用可能部署多台機器,因此用戶端線上狀態的資料可以使用資料庫或者Redis等外部共用儲存來維護。
* 其次需要單獨做訊息等冪處理,以免重複接收訊息導致狀態機器判斷錯誤。
*/
static class MqttClientStatusNoticeListener implements MessageListener {
private MqttClientStatusStore mqttClientStatusStore;
public MqttClientStatusNoticeListener(
MqttClientStatusStore mqttClientStatusStore) {
this.mqttClientStatusStore = mqttClientStatusStore;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
System.out.println(msgBody);
String eventType = msgBody.getString("eventType");
String clientId = msgBody.getString("clientId");
String channelId = msgBody.getString("channelId");
ClientStatusEvent event = new ClientStatusEvent();
event.setChannelId(channelId);
event.setClientIp(msgBody.getString("clientIp"));
event.setEventType(eventType);
event.setTime(msgBody.getLong("time"));
/**
* 首先儲存新的事件。
*/
mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
/**
* 讀取當前channel的事件列表。
*/
Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
if (events == null || events.isEmpty()) {
return Action.CommitMessage;
}
/**
* 如果事件列表裡上線和下線事件都已經收到,則當前channel已經掉線,可以清理掉這個channel的資料。
*/
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent clientStatusEvent : events) {
if (clientStatusEvent.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent && findOfflineEvent) {
mqttClientStatusStore.deleteEvent(clientId, channelId);
}
return Action.CommitMessage;
} catch (Throwable e) {
e.printStackTrace();
}
return Action.ReconsumeLater;
}
}
/**
* 根據狀態表判斷一個clientId是否有活躍的TCP串連。
* 1.如果沒有channel表,則用戶端一定不線上。
* 2.如果channel表非空,檢查一下channel資料中是否僅包含上線事件,如果有則代表有活躍串連,用戶端線上。
* 如果全部的channel都有掉線斷開事件則用戶端一定不線上。
*
* @param clientId
* @param mqttClientStatusStore
* @return
*/
public static boolean checkClientOnline(String clientId,
MqttClientStatusStore mqttClientStatusStore) {
Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
if (channelMap == null) {
return false;
}
for (Set<ClientStatusEvent> events : channelMap.values()) {
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent event : events) {
if (event.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent & !findOfflineEvent) {
return true;
}
}
return false;
}
}
3.結果驗證
執行MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java類中的Main函數運行代碼類比用戶端上線操作。可以根據下面操作查詢用戶端狀態和訊息推送情況。
說明 可以通過終止Main函數的執行來類比用戶端的離線操作。
執行MQTTClientStatusNoticeProcessDemo.java類中的Main函數運行代碼。此時已經收到上線的事件訊息,ClientStatus狀態也從false變成true,如下圖所示。
