您可通过配置云消息队列 MQTT 版的客户端上下线通知规则,将获取的MQTT客户端上下线事件数据导出至其他阿里云产品。该方法为异步上下线通知。本文介绍客户端上下线通知的原理、应用场景、使用限制以及云消息队列 MQTT 版与其他阿里云产品的资源映射关系。
基本原理

该方式属于异步感知客户端的状态,且感知到的是上下线事件,而非在线状态,云端应用需要根据事件发生的时间序列分析出客户端的状态。
异步上下线通知因为采用消息解耦,状态判断更加复杂,且误判可能性更大,但该方法可以基于事件分析多个客户端的运行状态轨迹。
应用场景
客户端上下线通知主要的应用场景为业务应用需要在客户端上线或者下线时触发一些预定义的动作。
例如,客户端在线状态聚合。此场景中,MQTT客户端产生上下线等状态变更,云消息队列 MQTT 版会根据您配置的客户端状态通知规则,将状态变更封装后转发到云消息队列 RocketMQ 版消息的方式来实现客户端状态数据的聚合和统计。
使用限制
当前仅支持将云消息队列 RocketMQ 版的数据导入云消息队列 MQTT 版。
资源映射方式
同一个云消息队列 MQTT 版Group ID下的所有客户端的状态变更通知都会转发到您配置的同一个其他阿里云产品的资源里。
MQTT资源 | 其他阿里云产品 | 其他阿里云产品资源 | 数据包定义 |
---|---|---|---|
MQTT Group ID | 云消息队列 RocketMQ 版 | 云消息队列 RocketMQ 版的Topic | MQTT和RocketMQ的消息结构映射 |
操作流程
如上文所述,如果使用异步上下线通知的方式,您需创建客户端上下线事件通知规则,将上下线通知的消息导出至后端云产品中。下文以使用云消息队列 RocketMQ 版后端云产品为例进行说明。
- 创建上下线通知规则。
您需关注哪些Group ID分组的设备,就在云消息队列 MQTT 版控制台创建规则时,选定相应的Group ID。创建规则的详细步骤,请参见创建上下线通知规则。
- 业务应用订阅该类通知消息。
通过步骤1中创建的规则,即可收到关注的客户端的上下线事件。云消息队列 RocketMQ 版的接收程序,请参见订阅消息。示例代码详细信息,请参见MQTTClientStatusNoticeProcessDemo.java。
事件类型放在云消息队列 RocketMQ 版的Tag中,代表上线或下线。数据格式如下:
MQ Tag:connect/disconnect/tcpclean
其中:
- connect事件代表客户端上线动作。
- disconnect事件代表客户端主动断开连接。按照MQTT协议,客户端主动断开TCP连接之前应该发送disconnect 报文,MQTT服务器在收到disconnect 报文后触发该类型消息。如果某些客户端SDK没有按照协议发送disconnect 报文,MQTT服务器相应无法收到该消息。
- tcpclean事件代表实际的TCP连接断开。无论客户端是否显示发送过disconnect 报文,只要当前TCP连接断开就会触发tcpclean事件。
说明tcpclean消息代表客户端网络层连接的真实断开。对应的,disconnect消息仅仅代表客户端是主动发送了下线报文。受限于客户端的实现,有时候客户端异常退出会导致disconnect消息并没有正常发送。因此判断客户端下线请使用tcpclean事件。
数据内容为JSON类型,相关的Key说明如下:
- clientId代表具体设备。
- time代表本次事件的时间。
- eventType代表事件类型,供客户端区分事件类型。
- channelId代表每个TCP连接的唯一标识。
- clientIp代表客户端使用的公网出口IP地址。
示例如下:
clientId:GID_XXX@@@YYYYY time:1212121212 eventType:connect/disconnect/tcpclean channelId:2b9b1281046046faafe5e0b458e4XXXX clientIp:192.168.XX.XX:133XX
判断客户端当前是否在线不能仅仅根据收到的最后一条消息的状态,而需要结合上下线消息的前后关联来判断。
具体判断规则如下:
- 同一个clientId的客户端,产生上下线事件的先后顺序以时间为准,基本原则为时间戳越大则越新。
- 同一个clientId的客户端,可能存在多次闪断,因此,当收到下线消息时,一定要根据channelId字段判断是否是当前的TCP连接。简而言之,下线消息只能覆盖channelId相同的下线消息,如果下线消息的channelId不一样,尽管time较新,也不能覆盖。一个channelId代表一个TCP连接,只会存在一个connect事件和一个close事件。
package com.aliyun.openservices.lmq.example.demo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Map; import java.util.Properties; import java.util.Set; public class MQTTClientStatusNoticeProcessDemo { public static void main(String[] args) { /** * 初始化消息队列RocketMQ版接收客户端,实际业务中一般部署在服务端应用中。 */ Properties properties = new Properties(); /** * 设置RocketMQ客户端的Group ID,注意此处的groupId和MQTT实例中的groupId是两个概念,请按照各自产品的说明申请填写。 */ properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX"); /** * 账号AccessKey ID,从账号系统控制台获取。 */ properties.put(PropertyKeyConst.AccessKey, "XXXX"); /** * 账号AccessKey Secret,从账号系统控制台获取,仅在Signature鉴权模式下需要设置。 */ properties.put(PropertyKeyConst.SecretKey, "XXXX"); /** * 设置TCP接入域名。 */ properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX"); /** * 使用RocketMQ消费端来处理MQTT客户端的上下线通知时,订阅的Topic为上下线通知Topic,请遵循控制台文档提前创建。 */ final String parentTopic = "GID_XXXX_MQTT"; /** * 客户端状态数据,实际生产环境中建议使用数据库或者Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本Demo以单机内存版实现做演示。 */ MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl(); Consumer consumer = ONSFactory.createConsumer(properties); /** * 此处仅处理客户端是否在线,因此只需要关注connect事件和tcpclean事件即可。 */ consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore)); consumer.start(); String clientId = "GID_XXXXxXX@@@XXXXX"; while (true) { System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 处理上下线通知的逻辑。 * 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者Redis等外部共享存储来维护。 * 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。 */ static class MqttClientStatusNoticeListener implements MessageListener { private MqttClientStatusStore mqttClientStatusStore; public MqttClientStatusNoticeListener( MqttClientStatusStore mqttClientStatusStore) { this.mqttClientStatusStore = mqttClientStatusStore; } @Override public Action consume(Message message, ConsumeContext context) { try { JSONObject msgBody = JSON.parseObject(new String(message.getBody())); System.out.println(msgBody); String eventType = msgBody.getString("eventType"); String clientId = msgBody.getString("clientId"); String channelId = msgBody.getString("channelId"); ClientStatusEvent event = new ClientStatusEvent(); event.setChannelId(channelId); event.setClientIp(msgBody.getString("clientIp")); event.setEventType(eventType); event.setTime(msgBody.getLong("time")); /** * 首先存储新的事件。 */ mqttClientStatusStore.addEvent(clientId, channelId, eventType, event); /** * 读取当前channel的事件列表。 */ Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId); if (events == null || events.isEmpty()) { return Action.CommitMessage; } /** * 如果事件列表里上线和下线事件都已经收到,则当前channel已经掉线,可以清理掉这个channel的数据。 */ boolean findOnlineEvent = false; boolean findOfflineEvent = false; for (ClientStatusEvent clientStatusEvent : events) { if (clientStatusEvent.isOnlineEvent()) { findOnlineEvent = true; } else { findOfflineEvent = true; } } if (findOnlineEvent && findOfflineEvent) { mqttClientStatusStore.deleteEvent(clientId, channelId); } return Action.CommitMessage; } catch (Throwable e) { e.printStackTrace(); } return Action.ReconsumeLater; } } /** * 根据状态表判断一个clientId是否有活跃的TCP连接。 * 1.如果没有channel表,则一定不在线。 * 2.如果channel表非空,检查一下channel数据中是否仅包含上线事件,如果有则代表有活跃连接在线。 * 如果全部的channel都有掉线断开事件则一定是不在线。 * * @param clientId * @param mqttClientStatusStore * @return */ public static boolean checkClientOnline(String clientId, MqttClientStatusStore mqttClientStatusStore) { Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId); if (channelMap == null) { return false; } for (Set<ClientStatusEvent> events : channelMap.values()) { boolean findOnlineEvent = false; boolean findOfflineEvent = false; for (ClientStatusEvent event : events) { if (event.isOnlineEvent()) { findOnlineEvent = true; } else { findOfflineEvent = true; } } if (findOnlineEvent & !findOfflineEvent) { return true; } } return false; } }
更多信息
如需了解控制台上的操作,请参见上下线通知规则管理。