全部产品
Search
文档中心

云消息队列 MQTT 版:MQTT和RocketMQ进行数据互通(客户端上下线通知)

更新时间:Sep 05, 2023

MQTT客户端的上下线事件将会触发MQTT服务端生成一条通知消息,云消息队列 MQTT 版支持将该条消息数据导出至其他阿里云产品,并使用MQTT的Java SDK实现MQTT客户端与后端应用收发消息。本文以当前仅支持的云消息队列 RocketMQ 版数据互通为例进行说明。

背景信息

云消息队列 MQTT 版支持云端SDK,云上应用可直接通过云端SDK接入云消息队列 MQTT 版服务端进行消息收发并获取客户端的上下线状态。更多信息,请参见获取MQTT客户端在线状态

同时云消息队列 MQTT 版支持和其他云产品进行互通,当前支持的云产品有云消息队列 RocketMQ 版。如果您的云端应用需要使用云消息队列 RocketMQ 版产品的某些功能,例如顺序消息特性、事务消息特性等,您可以通过消息流入或流出规则将云消息队列 MQTT 版云消息队列 RocketMQ 版数据进行流转。

本文以公网环境中的Java SDK为例说明云消息队列 MQTT 版如何将MQTT客户端上下线事件通知的消息发送至后端应用。

此场景下可使用多语言的第三方开源SDK来实现消息收发。更多信息,请参见SDK下载

quick_start_client_stats_notify
重要 云消息队列 RocketMQ 版云消息队列 MQTT 版的Topic不能跨地域使用,因此,本文中所有资源都应在公网地域创建。

网络访问

云消息队列 MQTT 版同时提供了公网接入点VPC 接入点
  • 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
  • VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版
重要 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题云消息队列 MQTT 版产品方概不负责:
  • 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
  • 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
本文以公网接入点为例。云消息队列 MQTT 版云消息队列 RocketMQ 版的应用场景对比和消息属性映射关系请参见以下文档:

使用流程

MQTT客户端上下线通知的消息收发流程如下图所示。

quick_start_client_status_notify_process

前提条件

  • 安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
  • 下载安装JDK
  • 创建云消息队列 RocketMQ 版实例、Topic和Group ID,具体操作,请参见步骤二:创建资源

步骤一:创建MQTT实例并获取接入点

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表
  2. 在顶部菜单栏选择目标地域,然后在页面左上角单击创建实例
  3. 在弹出的付费方式面板中,付费方式固定为包年包月,您无需设置,直接在面板左下角单击确定
  4. 在弹出的实例规格面板中,按需选择您需要购买的实例规格,选中微消息队列 MQTT 版(包年包月)服务协议,然后单击立即购买
  5. 在订单支付面板,根据提示完成支付。
  6. 在支付成功页面单击返回控制台
  7. 回到云消息队列 MQTT 版控制台,在左侧导航栏单击实例列表,并将地域切换为您所购买的实例所对应的地域。
  8. 实例列表页面中,单击您所购买实例的名称或在其操作列单击详情,进入实例详情页面。
  9. 实例详情页面单击接入点页签,即可看到实例的接入点信息,本示例以公网接入点为例。
    接入点

步骤二:创建父级Topic

MQTT协议支持多级Topic,父级Topic需在控制台创建,子级Topic无需创建,使用时直接在代码中设置即可。命名格式为:父级Topic和各子级Topic间均使用正斜线(/)隔开,<父级Topic名称>/<二级Topic名称>/<三级Topic名称>,例如,SendMessage/demo/producer。父级Topic和子级Topic总长度不能超过64个字符。Topic详细信息,请参见名词解释

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表
  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
  3. 在左侧导航栏单击Topic 管理,然后在页面左上角,单击创建 Topic
  4. 在创建Topic面板中,输入要创建的Topic名称描述,然后在左下角单击确定

步骤三:创建Group ID

Group ID详细信息,请参见名词解释

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表
  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
  3. 在左侧导航栏单击Group 管理,然后在页面左上角单击创建 Group
  4. 在创建Group面板中,输入Group ID,然后在左下角单击确定

步骤四:创建客户端上下线通知规则

规则中填写的参数需与您创建的资源保持一致。

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表
  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
  3. 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则
  4. 创建规则页面完成以下操作。
    1. 配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步
      参数取值示例说明
      规则ID111111规则的全局唯一标识,说明如下:
      • 只能包含字母、数字、短划线(-)和下划线(_),至少包含一个字母或数字。
      • 名称长度限制在3~64字符之间,长于64字符将被自动截取。
      • 创建后无法更新。
      描述migrate from rocketmq对规则的描述。
      状态启用是否启用当前规则,取值说明如下:
      • 启用
      • 停用
      规则类型上下线通知创建的规则类型,取值说明如下:
      • 数据流出:用于将云消息队列 MQTT 版的数据导出至其他阿里云产品。详细信息,请参见跨云产品的数据流出
      • 数据流入:用于将其他阿里云产品的数据导入至云消息队列 MQTT 版。详细信息,请参见跨云产品数据流入
      • 上下线通知:用于将获取的云消息队列 MQTT 版客户端上下线事件数据导出至其他阿里云产品。详细信息,请参见MQTT客户端上下线事件数据流出
    2. 配置规则源配置向导页面,配置数据源,然后单击下一步
      参数取值示例说明
      Group IDGID_Client_Status指定需导出数据的设备组。Group ID的详细信息,请参见名词解释
    3. 配置规则目标配置向导页面,配置数据的流转目标,然后单击创建
      参数取值示例说明
      目标服务类型消息队列 RocketMQ 版指定您需将云消息队列 MQTT 版客户端上下线通知流转至哪个目标云产品。
      说明 当前仅支持云消息队列 RocketMQ 版
      RocketMQ 实例MQ_INST_13801563067*****_BbyOD2jQ指定目标云产品的实例ID,即云消息队列 RocketMQ 版的实例ID。
      说明 仅支持选择和云消息队列 MQTT 版实例为同一地域的云产品实例。
      TopicTopicB指定目标云产品的资源键值,即云消息队列 RocketMQ 版的Topic。云消息队列 MQTT 版客户端上下线通知信息将流转至TopicB。
    您可以在规则管理的规则列表查看到刚创建的上下线通知规则。

步骤五:调用Java SDK收发消息

  1. 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client
  2. 下载阿里云云消息队列 MQTT 版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo
  3. 解压该Demo工程包至您指定的文件夹。
  4. 在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。

    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.5.Final</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
    说明 ons-client的版本信息,请参见版本说明
  5. MQTTClientStatusNoticeProcessDemo.java类中,按代码注释说明填写相应参数,主要涉及步骤一步骤三所创建MQTT资源以及您在云消息队列 RocketMQ 版创建的相应资源,然后执行Main函数运行代码实现消息收发。

    示例代码如下。

    说明 在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证

    云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENVMQTT_SK_ENV

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import java.util.Map;
    import java.util.Properties;
    import java.util.Set;
    
    public class MQTTClientStatusNoticeProcessDemo {
        public static void main(String[] args) {
            /**
             * 初始化消息队列RocketMQ版接收客户端,实际业务中一般部署在服务端应用中。
             */
            Properties properties = new Properties();
            /**
             * 设置消息队列RocketMQ版Group ID,在消息队列RocketMQ版控制台创建。
             */
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
            /**
             * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
             * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
             * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
             * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
             */
            properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
            /**
             * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
             */
            properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
            /**
             * 设置TCP接入点,该接入点为消息队列RocketMQ版实例的接入点。进入消息队列RocketMQ版控制台实例详情页面获取。
             */
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
            /**
             * 使用消息队列RocketMQ版消费端来处理MQTT客户端的上下线通知时,订阅的Topic为上下线通知Topic。
             */
            final String parentTopic = "GID_XXXX_MQTT";
            /**
             * 客户端状态数据,实际生产环境中建议使用数据库或者Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本示例以单机内存版实现为例。
             */
            MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
            Consumer consumer = ONSFactory.createConsumer(properties);
            /**
             *  此处仅处理客户端是否在线,因此只需要关注connect事件和tcpclean事件即可。
             */
            consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
            consumer.start();
            String clientId = "GID_XXXXX@@@XXXXX";
            while (true) {
                System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 处理上下线通知的逻辑。
         * 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者Redis等外部共享存储来维护。
         * 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。
         */
        static class MqttClientStatusNoticeListener implements MessageListener {
            private MqttClientStatusStore mqttClientStatusStore;
    
            public MqttClientStatusNoticeListener(
                MqttClientStatusStore mqttClientStatusStore) {
                this.mqttClientStatusStore = mqttClientStatusStore;
            }
    
            @Override
            public Action consume(Message message, ConsumeContext context) {
                try {
                    JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                    System.out.println(msgBody);
                    String eventType = msgBody.getString("eventType");
                    String clientId = msgBody.getString("clientId");
                    String channelId = msgBody.getString("channelId");
                    ClientStatusEvent event = new ClientStatusEvent();
                    event.setChannelId(channelId);
                    event.setClientIp(msgBody.getString("clientIp"));
                    event.setEventType(eventType);
                    event.setTime(msgBody.getLong("time"));
                    /**
                     * 首先存储新的事件。
                     */
                    mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                    /**
                     * 读取当前channel的事件列表。
                     */
                    Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                    if (events == null || events.isEmpty()) {
                        return Action.CommitMessage;
                    }
                    /**
                     * 如果事件列表里上线和下线事件都已经收到,则当前channel已经掉线,可以清理掉这个channel的数据。
                     */
                    boolean findOnlineEvent = false;
                    boolean findOfflineEvent = false;
                    for (ClientStatusEvent clientStatusEvent : events) {
                        if (clientStatusEvent.isOnlineEvent()) {
                            findOnlineEvent = true;
                        } else {
                            findOfflineEvent = true;
                        }
                    }
                    if (findOnlineEvent && findOfflineEvent) {
                        mqttClientStatusStore.deleteEvent(clientId, channelId);
                    }
                    return Action.CommitMessage;
                } catch (Throwable e) {
                    e.printStackTrace();
                }
                return Action.ReconsumeLater;
            }
        }
    
        /**
         * 根据状态表判断一个clientId是否有活跃的TCP连接。
         * 1.如果没有channel表,则客户端一定不在线。
         * 2.如果channel表非空,检查一下channel数据中是否仅包含上线事件,如果有则代表有活跃连接,客户端在线。
         * 如果全部的channel都有掉线断开事件则客户端一定不在线。
         *
         * @param clientId
         * @param mqttClientStatusStore
         * @return
         */
        public static boolean checkClientOnline(String clientId,
            MqttClientStatusStore mqttClientStatusStore) {
            Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
            if (channelMap == null) {
                return false;
            }
            for (Set<ClientStatusEvent> events : channelMap.values()) {
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent event : events) {
                    if (event.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent & !findOfflineEvent) {
                    return true;
                }
            }
            return false;
        }
    
    }

结果验证

完成消息收发后,您可在云消息队列 MQTT 版控制台查询轨迹以验证消息是否发送并接收成功。详细信息,请参见消息轨迹查询

更多信息