全部产品
Search
文档中心

云消息队列 MQTT 版:MQTT和RocketMQ进行数据互通(跨产品数据流出)

更新时间:Feb 23, 2024

如果您的云端应用需要使用云消息队列 RocketMQ 版产品的某些功能,例如顺序消息特性、事务消息特性等,您可以通过消息流入或流出规则将云消息队列 MQTT 版云消息队列 RocketMQ 版数据进行流转。本文介绍如何将云消息队列 MQTT 版的数据导出至其他阿里云产品。

背景信息

云消息队列 MQTT 版支持云端SDK,云上应用可直接通过云端SDK接入云消息队列 MQTT 版服务端进行消息收发。云端SDK使用,请参见云端开发概述

同时云消息队列 MQTT 版支持和其他云产品进行互通,当前支持的云产品有云消息队列 RocketMQ 版

本文以公网环境中的Java SDK为例说明如何将云消息队列 MQTT 版的数据导出至云消息队列 RocketMQ 版

此场景下可使用多语言的第三方开源SDK来实现消息收发。更多信息,请参见SDK下载

quick_start_data_outflow

重要 云消息队列 RocketMQ 版云消息队列 MQTT 版的Topic不能跨地域使用,因此,本文中所有资源都应在公网地域创建。

网络访问

云消息队列 MQTT 版同时提供了公网接入点VPC 接入点
  • 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
  • VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版
重要 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题云消息队列 MQTT 版产品方概不负责:
  • 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
  • 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
本文以公网接入点为例。云消息队列 MQTT 版云消息队列 RocketMQ 版的应用场景对比和消息属性映射关系请参见以下文档:

使用流程

MQTT客户端消息发送至后端应用的流程如下图所示。

quick_start_mqtt_to_rocketmq

前提条件

  • 安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
  • 下载安装JDK
  • 创建云消息队列 RocketMQ 版实例、Topic和Group ID,具体操作,请参见步骤二:创建资源

步骤一:创建MQTT实例并获取接入点

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择目标地域,然后在页面左上角单击创建实例

  3. 在弹出的付费方式面板中,付费方式固定为包年包月,您无需设置,直接在面板左下角单击确定

  4. 在弹出的商品购买页中,按需选择您需要购买的实例规格,选中微消息队列 MQTT 版(包年包月)服务协议,然后单击立即购买

    云消息队列 MQTT 版支持的实例类型及功能差异,请参见实例类型

  5. 确认订单页面,根据提示完成支付。

  6. 在支付成功页面单击返回控制台
  7. 回到云消息队列 MQTT 版控制台,在左侧导航栏单击实例列表,并将地域切换为您所购买的实例所对应的地域。
  8. 实例列表页面中,单击您所购买实例的名称或在其操作列单击详情,进入实例详情页面。
  9. 实例详情页面单击接入点页签,即可看到实例的接入点信息,本示例以公网接入点为例。
    接入点

步骤二:创建父级Topic

MQTT协议支持多级Topic,父级Topic需在控制台创建,子级Topic无需创建,使用时直接在代码中设置即可。命名格式为:父级Topic和各子级Topic间均使用正斜线(/)隔开,<父级Topic名称>/<二级Topic名称>/<三级Topic名称>,例如,SendMessage/demo/producer。父级Topic和子级Topic总长度不能超过64个字符。Topic详细信息,请参见名词解释

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。

  3. 在左侧导航栏单击Topic 管理,然后在页面左上角,单击创建 Topic
  4. 在创建Topic面板中,输入要创建的Topic名称描述,然后在左下角单击确定

步骤三:创建Group ID

Group ID详细信息,请参见名词解释

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。

  3. 在左侧导航栏单击Group 管理,然后在页面左上角单击创建 Group
  4. 在创建Group面板中,输入Group ID,然后在左下角单击确定

步骤四:创建数据流出规则

规则中填写的参数需与您创建的资源保持一致。

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。

  3. 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则
  4. 创建规则页面完成以下操作。
    1. 配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步
      参数取值示例说明
      规则ID111111规则的全局唯一标识,说明如下:
      • 只能包含字母、数字、短划线(-)和下划线(_),至少包含一个字母或数字。
      • 名称长度限制在3~64字符之间,长于64字符将被自动截取。
      • 创建后无法更新。
      描述migrate from rocketmq对规则的描述。
      状态启用是否启用当前规则,取值说明如下:
      • 启用
      • 停用
      规则类型数据流出创建的规则类型,取值说明如下:
      • 数据流出:用于将云消息队列 MQTT 版的数据导出至其他阿里云产品。详细信息,请参见跨云产品的数据流出
      • 数据流入:用于将其他阿里云产品的数据导入至云消息队列 MQTT 版。详细信息,请参见跨云产品数据流入
      • 上下线通知:用于将获取的云消息队列 MQTT 版客户端上下线事件数据导出至其他阿里云产品。详细信息,请参见MQTT客户端上下线事件数据流出
    2. 配置规则源配置向导页面,配置数据源,然后单击下一步
      参数取值示例说明
      TopicTopicA指定您需导出数据的源Topic,即云消息队列 MQTT 版的Topic。
    3. 配置规则目标配置向导页面,配置数据的流转目标,然后单击创建
      参数取值示例说明
      目标服务类型消息队列 RocketMQ 版指定您需将源Topic的数据转发至的目标云产品。
      说明 当前仅支持云消息队列 RocketMQ 版
      RocketMQ 实例MQ_INST_13801563067*****_BbyOD2jQ指定目标云产品的实例ID,即云消息队列 RocketMQ 版的实例ID。
      说明 仅支持选择和云消息队列 MQTT 版实例为同一地域的云产品实例。
      TopicTopicB指定目标云产品的资源键值,即云消息队列 RocketMQ 版的Topic。源Topic的数据将流转至TopicB。
    您可以在规则管理的规则列表查看到刚创建的数据流出规则。

步骤五:调用Java SDK收发消息

  1. 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client
  2. 下载阿里云云消息队列 MQTT 版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo
  3. 解压该Demo工程包至您指定的文件夹。
  4. 在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。

    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.5.Final</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
    说明 ons-client的版本信息,请参见版本说明
  5. MQ4IoTSendMessageToRocketMQ.java类中,按代码注释说明填写相应参数,主要涉及步骤一步骤三所创建的MQTT资源以及您在云消息队列 RocketMQ 版创建的相应资源,然后执行Main函数运行代码实现消息收发。

    示例代码如下。

    说明

    在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证

    云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENVMQTT_SK_ENV

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import java.util.Properties;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MQ4IoTSendMessageToRocketMQ {
        public static void main(String[] args) throws Exception {
            /**
             * 初始化云消息队列 RocketMQ 版接收客户端,实际业务中一般部署在服务端应用中。
             */
            Properties properties = new Properties();
            /**
             * 设置云消息队列 RocketMQ 版Group ID,在云消息队列 RocketMQ 版控制台创建。
             */
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
            
            /**
             * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
             * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
             * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
             * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
             */
            properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
            /**
             * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
             */
            properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
          
             /**
             * 设置TCP接入点,该接入点为云消息队列 RocketMQ 版实例的接入点。云消息队列 RocketMQ 版控制台实例详情页面获取。
             */
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
            /**
             * 设置云消息队列 RocketMQ 版的Topic,在云消息队列 RocketMQ 版控制台创建。
             * 云消息队列 RocketMQ 版和微消息队列MQTT配合使用时,RocketMQ客户端仅操作一级Topic。
             */
            final String parentTopic = "XXXXX";
            Consumer consumer = ONSFactory.createConsumer(properties);
            consumer.subscribe(parentTopic, "*", new MessageListener() {
                public Action consume(Message message, ConsumeContext consumeContext) {
                    System.out.println("recv msg:" + message);
                    return Action.CommitMessage;
                }
            });
            consumer.start();
            //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
            /**
             * 初始化云消息队列 MQTT 版发送客户端,实际业务中云消息队列 MQTT 版一般部署在移动终端环境。
             */
    
            /**
             * 您在控制台创建的微消息队列MQTT的实例ID。
             */
            String instanceId = "XXXXX";
             /**
             * 设置接入点,进入微消息队列MQTT版控制台实例详情页面获取。
             */
            String endPoint = "XXXXXX.mqtt.aliyuncs.com";
            /**
             * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
             * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
             * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
             * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
            /**
             * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。
             * clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在云消息队列 MQTT 版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。
             */
            String clientId = "GID_XXXX@@@XXXXX";
           /**
             * 云消息队列 MQTT 版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任何字符串。
             * 需要注意的是,完整的Topic长度不得超过128个字符。
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
             /**
             * 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * 设置客户端发送超时时间,防止无限阻塞。
             */
            mqttClient.setTimeToWait(5000);
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * 客户端连接成功后就需要尽快订阅需要的Topic。
                     */
                    System.out.println("connect success");
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                /**
                 *  发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
                 */
                mqttClient.publish(mq4IotTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
    
        }
    
    }

结果验证

完成消息收发后,您可在云消息队列 MQTT 版控制台查询轨迹以验证消息是否发送并接收成功。详细信息,请参见消息轨迹查询

更多信息