このトピックでは、Realtime Compute を使用して IoT センサーからのデータを多次元で分析する方法について、ユースケースを用いて説明します。
背景
グローバリゼーションの経済的な波が世界中に広がっている中、工業メーカーはますます激しい競争に直面しています。 競争力を高めるために、自動車、航空、ハイテク、食品および飲料、繊維、製薬業界のメーカーは、既存のインフラストラクチャを革新し、置き換える必要があります。 これらの業界は、イノベーションプロセス中に多くの課題に対処する必要があります。 たとえば、既存の従来のデバイスとシステムは何十年も使用されてきたため、メンテナンスコストが高くなります。 ただし、これらのシステムとデバイスを置き換えると、生産プロセスが遅くなり、プロダクトの品質を損なう可能性があります。
これらの業界はさらに 2 つの課題に直面しています。1 つは高いセキュリティリスク、もう 1 つは複雑なプロセス自動化への対応が急務である点です。 製造業界は、既存の従来のデバイスとシステムを置き換える準備をしています。 この業界では、リアルタイム操作の安全性と安定性を確保するために、高い信頼性と可用性のシステムが必要となります。 製造プロセスには、ロボットアーム、組み立てライン、パッケージングマシンなど、幅広いコンポーネントが含まれます。 これには、デバイスの展開、更新、生産終了管理など、製造プロセスの各段階をシームレスに統合できるリモートアプリケーションが必要となります。 リモートアプリケーションもフェールオーバーの問題を処理する必要があります。
これらの次世代システムおよびアプリケーションのもう 1 つの要件は、デバイスによって生成された大量のデータをキャプチャして分析し、適切なタイミングでタイムリーに対処できることです。 競争力を高め、開発を加速するために、製造元は既存のシステムとデバイスを最適化およびアップグレードする必要があります。 Realtime Compute および Alibaba Cloud IoT ソリューションのアプリケーションにより、デバイスの実行情報を分析し、障害を検出し、歩留まり率をリアルタイムで予測することができます。 本ページでは、ユースケースを例として説明します。 このユースケースでは、製造元は Realtime Compute を使用して、センサーから収集された大量のデータをリアルタイムで分析します。 Realtime Compute は、データをリアルタイムでクレンジングおよび集計し、オンライン分析処理 (OLAP) システムにリアルタイムでデータを書き込み、デバイスの主要なメトリックをリアルタイムでモニターするためにも使用されます。
事業内容
このユースケースでは、製造元は多くの都市の複数の工場に 1,000 台を超えるデバイスを所有しています。 各デバイスには 10 種類のセンサーが搭載されています。 これらのセンサーは、収集したデータを 5 秒ごとに Log Service に送信します。 各センサーから収集されるデータは、次の表で説明する形式です。
s_id | s_value | s_ts |
---|---|---|
センサーの ID | センサーからの現在値 | データの送信時刻 |
センサーは、複数の工場のデバイスに分散されています。 製造元は、デバイスや工場全体のセンサーの分布を表示する RDS ディメンションテーブルを作成します。
s_id | s_type | device_id | factory_id |
---|---|---|---|
センサーの ID | センサーのタイプ | デバイスの ID | 工場の ID |
このディメンションテーブルに含まれる情報は、RDS システムに格納されます。 製造元は、このディメンションテーブルに基づいてセンサーからのデータを整理し、デバイスごとにデータをソートする必要があります。 このニーズを満たすために、Realtime Compute はセンサーから送信されたデータがデバイスごとに論理的に 1 分ごとに整理される幅広いテーブルを提供します。
ts | device_id | factory_id | device_temp | device_pres |
---|---|---|---|---|
データの送信時刻 | デバイスの ID | 工場の ID | デバイスの温度 | デバイスの圧力 |
コンピューティングロジックとリアルタイムストリームコンピューティングを簡素化するために、このユースケースでは温度と圧力の 2 種類のセンサーのみを使用します。 コンピューティングロジックは次のとおりです。
-
Realtime Compute は、温度が 80°C を超えるデバイスを識別し、ダウンストリームノードで警告をトリガーします。 このユースケースでは、Realtime Compute は識別されたデバイスのデータを Message Queue に送信します。 Message Queue は、製造元がダウンストリームの警告システムで指定した警告をトリガーします。
-
Realtime Compute はデータを OLAP システムに書き込みます。 このユースケースでは、製造元は HybridDB for MySQL を使用しています。 HybridDB for MySQL と統合するために、製造元は多次元データの表示と分析のための一連のビジネスインテリジェンス (BI) アプリケーションを開発しました。
次の図は、全体的なアーキテクチャを示しています。
よくある質問
-
幅の広いテーブルはどのように作成されますか。
ほとんどの場合、各センサーは 1 次元の IoT データのみを収集します。 これは、その後のデータ処理と分析に課題をもたらします。 幅の広いテーブルを作成するために、Realtime Compute はウィンドウに基づいてデータを集計し、ディメンション別にデータを整理します。
-
Message Queue が警告のトリガーに使用されるのはなぜですか。
Realtime Compute では、データをあらゆるタイプのストレージシステムに書き込むことができます。 警告と通知の送信には、Message Queue のようなメッセージストレージシステムを使用することを推奨します。 これは、これらのシステムのアプリケーションが、ユーザー定義の警告システムで生じるエラーの防止に役立つためです。 これらのエラーは、特定の警告および通知の報告に失敗する可能性があります。
コードの説明
センサーからアップロードされたデータを Log Service に送信します。 行のデータ形式は次のとおりです。
{
"sid": "t_xxsfdsad",
"s_value": "85.5",
"s_ts": "1515228763"
}
Log Service のソーステーブル s_sensor_data を定義します。
CREATE TABLE s_sensor_data (
s_id VARCHAR,
s_value VARCHAR,
s_ts VARCHAR,
ts AS CAST(FROM_UNIXTIME(CAST(s_ts AS BIGINT)) AS TIMESTAMP),
WATERMARK FOR ts AS withOffset(ts, 10000)
) WITH (
TYPE='sls',
endPoint ='http://cn-hangzhou-corp.sls.aliyuncs.com',
accessId ='yourAccessId',
accessKey ='yourAccessSecret',
project ='ali-cloud-streamtest',
logStore ='stream-test',
);
RDS ディメンションテーブル d_sensor_device_data を作成します。 このディメンションテーブルには、センサーとデバイス間のマッピングが格納されます。
CREATE TABLE d_sensor_device_data (
s_id VARCHAR,
s_type VARCHAR,
device_id BIGINT,
factory_id BIGINT,
PERIOD FOR SYSTEM_TIME,
PRIMARY KEY(s_id)
) WITH (
TYPE='RDS',
url='yourDatabaseURL',
tableName='test4',
userName='test',
password='yourDatabasePassword'
);
Message Queue 結果テーブル r_monitor_data を作成します。 この表は、警告をトリガーするためのロジックを示しています。
CREATE TABLE r_monitor_data (
ts VARCHAR,
device_id BIGINT,
factory_id BIGINT,
device_TEMP DOUBLE,
device_PRES DOUBLE
) WITH (
TYPE='MQ'
);
HybridDB for MySQL 結果テーブル r_device_data を作成します。
CREATE TABLE r_device_data (
ts VARCHAR,
device_id BIGINT,
factory_id BIGINT,
device_temp DOUBLE,
device_pres DOUBLE,
PRIMARY KEY(ts, device_id)
) WITH (
TYPE='HybridDB'
);
センサーから収集されたデータを分単位で集計し、集計されたデータに基づいて幅広いテーブルを作成します。 構造化コードを理解し、コードの保守を容易にするために、このユースケースではビューを作成します。
// Create a view to obtain the device and factory mapping each sensor.
CREATE VIEW v_sensor_device_data
AS
SELECT
s.ts,
s.s_id,
s.s_value,
d.s_type,
d.device_id,
d.factory_id
FROM
s_sensor_data s
JOIN
d_sensor_device_data FOR SYSTEM_TIME AS OF PROCTIME() as d
ON
s.s_id = d.s_id;
// Create a wide table.
CREATE VIEW v_device_data
AS
SELECT
// Specify the start time of a tumbling window as the time for the record.
CAST(TUMBLE_START(v.ts, INTERVAL '1' MINUTE) AS VARCHAR) as ts,
v.device_id,
v.factory_id,
CAST(SUM(IF(v.s_type = 'TEMP', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'TEMP', 1, 0)) AS DOUBLE) device_temp, // Compute the average temperature by minute.
CAST(SUM(IF(v.s_type = 'PRES', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'PRES', 1, 0)) AS DOUBLE) device_pres // Compute the average pressure by minute.
FROM
v_sensor_device_data v
GROUP BY
TUMBLE(v.ts, INTERVAL '1' MINUTE), v.device_id, v.factory_id;
上記のコアコンピューティングロジックでは、分単位の平均温度と平均圧力が出力として計算されます。 ここではタンブリングウィンドウが使用されているため、毎分新しいデータレコードが生成されます。 生成されたデータをフィルタリングして、Message Queue 結果テーブルと HybridDB for MySQL 結果テーブルに書き込みます。
// Identify the sensors whose temperatures are higher than 80°C and write the data into the Message Queue result table to trigger alerts.
INSERT INTO r_monitor_data
SELECT
ts,
device_id,
factory_id,
device_temp,
device_pres
FROM
v_device_data
WHERE
device_temp > 80.0;
// Write the result data into the HybridDB for MySQL result table for analysis.
INSERT INTO r_device_data
SELECT
ts,
device_id,
factory_id,
device_temp,
device_pres
FROM
v_device_data;
デモコードとソースコード
Alibaba Cloud チームは、前述の IoT ソリューションに適用できる完全なリンクを含むデモコードを作成しました。
- Logtail が ECS から収集したテキスト情報をソーステーブルのデータとして使用します。
- RDS ディメンションテーブルを作成します。
- Message Queue 結果テーブルと HybridDB for MySQL 結果テーブルを作成します。
完全なデモコードを参照して入力データと出力データを登録し、独自の IoT ソリューションをご開発ください。 [デモコード] をクリックしてデモコードをダウンロードします。