全部产品
Search
文档中心

阿里云物联网平台:使用自定义Topic进行通信

更新时间:Nov 20, 2023

使用MQTT协议接入的设备和物联网平台,通过订阅Topic或向Topic发布消息的方式进行通信。Topic分为系统Topic、物模型Topic和自定义Topic,其中自定义Topic需要用户在控制台定义。 本文为您介绍设备使用自定义Topic与物联网平台进行上下行通信,以及物联网平台和业务服务器之间通信的步骤。

背景信息

本示例中,电子温度计通过订阅自定义Topic接收指令,通过向自定义Topic发布消息从而上报温度,物联网平台将收到的温度信息通过AMQP服务端转发到用户服务器,用户服务器调用Pub接口向自定义Topic发布消息从而实现对设备的远程精度设置。

自定义Topic通信

准备开发环境

本示例中,设备端和云端均使用Java语言的SDK,需先准备Java开发环境。您可从Java官方网站下载并安装Java开发环境。

本示例使用环境如下:

创建产品和设备

  1. 登录物联网平台控制台

  2. 实例概览页签的全部环境下,找到对应的实例,单击实例卡片。

  3. 在左侧导航栏,单击设备管理 > 产品

  4. 单击创建产品,创建温度计产品,获取productKey,例如a1uzcH0****

    详细操作指导,请参见创建产品

  5. 创建产品成功后,单击该产品对应的查看

  6. 产品详情页面的Topic类列表页签下,单击自定义Topic,增加自定义Topic类。

    详细操作指导,请参见使用自定义Topic通信

    本示例中,定义了以下两个Topic类:

    • 设备发布消息Topic:/a1uzcH0****/${deviceName}/user/devmsg,权限为发布。

    • 设备订阅消息Topic:/a1uzcH0****/${deviceName}/user/cloudmsg,权限为订阅。

  7. 服务端订阅页签下,单击创建订阅,设置AMQP服务端订阅,订阅设备上报消息默认消费组

    设备上报消息包含自定义Topic消息和物模型消息。详细操作和说明,请参见配置AMQP服务端订阅

  8. 在左侧导航栏,选择设备管理 > 设备,然后在刚创建的温度计产品下,添加设备device1,获取设备证书ProductKeyDeviceNameDeviceSecret

    详细操作指导,请参见单个创建设备

设备发送消息给服务器

流程图:

自定义Topic通信

在整个流程中:

  • 服务器通过AMQP客户端接收消息,需配置AMQP客户端接入物联网平台,监听设备消息。具体操作,请参见Java SDK接入示例

    重要

    AMQP订阅消息的消费组,必须与设备在相同的物联网平台实例下。

  • 配置设备端SDK接入物联网平台,并发送消息。

    • 配置设备认证信息。

      final String productKey = "a1uzcH0****";
      final String deviceName = "device1";
      final String deviceSecret = "uwMTmVAMnxB****";
      final String region = "cn-shanghai";
      final String iotInstanceId = "iot-2w****";

      实际业务场景中,您需修改以下参数值。

      参数

      示例

      说明

      productKey

      a1uzcH0****

      设备证书信息。您可在物联网平台控制台的设备详情页面查看。具体操作,请参见查看具体设备信息

      deviceName

      device1

      deviceSecret

      uwMTmVAMnxB****

      region

      cn-shanghai

      您物联网平台设备所在地域的Region ID。Region ID表达方法,请参见地域列表

      iotInstanceId

      iot-2w****

      设备所属实例的ID。

      您可在控制台的实例概览页面查看。

      • 若有ID值,必须传入该ID值。

      • 若无实例概览页面或ID值,传入空值,即iotInstanceId = ""

    • 设置初始化连接参数,包括MQTT连接配置、设备信息和初始物模型属性。

      LinkKitInitParams params = new LinkKitInitParams();
      //LinkKit底层是MQTT协议,设置MQTT的配置。
      IoTMqttClientConfig config = new IoTMqttClientConfig();
      config.productKey = productKey;
      config.deviceName = deviceName;
      config.deviceSecret = deviceSecret;
      config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883";
      //设备的信息。
      DeviceInfo deviceInfo = new DeviceInfo();
      deviceInfo.productKey = productKey;
      deviceInfo.deviceName = deviceName;
      deviceInfo.deviceSecret = deviceSecret;
      //报备的设备初始状态。
      Map<String, ValueWrapper> propertyValues = new HashMap<String, ValueWrapper>();
      
      params.mqttClientConfig = config;
      params.deviceInfo = deviceInfo;
      params.propertyValues = propertyValues;

      实际业务场景中,您需修改以下参数值。

      参数

      示例

      说明

      config.channelHost

      config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883";

      MQTT设备接入域名。

      • 旧版公共实例:config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883";

      • 新版公共实例和企业版实例:config.channelHost = iotInstanceId + ".mqtt.iothub.aliyuncs.com:1883";

    • 初始化连接。

      //连接并设置连接成功以后的回调函数。
      LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
           @Override
           public void onError(AError aError) {
               System.out.println("Init error:" + aError);
           }
      
           //初始化成功以后的回调。
           @Override
           public void onInitDone(InitResult initResult) {
               System.out.println("Init done:" + initResult);
           }
       });
    • 设备发送消息。

      设备端连接物联网平台后,向自定义的Topic发送消息。您需将onInitDone函数内容替换为以下内容:

      @Override
       public void onInitDone(InitResult initResult) {
           //设置Pub消息的Topic和内容。
           MqttPublishRequest request = new MqttPublishRequest();
           request.topic = "/" + productKey + "/" + deviceName + "/user/devmsg";
           request.qos = 0;
           request.payloadObj = "{\"temperature\":35.0, \"time\":\"sometime\"}";
           //发送消息并设置成功以后的回调。
           LinkKit.getInstance().publish(request, new IConnectSendListener() {
               @Override
               public void onResponse(ARequest aRequest, AResponse aResponse) {
                   System.out.println("onResponse:" + aResponse.getData());
               }
      
               @Override
               public void onFailure(ARequest aRequest, AError aError) {
                   System.out.println("onFailure:" + aError.getCode() + aError.getMsg());
               }
           });
       }

      实际业务场景中,您需修改以下参数值。

      参数

      示例

      说明

      request.topic

      "/" + productKey + "/" + deviceName + "/user/devmsg"

      具有发布权限的自定义Topic。

      request.payloadObj

      "{\"temperature\":35.0, \"time\":\"sometime\"}"

      自定义的消息内容。

      服务器收到消息如下:

      Message
      {payload={"temperature":35.0, "time":"sometime"},
      topic='/a1uzcH0****/device1/user/devmsg',
      messageId='1131755639450642944',
      qos=0,
      generateTime=1558666546105}

服务器发送消息给设备

流程图:

自定义Topic通信

  • 配置设备端SDK订阅Topic。

    配置设备认证信息、设置初始化连接参数、初始化连接,请参见设备发送消息给服务器中的相应示例代码。

    设备要接收服务器发送的消息,还需订阅消息Topic。

    配置设备端订阅Topic示例如下:

    //初始化成功以后的回调。
    @Override
    public void onInitDone(InitResult initResult) {
        //设置订阅的Topic。
        MqttSubscribeRequest request = new MqttSubscribeRequest();
        request.topic = "/" + productKey + "/" + deviceName + "/user/cloudmsg";
        request.isSubscribe = true;
        //发出订阅请求并设置订阅成功或者失败的回调函数。
        LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {
            @Override
            public void onSuccess() {
                System.out.println("");
            }
    
            @Override
            public void onFailure(AError aError) {
    
            }
        });
    
        //设置订阅的下行消息到来时的回调函数。
        IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
            //此处定义收到下行消息以后的回调函数。
            @Override
            public void onNotify(String connectId, String topic, AMessage aMessage) {
                System.out.println(
                    "received message from " + topic + ":" + new String((byte[])aMessage.getData()));
            }
    
            @Override
            public boolean shouldHandle(String s, String s1) {
                return false;
            }
    
            @Override
            public void onConnectStateChange(String s, ConnectState connectState) {
    
            }
        };
        LinkKit.getInstance().registerOnNotifyListener(notifyListener);
    }

    其中,request.topic值需修改为具有订阅权限的自定义Topic。

  • 配置云端SDK调用物联网平台接口Pub发布消息。参数说明,请参见Pub,使用说明,请参见Java SDK使用说明

    • 设置身份认证信息。

       String regionId = "cn-shanghai";
       String accessKey = "LTAI4GFGQvKuqHJhFaj****";
       String accessSecret = "iMS8ZhCDdfJbCMeA005sieKe****";
       final String productKey = "a1uzcH0****";
       final String deviceName = "device1";
       final String iotInstanceId = "iot-2w****";

      实际业务场景中,您需修改以下参数值。

      参数

      示例

      说明

      accessKey

      LTAI4GFGQvKuqHJhFaj****

      您的阿里云账号的AccessKey ID和AccessKey Secret。

      登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。

      说明

      如果使用RAM用户,您需授予该RAM用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见授权RAM用户访问物联网平台

      accessSecret

      iMS8ZhCDdfJbCMeA005sieKe****

      productKey

      a1uzcH0****

      设备证书信息。您可在物联网平台控制台的设备详情页面查看。具体操作,请参见查看具体设备信息

      deviceName

      device1

      region

      cn-shanghai

      您物联网平台设备所在地域的Region ID。Region ID表达方法,请参见地域列表

      iotInstanceId

      iot-2w****

      设备所属实例的ID。

      您可在控制台的实例概览页面查看。

      • 若有ID值,必须传入该ID值。

      • 若无实例概览页面或ID值,传入空值,即iotInstanceId = ""

    • 设置连接参数。

      //设置client的参数。
      DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKey, accessSecret);
      IAcsClient client = new DefaultAcsClient(profile);
    • 设置消息发布参数。

      PubRequest request = new PubRequest();
      request.setIotInstanceId(iotInstanceId);
      request.setQos(0);
      //设置发布消息的Topic。
      request.setTopicFullName("/" + productKey + "/" + deviceName + "/user/cloudmsg");
      request.setProductKey(productKey);
      //设置消息的内容,一定要用Base64编码,否则乱码。
      request.setMessageContent(Base64.encode("{\"accuracy\":0.001,\"time\":now}"));

      实际业务场景中,您需修改调用接口的请求参数。详细说明,请参见Pub

    • 发送消息。

      try {
           PubResponse response = client.getAcsResponse(request);
           System.out.println("pub success?:" + response.getSuccess());
       } catch (Exception e) {
           System.out.println(e);
       }

      设备端接收到的消息如下:

      msg = [{"accuracy":0.001,"time":now}]

附录:代码示例

重要

实际业务场景中,请按照上文描述,修改相关代码及相关参数的值。

下载Pub/Sub demo,包含本示例的云端SDK和设备端SDK配置代码Demo。

AMQP客户端接入物联网平台示例,请参见: