IoT Platform forwards device data to Message Queue for Apache RocketMQ topics. Then, Message Queue for Apache RocketMQ forwards the data to your business servers.

Prerequisites

  • An Alibaba Cloud account is created.
  • IoT Platform is activated.

    To activate IoT Platform, log on to the IoT Platform console, click Activate Now, and then follow the prompted instructions.

  • Message Queue for Apache RocketMQ is activated.

    To activate Message Queue for Apache RocketMQ, log on to the Message Queue for Apache RocketMQ console.

Background information

Recommended architecture:

Forward device data to servers by using Message Queue for Apache RocketMQ

Benefits:

Message Queue for Apache RocketMQ is used as a message buffer to reduce concurrent workloads of your business servers.

Procedure

  1. Log on to the IoT Platform console to create a product and a device.
    1. In the left-side navigation pane, choose Devices > Products. Click Create Product to create a product. In this example, a product named MQ_test is created. and the node type is set to Directly Connected Device.
    2. On the Product Details page, create a custom topic category to submit device data. In this example, the /{YourProductKey}/${YourDeviceName}/user/data topic is created.
    3. Choose Devices > Device > Add Device to create a device. In this example, a device named MQdevice is created.
  2. In the Message Queue for Apache RocketMQ console, create a topic and a consumer.
    1. Log on to the Message Queue for Apache RocketMQ console.
    2. Create an instance.
    3. Create a topic. Set the Message Type parameter to Normal Message.
    4. Create a group.
    5. Create a message consumer. Run the following sample code that is provided in Message Queue for Apache RocketMQ SDK. Then, view the consumer status in the Message Queue for Apache RocketMQ console. You can check whether the consumer is online, and whether the required subscription is created.
      import com.aliyun.openservices.ons.api.Action;
      import com.aliyun.openservices.ons.api.ConsumeContext;
      import com.aliyun.openservices.ons.api.Consumer;
      import com.aliyun.openservices.ons.api.Message;
      import com.aliyun.openservices.ons.api.MessageListener;
      import com.aliyun.openservices.ons.api.ONSFactory;
      import com.aliyun.openservices.ons.api.PropertyKeyConst;
      import java.util.Properties;
      public class ConsumerTest {
          public static void main(String[] args) {
              Properties properties = new Properties();
              // The ID of the group that you created in the console.
              properties.put(PropertyKeyConst.GROUP_ID, "XXX");
              // The AccessKey ID that is created in the Alibaba Cloud Management console.
              properties.put(PropertyKeyConst.AccessKey, "${AccessKey}");
              // The AccessKey secret that is created in the Alibaba Cloud Management console.
              properties.put(PropertyKeyConst.SecretKey, "${SecretKey}");
              // The TCP endpoint that is used to access the instance. You can view the endpoint in the console.
              properties.put(PropertyKeyConst.NAMESRV_ADDR,
                  "XXX");
              // The cluster subscription method. Use the default settings.
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // The broadcast subscription method.
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
              Consumer consumer = ONSFactory.createConsumer(properties);
              consumer.subscribe("iot_to_mq", "*", new MessageListener() { 
                  // Subscribe to multiple tags.
                  public Action consume(Message message, ConsumeContext context) {
                      System.out.println("Receive:
      " + message);
                      return Action.CommitMessage;
                  }
              });
              consumer.start();
              System.out.println("Consumer Started");
          }
      }
      Note
  3. In the IoT Platform console, set a data forwarding rule to forward the device data to Message Queue for Apache RocketMQ.
    For more information, see Configure a data forwarding rule.
    1. Choose Rules > Data Forwarding. On the Data Forwarding page, click Create Rule and set the parameters to create a data forwarding rule. Set the data format to JSON.
    2. Configure the SQL statement that is used to process data.
      In this example, set the Field parameter to deviceName() as deviceName. This value indicates that the device name field is extracted from device data.
    3. Set the data forwarding destination to Message Queue for Apache RocketMQ.
    4. Enable the rule.
      After the rule is enabled, IoT Platform forwards device data to the Message Queue for Apache RocketMQ topic.
  4. Use Link SDK for Java to simulate a device and submit data.
    1. Click here to download the Link SDK for Java demo.
    2. Specify the device certificate information, including ProductKey, DeviceName, and DeviceSecret.
    3. Change the MQTT topic to the topic to which device data is submitted. In this example, the /{YourProductKey}/${YourDeviceName}/user/data topic is used.
    4. Start the device.
    In the IoT Platform console, choose Maintenance > Device Log. You can view device logs and check whether the device data is forwarded to Message Queue for Apache RocketMQ.
  5. View the data in the Message Queue for Apache RocketMQ console.
    1. Run the code on premises to subscribe to the Message Queue for Apache RocketMQ topic.
    2. Go to the Message Query page. You can query messages by topic or message ID to check whether the device data is forwarded to Message Queue for Apache RocketMQ.

      The following message is received in the Message Queue for Apache RocketMQ console:

      {"deviceName()":"MQdevice"}