对于需要控制和反馈的场景,物联网平台提供同步服务的能力。本章描述如何实现同步服务调用。
背景信息
同步服务调用是用户服务器调用云端API,通过物联网平台下发服务请求到设备端,设备端在一定时间内完成服务逻辑的执行并返回结果到物联网平台的过程。用户服务器通过同步服务控制设备,并获取到设备返回结果。物联网平台的同步服务调用通过RRPC实现,详情请参见设备服务调用。
使用说明
- 只有设备端在线后,才能调用同步服务。
- 调用同步服务的最大超时时间为8秒,若8秒内物联网平台未收到回复,则返回超时错误。
步骤一:创建服务
步骤二:开发设备端
- 增加POM(project object model)依赖。
<dependency> <groupId>com.aliyun.alink.linksdk</groupId> <artifactId>iot-linkkit-java</artifactId> <version>1.2.0.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.40</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency>
更多信息,请参见远程配置。
- 增加Java类,修改Config.*文件中的参数为您的设备信息。
/* * Copyright © 2019 Alibaba. All rights reserved. */ package com.aliyun.iot.demo.alink; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.alibaba.fastjson.JSONObject; import com.aliyun.alink.dm.api.DeviceInfo; import com.aliyun.alink.dm.api.InitResult; import com.aliyun.alink.linkkit.api.ILinkKitConnectListener; import com.aliyun.alink.linkkit.api.IoTMqttClientConfig; import com.aliyun.alink.linkkit.api.LinkKit; import com.aliyun.alink.linkkit.api.LinkKitInitParams; import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest; import com.aliyun.alink.linksdk.cmp.core.base.AMessage; import com.aliyun.alink.linksdk.cmp.core.base.ARequest; import com.aliyun.alink.linksdk.cmp.core.base.AResponse; import com.aliyun.alink.linksdk.cmp.core.base.ConnectState; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener; import com.aliyun.alink.linksdk.tools.AError; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * 同步服务调用<br> */ public class SyncServiceProcessor { // ===================需要用户填写的参数,开始=========================== // 修改Config.*的参数为您的实际信息 // 站点id,根据实际站点获取对应id private static String regionId = "cn-shanghai"; // 产品productKey,设备证书信息之一 private static String productKey = "Config.productKey"; // 设备名称deviceName,设备证书信息之一 private static String deviceName = "Config.deviceName"; // 设备密钥deviceSecret,设备证书信息之一 private static String deviceSecret = "Config.deviceSecret"; // ===================需要用户填写的参数,结束=========================== private static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("http2-downlink-handle-%d").build(), new ThreadPoolExecutor.AbortPolicy()); /** * 1、启动本程序模拟设备在线 <br> * 2、启动InvokeSyncService发起服务调用 <br> * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // 下行数据监听 registerNotifyListener(); // 设备接入 connect(productKey, deviceName, deviceSecret); } /** * 建立连接 * * @param productKey 产品key * @param deviceName 设备名称 * @param deviceSecret 设备密钥 * @throws InterruptedException */ private static void connect(String productKey, String deviceName, String deviceSecret) throws InterruptedException { // 初始化参数 LinkKitInitParams params = new LinkKitInitParams(); // 设置 Mqtt 初始化参数 IoTMqttClientConfig config = new IoTMqttClientConfig(); config.channelHost = productKey + ".iot-as-mqtt." + regionId + ".aliyuncs.com:1883"; config.productKey = productKey; config.deviceName = deviceName; config.deviceSecret = deviceSecret; params.mqttClientConfig = config; // 设置初始化设备证书信息,用户传入 DeviceInfo deviceInfo = new DeviceInfo(); deviceInfo.productKey = productKey; deviceInfo.deviceName = deviceName; deviceInfo.deviceSecret = deviceSecret; params.deviceInfo = deviceInfo; // 初始化 LinkKit.getInstance().init(params, new ILinkKitConnectListener() { public void onError(AError aError) { System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode=" + aError.getSubCode() + ",subMsg=" + aError.getSubMsg()); } public void onInitDone(InitResult initResult) { System.out.println("init success !!"); } }); // 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时 Thread.sleep(2000); } /** * 发布消息 * * @param topic 发送消息的topic * @param payload 发送的消息内容 */ private static void publish(String topic, String payload) { MqttPublishRequest request = new MqttPublishRequest(); request.topic = topic; request.payloadObj = payload; request.qos = 0; LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { } @Override public void onFailure(ARequest aRequest, AError aError) { } }); } /** * 监听下行数据 */ private static void registerNotifyListener() { LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() { @Override public boolean shouldHandle(String connectId, String topic) { return true; } @Override public void onNotify(String connectId, String topic, AMessage aMessage) { // 接收同步服务调用,并响应 // 这里另起线程,避免回调堵塞 executorService.submit(() -> doService(topic, aMessage)); } @Override public void onConnectStateChange(String connectId, ConnectState connectState) { } }); } /** * 处理同步服务调用 * * @param topic 服务调用指令的topic * @param aMessage 服务调用指令的内容 */ private static void doService(String topic, AMessage aMessage) { String content = new String((byte[]) aMessage.getData()); System.out.println("服务请求Topic:" + topic); System.out.println("服务指令:" + content); /** * 服务请求 aMessage 消息体内容是一个json * { * "id": "123", * "version": "1.0", * "params": // 服务参数,取决于服务定义 * { * "input": 50 * }, * "method": "thing.service.{tsl.service.identifier}" * } */ JSONObject request = JSONObject.parseObject(content); JSONObject params = request.getJSONObject("params"); if (!params.containsKey("input")) { // 检查入参 return; } Integer input = params.getInteger("input"); // 获取入参 /** * 服务响应格式消息体内容是一个json * { * "id": "123", // 同上面服务请求的id * "code": 200, // 200表示成功 * "data": {} // 服务返回,取决于服务定义 * } */ JSONObject response = new JSONObject(); JSONObject data = new JSONObject(); data.put("output", input + 1); response.put("id", request.get("id")); response.put("code", 200); response.put("data", data); // 服务响应 String respTopic = topic.replace("/request/", "/response/"); publish(respTopic, response.toString()); System.out.println("服务响应Topic:" + respTopic); System.out.println("服务响应内容:" + response.toString()); } }
关于设备通用code内容,请参见设备端通用code。
步骤三:开发服务端SDK
- 增加POM依赖。
<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-iot</artifactId> <version>7.0.0</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.40</version> </dependency>
更多信息,请参见Java SDK使用说明。
- 增加Java类,修改Config.*文件中的参数为您的实际信息。
/* * Copyright © 2019 Alibaba. All rights reserved. */ package com.aliyun.iot.demo.alink; import com.alibaba.fastjson.JSONObject; import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.iot.model.v20180120.InvokeThingServiceRequest; import com.aliyuncs.iot.model.v20180120.InvokeThingServiceResponse; import com.aliyuncs.profile.DefaultProfile; import com.aliyuncs.profile.IClientProfile; /** * 同步服务调用<br> */ public class InvokeSyncService { // ===================需要用户填写的参数开始=========================== // 修改Config.*的参数为您的实际信息 // 站点id,根据实际站点获取对应id private static String regionId = "cn-shanghai"; // 用户账号AccessKey private static String accessKeyID = "Config.accessKey"; // 用户账号AccesseKeySecret private static String accessKeySecret = "Config.accessKeySecret"; // 产品productKey,执行服务的设备证书信息之一 private static String productKey = "Config.productKey"; // 设备名字deviceName,执行服务的设备证书信息之一 private static String deviceName = "Config.deviceName"; // ===================需要用户填写的参数结束=========================== /** * 1、启动SyncServiceProcessor模拟设备在线 <br> * 2、启动本程序发起服务调用 <br> * * @param args * @throws ServerException * @throws ClientException */ public static void main(String[] args) throws ServerException, ClientException { // 获取服务端请求客户端 DefaultAcsClient client = null; try { IClientProfile profile = DefaultProfile.getProfile(regionId, accessKeyID, accessKeySecret); DefaultProfile.addEndpoint(regionId, regionId, "Iot", "iot." + regionId + ".aliyuncs.com"); client = new DefaultAcsClient(profile); } catch (Exception e) { System.out.println("create OpenAPI Client failed !! exception:" + e.getMessage()); } // 填充服务调用的参数 InvokeThingServiceRequest request = new InvokeThingServiceRequest(); request.setProductKey(productKey); // 设备证书之productKey request.setDeviceName(deviceName); // 设备证书之deviceName request.setIdentifier("MySyncService"); // 要调用的服务标识符,取决于服务定义 JSONObject json = new JSONObject(); // 构造服务入参,服务入参是一个JSON String json.put("input", 50); // 取决于服务定义,取值要符合服务定义时配置的参数规格 request.setArgs(json.toString()); // 获得服务调用响应 InvokeThingServiceResponse response = client.getAcsResponse(request); if (response == null) { System.out.println("调用服务异常"); return; } System.out.println("requestId:" + response.getRequestId()); System.out.println("code:" + response.getCode()); System.out.println("success:" + response.getSuccess()); System.out.println("error message:" + response.getErrorMessage()); if (response != null && response.getSuccess()) { // 服务调用成功,仅代表发送服务指令的成功,不代表执行服务本身是否成功 System.out.println("服务调用成功"); System.out.println("消息id:" + response.getData().getMessageId()); System.out.println("服务返回结果:" + response.getData().getResult()); // 仅同步服务有result } else { System.out.println("服务调用失败"); } } }
其中,同步服务调用的更多信息,请参见InvokeThingService。