您可以在物联网平台上自定义Topic类,设备将消息发送到自定义Topic中,服务端通过AMQP SDK获取设备上报消息;服务端通过调用物联网平台接口Pub向设备发布指令。自定义Topic通信不使用物模型,消息的数据结构由您自定义。
背景信息
本示例中,电子温度计定期与服务器进行数据的交互,传递温度和指令等信息。温度计向服务器上行发送当前的温度;服务器向温度计下行发送精度设置指令。

准备开发环境
本示例中,设备端和云端均使用Java语言的SDK,需先准备Java开发环境。您可从Java 官方网站下载,并安装Java最新开发环境。
创建产品和设备
设备发送消息给服务器
流程图:

在整个流程中:
- 服务器通过AMQP客户端接收消息,需配置AMQP客户端接入物联网平台,监听设备消息。具体操作,请参见Java SDK接入示例。
注意 AMQP订阅消息的消费组,必须与设备在相同的物联网平台实例下。
- 配置设备端SDK接入物联网平台,并发送消息。
- 配置设备认证信息。
final String productKey = "a1uzcH0****"; final String deviceName = "device1"; final String deviceSecret = "uwMTmVAMnxB***"; final String region = "cn-shanghai";
- 设置初始化连接参数,包括MQTT连接配置、设备信息和初始物模型属性。下文代码示例是将设备接入物联网平台公共实例。
LinkKitInitParams params = new LinkKitInitParams(); //LinkKit底层是MQTT协议,设置MQTT的配置。 IoTMqttClientConfig config = new IoTMqttClientConfig(); config.productKey = productKey; config.deviceName = deviceName; config.deviceSecret = deviceSecret; config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883"; //设备的信息。 DeviceInfo deviceInfo = new DeviceInfo(); deviceInfo.productKey = productKey; deviceInfo.deviceName = deviceName; deviceInfo.deviceSecret = deviceSecret; //报备的设备初始状态。 Map<String, ValueWrapper> propertyValues = new HashMap<String, ValueWrapper>(); params.mqttClientConfig = config; params.deviceInfo = deviceInfo; params.propertyValues = propertyValues;
- 初始化连接。
//连接并设置连接成功以后的回调函数。 LinkKit.getInstance().init(params, new ILinkKitConnectListener() { @Override public void onError(AError aError) { System.out.println("Init error:" + aError); } //初始化成功以后的回调。 @Override public void onInitDone(InitResult initResult) { System.out.println("Init done:" + initResult); } });
- 设备发送消息。
设备端连接物联网平台后,向以上定义的Topic发送消息。需将onInitDone函数内容替换为以下内容:
@Override public void onInitDone(InitResult initResult) { //设置Pub消息的Topic和内容。 MqttPublishRequest request = new MqttPublishRequest(); request.topic = "/" + productKey + "/" + deviceName + "/user/devmsg"; request.qos = 0; request.payloadObj = "{\"temperature\":35.0, \"time\":\"sometime\"}"; //发送消息并设置成功以后的回调。 LinkKit.getInstance().publish(request, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { System.out.println("onResponse:" + aResponse.getData()); } @Override public void onFailure(ARequest aRequest, AError aError) { System.out.println("onFailure:" + aError.getCode() + aError.getMsg()); } }); }
服务器收到消息如下:
Message {payload={"temperature":35.0, "time":"sometime"}, topic='/a1uzcH0****/device1/user/devmsg', messageId='1131755639450642944', qos=0, generateTime=1558666546105}
实际业务场景中,您需修改以下参数值。
参数 示例 说明 productKey a1uzcH0**** deviceName device1 deviceSecret uwMTmVAMnxB*** region cn-shanghai 您的物联网平台设备所在地域代码。地域代码表达方法,请参见地域和可用区。 channelHost iot-o***.mqtt.iothub.aliyuncs.com:1883 设备接入地址。详细说明,请参见查看实例终端节点。 注意 接入域名若没有携带端口号1883,需添加该端口号。request.topic "/" + productKey + "/" + deviceName + "/user/devmsg" 具有发布权限的自定义Topic。 request.payloadObj "{\"temperature\":35.0, \"time\":\"sometime\"}" 自定义的消息内容。 - 配置设备认证信息。
服务器发送消息给设备
流程图:

- 配置设备端SDK订阅Topic。
配置设备认证信息、设置初始化连接参数、初始化连接,请参见设备发送消息给服务器中的相应示例代码。
设备要接收服务器发送的消息,还需订阅消息Topic。
配置设备端订阅Topic示例如下:
//初始化成功以后的回调。 @Override public void onInitDone(InitResult initResult) { //设置订阅的Topic。 MqttSubscribeRequest request = new MqttSubscribeRequest(); request.topic = "/" + productKey + "/" + deviceName + "/user/cloudmsg"; request.isSubscribe = true; //发出订阅请求并设置订阅成功或者失败的回调函数。 LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() { @Override public void onSuccess() { System.out.println(""); } @Override public void onFailure(AError aError) { } }); //设置订阅的下行消息到来时的回调函数。 IConnectNotifyListener notifyListener = new IConnectNotifyListener() { //此处定义收到下行消息以后的回调函数。 @Override public void onNotify(String connectId, String topic, AMessage aMessage) { System.out.println( "received message from " + topic + ":" + new String((byte[])aMessage.getData())); } @Override public boolean shouldHandle(String s, String s1) { return false; } @Override public void onConnectStateChange(String s, ConnectState connectState) { } }; LinkKit.getInstance().registerOnNotifyListener(notifyListener); }
其中,
request.topic
值需修改为具有订阅权限的自定义Topic。 - 配置云端SDK调用物联网平台接口Pub发布消息。参数说明,请参见Pub,使用说明,请参见Java SDK使用说明。下文代码示例是向物联网平台公共实例下设备发布消息。
- 设置身份认证信息。
String regionId = "***"; String accessKey = "***"; String accessSecret = "***"; final String productKey = "***";
- 设置连接参数。
//设置client的参数。 DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKey, accessSecret); IAcsClient client = new DefaultAcsClient(profile);
- 设置消息发布参数。
PubRequest request = new PubRequest(); request.setQos(0); //设置发布消息的Topic。 request.setTopicFullName("/" + productKey + "/" + deviceName + "/user/cloudmsg"); request.setProductKey(productKey); //设置消息的内容,一定要用Base64编码,否则乱码。 request.setMessageContent(Base64.encode("{\"accuracy\":0.001,\"time\":now}"));
- 发送消息。
try { PubResponse response = client.getAcsResponse(request); System.out.println("pub success?:" + response.getSuccess()); } catch (Exception e) { System.out.println(e); }
设备端接收到的消息如下:msg = [{"accuracy":0.001,"time":now}]
- 设置身份认证信息。
附录:代码示例
说明 实际业务场景中,请按照上文描述,修改代码中相关参数的值。
下载Pub/Sub demo,包含本示例的云端SDK和设备端SDK配置代码Demo。
AMQP客户端接入物联网平台示例,请参见: