本文介绍如何快速使用微消息队列MQTT版的Java SDK实现MQTT终端与终端消息的收发。
背景信息
微消息队列MQTT版最简单的使用场景即MQTT终端和终端交互,消息生产者和消费者均分布在终端设备。各终端设备均通过终端SDK微消息队列MQTT版与微消息队列MQTT版服务端连接实现消息收发。

本文以公网环境为例,说明如何使用Java SDK完成消息收发。
接入点说明
终端和云端服务与微消息队列MQTT版通信时,需要在各自的SDK代码中设置微消息队列MQTT版实例的接入点信息,通过接入点和微消息队列MQTT版服务端连接。
- 终端SDK接入点格式
使用终端SDK接入微消息队列MQTT版时,需要填写的接入点格式如下:
- 公网接入点:
MQTT实例ID.mqtt.aliyuncs.com
- VPC 接入点:
MQTT实例ID-internal-vpc.mqtt.aliyuncs.com
终端SDK接入点也可以直接在微消息队列MQTT版控制台实例详情页面的接入点页签中查看。
- 云端SDK接入点格式
使用云端SDK接入微消息队列MQTT版时,需要填写的接入点格式如下:
注意 仅实例内核版本为V3.3.0且实例地域属于中国内地的实例支持云端SDK接入。
- 公网接入点:
MQTT实例ID-server-internet.mqtt.aliyuncs.com
- VPC 接入点:
MQTT实例ID-server-internal.mqtt.aliyuncs.com
终端SDK接入点和云端SDK接入点同时支持公网接入点和VPC 接入点。公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入微消息队列MQTT版。
注意 SDK使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题
微消息队列MQTT版产品方概不负责:
- 终端或云端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
- 终端或云端网络侧对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
调用Java SDK收发消息
- 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client。
- 下载阿里云微消息队列MQTT版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo。
- 解压该Demo工程包至您指定的文件夹。
- 在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。
<dependencies>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
- 在MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java类中,按代码注释说明填写相应参数,主要涉及您已在创建资源中所创建的MQTT资源信息。然后执行Main函数运行代码实现消息收发。
示例代码如下。
package com.aliyun.openservices.lmq.example.demo;
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
/**
* 您在控制台创建的微消息队列MQTT版的实例ID。
*/
String instanceId = "XXXXX";
/**
* 设置接入点,进入微消息队列MQTT版控制台实例详情页面获取。
*/
String endPoint = "XXXXX.mqtt.aliyuncs.com";
/**
* AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
*/
String accessKey = "XXXXX";
/**
* AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
*/
String secretKey = "XXXXX";
/**
* MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。
* clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在微消息队列MQTT版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。
*/
String clientId = "GID_XXXXX@@@XXXXX";
/**
* 微消息队列MQTT版消息的一级Topic,需要在控制台创建才能使用。
* 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
*/
final String parentTopic = "XXXXX";
/**
* 微消息队列MQTT版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任意字符串。
* 需要注意的是,完整的Topic长度不得超过128个字符。
*/
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
/**
* QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 设置客户端发送超时时间,防止无限阻塞。
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 客户端连接成功后就需要尽快订阅需要的Topic。
*/
System.out.println("connect success");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
final String topicFilter[] = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
* 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
*/
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
/**
* 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
*/
mqttClient.publish(mq4IotTopic, message);
/**
* 微消息队列MQTT版支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的clientId,则可以直接发送点对点消息。
* 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的Topic格式规范是 {{parentTopic}}/p2p/{{targetClientId}}。
*/
final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}
结果验证
完成消息收发后,您可在微消息队列MQTT版控制台查询轨迹以验证消息是否发送并接收成功。详细信息,请参见消息轨迹查询。