使用阿里云物联网平台可实现伪内网穿透,对无公网IP的树莓派服务器进行远程控制。本文以实现基于树莓派服务器远程控制为例,介绍伪内网穿透的实现流程,并提供开发代码示例。
背景信息
假如您在公司或家里使用树莓派搭建一个服务器,用于执行一些简单的任务,如启动某个脚本,开始下载文件等。但是,如果树莓派没有公网IP,您不在公司或家里的情况下,您就无法控制该服务器。如果使用其他内网穿透工具,也会经常出现断线的情况。为解决以上问题,您可以使用阿里云物联网平台的RRPC(同步远程过程调用)功能结合JSch库来实现对树莓派服务器的远程控制。
实现远程控制的流程
通过物联网平台远程控制树莓派服务器的流程:
- 在电脑上调用物联网平台RRPC接口发送SSH指令。
- 物联网平台接收到指令后,通过MQTT协议将SSH指令下发给树莓派服务器。
- 服务器执行SSH指令。
- 服务器将SSH指令执行结果封装成RRPC响应,通过MQTT协议上报到物联网平台。
- 物联网平台将RRPC响应回复给电脑。
说明 RRPC调用的超时限制为5秒。服务器5秒内未收到设备回复,会返回超时错误。如果您发送的指令操作耗时较长,可忽略该超时错误信息。
下载SDK和Demo
实现物联网平台远程控制树莓派,您需先进行服务端SDK和设备端SDK开发。
以下章节中,将介绍服务端SDK和设备端SDK的开发示例。
说明 本文示例中提供的代码仅支持一些简单的Linux命令,如uname、touch、mv等,不支持文件编辑等复杂的指令,需要您自行实现。
设备端SDK开发
下载、安装设备端SDK和下载SDK Demo后,您需添加项目依赖和增加以下Java文件。
项目可以导出成JAR包在树莓派上运行。
- 在pom.xml文件中,添加依赖。
<!-- 设备端SDK -->
<dependency>
<groupId>com.aliyun.alink.linksdk</groupId>
<artifactId>iot-linkkit-java</artifactId>
<version>1.1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
<scope>compile</scope>
</dependency>
<!-- SSH客户端 -->
<!-- https://mvnrepository.com/artifact/com.jcraft/jsch -->
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>
- 增加SSHShell.java文件,用于执行SSH指令。
public class SSHShell {
private String host;
private String username;
private String password;
private int port;
private Vector<String> stdout;
public SSHShell(final String ipAddress, final String username, final String password, final int port) {
this.host = ipAddress;
this.username = username;
this.password = password;
this.port = port;
this.stdout = new Vector<String>();
}
public int execute(final String command) {
System.out.println("ssh command: " + command);
int returnCode = 0;
JSch jsch = new JSch();
SSHUserInfo userInfo = new SSHUserInfo();
try {
Session session = jsch.getSession(username, host, port);
session.setPassword(password);
session.setUserInfo(userInfo);
session.connect();
Channel channel = session.openChannel("exec");
((ChannelExec) channel).setCommand(command);
channel.setInputStream(null);
BufferedReader input = new BufferedReader(new InputStreamReader(channel.getInputStream()));
channel.connect();
String line = null;
while ((line = input.readLine()) != null) {
stdout.add(line);
}
input.close();
if (channel.isClosed()) {
returnCode = channel.getExitStatus();
}
channel.disconnect();
session.disconnect();
} catch (JSchException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return returnCode;
}
public Vector<String> getStdout() {
return stdout;
}
}
- 增加SSHUserInfo.java文件,用于验证SSH账号密码。
public class SSHUserInfo implements UserInfo {
@Override
public String getPassphrase() {
return null;
}
@Override
public String getPassword() {
return null;
}
@Override
public boolean promptPassphrase(final String arg0) {
return false;
}
@Override
public boolean promptPassword(final String arg0) {
return false;
}
@Override
public boolean promptYesNo(final String arg0) {
if (arg0.contains("The authenticity of host")) {
return true;
}
return false;
}
@Override
public void showMessage(final String arg0) {
}
}
- 增加Device.java文件,用于创建MQTT连接。
public class Device {
/**
* 建立连接
*
* @param productKey 产品key
* @param deviceName 设备名称
* @param deviceSecret 设备密钥
* @throws InterruptedException
*/
public static void connect(String productKey, String deviceName, String deviceSecret) throws InterruptedException {
// 初始化参数
LinkKitInitParams params = new LinkKitInitParams();
// 设置 Mqtt 初始化参数
IoTMqttClientConfig config = new IoTMqttClientConfig();
config.productKey = productKey;
config.deviceName = deviceName;
config.deviceSecret = deviceSecret;
params.mqttClientConfig = config;
// 设置初始化设备证书信息,传入:
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.productKey = productKey;
deviceInfo.deviceName = deviceName;
deviceInfo.deviceSecret = deviceSecret;
params.deviceInfo = deviceInfo;
// 初始化
LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
public void onError(AError aError) {
System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
+ aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
}
public void onInitDone(InitResult initResult) {
System.out.println("init success !!");
}
});
// 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时
Thread.sleep(2000);
}
/**
* 发布消息
*
* @param topic 发送消息的topic
* @param payload 发送的消息内容
*/
public static void publish(String topic, String payload) {
MqttPublishRequest request = new MqttPublishRequest();
request.topic = topic;
request.payloadObj = payload;
request.qos = 0;
LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
@Override
public void onResponse(ARequest aRequest, AResponse aResponse) {
}
@Override
public void onFailure(ARequest aRequest, AError aError) {
}
});
}
/**
* 订阅消息
*
* @param topic 订阅消息的topic
*/
public static void subscribe(String topic) {
MqttSubscribeRequest request = new MqttSubscribeRequest();
request.topic = topic;
request.isSubscribe = true;
LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
@Override
public void onSuccess() {
}
@Override
public void onFailure(AError aError) {
}
});
}
/**
* 取消订阅
*
* @param topic 取消订阅消息的topic
*/
public static void unsubscribe(String topic) {
MqttSubscribeRequest request = new MqttSubscribeRequest();
request.topic = topic;
request.isSubscribe = false;
LinkKit.getInstance().getMqttClient().unsubscribe(request, new IConnectUnscribeListener() {
@Override
public void onSuccess() {
}
@Override
public void onFailure(AError aError) {
}
});
}
/**
* 断开连接
*/
public static void disconnect() {
// 反初始化
LinkKit.getInstance().deinit();
}
}
- 增加SSHDevice.java文件。SSHDevice.java包含main方法,用于接收RRPC指令,调用
SSHShell
执行SSH指令,返回RRPC响应。SSHDevice.java文件中,需要填写设备证书信息(ProductKey、DeviceName和DeviceSecret)和SSH账号密码。 public class SSHDevice {
// ===================需要填写的参数开始===========================
// 产品productKey
private static String productKey = "";
//
private static String deviceName = "";
// 设备密钥deviceSecret
private static String deviceSecret = "";
// 消息通信的topic,无需创建和定义,直接使用即可
private static String rrpcTopic = "/sys/" + productKey + "/" + deviceName + "/rrpc/request/+";
// ssh 要访问的域名或IP
private static String host = "127.0.0.1";
// ssh 用户名
private static String username = "";
// ssh 密码
private static String password = "";
// ssh 端口号
private static int port = 22;
// ===================需要填写的参数结束===========================
public static void main(String[] args) throws InterruptedException {
// 下行数据监听
registerNotifyListener();
// 建立连接
Device.connect(productKey, deviceName, deviceSecret);
// 订阅topic
Device.subscribe(rrpcTopic);
}
public static void registerNotifyListener() {
LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
@Override
public boolean shouldHandle(String connectId, String topic) {
// 只处理特定topic的消息
if (topic.contains("/rrpc/request/")) {
return true;
} else {
return false;
}
}
@Override
public void onNotify(String connectId, String topic, AMessage aMessage) {
// 接收rrpc请求并回复rrpc响应
try {
// 执行远程命令
String payload = new String((byte[]) aMessage.getData(), "UTF-8");
SSHShell sshExecutor = new SSHShell(host, username, password, port);
sshExecutor.execute(payload);
// 获取命令回显
StringBuffer sb = new StringBuffer();
Vector<String> stdout = sshExecutor.getStdout();
for (String str : stdout) {
sb.append(str);
sb.append("\n");
}
// 回复回显到服务端
String response = topic.replace("/request/", "/response/");
Device.publish(response, sb.toString());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void onConnectStateChange(String connectId, ConnectState connectState) {
}
});
}
}
服务端SDK开发
下载、安装服务端SDK和下载SDK Demo后,您需添加项目依赖和增加以下Java文件。
- 在pom.xml文件中,添加依赖。
<!-- 服务端SDK -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-iot</artifactId>
<version>6.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>3.5.1</version>
</dependency>
<!-- commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.8</version>
</dependency>
- 增加OpenApiClient.java文件,用于调用物联网平台开放接口。
public class OpenApiClient {
private static DefaultAcsClient client = null;
public static DefaultAcsClient getClient(String accessKeyID, String accessKeySecret) {
if (client != null) {
return client;
}
try {
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKeyID, accessKeySecret);
DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
client = new DefaultAcsClient(profile);
} catch (Exception e) {
System.out.println("create OpenAPI Client failed !! exception:" + e.getMessage());
}
return client;
}
}
- 增加SSHCommandSender.java文件。SSHCommandSender.java包含main方法,用于发送SSH指令和接收SSH指令响应。SSHCommandSender.java中,需要填写您的账号AccessKey信息、设备证书信息(ProductKey和DeviceName)、以及SSH指令。
public class SSHCommandSender {
// ===================需要填写的参数开始===========================
// 用户账号AccessKey
private static String accessKeyID = "";
// 用户账号AccesseKeySecret
private static String accessKeySecret = "";
// 产品Key
private static String productKey = "";
// 设备名称deviceName
private static String deviceName = "";
// ===================需要填写的参数结束===========================
public static void main(String[] args) throws ServerException, ClientException, UnsupportedEncodingException {
// Linux 远程命令
String payload = "uname -a";
// 构建RRPC请求
RRpcRequest request = new RRpcRequest();
request.setProductKey(productKey);
request.setDeviceName(deviceName);
request.setRequestBase64Byte(Base64.encodeBase64String(payload.getBytes()));
request.setTimeout(5000);
// 获取服务端请求客户端
DefaultAcsClient client = OpenApiClient.getClient(accessKeyID, accessKeySecret);
// 发起RRPC请求
RRpcResponse response = (RRpcResponse) client.getAcsResponse(request);
// RRPC响应处理
// response.getSuccess()仅表明RRPC请求发送成功,不代表设备接收成功和响应成功
// 需要根据RrpcCode来判定,参考文档https://www.alibabacloud.com/help/doc-detail/69797.htm
if (response != null && "SUCCESS".equals(response.getRrpcCode())) {
// 回显
System.out.println(new String(Base64.decodeBase64(response.getPayloadBase64Byte()), "UTF-8"));
} else {
// 回显失败,打印rrpc code
System.out.println(response.getRrpcCode());
}
}
}