全部产品
Search
文档中心

云消息队列 MQTT 版:快速使用MQTT的Java SDK收发消息(终端和云端消息收发)

更新时间:Jan 18, 2024

本文介绍如何快速使用云消息队列 MQTT 版的Java SDK实现MQTT终端和云端服务的消息收发。

前提条件

背景信息

MQTT终端和云端服务交互流程如下图所示。终端设备和云端服务可分别通过对应的SDK接入云消息队列 MQTT 版,实现终端和云端服务的双向通信。终端云端交互本文以公网环境为例,介绍使用Java SDK实现消息收发。更多消息收发示例代码,请参见终端Demo工程云端Demo工程

接入点说明

终端和云端服务与云消息队列 MQTT 版通信时,需要在各自的SDK代码中设置云消息队列 MQTT 版实例的接入点信息,通过接入点和云消息队列 MQTT 版服务端连接。

  • 终端SDK接入点格式

    使用终端SDK接入云消息队列 MQTT 版时,需要填写的接入点格式如下:

    • 公网接入点MQTT实例ID.mqtt.aliyuncs.com

    • VPC 接入点MQTT实例ID-internal-vpc.mqtt.aliyuncs.com

    终端SDK接入点也可以直接在云消息队列 MQTT 版控制台实例详情页面的接入点页签中查看。

  • 云端SDK接入点格式

    使用云端SDK接入云消息队列 MQTT 版时,需要填写的接入点格式如下:

    重要

    仅实例内核版本为V3.3.0且实例地域属于中国内地的实例支持云端SDK接入。

    • 公网接入点MQTT实例ID-server-internet.mqtt.aliyuncs.com

    • VPC 接入点MQTT实例ID-server-internal.mqtt.aliyuncs.com

说明

MQTT实例ID可在云消息队列 MQTT 版控制台实例详情页面的基础信息区域查看。

终端SDK接入点和云端SDK接入点同时支持公网接入点VPC 接入点公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版

重要

SDK使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题云消息队列 MQTT 版产品方概不负责:

  • 终端或云端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。

  • 终端或云端网络侧对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。

调用终端SDK发送消息

  1. 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client
  2. 下载终端SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo

  3. 解压该Demo工程包至您指定的文件夹。
  4. 在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。

    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. MQ4IoTProducerDemo.java类中,按代码注释说明填写相应参数,主要涉及您已在创建资源中所创建的MQTT资源信息。然后执行Main函数运行代码完成消息发送。

    示例代码如下。

    说明

    在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证

    云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENVMQTT_SK_ENV

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    
    public class MQ4IoTProducerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * 您创建的云消息队列 MQTT 版的实例ID。
             */
            String instanceId = "XXXXX";
            /**
             * 设置终端SDK的接入点,进入云消息队列 MQTT 版控制台实例详情页面的接入点页签查看。
             * 接入点地址必须填写分配的域名,不得使用IP地址直接连接,否则可能会导致客户端异常。
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
             * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
             * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
             * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
            /**
             * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。
             * clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在云消息队列 MQTT 版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * 云消息队列 MQTT 版消息的一级Topic,需要在控制台创建才能使用。
             * 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
             */
            final String parentTopic = "XXXXX";
            /**
             * 云消息队列 MQTT 版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任意字符串。
             * 需要注意的是,完整的Topic长度不得超过128个字符。
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
             /**
             * 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
             /**
             * 设置客户端发送超时时间,防止无限阻塞。
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * 客户端连接成功后就需要尽快订阅需要的Topic。
                     */
                    System.out.println("connect success");
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                     /**
                     * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                     * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
                     */
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                 /**
                 * 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * 云消息队列 MQTT 版支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的clientId,则可以直接发送点对点消息。
                 * 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的Topic格式规范是 {{parentTopic}}/p2p/{{targetClientId}}。
                 */
                String receiverId = "xxx";
                final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

调用云端SDK接收消息

  1. 下载云消息队列 MQTT 版提供的云端SDK。下载地址为云端SDK版本说明

  2. 下载云端SDK的Demo示例做为您代码开发的参考。下载地址为mqtt-server-sdk-demo

  3. 解压该Demo工程包至您指定的文件夹。
  4. 在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。

    <dependencies>
            <dependency>
                <groupId>com.alibaba.mqtt</groupId>
                <artifactId>server-sdk</artifactId>
                <version>1.0.0.Final</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
    </dependencies>
  5. MQTTConsumerDemo.java类中,按代码注释说明填写相应参数,主要涉及您已在创建资源中所创建好的MQTT资源信息。然后执行Main函数运行代码完成消息接收。

    示例代码如下。

    说明

    在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证

    云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENVMQTT_SK_ENV

    package com.aliyun.openservices.lmq.example;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.mqtt.server.ServerConsumer;
    import com.alibaba.mqtt.server.callback.MessageListener;
    import com.alibaba.mqtt.server.config.ChannelConfig;
    import com.alibaba.mqtt.server.config.ConsumerConfig;
    import com.alibaba.mqtt.server.model.MessageProperties;
    
    public class MQTTConsumerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * 设置云端SDK的接入点,请参见接入点说明中的云端SDK接入点格式。
             * 接入点地址必须填写分配的域名,不得使用IP地址直接连接,否则可能会导致服务端异常。
             */
            String domain = "domain";
    
            /**
             * 使用的协议和端口必须匹配,该参数值固定为5672。
             */
            int port = "port";
    
            /**
             * 您创建的云消息队列 MQTT 版的实例ID。
             */
            String instanceId = "instanceId";
    
            /**
             * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
             * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
             * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
             * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
    
            /**
             * 云消息队列 MQTT 版消息的一级Topic,需要在控制台创建才能使用。
             * 由于云端SDK订阅消息一般用于云上应用进行消息汇总和分析等场景,因此,云端SDK订阅消息不支持设置子级Topic。
             * 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
             */
            String firstTopic = "firstTopic";
    
            ChannelConfig channelConfig = new ChannelConfig();
            channelConfig.setDomain(domain);
            channelConfig.setPort(port);
            channelConfig.setInstanceId(instanceId);
            channelConfig.setAccessKey(accessKey);
            channelConfig.setSecretKey(secretKey);
    
            ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
            serverConsumer.start();
            serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
                @Override
                public void process(String msgId, MessageProperties messageProperties, byte[] payload) {
                    System.out.println("Receive:" + msgId + "," + JSONObject.toJSONString(messageProperties) + "," + new String(payload));
                }
            });
        }
    
    }
    说明

    云端SDK消息发送的示例代码,请参见MQTTProducerDemo.java