本文介绍云消息队列 MQTT 版客户端连接服务端时的常见问题。

使用安卓的Demo,返回连接失败报错。

问题描述

  • 资源填写正确,且网络可以连通。
  • AK(AccessKey ID)、SK(AccessKey Secret)填写正确且已授权。

可能原因

接入点格式不正确,正确格式为“tcp://Endpoint:1883”或“tls://Endpoint:8883”。

为什么客户端能连接上,消费端一订阅就马上断连?

可能原因:Topic设置和命名格式不正确。请检查您的父级Topic和子级Topic的设置情况。
父级Topic(Parent Topic)
MQTT协议基于Pub/Sub模型,因此任何消息都属于一个Topic。根据MQTT协议,Topic存在多级,定义第一级Topic为父级Topic,使用云消息队列 MQTT 版前,需先在控制台创建该父级Topic,可以在云消息队列 MQTT 版控制台创建,或者直接在云消息队列 RocketMQ 版的控制台创建。
子级Topic(Subtopic)
MQTT的二级Topic,甚至三级Topic都是父级Topic下的子类。使用时无需在控制台创建,直接在代码中设置即可。命名格式为:父级Topic和各子级Topic间均使用正斜线(/)隔开,<父级Topic名称>/<二级Topic名称>/<三级Topic名称>,例如,SendMessage/demo/producer。需要注意的是云消息队列 MQTT 版限制父级Topic和子级Topic的总长度为64个字符,如果超出长度限制将会导致客户端异常。

为什么控制台显示的订阅关系数比实际使用的高很多?

cleanSession设置为false时,客户端离线后订阅关系也会一直保留,不会被清理。

若您不需要保留离线订阅关系,您可以使用订阅关系清理功能,自动清理离线客户端的订阅关系。

客户端一直断线重连,并且返回“invalid param”错误。

可能原因:消息体超过限制,服务端拒绝连接。云消息队列 MQTT 版中,消息体不能超过64 KB。

业务侧无改动,客户端和服务端连接突然断开。

可能原因:

  • 设备关机、网络不通导致客户端连接断开。
  • 若您使用的是Token鉴权方式,Token到期后连接会断开,重新建立连接时客户端未获取新的Token也会导致连接失败。

客户端心跳时间在哪里设置?

调用MqttConnectOptions类。

说明 在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证

云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENVMQTT_SK_ENV

示例如下:
String clientId = mqttTools.getClientId();
mqttConnectOptions = new MqttConnectOptions();
// 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
// 强烈建议不要把AccessKey ID保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
// 本示例以将AccessKey保存在环境变量为例说明。
mqttConnectOptions.setUserName("Signature|" + System.getenv("MQTT_AK_ENV") + "|" + instanceId);
mqttConnectOptions.setPassword(mqttTools.macSignature(clientId).toCharArray());
mqttConnectOptions.setCleanSession(true);
// 客户端心跳超时时间,默认90秒。
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
mqttConnectOptions.setConnectionTimeout(5000);
mqttLogger.info("MQTT is connect setting={},clientId={}", JSON.toJSONString(mqttConnectOptions), clientId);
mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
mqttClient.setTimeToWait(5000);
mqttClient.setCallback(this.getMqttCallbackExtended());
mqttClient.connect(mqttConnectOptions);

客户端连接时出现“valid owner failed”报错

请保证以下内容是否正确:
  • 实例名称是否填写正确?
  • Group和Topic是否已在控制台提前创建且名称填写正确?
  • Group或Topic是填写的指定实例下的资源?

设备偶尔出现“connection reset by peer”错误。

客户端ID(ClientID)重复时会互踢,导致服务端断开连接,而客户端继续往TCP连接中发送数据,会触发对端Socket发送RST报文。

请确保您设置的客户端ID全局唯一。更多信息,请参见客户端限制

如何配置MQTT开源Java SDK的客户端自动连接

开启SDK客户端自动重连功能

mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);

autoReconnect参数设置为true后,云消息队列 MQTT 版的Java SDK客户端会自动重连。

mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅的Topic。
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
                 * 超时时间请参见使用限制。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));

            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
});

这样客户端的回调中的connectComplete方法被调用时就代表连接成功或重连成功,业务方无需关心连接情况。这时候重新建立的连接与之前的连接已经不是一个,因此之前客户端的内部连接上的消息发送订阅等行为会失效。

此时需要重新订阅Topic,这样业务侧在新的内部连接中可以继续发送订阅消息。

@Override
public void connectComplete(boolean reconnect, String serverURI) {
    /**
    * 客户端连接成功后就需要尽快订阅需要的Topic。
    */
    System.out.println("connect success");
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                final String topicFilter[] = {mq4IotTopic};
                final int[] qos = {qosLevel};
                mqttClient.subscribe(topicFilter, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    });
}

业务侧手动维护客户端连接

mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(false);

autoReconnect参数设置为false时,云消息队列 MQTT 版的客户端断开后不会自动重连。

当连接断开后,业务侧会抓取到异常,需要手动将旧的连接关闭,然后重新建立新的连接。