すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:ETL機能を使用したリアルタイム注文の分析

最終更新日:Jan 22, 2025

このトピックでは、抽出、変換、および読み込み (ETL) 機能を使用してリアルタイム注文を分析する方法について説明します。

シナリオ

ETL機能を使用すると、ストリーミングデータを抽出、変換、ロードし、大量のリアルタイムデータを効率的に統合して、リアルタイムデータ処理の要件を満たすことができます。 ETL機能は、ドラッグアンドドロップ操作とローコード開発をサポートし、ビジネス意思決定、リアルタイムレポート、リアルタイムデータコンピューティングなどのさまざまなビジネスシナリオを容易にします。 企業のデジタル変革中、ETL機能は次のリアルタイムデータ処理シナリオで使用できます。

  • マルチリージョンまたは異種データをリアルタイムで集中管理: ETL機能を使用すると、異種データまたは複数のリージョンのデータを同じデータベースにリアルタイムで保存できます。 これは、集中的かつ効率的な管理および意思決定を容易にする。

  • リアルタイムレポート: ETL機能を使用すると、リアルタイムレポートシステムを構築できます。 レポートシステムは、デジタル変換中のレポート効率を向上させ、さまざまなリアルタイム分析シナリオに適しています。

  • リアルタイムコンピューティング: ETL機能を使用すると、ビジネス側で生成されたストリーミングデータをリアルタイムでクレンジングして、機能値とタグを抽出できます。 典型的なリアルタイムコンピューティングシナリオには、プロファイリング、リスク管理、リアルタイムデータの表示に使用される推奨およびダッシュボードなどのオンラインビジネスコンピューティングモデルが含まれます。

概要

この例では、ETL機能を使用して、リアルタイムのトランザクションデータとビジネスデータをマージし、指定されたフィルタ条件を満たすデータをデータウェアハウスにリアルタイムで送信します。 取引データは、注文番号、顧客ID、商品コード、取引金額、及び取引時間を含む。 業務データには、商品コード、商品単価、商品名が含まれる。 たとえば、価格値が3,000を超える注文に関するリアルタイムトランザクション情報をクエリするフィルター条件を指定できます。 次に、製品や顧客のディメンションなど、複数のディメンションからトランザクションデータを分析できます。 さらに、ツールを使用して、視覚化されたダッシュボードを作成し、ビジネス要件に基づいて動的データに関する洞察を得ることができます。

实现流程

手順

警告

ETLタスクの設定と実行を確実に行うために、ETLタスクを設定する前に、「DAGモードでETLタスクを設定する」トピックの前提条件注意事項を読むことをお勧めします。

配置流程概览

ステップ

説明

準備

リアルタイムトランザクションデータとビジネスデータをソーステーブルに保存し、ビジネス要件に基づいて宛先テーブルを作成します。

説明

この例では、リアルタイムトランザクションデータとビジネスデータを格納するテーブル、および宛先テーブルは、ApsaraDB RDS for MySQLインスタンスに格納されます。

手順1: ソースデータベースの設定

リアルタイムトランザクションデータを格納するテーブルをストリームテーブルとして設定し、ビジネスデータを格納するテーブルをディメンションテーブルとして設定します。

手順2: Table Joinコンポーネントの設定

ディメンションテーブルとストリームテーブルをワイドテーブルに結合します。

手順3: テーブルレコードフィルターコンポーネントの設定

ワイドテーブルのデータを照会するフィルター条件を指定します。 たとえば、価格値が3,000を超える注文をクエリできます。

手順4: ターゲットデータベースの設定

処理されたデータをリアルタイムで宛先テーブルにロードします。

ステップ5: 事前にチェックしてタスクを開始

ETLタスクを事前にチェックして開始します。

準備

ETLタスクを設定する前に、リアルタイムのトランザクションデータをストリームテーブルに保存し、ビジネスデータをソースApsaraDB RDS for MySQLインスタンスのディメンションテーブルに保存する必要があります。

また、ビジネス要件に基づいて、ターゲットApsaraDB RDS for MySQLインスタンスにターゲットテーブルを作成する必要があります。

説明

次のステートメントを実行して、リアルタイムトランザクションデータテーブル、業務データテーブル、および宛先テーブルを作成できます。

テーブル作成ステートメント

リアルタイム取引データテーブル

テーブルtest_ordersを作成する (
   order_id bigint nullでないCOMMENT 'Order ID' 、
     user_id bigint nullコメント 'User ID ',
     product_id bigint nullコメント 'Product ID '、
   total_price decimal(15,2) not null COMMENT 'Total Price ',
   order_date TIMESTAMP not null COMMENT 'Order Date',
   PRIMARYキー (order_id) 

ビジネスデータテーブル

CREATEテーブル製品 (
      product_id bigint nullコメント 'Product ID '、
      product_name varchar(20) コメント「製品名」、
      product_price decimal(15,2) not nullコメント 'Unit Price') 

宛先テーブル

テーブルtest_ordersを作成する (
 order_id bigint nullでないCOMMENT 'Order ID' 、
 user_id bigint nullコメント 'User ID ',
 product_id bigint nullコメント 'Product ID '、
 total_price decimal(15,2) not null COMMENT 'Total Price ',
 order_date TIMESTAMP not null
 コメント「注文日」、
 product_id_2 bigint nullコメント 'Product ID '、
 product_name varchar(20) コメント「製品名」、
 product_price decimal(15,2) not nullコメント 'Unit Price' 、
 PRIMARYキー (order_id) 

手順1: ソースデータベースの設定

  1. ストリーミングETLページに移動します。

    1. 最初に DTSコンソール

    2. 左側のナビゲーションウィンドウで、[ETL] をクリックします。

  2. ストリーミングETLページの左上隅にある [新增数据流] をクリックします。 [データフローの作成] ダイアログボックスで、[データフロー名] フィールドにETLタスク名を指定し、[開発方法] パラメーターを [DAG] に設定します。

  3. クリックOK.

  4. ストリームテーブルとディメンションテーブルを設定します。

    1. ストリームテーブルを設定します。

      1. キャンバスの左側で、[入力 /ディメンションテーブルMySQL] ノードをキャンバスの空白領域にドラッグします。

      2. キャンバス上の [入力 /寸法テーブルMySQL-1] をクリックします。

      3. [ノード設定] タブで、パラメーターを設定します。

        パラメーター

        説明

        データソース名

        Data Transmission Service (DTS) は、データソース名を自動的に生成します。 簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意の名前を使用する必要はありません。

        リージョン

        ソースデータベースが存在するリージョンを選択します。

        説明

        ETLタスクは、中国 (杭州) 、中国 (上海) 、中国 (青島) 、中国 (北京) 、中国 (張家口) 、中国 (深セン) 、中国 (広州) 、中国 (香港) のいずれかのリージョンで作成できます。

        [インスタンス数]

        ソースデータベースインスタンスを選択します。 [インスタンスの作成] をクリックして、ソースデータベースインスタンスを作成することもできます。 詳細については、「DMSでサポートされているデータベース」をご参照ください。

        ノードタイプ

        ソーステーブルのタイプを選択します。 この例では、ストリームテーブルが選択されています。

        ストリームテーブル: リアルタイムで更新され、データ関連付けクエリのディメンションテーブルに関連付けることができるテーブル。

        Dimension Table: リアルタイムで更新されないテーブルで、通常、データ分析用にリアルタイムデータをワイドテーブルにアセンブルするために使用されます。

        変換形式

        ETLはストリームを動的テーブルに変換し、動的テーブルに対して連続クエリを実行して新しい動的テーブルを生成します。 このプロセスでは、動的テーブルは、INSERT、UPDATE、およびDELETE動作を実行することによって連続的に修正される。 動的テーブルが最終的に宛先データベースに書き込まれると、新しい動的テーブルはストリームに変換される。 新しい動的テーブルをストリームに変換するときは、動的テーブルへの変更をエンコードするために、Convert Formatパラメーターを指定する必要があります。

        • Upsert Stream: 動的テーブルのデータは、INSERT、UPDATE、およびDELETE操作を実行して変更できます。 動的テーブルがストリームに変換されると、INSERTおよびUPDATE操作はアップサート・メッセージとして符号化され、DELETE操作は削除メッセージとして符号化される。

          説明

          upsertストリームに変換される動的テーブルには、一意のキーが必要です。 キーは複合であってもよい。

        • 追加のみのストリーム: 動的テーブルのデータは、INSERT操作を実行することによってのみ変更できます。 動的テーブルがストリームに変換されると、挿入されたデータのみが送信されます。

        データベースとテーブルの選択

        変換するデータベースとテーブルを選択します。

      4. ノードの設定後、[出力フィールド] タブにリダイレクトされます。 このタブで、ビジネス要件に基づいて [列名] 列のデータベースとテーブルを選択します。

      5. [時間属性] タブをクリックし、パラメーターを設定します。

        パラメーター

        説明

        イベント時間透かし

        ストリームテーブルで時間フィールドを選択します。 ほとんどの場合、時間フィールドは、データが生成される時間を表すためにストリームテーブル内に定義される。 時間フィールドは、通常、ordertimeなどの有益なタイムスタンプである。

        イベント時間透かしのレイテンシ

        受け入れることができる最大データ遅延を入力します。

        データが生成された時間と、データがETLで処理された時間との間に、待ち時間が存在し得る。 ETLは、遅延データの処理を無期限に待つことはできません。 このパラメーターを使用して、順序通りのデータを処理するETLの最大レイテンシを指定できます。 たとえば、10:00に生成されたデータを受信したが、9:59に生成されたデータを受信しなかった場合、ETLは「10:00 + latency」まで待機します。 9:59に生成されたデータがこの時点より前に受信されない場合、データは破棄される。

        処理時間

        データがETLで処理されるサーバー時刻。 列名を入力します。 ETLは、この列でデータが処理されるサーバー時間を保存します。 処理時間は、オペレータの操作に使用される。 たとえば、処理時間は、一時的な結合操作を実行するときに、標準テーブルの最新バージョンを関連付けるために使用されます。

      説明

      アイコンが配置源库信息_感叹号ストリームテーブルの右側に表示されていない場合、ストリームテーブルが設定されます。

    2. ディメンションテーブルを設定します。

      1. キャンバスの左側で、[入力 /ディメンションテーブルMySQL] ノードをキャンバスの空白領域にドラッグします。

      2. キャンバス上の [入力 /寸法テーブルMySQL-2] をクリックします。

      3. [ノード設定] タブで、パラメーターを設定します。

        パラメーター

        説明

        データソース名

        DTSは、データソース名を自動的に生成します。 簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意の名前を使用する必要はありません。

        リージョン

        ソースデータベースが存在するリージョンを選択します。

        [インスタンス数]

        ソースデータベースインスタンスを選択します。 [インスタンスの作成] をクリックして、ソースデータベースインスタンスを作成することもできます。 詳細については、「DMSでサポートされているデータベース」をご参照ください。

        ノードタイプ

        有効な値:

        ストリームテーブル

        寸法テーブル

        この例では、ディメンションテーブルが選択されています。

        データベースとテーブルの選択

        変換するデータベースとテーブルを選択します。

      4. ノードの設定後、[出力フィールド] タブにリダイレクトされます。 このタブで、ビジネス要件に基づいて [列名] 列のデータベースとテーブルを選択します。

      説明

      アイコンが配置源库信息_感叹号ストリームテーブルの右側に表示されていない場合、ストリームテーブルが設定されます。

ステップ2: Table Joinコンポーネントの設定

  1. ページの左側にある [変形] セクションで、[参加] を選択し、ページの右側にあるキャンバスにドラッグします。

  2. ポインターをストリームテーブルノードの上に移動し、ノードの右側にある中空の円をクリックしてから、接続線をtable Join-1コンポーネントにドラッグします。 次に、ポインタをディメンションテーブルノードの上に移動し、ノードの右側にある中空の円をクリックしてから、接続線をtable Join-1コンポーネントにドラッグします。

  3. クリックテーブル参加-1キャンバスでTable Join-1コンポーネントを設定します。

    1. On theノード設定タブでパラメーターを設定します。

      セクション

      パラメーター

      説明

      変換名

      変換名の入力

      DTSは、Table Join-1コンポーネントの名前を自動的に生成します。 簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意の名前を使用する必要はありません。

      JOIN設定

      左テーブルでJOIN句

      JOIN句で左側のテーブルを選択します。 テーブルはプライマリテーブルとして使用されます。 この例では、ストリームテーブルが選択されています。

      時間的結合時間属性 (選択しない場合は通常の結合が適用されます)

      時間結合操作が実行されるときに、時間テーブルに関連付けられたストリームテーブルの時間属性を選択します。 このパラメーターを指定しない場合、通常の結合操作が実行されます。 この例では、[処理時間に基づく] が選択されています。

      説明
      • テンポラルテーブルは、テーブルベースのパラメータ化ビューである動的テーブルである。 時間テーブルは、時間に基づいてデータ変更履歴を記録する。 時間テーブルには、バージョンテーブルと標準テーブルが含まれます。 バージョン付きテーブルは、データの履歴バージョンを表示できます。 標準テーブルには最新バージョンのデータのみが表示されます。

      • 一時的な結合操作を実行するには、すべてのストリームテーブルに対して時間属性を定義し、右側のテーブルにプライマリキーを設定する必要があります。 右側のテーブルがディメンションテーブルの場合、テーブルの主キーが指定されたJOIN Conditionパラメーターに含まれている必要があります。

      • [Event Time Watermark]: ストリームテーブルのデータが生成された時刻を使用して、バージョン管理されたテーブルのバージョンを関連付けます。

      • 処理時間に基づく: ストリームテーブルの処理時間を使用して、標準テーブルの最新バージョンを関連付けます。

      JOIN操作の選択

      結合操作を選択します。 この例では、[Inner Join] が選択されています。

      • Inner Join: 2つのテーブルの共通部分を取得します。

      • Left Join: 左側のテーブルのすべてのデータと、右側のテーブルの2つのテーブルの共通部分を取得します。

      • Right Join: 左側のテーブルの2つのテーブルと右側のテーブルのすべてのデータの共通部分を取得します。

      参加条件

      + 条件の追加

      [+ 条件の追加] をクリックし、結合条件を選択します。

      説明

      等号 (=) の左側のフィールドは左側のテーブルに属します。 等号 (=) の右側のフィールドは右側のテーブルに属します。

  4. JOIN条件パラメーターを設定した後、[出力フィールド] タブをクリックします。 このタブで、ビジネス要件に基づいて [列名] 列のフィールドを選択します。

説明

アイコンが配置源库信息_感叹号Table Join-1コンポーネントの右側に表示されていない場合、joinコンポーネントが設定されます。

手順3: テーブルレコードフィルターコンポーネントの設定

  1. ページの左側にある [変換] セクションで、[テーブルレコードフィルター] を選択し、ページの右側にあるキャンバスにドラッグします。

  2. ポインターをテーブル参加-1コンポーネントの右側にある中空の円をクリックし、接続線をテーブルレコードフィルター-1コンポーネントを使用します。

  3. クリックテーブルレコードフィルター-1キャンバスでテーブルレコードフィルター-1コンポーネントを設定します。

    1. [ノード設定] タブで、テーブルレコードフィルター-1コンポーネントの名前を変換名フィールドを選択します。

      説明

      DTSは、Table Record Filter-1コンポーネントの名前を自動的に生成します。 簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意の名前を使用する必要はありません。

    2. [WHERE条件] フィールドで、次のいずれかの方法でWHERE条件を指定します。

      • WHERE条件を入力します。 たとえば、total_price>3000.00と入力して、結合テーブルのtotal_priceパラメーターの値が3000.00より大きいデータを照会します。

      • [入力フィールド] または [演算子] セクションのオプションをクリックして、WHERE条件を指定します。

説明

アイコンが配置源库信息_感叹号Table Join-1コンポーネントの右側に表示されていない場合、joinコンポーネントが設定されます。

手順4: ターゲットデータベースの設定

  1. ページの左側にある [出力] セクションで、[MySQL] を選択し、ページの右側にあるキャンバスにドラッグします。

  2. ポインターをテーブルレコードフィルター-1コンポーネントの右側にある中空の円をクリックし、接続線を出力MySQL-1コンポーネントを使用します。

  3. クリック出力MySQL-1キャンバス上で出力MySQL-1コンポーネントを設定します。

    1. [ノード設定] タブで、パラメーターを設定します。

      パラメーター

      説明

      データソース名

      DTSは、データソース名を自動的に生成します。 簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意の名前を使用する必要はありません。

      リージョン

      ターゲットデータベースが存在するリージョンを選択します。

      説明

      ETLタスクは、中国 (杭州) 、中国 (上海) 、中国 (青島) 、中国 (北京) 、中国 (張家口) 、中国 (深セン) 、中国 (広州) 、中国 (香港) のいずれかのリージョンで作成できます。

      [インスタンス数]

      ターゲットデータベースインスタンスを選択します。 [インスタンスの作成] をクリックして、ターゲットデータベースインスタンスを作成することもできます。 詳細については、「DMSでサポートされているデータベース」をご参照ください。

      テーブルマッピング

      ターゲットデータベースに格納するテーブルを選択します。

      [宛先テーブルの選択] セクションで、宛先テーブルをクリックします。

  4. でテーブルを選択します。列名ビジネス要件に基づいて列を作成します。

説明

アイコンが配置源库信息_感叹号出力MySQL-1コンポーネントの右側に表示されていない場合、ターゲットデータベースが構成されます。

ステップ5: タスクを事前にチェックして開始する

  1. 上記の設定が完了したら、[Flink SQL検証の生成] をクリックします。 ETLはFlink SQLステートメントを生成および検証します。

  2. Flink SQLの検証が完了したら、[ETL検証の詳細の表示] をクリックします。 表示されるダイアログボックスで、Flink SQLの検証結果とSQL文を表示します。 結果を確認し、[閉じる] をクリックします。

    説明

    検証が失敗した場合は、結果に表示された原因に基づいて障害をトラブルシューティングできます。

  3. [次へ: タスク設定の保存と事前チェック] をクリックします。 DTSは、タスクが事前チェックに合格した後にのみETLタスクを開始できます。 タスクが事前チェックに合格しなかった場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 エラーメッセージに基づいて問題をトラブルシューティングし、事前チェックを再度実行します。

  4. 事前チェックに合格したら、をクリックします。次へ: インスタンスの購入ページの下部にあります。

  5. [インスタンスの購入] ページで、[インスタンスクラス] および [計算ユニット (CU)] パラメーターを設定します。 次に、[データ送信サービス (従量課金) サービス規約] および [パブリックプレビューのサービス規約] を読み、選択します。

    説明

    パブリックプレビュー中、各ユーザーは2つのETLインスタンスを無料で作成できます。

  6. クリック購入して開始ETLタスクを開始します。

タスク結果

この例では、ETLタスクが8月1日に開始された後、フィルタ条件を満たすリアルタイムトランザクションテーブルtest_ordersの更新データが宛先テーブルtest_orders_newに同期されます。 フィルター条件は、total_priceパラメーターの値が3000.00より大きいことです。これは、注文の総トランザクション量が3000.00より大きいことを示します。

図 1. リアルタイムトランザクションデータテーブル: test_orders 目标表test_orders_new

図2. 宛先テーブル: test_orders_new 业务数据表test_orders