All Products
Search
Document Center

IoT Platform:Forward device data to business servers by using ApsaraMQ for RocketMQ

Last Updated:Jan 04, 2024

IoT Platform forwards data that is submitted by devices to ApsaraMQ for RocketMQ topics. Then, ApsaraMQ for RocketMQ forwards the data to your business servers. This topic describes how to forward data.

Prerequisites

  • An Alibaba Cloud account is created.

  • IoT Platform is activated.

  • ApsaraMQ for RocketMQ is activated.

    If ApsaraMQ for RocketMQ is not activated, go to the product page of ApsaraMQ for RocketMQ to activate the service.

  • A development environment is prepared. In this example, a Java development environment that consists of the following components is used.

Background information

Data forwarding process

消息流转MQ

Benefits:

  • When you connect devices to IoT Platform over Message Queuing Telemetry Transport (MQTT), data transmission links use Transport Layer Security (TLS) encryption to prevent data from being tampered with. For more information about MQTT, see MQTT protocol.

  • ApsaraMQ for RocketMQ is used as a message buffer to reduce traffic fluctuations and concurrent workloads of your business servers.

Procedure

  1. Log on to the IoT Platform console and create a product and a device.

    1. On the Overview page, click All environment. On the All environment tab, find the instance that you want to manage and click the instance ID or instance name.

      In this example, the China (Shanghai) region is selected.

    2. In the left-side navigation pane, choose Devices > Products. On the Products page, click Create Product. On the Create Product page, configure the parameters and click OK.

      In this example, the Product Name parameter is set to MQ_test and the Node Type parameter is set to Directly Connected Device. Use the default values for other parameters.

    3. Click View Product Details. On the product details page, choose Topic Categories > Topic Category and click Edit Topic Category to create a topic category that is used to submit device data.

      In this example, a topic category named /{YourProductKey}/${YourDeviceName}/user/data is created.

    4. In the left-side navigation pane, choose Devices > Devices. On the Devices page, click Add Device to create a device for the MQ_test product.

      In this example, a device named MQdevice is created.

  2. In the ApsaraMQ for RocketMQ console, create a topic and a consumer.

    1. Log on to the ApsaraMQ for RocketMQ console.

    2. In the left-side navigation pane, click Instances. On the Instances page, click Create Instance. In this example, an instance whose Instance Version parameter is set to V4.0 and whose Instance Type parameter is set to Standard Edition Instance is created in the China (Shanghai) region.

      For more information, see Create an instance.

      Important
      • The ApsaraMQ for RocketMQ instance must reside in the same region as the IoT Platform instance.

      • You can forward data streams only to the topics of the ApsaraMQ for RocketMQ V4.x instance.

    3. On the Instances page, find the instance for which you want to create a database and click the instance name.

    4. On the Instance Details page, click Create Group. In the Create Group panel, configure the parameters and click OK, as shown in the following figure.

      物联网设备消息

    5. Click Create Topic. In the Create Topic panel, set the Message Type parameter to Normal Message.

      物联网设备消息

    6. Create a consumer and view the status of the consumer in the ApsaraMQ for RocketMQ console. Make sure that the consumer is in the Online state and the subscriptions of consumers in the group are consistent.

      In this example, a TCP SDK is used to send and receive messages. For more information about how to obtain and use a TCP SDK, see Use TCP client SDKs to send and subscribe to normal messages.

      Sample Java code:

      Note
      import com.aliyun.openservices.ons.api.Action;
      import com.aliyun.openservices.ons.api.ConsumeContext;
      import com.aliyun.openservices.ons.api.Consumer;
      import com.aliyun.openservices.ons.api.Message;
      import com.aliyun.openservices.ons.api.MessageListener;
      import com.aliyun.openservices.ons.api.ONSFactory;
      import com.aliyun.openservices.ons.api.PropertyKeyConst;
      import java.util.Properties;
      public class ConsumerTest {
          public static void main(String[] args) {
              Properties properties = new Properties();
              // The ID of the group that you created in the ApsaraMQ for RocketMQ console.
              properties.put(PropertyKeyConst.GROUP_ID, "XXX");
              // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication.
              properties.put(PropertyKeyConst.AccessKey, "${AccessKey}");
              // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication.
              properties.put(PropertyKeyConst.SecretKey, "${SecretKey}");
              // The TCP endpoint. To obtain the TCP endpoint, go to the Basic Information section of the instance details page in the IoT Platform console.
              properties.put(PropertyKeyConst.NAMESRV_ADDR,
                  "XXX");
              // Clustering subscription. This is the default mode.
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // Broadcasting subscription.
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
              Consumer consumer = ONSFactory.createConsumer(properties);
              consumer.subscribe("iotx_test_mq", "", new MessageListener() { // Subscribe to multiple tags.
                  public Action consume(Message message, ConsumeContext context) {
                      System.out.println("Receive: " + message);
                      return Action.CommitMessage;
                  }
              });
              consumer.start();
              System.out.println("Consumer Started");
          }
      }
  3. In the IoT Platform console, configure a data forwarding rule for the instance to forward the device data to ApsaraMQ for RocketMQ.

    1. In the left-side navigation pane, choose Message Forwarding > Data Forwarding.

    2. On the Data Forwarding page, click Create Rule.

      Important

      If the Data Forwarding page of the latest version appears, click Back to Previous Version in the upper-right corner of the page. When the Data Forwarding page of the previous version appears, click Create Rule.

    3. In the Create Data Forwarding Rule dialog box, set the Rule Name parameter to MQ forwarding and set the Data Type parameter to JSON. Then, click OK.

    4. On the rule details page, click Write SQL Statement. In the Write SQL Statement dialog box, configure the parameters and click OK, as shown in the following figure.

      物联网设备消息

    5. Click Add Operation. In the Add Operation dialog box, specify a destination to which you want to forward data and click OK, as shown in the following figure.

      物联网设备消息

    6. Go to the Data Forwarding page. Find the MQ forwarding rule and click Start in the Actions column.

      After the rule is enabled, IoT Platform forwards device data to the ApsaraMQ for RocketMQ topic.

  4. Use a Java SDK to simulate a device, connect the device to IoT Platform, and then use the device to submit data.

    1. Download a demo package from Download Java SDK Demo and decompress the package.

    2. Start IntelliJ IDEA and import a sample project named JavaLinkKitDemo from the demo package.

    3. Specify the following device certificate information of the MQdevice device in the device_id.json file: ProductKey, DeviceName, and DeviceSecret.

    4. Change the MQTT topic in the src\main\java\com.aliyun.alink.devicesdk.demo\MqttSample.java file to the topic to which the device submits data.

      In this example, the /{YourProductKey}/${YourDeviceName}/user/data topic is used.

      /**
       * Example of a publish operation
       */
      public void publish() {
              MqttPublishRequest request = new MqttPublishRequest();
              // Specify a topic based on your business scenario.
              request.topic = "/" + productKey + "/" + deviceName + "/user/data";
              ......
              ......
      }
    5. Change the MQTT endpoint in the src\main\java\com.aliyun.alink.devicesdk.demo\HelloWorld.java file to the MQTT endpoint of your device.

      In this example, the following MQTT endpoint is used. For more information about how to obtain MQTT endpoints, see View the endpoint of an instance.

      public void init(final DeviceInfoData deviceInfoData) {
              ......
              ......
              /**
               * Configure the parameters for MQTT initialization.
               */
              IoTMqttClientConfig config = new IoTMqttClientConfig();
              config.productKey = deviceInfoData.productKey;
              config.deviceName = deviceInfoData.deviceName;
              config.deviceSecret = deviceInfoData.deviceSecret;
              config.channelHost = "iot-06****.mqtt.iothub.aliyuncs.com:1883";
              ......
              ......
      }
    6. Execute src\main\java\com.aliyun.alink.devicesdk.demo\HelloWorld.java file to start the device.

    In the IoT Platform console, go to the Instance Details page of the instance and choose Maintenance > Device Log. You can view device logs and check whether the device data is forwarded to ApsaraMQ for RocketMQ.物联网设备消息

  5. View the data in the ApsaraMQ for RocketMQ console.

    1. Run the code on your on-premises server to subscribe to the ApsaraMQ for RocketMQ topic.

      物联网设备消息

    2. Go to the instance details page in the ApsaraMQ for RocketMQ console and click Message Query. On the Message Query page, search for messages by topic or message ID.

      You can click Details to view the details of the message that is forwarded to ApsaraMQ for RocketMQ and then download the message.

      Sample message:

      {"deviceName":"MQdevice"}