本文介绍如何将云消息队列 MQTT 版的数据导出至其他阿里云产品。本文以当前仅支持的云消息队列 RocketMQ 版数据互通为例进行说明。

前提条件

  • 安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
  • 下载安装JDK

背景信息

本文以公网环境中的Java SDK为例说明如何将云消息队列 MQTT 版的数据导出至云消息队列 RocketMQ 版

此场景下可使用多语言的第三方开源SDK来实现消息收发。更多信息,请参见SDK下载

quick_start_data_outflow

如上图所示,您部署在公网环境的MQTT客户端数据需要传送至公网地域的后端应用,后端应用和MQTT客户端均通过Java语言开发。数据从云消息队列 MQTT 版导出至云消息队列 RocketMQ 版是通过配置数据流出规则实现。两个产品的服务端通过各自产品提供的Java SDK分别与各自的客户端实现消息收发。

重要 云消息队列 RocketMQ 版云消息队列 MQTT 版的Topic不能跨地域使用,因此,本文中所有资源都应在公网地域创建。

网络访问

云消息队列 MQTT 版同时提供了公网接入点VPC 接入点
  • 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
  • VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版
重要 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题云消息队列 MQTT 版产品方概不负责:
  • 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
  • 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
本文以公网接入点为例。云消息队列 MQTT 版云消息队列 RocketMQ 版的应用场景对比和消息属性映射关系请参见以下文档:

使用流程

MQTT客户端消息发送至后端应用的流程如下图所示。

quick_start_mqtt_to_rocketmq

步骤一:创建MQTT实例并获取接入点

  1. 登录云消息队列 MQTT 版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 在弹出的付费方式面板中,付费方式固定为包年包月,您无需设置,直接在面板左下角单击确定
  5. 实例列表页面左上角单击创建实例
  6. 在弹出的实例规格面板中,按需选择您需要购买的实例规格,选中微消息队列 MQTT 版(包年包月)服务协议,然后单击立即购买
  7. 在订单支付面板,根据提示完成支付。
  8. 在支付成功页面单击返回控制台
  9. 回到云消息队列 MQTT 版控制台,在左侧导航栏单击实例列表,并将地域切换为您所购买的实例所对应的地域。
  10. 实例列表页面中,单击您所购买实例的名称或在其操作列单击详情,进入实例详情页面。
  11. 实例详情页面单击接入点页签,即可看到实例的接入点信息,本示例以公网接入点为例。
    接入点

步骤二:创建父级Topic

MQTT协议支持多级Topic,父级Topic需在控制台创建,子级Topic无需创建,使用时直接在代码中设置即可。命名格式为:父级Topic和各子级Topic间均使用正斜线(/)隔开,<父级Topic名称>/<二级Topic名称>/<三级Topic名称>,例如,SendMessage/demo/producer。父级Topic和子级Topic总长度不能超过64个字符。Topic详细信息,请参见名词解释

  1. 登录云消息队列 MQTT 版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 在实例列表中找到目标实例,在其操作列中,选择更多 > Topic 管理
  5. Topic 管理页面左上角,单击创建 Topic
  6. 在创建Topic面板中,输入要创建的Topic名称描述,然后在左下角单击确定
    您可以在Topic 管理页面查看刚创建的Topic。

步骤三:创建Group ID

Group ID详细信息,请参见名词解释

  1. 登录云消息队列 MQTT 版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 在实例列表中找到目标实例,在其操作列中,选择更多 > Group 管理
  5. Group 管理页面的左上角,单击创建 Group
  6. 在创建Group面板中,输入Group ID,然后在左下角单击确定
    您可以在Group 管理页面查看刚创建的Group。

步骤四:创建数据流出规则

规则中填写的参数需与您创建的资源保持一致。

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表
  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
  3. 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则
  4. 创建规则页面完成以下操作。
    1. 配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步
      参数取值示例说明
      规则ID111111规则的全局唯一标识,说明如下:
      • 只能包含字母、数字、短划线(-)和下划线(_),至少包含一个字母或数字。
      • 名称长度限制在3~64字符之间,长于64字符将被自动截取。
      • 创建后无法更新。
      描述migrate from rocketmq对规则的描述。
      状态启用是否启用当前规则,取值说明如下:
      • 启用
      • 停用
      规则类型数据流出创建的规则类型,取值说明如下:
      • 数据流出:用于将云消息队列 MQTT 版的数据导出至其他阿里云产品。详细信息,请参见跨云产品的数据流出
      • 数据流入:用于将其他阿里云产品的数据导入至云消息队列 MQTT 版。详细信息,请参见跨云产品数据流入
      • 上下线通知:用于将获取的云消息队列 MQTT 版客户端上下线事件数据导出至其他阿里云产品。详细信息,请参见MQTT客户端上下线事件数据流出
    2. 配置规则源配置向导页面,配置数据源,然后单击下一步
      参数取值示例说明
      TopicTopicA指定您需导出数据的源Topic,即云消息队列 MQTT 版的Topic。
    3. 配置规则目标配置向导页面,配置数据的流转目标,然后单击创建
      参数取值示例说明
      目标服务类型消息队列 RocketMQ 版指定您需将源Topic的数据转发至的目标云产品。
      说明 当前仅支持云消息队列 RocketMQ 版
      RocketMQ 实例MQ_INST_13801563067*****_BbyOD2jQ指定目标云产品的实例ID,即云消息队列 RocketMQ 版的实例ID。
      说明 仅支持选择和云消息队列 MQTT 版实例为同一地域的云产品实例。
      TopicTopicB指定目标云产品的资源键值,即云消息队列 RocketMQ 版的Topic。源Topic的数据将流转至TopicB。
    您可以在规则管理的规则列表查看到刚创建的数据流出规则。

步骤五:调用Java SDK收发消息

  1. 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client
  2. 下载阿里云云消息队列 MQTT 版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo
  3. 解压该Demo工程包至您指定的文件夹。
  4. 在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.5.Final</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
    说明 ons-client的版本信息,请参见版本说明
  5. MQ4IoTSendMessageToRocketMQ.java类中,按代码注释说明填写相应参数,主要涉及步骤一步骤三所创建的MQTT资源以及您在云消息队列 RocketMQ 版创建的相应资源,然后执行Main函数运行代码实现消息收发。
    示例代码如下。
    说明 在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证

    云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENVMQTT_SK_ENV

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import java.util.Properties;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    /**
     * 阿里云生产环境中,云消息队列 RocketMQ 版除了公网地域,其他地域的实例不允许在本地使用,必须在对应地域的ECS机器上部署使用。
     */
    public class MQ4IoTSendMessageToRocketMQ {
        public static void main(String[] args) throws Exception {
            /**
             * 初始化云消息队列 RocketMQ 版接收客户端,实际业务中一般部署在服务端应用中。
             */
            Properties properties = new Properties();
            /**
             * 设置云消息队列 RocketMQ 版Group ID,在云消息队列 RocketMQ 版控制台创建。
             */
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
            
            /**
             * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
             * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
             * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
             * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
             */
            properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
            /**
             * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
             */
            properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
          
             /**
             * 设置TCP接入点,该接入点为云消息队列 RocketMQ 版实例的接入点。云消息队列 RocketMQ 版控制台实例详情页面获取。
             */
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
            /**
             * 设置云消息队列 RocketMQ 版的Topic,在云消息队列 RocketMQ 版控制台创建。
             * 云消息队列 RocketMQ 版和微消息队列MQTT配合使用时,RocketMQ客户端仅操作一级Topic。
             */
            final String parentTopic = "XXXXX";
            Consumer consumer = ONSFactory.createConsumer(properties);
            consumer.subscribe(parentTopic, "*", new MessageListener() {
                public Action consume(Message message, ConsumeContext consumeContext) {
                    System.out.println("recv msg:" + message);
                    return Action.CommitMessage;
                }
            });
            consumer.start();
            //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
            /**
             * 初始化云消息队列 MQTT 版发送客户端,实际业务中云消息队列 MQTT 版一般部署在移动终端环境。
             */
    
            /**
             * 您在控制台创建的微消息队列MQTT的实例ID。
             */
            String instanceId = "XXXXX";
             /**
             * 设置接入点,进入微消息队列MQTT版控制台实例详情页面获取。
             */
            String endPoint = "XXXXXX.mqtt.aliyuncs.com";
            /**
             * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
             * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
             * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
             * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
            /**
             * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。
             * clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在云消息队列 MQTT 版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。
             */
            String clientId = "GID_XXXX@@@XXXXX";
           /**
             * 云消息队列 MQTT 版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任何字符串。
             * 需要注意的是,完整的Topic长度不得超过128个字符。
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
             /**
             * 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * 设置客户端发送超时时间,防止无限阻塞。
             */
            mqttClient.setTimeToWait(5000);
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * 客户端连接成功后就需要尽快订阅需要的Topic。
                     */
                    System.out.println("connect success");
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                /**
                 *  发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
                 */
                mqttClient.publish(mq4IotTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
    
        }
    
    }

结果验证

完成消息收发后,您可在云消息队列 MQTT 版控制台查询轨迹以验证消息是否发送并接收成功。详细信息,请参见消息轨迹查询

更多信息