This topic provides a use case to show how to use Realtime Compute for Apache Flink to analyze data from IoT sensors in multiple dimensions.

Background information

As global competition increases, industrial manufacturers need to gain a competitive edge. To increase competitiveness, manufacturers in the automotive, aviation, high-technology, food and beverage, textile, and pharmaceutical industries must innovate and replace the existing infrastructure. These industries must address many challenges during the innovation process. For example, in many cases, existing devices and systems have been used for decades and have high maintenance costs. However, if the systems and devices are replaced, the production process may slow down and the product quality may not be guaranteed.

Two of the main challenges faced by these industries are security risks and the need for process automation. A manufacturing process involves a wide range of components, such as robotic arms, assembly lines, and packaging machines. Remote applications are required to seamlessly deploy and update the components and handle failover and end-of-life issues. Therefore, highly reliable and available systems are required in the manufacturing industry to ensure the security and stability of real-time business operations.

Another requirement for next-generation systems and applications is that they must be able to capture and analyze large amounts of data generated by devices, and respond in a timely manner. To increase competitiveness and accelerate development, manufacturers need to optimize and upgrade their existing systems and devices. Realtime Compute for Apache Flink and Alibaba Cloud IoT solutions allow you to analyze device status, detect faults, and predict yield rates in real time.

This topic provides a use case to show how to use Realtime Compute for Apache Flink to perform multidimensional analysis of data from IoT sensors. In this use case, Realtime Compute for Apache Flink analyzes large amounts of data that is collected from sensors, cleans and aggregates data, writes data to an online analytical processing (OLAP) system, and monitors the key metrics of devices in real time.

Description

In this use case, the manufacturer has more than 1,000 devices from multiple factories across cities. Each device is equipped with 10 sensors of different types. These sensors send the collected data to Log Service at an interval of 5 seconds. The data collected from each sensor is in the format described in the following table.

s_id s_value s_ts
The ID of the sensor. The current value from the sensor. The time when the data is sent.

The sensors are distributed across devices from multiple factories. Therefore, the manufacturer creates an ApsaraDB RDS dimension table to display the distribution of sensors across devices and factories.

s_id s_type device_id factory_id
The ID of the sensor. The type of the sensor. The ID of a device. The ID of the factory.

The information included in this dimension table is stored in an ApsaraDB RDS database. The manufacturer needs to organize the data from sensors based on this dimension table, and sort the data by device. To meet this requirement, Realtime Compute for Apache Flink provides a wide table to which data sent from sensors is logically organized by device every minute.

ts device_id factory_id device_temp device_pres
The time when the data is sent. The ID of a device. The ID of the factory. The temperature of the device. The pressure of the device.
Only two types of sensors are used in this use case: temperature and pressure. This simplifies the computing logic and real-time stream processing. The following computing logic is used:
  1. Realtime Compute for Apache Flink identifies the devices whose temperature is higher than 80°C and triggers alerts at the downstream nodes. In this use case, Realtime Compute for Apache Flink sends data of the identified devices to Message Queue. Then, Message Queue triggers alerts that are specified by the manufacturer in the downstream alerting system.
  2. Realtime Compute for Apache Flink writes the data into an OLAP system. In this use case, the manufacturer uses HybridDB for MySQL. To analyze and display multidimensional data, the manufacturer developed a set of business intelligence (BI) applications that can be used together with HybridDB for MySQL.
The following figure shows the overall architecture.

FAQ

  • How is a wide table created?

    In most cases, each sensor collects only the IoT data of one dimension. This poses challenges for subsequent data processing and analysis. Realtime Compute for Apache Flink creates a wide table by aggregating data based on time windows and organizing data by dimension.

  • Why is Message Queue used to trigger alerts?

    Realtime Compute for Apache Flink allows you to write data to any type of storage system. However, to ensure the accuracy of alerts, we recommend that you send result data to message storage systems such as Message Queue in scenarios such as alerts and notifications.

Code description

Send the data that is uploaded from sensors to Log Service. The following code shows the data format of a row.
{
    "sid": "t_xxsfdsad", 
    "s_value": "85.5", 
    "s_ts": "1515228763"
} 
Define a Log Service source table named s_sensor_data.
CREATE TABLE s_sensor_data (
    s_id    VARCHAR,
    s_value VARCHAR,
    s_ts    VARCHAR,
    ts        AS CAST(FROM_UNIXTIME(CAST(s_ts AS BIGINT)) AS TIMESTAMP),
      WATERMARK FOR ts AS withOffset(ts, 10000)
) WITH (
    TYPE='sls',
    endPoint ='http://cn-hangzhou-corp.sls.aliyuncs.com',
      accessId ='xxxxxxxxxxx',
      accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
      project ='ali-cloud-streamtest',
      logStore ='stream-test',
);       
Create an ApsaraDB RDS dimension table named d_sensor_device_data. This table stores the mappings between sensors and devices.
CREATE TABLE d_sensor_device_data (
    s_id    VARCHAR,
    s_type    VARCHAR,
    device_id BIGINT,
    factory_id BIGINT,
    PERIOD FOR SYSTEM_TIME, 
    PRIMARY KEY(s_id)
) WITH (
    TYPE='RDS',
     url='',
     tableName='test4',
     userName='test',
     password='XXXXXX'
);    
A Message Queue result table named r_monitor_data is created. This table specifies the logic for triggering alerts.
CREATE TABLE r_monitor_data (
    ts    VARCHAR,
    device_id    BIGINT,
    factory_id    BIGINT,
    device_TEMP    DOUBLE,
    device_PRES DOUBLE
) WITH (
    TYPE='MQ'
);    
Create a HybridDB for MySQL result table r_device_data.
CREATE TABLE r_device_data (
    ts    VARCHAR,
    device_id BIGINT,
    factory_id BIGINT,
    device_temp    DOUBLE,
    device_pres DOUBLE,
    PRIMARY KEY(ts, device_id)
) WITH (
    TYPE='HybridDB'
); 
Aggregate the data that is collected from sensors by minute and create a wide table based on the aggregated data. To facilitate subsequent code maintenance, we recommend that you use structured code and views.
-- Create a view to obtain the device and factory that are mapped to each sensor. 
CREATE VIEW v_sensor_device_data
AS
SELECT
    s.ts,
    s.s_id,
    s.s_value,
    d.s_type,
    d.device_id,
    d.factory_id
FROM
    s_sensor_data s
JOIN
    d_sensor_device_data FOR SYSTEM_TIME AS OF PROCTIME() as d
ON
    s.s_id = d.s_id;

// Create a wide table. 
CREATE VIEW v_device_data
AS
SELECT
    --Specify the start time of a tumbling window as the time to generate the record. 
    CAST(TUMBLE_START(v.ts, INTERVAL '1' MINUTE) AS VARCHAR) as ts,
    v.device_id,
    v.factory_id,
    CAST(SUM(IF(v.s_type = 'TEMP', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'TEMP', 1, 0)) AS DOUBLE) device_temp, // Calculate the average temperature by minute.
    CAST(SUM(IF(v.s_type = 'PRES', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'PRES', 1, 0)) AS DOUBLE) device_pres // Calculate the average pressure by minute.
FROM
    v_sensor_device_data v
GROUP BY
    TUMBLE(v.ts, INTERVAL '1' MINUTE), v.device_id, v.factory_id;        

The preceding code shows the core logic of the calculation. The average value of temperature and pressure in one minute is calculated as the temperature value and pressure value in this minute. In the code, a tumbling window is used, which means that a new data record is generated every minute. The generated data is then filtered and written to the Message Queue result table and HybridDB for MySQL result table.

Demo code and source code

The Alibaba Cloud team creates demo code that contains a complete procedure that is applicable to the preceding IoT solution.
-- Identify the sensors whose temperatures are higher than 80°C and write the data to the MQ result table to trigger alerts. 
INSERT INTO r_monitor_data
SELECT
    ts,
    device_id,
    factory_id,
    device_temp,
    device_pres
FROM
    v_device_data
WHERE
    device_temp > 80.0;

-- Write the result data to the HybridDB for MySQL result table for analysis. 
INSERT INTO r_device_data
SELECT
    ts,
    device_id,
    factory_id,
    device_temp,
    device_pres
FROM
    v_device_data;    
For more information about how to register upstream and downstream storage systems and develop your own IoT solution, see Demo code. When you use this use case, take note of the following points for upstream and downstream storage systems:
  • Use the text information collected by Logtail from Elastic Compute Service (ECS) instances as data in the source table.
  • Create an ApsaraDB RDS dimension table.
  • Create a Message Queue result table and a HybridDB for MySQL result table.