すべてのプロダクト
Search
ドキュメントセンター

DataWorks:ワークフローでのデータプッシュノードの設定に関するベストプラクティス

最終更新日:Mar 25, 2025

DataWorks DataStudio では、ワークフローでデータプッシュノードを作成し、さまざまなプッシュ方法に基づいて特定の宛先にデータをプッシュできます。DataStudio は、シンプルなデータプッシュ、複合データプッシュ、スクリプトデータプッシュ、条件付きデータプッシュ、MaxCompute データプッシュというプッシュ方法を提供しています。このトピックでは、選択したプッシュ方法に基づいてデータをプッシュするように、ワークフローでデータプッシュノードを設定する方法について説明します。

背景情報

DataStudio では、ワークフローでデータプッシュノードを作成できます。データプッシュノードは、ワークフロー内の他のノードによって処理されたデータに対する単純なクエリの結果を取得し、特定のスケジュール設定に基づいて、取得したデータをタイムリーに DingTalk グループ、Lark グループ、WeCom グループ、Microsoft Teams、または電子メールにプッシュします。

プロセス

  1. さまざまなデータプッシュフローのテストデータを準備するためのノードを作成します。

  2. データクエリノード、代入ノード、およびその他のデータ処理ノードを作成して、テストデータを処理およびクエリします。

    説明

    ビジネス要件に基づいて、さまざまなタイプのノードを作成して、テストデータの準備、データの処理、データのクエリを実行できます。このトピックの例では、MySQL ノードが使用されています。

  3. データプッシュノードを作成して、データクエリノードの出力パラメーターを受け取ります。次に、データプッシュノードの入力パラメーターを使用して、取得したデータを DingTalk グループ、Lark グループ、WeCom グループ、Microsoft Teams、または電子メールにプッシュします。

プッシュ方法

このトピックでは、ワークフローでデータプッシュノードを設定してデータをプッシュできるように、次のプッシュ方法を紹介します。シンプルなデータプッシュ複合データプッシュスクリプトデータプッシュ条件付きデータプッシュMaxCompute データプッシュ

  • シンプルなデータプッシュ: シンプルなデータプッシュフローでは、ノードがデータをクエリし、[出力パラメーター] を使用してクエリ結果をデータプッシュノードに渡します。次に、データプッシュノードはデータを特定の宛先にプッシュします。

  • 複合データプッシュ: 複合データプッシュフローでは、複数のノードがデータをクエリし、[出力パラメーター] を使用してクエリ結果を同じデータプッシュノードに渡します。次に、データプッシュノードはデータを特定の宛先にプッシュします。

  • スクリプトデータプッシュ: スクリプトデータプッシュフローでは、[代入ノード] がデータを処理し、[出力パラメーター] を使用して結果をデータプッシュノードに渡します。次に、データプッシュノードはデータを特定の宛先にプッシュします。代入ノードの詳細については、「代入ノードを設定する」をご参照ください。

  • 条件付きデータプッシュ: 条件付きデータプッシュフローでは、[分岐ノード] が論理判断を実行し、特定の条件を満たすデータと条件を満たさないデータを異なるデータクエリノードに渡します。データクエリノードはデータをクエリし、[出力パラメーター] を使用して結果を異なるデータプッシュノードに渡します。次に、データプッシュノードはデータを特定の宛先にプッシュします。分岐ノードの詳細については、「分岐ノード」をご参照ください。

  • MaxCompute データプッシュ: MaxCompute データプッシュフローでは、代入ノード が MaxCompute データソースからデータをクエリし、クエリ結果をデータプッシュノードに渡します。次に、データプッシュノードはデータを特定の宛先にプッシュします。

前提条件

以下の要件が満たされていることを確認してください。

  • DataWorks がアクティブ化されている。詳細については、「DataWorks をアクティブ化する」をご参照ください。

  • DataWorks ワークスペースが作成されている。詳細については、「ワークスペースを作成する」をご参照ください。

  • ApsaraDB RDS for MySQL インスタンスが ApsaraDB RDS コンソールで作成され、MySQL データソースがワークスペースに追加されている。詳細については、「データソースを追加および管理する」をご参照ください。

  • MaxCompute データソースがワークスペースに追加されている。詳細については、「MaxCompute データソースを追加する」をご参照ください。

    説明

    このトピックでは、MySQL データソースと MaxCompute データソースが使用されています。ビジネス要件に基づいて、さまざまなタイプのデータソースを追加できます。

  • サーバーレスリソースグループが作成されている。データプッシュノードを実行するには、サーバーレスリソースグループのみを使用できます。サーバーレスリソースグループの作成と使用方法の詳細については、「サーバーレスリソースグループを作成および使用する」をご参照ください。

制限事項

  • データサイズの制限:

    • DingTalk にデータをプッシュする場合、データサイズは 20 KB を超えることはできません。

    • Lark にデータをプッシュする場合、データサイズは 20 KB を超えることはできず、画像のサイズは 10 MB 未満である必要があります。

    • WeCom にデータをプッシュする場合、各チャットボットは 1 分間に最大 20 件のメッセージを送信できます。

    • Microsoft Teams にデータをプッシュする場合、データサイズは 28 KB を超えることはできません。

    • 電子メールにデータをプッシュする場合、各データプッシュタスクに 1 つの電子メール本文のみを追加できます。電子メール本文が追加されている場合、電子メール本文を再度追加することはできません。制限の詳細については、使用している電子メールサービスの簡易メール転送プロトコル (SMTP) の制限事項をご参照ください。

  • データプッシュ機能は、次のリージョンの DataWorks ワークスペースでのみ使用できます。中国 (杭州)、中国 (上海)、中国 (北京)、中国 (深圳)、中国 (成都)、中国 (香港)、シンガポール、マレーシア (クアラルンプール)、米国 (シリコンバレー)、米国 (バージニア)、ドイツ (フランクフルト)

準備

ワークフローを作成する

  1. DataWorks コンソール にログインします。トップナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

  2. [スケジュールされたワークフロー] ペインで、[ビジネスフロー] を右クリックし、[ワークフローの作成] を選択します。 [ワークフローの作成] ダイアログボックスで、ビジネス要件に基づいて [ワークフロー名] パラメーターを構成します。 この例では、DataPushDemo を使用します。

ワークフローでノードを作成する

DataPushDemo ワークフローの名前をダブルクリックします。ワークフローの設定タブで、image アイコンをクリックして、使用するプッシュ方法に基づいてノードを作成します。次の表に、さまざまなプッシュ方法で作成する必要があるノードを示します。演習を完了しやすくするために、次の表に示すノード名でノードに名前を付けることをお勧めします。

プッシュ方法

ノード名

ノードタイプ

ノードの説明

条件付きデータプッシュ

SalesAmountPreMonth

MySQL ノード

テストデータの前月の総売上高をクエリし、出力パラメーターを使用してクエリ結果を分岐ノードに渡します。

Condition

分岐ノード

MySQL ノードの出力パラメーターを受け取り、論理判断を実行し、分岐ノードの出力パラメーターを使用して、特定の条件を満たすデータと満たさないデータを対応する MySQL ノードに渡します。

CompliantData

MySQL ノード

分岐ノードの特定の条件を満たす出力パラメーターを受け取り、特定の条件を満たすデータをクエリし、MySQL ノードの出力パラメーターを使用してクエリ結果をデータプッシュノードに渡します。

NonCompliantData

分岐ノードの特定の条件を満たさない出力パラメーターを受け取り、特定の条件を満たさないデータをクエリし、MySQL ノードの出力パラメーターを使用してクエリ結果をデータプッシュノードに渡します。

Top3Categories

データプッシュノード

MySQL ノードの特定の条件を満たす出力パラメーターを受け取り、データプッシュノードの入力パラメーターを使用して、適合データを特定の宛先にプッシュします。

Bottom3Categories

MySQL ノードの特定の条件を満たさない出力パラメーターを受け取り、データプッシュノードの入力パラメーターを使用して、不適合データを特定の宛先にプッシュします。

スクリプトデータプッシュ

SalesAmountPreWeek

MySQL ノード

テストデータの前週の上位 3 つのカテゴリの総売上高をクエリし、出力パラメーターを使用してクエリ結果を代入ノードに渡します。

Top3CategoryList

代入ノード

MySQL ノードの出力パラメーターを受け取り、パラメーターをリストし、代入ノードの出力パラメーターを使用して結果をデータプッシュノードに渡します。代入ノードを設定するときは、言語 に Python を選択します。

Top3CategoriesPreWeek

データプッシュノード

代入ノードの出力パラメーターを受け取り、データプッシュノードの入力パラメーターを使用してデータを特定の宛先にプッシュします。

複合データプッシュ

SalesAmountPreDay

MySQL ノード

  • テストデータの前日の総売上高と売上高の増加をクエリし、MySQL ノードの出力パラメーターを使用してクエリ結果をデータプッシュノードに渡します。

  • SalesAmountPreDay ノードは、複合データプッシュ方法とシンプルなデータプッシュ方法の両方で使用できます。

SalesGrowthPreDay

CombinedPush

データプッシュノード

MySQL ノードの出力パラメーターを受け取り、データプッシュノードの入力パラメーターを使用してデータを特定の宛先にプッシュします。

シンプルなデータプッシュ

SalesAmountPreDay

MySQL ノード

テストデータの前日の総売上高をクエリし、出力パラメーターを使用してクエリ結果をデータプッシュノードに渡します。

PushSalesAmountPreDay

データプッシュノード

MySQL ノードの出力パラメーターを受け取り、データプッシュノードの入力パラメーターを使用してデータを特定の宛先にプッシュします。

MaxCompute データプッシュ

MaxComputeDataSync

バッチ同期

MySQL データベースのデータを MaxCompute データソースに同期します。

MaxComputeDataQuery

代入ノード

MaxCompute データソースからデータをクエリし、代入ノードの出力パラメーターを使用してクエリ結果をデータプッシュノードに渡します。

MaxComputeDataPush

データプッシュノード

代入ノードの出力パラメーターを受け取り、データプッシュノードの入力パラメーターを使用してデータを特定の宛先にプッシュします。

データの準備

このトピックでは、テストデータは注文テーブルからのものです。このセクションでは、テストテーブルを作成し、テーブルにデータを書き込む方法について説明します。テストデータが不要な場合は、このセクションの操作をスキップできます。

テストテーブルを作成する

  1. DataWorks コンソール にログインします。トップナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

  2. DataStudio ページの左側のナビゲーションウィンドウで、image アイコンをクリックします。[Ad Hoc Query] ペインで、ポインターを image アイコンの上に移動し、[作成] > MySQL を選択します。[Create Node] ダイアログボックスで、次のパラメーターを構成します。

    • ノードタイプ: MySQL。

    • パス: アドホッククエリ。

    • 名前: TableCreation。

  3. orders という名前のテストテーブルを作成します。サンプルコード:

CREATE TABLE orders (
     order_id INT NOT NULL AUTO_INCREMENT,
     category VARCHAR(100) NOT NULL, -- カテゴリ。
     sales DOUBLE NOT NULL, -- 注文の売上高。
     datetime DATETIME NOT NULL, -- 注文の支払い日時。
     PRIMARY KEY (order_id),
     INDEX (category)
);

ストアドプロシージャを作成する

次のコードは、過去 2 か月間の注文データを生成するストアドプロシージャを作成する方法の例を示しています。

説明

MySQL クライアントでストアドプロシージャを作成する必要があります。

DELIMITER $$

CREATE PROCEDURE InsertOrders(IN num_orders INT)
BEGIN
  DECLARE v_category VARCHAR(100);
  DECLARE v_sales DOUBLE;
  DECLARE v_datetime DATETIME;
  DECLARE v_category_list VARCHAR(255);
  DECLARE v_index INT;
  DECLARE i INT DEFAULT 0;
  
  -- カテゴリを定義します。複数のカテゴリはコンマで区切ります。
  SET v_category_list = 'Electronics,Books,Home & Kitchen,Fashion,Toys,Baby,Computers,Electronics,Games,Garden,Clothing,Grocery,Health,Jewelry,Kids';
  -- カテゴリの総数を取得します。
  SET v_index = ROUND((RAND() * (CHAR_LENGTH(v_category_list) - CHAR_LENGTH(REPLACE(v_category_list, ',', '')) + 1)));
  
  WHILE i < num_orders DO
    -- ランダムなインデックスを生成してカテゴリを選択します。
    SET v_index = FLOOR(1 + (RAND() * (CHAR_LENGTH(v_category_list) - CHAR_LENGTH(REPLACE(v_category_list, ',', '')) + 1)));
    -- カテゴリのリストからランダムなカテゴリを抽出します。
    SET v_category = SUBSTRING_INDEX(SUBSTRING_INDEX(v_category_list, ',', v_index), ',', -1);
    
    -- 1,000 から 30,000 までのランダムな売上高を生成します。
    SET v_sales = 1000 + FLOOR(RAND() * 29000);
    
    -- 過去 2 か月間のランダムな日付を生成します。
    SET v_datetime = NOW() - INTERVAL FLOOR(RAND() * 61) DAY;
    
    -- 新しいランダムな注文のデータを orders テーブルに挿入します。
    INSERT INTO orders (category, sales, datetime) VALUES (v_category, v_sales, v_datetime);
    
    SET i = i + 1;
  END WHILE;
END$$

DELIMITER ;

orders テーブルにテストデータを書き込む

ストアドプロシージャを作成したら、CALL 文を実行してストアドプロシージャを呼び出し、ランダムな注文データを orders テーブルに挿入できます。

-- ストアドプロシージャを呼び出して、特定の数の注文データレコードを orders テーブルに挿入します。
CALL InsertOrders(1000); -- orders テーブルに 1,000 件の注文データレコードを挿入します。

ワークフローでデータプッシュフローを設定する

このセクションでは、条件付きデータプッシュスクリプトデータプッシュ複合データプッシュシンプルなデータプッシュMaxCompute データプッシュのプッシュ方法を使用して、ワークフローでデータプッシュフローを設定し、特定の宛先にデータをプッシュする方法について説明します。

条件付きデータプッシュ

ステップ 1: データプッシュフローを設定する

DataPushDemo ワークフローをダブルクリックします。ワークフローキャンバスで、SalesAmountPreMonthConditionCompliantDataTop3CategoriesNonCompliantDataBottom3Categories ノードを接続して、データプッシュフローを形成します。次の図は、データプッシュフローを示しています。

image

ステップ 2: SQL クエリノードを設定する

SQL クエリノードを設定してテストデータをクエリし、[入力パラメーターと出力パラメーター] セクションに outputs パラメーターを追加して、クエリ結果を分岐ノードに渡すことができます。

  1. SalesAmountPreMonth ノードをダブルクリックします。ノードの設定タブで、ノードのコードを記述します。サンプルコード:

    -- 前月の総売上高をクエリします。
    SELECT SUM(sales) AS sales_amount
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
  2. ノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを構成します。

    • スケジュール時刻: 値を 08:00 に設定します。

    • リソースグループ: 既存のサーバーレスリソースグループを選択します。

    • 親ノード: [ルートノードを追加] を選択します。

    • 出力パラメーター: [入力パラメーターと出力パラメーター] セクションで、[出力パラメーター] の右側にある [代入パラメーターを追加] をクリックして、SQL クエリノードの outputs パラメーターを追加します。 outputs パラメーターは、データプッシュノードの入力パラメーターとして使用されます。image

ステップ 3: 分岐ノードを設定する

[入力パラメーターと出力パラメーター] セクションで、SQL クエリノードの outputs パラメーターを分岐ノードの 入力パラメーターとして設定できます。分岐ノードは論理判断を実行した後、outputs パラメーターを使用して、クエリ結果を分岐ノードの子ノードに渡します。

  1. Condition ノードをダブルクリックします。ノードの構成タブの定義セクションで、[ブランチの追加] をクリックします。[ブランチ定義] ダイアログボックスで、パラメーターを構成します。次の表は、構成する必要があるパラメーターについて説明しています。

    パラメーター

    適合データを受け取る子ノード

    不適合データを受け取る子ノード

    条件

    ${inputs[0][0]}>=500000

    ${inputs[0][0]}<500000

    関連付けられたノード出力

    適合

    不適合

    説明

    特定の条件を満たす売上高

    特定の条件を満たさない売上高

    説明

    [0][0] は、分岐ノードの先祖ノードの outputs パラメーターのデータを指定する 2 次元配列です。

    • 先祖ノードが SQL クエリノードの場合、2 次元配列を使用して、分岐ノードの先祖ノードの outputs パラメーターのデータを指定します。

    • 先祖ノードが Python ノードの場合、1 次元配列を使用して、分岐ノードの先祖ノードの outputs パラメーターのデータを指定します。

  2. ブランチノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを構成します。次の表は、構成する必要があるパラメーターについて説明しています。

    セクションとパラメーター

    説明

    スケジュール

    スケジューリングサイクル

    値を に設定します。

    スケジュール時刻

    値を 08:00 に設定します。

    再実行

    値を 実行状態に関係なく許可 に設定します。

    リソースグループ

    リソースグループ

    スケジューリング用に既存のリソースグループを選択します。

    説明

    データプッシュノードを初めて使用するときは、チケットを送信 して、スケジューリング用にリソースグループをアップグレードする必要があります。

    現在のノードの出力名

    分岐を設定すると、出力名が自動的に解析されて表示されます。出力名 は、分岐を追加したときに [関連付けられたノード出力] パラメーターに設定した値と同じです。

    入力パラメーターと出力パラメーター

    入力パラメーター

    [作成] をクリックします。パラメーター名: inputs

    値のソース: SalesAmountPreMonth ノードの outputs パラメーターを選択します。

    出力パラメーター

    デフォルトでは、outputs パラメーターが追加されます。

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 4: 分岐ノードの子ノードを設定する

この例では、CompliantData ノードと NonCompliantData ノードは分岐ノードの子ノードです。[分岐ノード] が outputs パラメーターを使用してクエリ結果を子ノードに渡した後、各子ノードの [入力パラメーターと出力パラメーター] セクションに outputs パラメーターを追加して、クエリ結果を対応するデータプッシュノードに渡すことができます。

  1. CompliantData ノードと NonCompliantData ノードをそれぞれダブルクリックします。各ノードの設定タブで、ノードのコードを記述します。

    CompliantData ノードのサンプルコード

    SET @all_cat_sales_volume_month := 0.0;
    SELECT SUM(sales) INTO @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
    
    -- 一時テーブルを作成します。
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      category VARCHAR(255),
      sales DOUBLE,
      all_cat_sales_volume_month DOUBLE
    );
    -- データをクエリし、一時テーブルに書き込みます。
    INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) AS amount, @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category ORDER BY amount DESC limit 3;
    -- 売上高が特定の条件を満たす上位 3 つのカテゴリをクエリします。
    SELECT category, sales, all_cat_sales_volume_month FROM temp_array;

    NonCompliantData ノードのサンプルコード

    SET @all_cat_sales_volume_month := 0.0;
    SELECT SUM(sales) INTO @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
    
    -- 一時テーブルを作成します。
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      category VARCHAR(255),
      sales DOUBLE,
      all_cat_sales_volume_month DOUBLE
    );
    
    -- データをクエリし、一時テーブルに書き込みます。
    INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) AS amount, @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category ORDER BY amount ASC limit 3;
    
    -- 売上高が特定の条件を満たさない下位 3 つのカテゴリをクエリします。
    SELECT category, sales, all_cat_sales_volume_month FROM temp_array;
  2. ノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを構成します。

    • スケジュール時刻: 値を 08:00 に設定します。

    • リソースグループ: 既存のサーバーレスリソースグループを選択します。

    • 親ノード: 分岐ノードとその子ノード間の依存関係が正しいかどうかを確認します。依存関係は、ステップ 1 でデータプッシュフローを設定するときに設定されます。

      • CompliantData: [依存関係] セクションで、[上位ノードの出力名] 列の値が [準拠] であることを確認します。

      • NonCompliantData: [依存関係] セクションで、[上位ノードの出力名] 列の値が [非準拠] であるかどうかを確認します。

    • 出力パラメーター: [入力パラメーターと出力パラメーター] セクションで、[出力パラメーター] の右側にある [代入パラメーターを追加] をクリックして、SQL クエリノードの outputs パラメーターを追加します。 outputs パラメーターは、データプッシュノードの入力パラメーターとして使用されます。image

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 5: データプッシュノードを設定する

各データプッシュノードの [入力パラメーターと出力パラメーター] セクションの 入力パラメーター に入力パラメーターを追加して、CompliantData ノードと NonCompliantData ノードの outputs パラメーターを取得できます。このようにして、データプッシュノードは入力パラメーターを使用してデータを特定の宛先にプッシュできます。

  1. Top3Categories ノードと Bottom3Categories ノードをそれぞれダブルクリックします。関連ノードの設定タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを設定します。次の表に、設定する必要があるパラメーターを示します。

    セクションとパラメーター

    説明

    スクリーンショット

    スケジューリングパラメーター

    パラメーター名

    値を curdate に設定します。

    image

    パラメーター値

    値を $[yyyymmddhh:mi:ss] に設定します。

    スケジュール

    スケジューリングサイクル

    値を に設定します。

    image

    スケジュール時刻

    値を 08:00 に設定します。

    説明

    この例では、毎日 08:00 に特定の宛先にデータがプッシュされるようにします。このパラメーターは、ビジネス要件に基づいて設定できます。

    再実行

    値を 実行状態に関係なく許可 に設定します。

    リソースグループ

    リソースグループ

    スケジューリング用に既存のリソースグループを選択します。

    説明

    データプッシュノードを初めて使用するときは、チケットを送信 して、スケジューリング用にリソースグループをアップグレードする必要があります。

    image

    入力パラメーターと出力パラメーター

    入力パラメーター

    [作成] をクリックして、入力パラメーターを追加します。

    • パラメーター名: inputs

    • 値のソース:

      • Top3Categories: ドロップダウンリストから CompliantData ノードの outputs パラメーターを選択します。

      • Bottom3Categories: ドロップダウンリストから NonCompliantData ノードの outputs パラメーターを選択します。

    • Top3Categoriesimage

    • Bottom3Categoriesimage

  2. プッシュするデータを設定します。

    • 宛先: [宛先] ドロップダウンリストから宛先を選択します。使用可能な宛先がない場合は、[宛先の作成] をクリックして宛先を作成します。次の表に、宛先を作成するためのパラメーターを示します。

      パラメーター

      説明

      タイプ

      プッシュチャネルを選択します。DingTalk、Lark、WeCom、Microsoft Teams、電子メールがサポートされています。

      名前

      ビジネス要件に基づいて名前を入力します。

      Webhook

      選択したプッシュチャネルの Webhook URL を入力します。対応するプラットフォームで関連するプッシュチャネルの URL を取得する必要があります。

      説明
    • タイトル: Top3Categories ノードには 売上高が上位 3 位のカテゴリ、Bottom3Categories ノードには 売上高が下位 3 位のカテゴリ と入力します。

    • 本文: ビジネス要件に基づいてコンテンツを設定します。詳細については、「データプッシュ」トピックの「プッシュするコンテンツを設定する」セクションをご参照ください。

      説明

      [本文] セクションでは、SQL クエリノードによってクエリされたフィールド名をプレースホルダーとして直接使用して、データプッシュノードの入力パラメーターを取得できます。

      • Top3Categories ノードのサンプル設定image

      • Bottom3Categories ノードのサンプル設定image

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 6: データプッシュフローをテストする

条件付きデータプッシュフローを設定したら、ワークフローをコミットしてワークフロー内のノードをデプロイする前に、フローをテストする必要があります。

  1. DataPushDemo ワークフローをダブルクリックします。

  2. ワークフローの設定タブで、SalesAmountPreMonth ノードを右クリックし、[現在のノードとその子孫ノードを実行] を選択します。

    説明

    データプッシュフロー内のノードでエラーが発生した場合は、ノードを右クリックして [ログの表示] を選択し、ログを表示します。

    image

スクリプトデータプッシュ

ステップ 1: データプッシュフローを設定する

DataPushDemo ワークフローをダブルクリックします。ワークフローキャンバスで、SalesAmountPreWeekTop3CategoryListTop3CategoriesPreWeek ノードを接続して、データプッシュフローを形成します。次の図は、データプッシュフローを示しています。

image

ステップ 2: SQL クエリノードを設定する

SQL クエリノードを設定してテストデータをクエリし、[入力パラメーターと出力パラメーター] セクションに outputs パラメーターを追加して、SQL クエリノードのクエリ結果を代入ノードに渡すことができます。

  1. SalesAmountPreWeek ノードをダブルクリックします。ノードの設定タブで、ノードのコードを記述します。サンプルコード:

    -- 前週の総売上高をクエリします。
    SELECT category, SUM(sales) AS amount
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 WEEK), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category ORDER BY amount DESC limit 3;
  2. ノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを構成します。

    • スケジュール時刻: 値を 08:00 に設定します。

    • リソースグループ: 既存のサーバーレスリソースグループを選択します。

    • 親ノード: [ルートノードを追加] を選択します。

    • 出力パラメーター: [入力パラメーターと出力パラメーター] セクションで、[出力パラメーター] の右側にある [代入パラメーターを追加] をクリックして、SQL クエリノードの outputs パラメーターを追加します。 outputs パラメーターは、データプッシュノードの入力パラメーターとして使用されます。image

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 3: 代入ノードを設定する

[入力パラメーターと出力パラメーター] セクションで、SQL クエリノードの outputs パラメーターを代入ノードの 入力パラメーターとして設定できます。次に、入力パラメーターに値を再割り当てして、代入ノードの outputs パラメーターを生成します。代入ノードは outputs パラメーターを使用して、データプッシュノードにデータを渡します。

  1. Top3CategoryList ノードをダブルクリックします。ノードの設定タブで、言語 に Python を選択し、ノードのコードを記述します。

    def main():
    
        From datetime import date
        today = date.today()
        formatted_date = today.strftime('%Y-%m-%d')
        
        msg = '統計日: ' + formatted_date + ' \\n\\n ' \
        '- 1: ${inputs[0][0]}, 売上高: ${inputs[0][1]} \\n\\n ' \
        '- 2: ${inputs[1][0]}, 売上高: ${inputs[1][1]} \\n\\n ' \
        '- 3: ${inputs[2][0]}, 売上高: ${inputs[2][1]} \\n\\n '
        
        print(msg)
    
    
    if __name__ == "__main__":
        import sys
        main()
  2. ノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを構成します。

    • スケジュール時刻: 値を 08:00 に設定します。

    • リソースグループ: スケジューリング用に既存のリソースグループを選択します。

    • 入力パラメーターと出力パラメーター

      • 入力パラメーター

        • [作成] をクリックします。パラメーター名: inputs

        • 値のソース: ドロップダウンリストから SalesAmountPreWeek ノードの outputs パラメーターを選択します。

      • 出力パラメーター: デフォルトでは、outputs パラメーターが追加されます。

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 4: データプッシュノードを設定する

[入力パラメーターと出力パラメーター] セクションで、代入ノードの outputs パラメーターをデータプッシュノードの 入力パラメーターとして設定できます。次に、データプッシュノードは入力パラメーターを使用してデータを特定の宛先にプッシュします。

  1. Top3CategoriesPreWeek ノードをダブルクリックします。ノードの設定タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを設定します。次の表に、設定する必要があるパラメーターを示します。

    セクションとパラメーター

    説明

    スクリーンショット

    スケジューリングパラメーター

    パラメーター名

    値を curdate に設定します。

    image

    パラメーター値

    値を $[yyyymmddhh:mi:ss] に設定します。

    スケジュール

    スケジューリングサイクル

    値を に設定します。

    image

    スケジュール時刻

    値を 08:00 に設定します。

    説明

    この例では、毎日 08:00 に特定の宛先にデータがプッシュされるようにします。このパラメーターは、ビジネス要件に基づいて設定できます。

    再実行

    値を 実行状態に関係なく許可 に設定します。

    リソースグループ

    リソースグループ

    スケジューリング用に既存のリソースグループを選択します。

    説明

    データプッシュノードを初めて使用するときは、チケットを送信 して、スケジューリング用にリソースグループをアップグレードする必要があります。

    image

    入力パラメーターと出力パラメーター

    入力パラメーター

    [作成] をクリックして、入力パラメーターを追加します。

    パラメーター名: inputs

    値のソース: ドロップダウンリストから Top3CategoryList ノードの outputs パラメーターを選択します。

    image

  2. プッシュするデータを設定します。

    • 宛先: [宛先] ドロップダウンリストから宛先を選択します。使用可能な宛先がない場合は、[宛先の作成] をクリックして宛先を作成します。次の表に、宛先を作成するためのパラメーターを示します。

      パラメーター

      説明

      タイプ

      プッシュチャネルを選択します。DingTalk、Lark、WeCom、Teams がサポートされています。

      名前

      ビジネス要件に基づいて名前を入力します。

      Webhook

      選択したプッシュチャネルの Webhook URL を入力します。対応するプラットフォームで関連するプッシュチャネルの URL を取得する必要があります。

      説明
    • タイトル: 前週の売上高が上位 3 位のカテゴリ と入力します。

    • 本文: ビジネス要件に基づいてコンテンツを設定します。詳細については、「データプッシュ」トピックの「プッシュするコンテンツを設定する」セクションをご参照ください。

      説明

      本文 セクションでは、SQL クエリノードによってクエリされたフィールド名をプレースホルダーとして直接使用して、データプッシュノードの入力パラメーターを取得できます。

      image

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 5: データプッシュフローをテストする

スクリプトデータプッシュフローを設定したら、ワークフローをコミットしてワークフロー内のノードをデプロイする前に、フローをテストする必要があります。

  1. DataPushDemo ワークフローをダブルクリックします。

  2. ワークフローの設定タブで、SalesAmountPreWeek ノードを右クリックし、[現在のノードとその子孫ノードを実行] を選択します。

    説明

    データプッシュフロー内のノードでエラーが発生した場合は、ノードを右クリックして [ログの表示] を選択し、ログを表示します。

    image

シンプルなデータプッシュ

ステップ 1: データプッシュフローを設定する

DataPushDemo ワークフローをダブルクリックします。ワークフローキャンバスで、SalesAmountPreDay ノードと PushSalesAmountPreDay ノードを接続して、データプッシュフローを形成します。次の図は、データプッシュフローを示しています。

image

ステップ 2: SQL クエリノードを設定する

SQL クエリノードを設定してテストデータをクエリし、[入力パラメーターと出力パラメーター] セクションに outputs パラメーターを追加して、SQL クエリノードのクエリ結果をデータプッシュノードに渡すことができます。

  1. SalesAmountPreDay ノードをダブルクリックします。ノードの設定タブで、ノードのコードを記述します。サンプルコード:

    -- temp_array という名前の一時テーブルを作成します。
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      total_amount DOUBLE
    );
    
    -- 昨日の総売上高に関連するデータを temp_array テーブルに書き込みます。
    INSERT INTO temp_array (total_amount) 
    SELECT SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59');
    
    -- temp_array テーブルをクエリします。
    select total_amount FROM temp_array;
  2. ノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを構成します。

    • スケジュール時刻: 値を 08:00 に設定します。

    • リソースグループ: 既存のサーバーレスリソースグループを選択します。

    • 親ノード: [ルートノードを追加] を選択します。

    • 出力パラメーター: [入力パラメーターと出力パラメーター] セクションで、[出力パラメーター] の右側にある [代入パラメーターを追加] をクリックして、SQL クエリノードの outputs パラメーターを追加します。 outputs パラメーターは、データプッシュノードの入力パラメーターとして使用されます。image

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 3: データプッシュノードを設定する

[入力パラメーターと出力パラメーター] セクションで、SQL クエリノードの outputs パラメーターをデータプッシュノードの 入力パラメーターとして設定できます。次に、データプッシュノードは入力パラメーターを使用してデータを特定の宛先にプッシュします。

  1. PushSalesAmountPreDay ノードをダブルクリックします。ノードの設定タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを設定します。次の表に、設定する必要があるパラメーターを示します。

    セクションとパラメーター

    説明

    スクリーンショット

    スケジューリングパラメーター

    パラメーター名

    値を curdate に設定します。

    image

    パラメーター値

    値を $[yyyymmddhh:mi:ss] に設定します。

    スケジュール

    スケジューリングサイクル

    値を に設定します。

    image

    スケジュール時刻

    値を 08:00 に設定します。

    説明

    この例では、毎日 08:00 に特定の宛先にデータがプッシュされるようにします。このパラメーターは、ビジネス要件に基づいて設定できます。

    再実行

    値を 実行状態に関係なく許可 に設定します。

    リソースグループ

    リソースグループ

    スケジューリング用に既存のリソースグループを選択します。

    説明

    データプッシュノードを初めて使用するときは、チケットを送信 して、スケジューリング用にリソースグループをアップグレードする必要があります。

    image

    入力パラメーターと出力パラメーター

    入力パラメーター

    [作成] をクリックして、入力パラメーターを追加します。

    パラメーター名: inputs

    値のソース: ドロップダウンリストから SalesAmountPreDay ノードの outputs パラメーターを選択します。

    image

  2. プッシュするデータを設定します。

    • 宛先: [宛先] ドロップダウンリストから宛先を選択します。使用可能な宛先がない場合は、[宛先の作成] をクリックして宛先を作成します。次の表に、宛先を作成するためのパラメーターを示します。

      パラメーター

      説明

      タイプ

      プッシュチャネルを選択します。DingTalk、Lark、WeCom、Teams がサポートされています。

      名前

      ビジネス要件に基づいて名前を入力します。

      Webhook

      選択したプッシュチャネルの Webhook URL を入力します。対応するプラットフォームで関連するプッシュチャネルの URL を取得する必要があります。

      説明
    • タイトル: 昨日の総売上高 と入力します。

    • 本文: ビジネス要件に基づいてコンテンツを設定します。詳細については、「データプッシュ」トピックの「プッシュするコンテンツを設定する」セクションをご参照ください。

      説明

      本文 セクションでは、SQL クエリノードによってクエリされたフィールド名をプレースホルダーとして直接使用して、データプッシュノードの入力パラメーターを取得できます。

      image

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 4: フローをテストする

シンプルなデータプッシュフローを設定したら、ワークフローをコミットしてワークフロー内のノードをデプロイする前に、フローをテストする必要があります。

  1. DataPushDemo ワークフローをダブルクリックします。

  2. ワークフローの設定タブで、SalesAmountPreDay ノードを右クリックし、[現在のノードとその子孫ノードを実行] を選択します。

    説明

    データプッシュフロー内のノードでエラーが発生した場合は、ノードを右クリックして [ログの表示] を選択し、ログを表示します。

    image

複合データプッシュ

ステップ 1: データプッシュフローを設定する

DataPushDemo ワークフローをダブルクリックします。ワークフローキャンバスで、SalesAmountPreDaySalesGrowthPreDayCombinedPush ノードを接続して、データプッシュフローを形成します。次の図は、データプッシュフローを示しています。

説明

SalesAmountPreDay ノードは、複合データプッシュ方法とシンプルなデータプッシュ方法の両方で使用できます。

image

ステップ 2: SQL クエリノードを設定する

各 SQL クエリノードを設定してテストデータをクエリし、[入力パラメーターと出力パラメーター] セクションの outputs パラメーターを使用して、SQL クエリノードのクエリ結果をデータプッシュノードに渡すことができます。

  1. SalesGrowthPreDay ノードをクリックします。ノードの設定タブで、ノードのコードを記述します。サンプルコード:

    -- 一昨日分の売上合計金額を集計するための temp_array1 という名前のテーブルを作成します。
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array1 (
      category VARCHAR(255),
      sales DOUBLE
    );
    -- 一昨日分のデータを temp_array1 テーブルに書き込みます。
    INSERT INTO temp_array1 (category, sales) SELECT category, SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category;
    
    -- 昨日分の売上合計金額を集計するための temp_array2 という名前のテーブルを作成します。
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array2 (
      category VARCHAR(255),
      sales DOUBLE
    );
    -- 昨日分のデータを temp_array2 テーブルに書き込みます。
    INSERT INTO temp_array2 (category, sales) SELECT category, SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category;
    
    -- 昨日分の売上増加金額を集計するための result という名前のテーブルを作成します。
    CREATE TEMPORARY TABLE IF NOT EXISTS result (
      category VARCHAR(255),
      diff DOUBLE
    );
    -- 売上増加データを result テーブルに書き込みます。
    INSERT INTO result (category, diff) SELECT temp_array2.category AS category, temp_array2.sales - temp_array1.sales AS diff FROM temp_array1 LEFT JOIN temp_array2 ON temp_array1.category = temp_array2.category;
    
    -- result テーブルからデータをクエリします。
    SELECT category, diff FROM result;

  2. ノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを構成します。

    • スケジュール時刻: 値を 08:00 に設定します。

    • リソースグループ: 既存のサーバーレスリソースグループを選択します。

    • 親ノード: [ルートノードを追加] を選択します。

    • 出力パラメーター: [入力パラメーターと出力パラメーター] セクションで、[出力パラメーター] の右側にある [代入パラメーターを追加] をクリックして、SQL クエリノードの outputs パラメーターを追加します。 outputs パラメーターは、データプッシュノードの入力パラメーターとして使用されます。image

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 3: データプッシュノードを設定する

[入力パラメーターと出力パラメーター] セクションで、SalesAmountPreDay ノードと SalesGrowthPreDay ノードの outputs パラメーターをデータプッシュノードの 入力パラメーターとして設定できます。次に、データプッシュノードは入力パラメーターを使用してデータを特定の宛先にプッシュします。

  1. CombinedPush ノードをダブルクリックします。ノードの設定タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを設定します。次の表に、設定する必要があるパラメーターを示します。

    セクションとパラメーター

    説明

    スクリーンショット

    スケジューリングパラメーター

    パラメーター名

    値を curdate に設定します。

    image

    パラメーター値

    値を $[yyyymmddhh:mi:ss] に設定します。

    スケジュール

    スケジューリングサイクル

    値を に設定します。

    image

    スケジュール時刻

    値を 08:00 に設定します。

    説明

    この例では、毎日 08:00 に特定の宛先にデータがプッシュされるようにします。このパラメーターは、ビジネス要件に基づいて設定できます。

    再実行

    値を 実行状態に関係なく許可 に設定します。

    リソースグループ

    リソースグループ

    スケジューリング用に既存のリソースグループを選択します。

    説明

    データプッシュノードを初めて使用するときは、チケットを送信 して、スケジューリング用にリソースグループをアップグレードする必要があります。

    image

    入力パラメーターと出力パラメーター

    入力パラメーター

    [作成] をクリックして、入力パラメーターを追加します。

    パラメーター 1:

    • パラメーター名: inputs1

    • 値のソース: ドロップダウンリストから SalesAmountPreDay ノードの outputs パラメーターを選択します。

    パラメーター 2:

    • パラメーター名: inputs2

    • 値のソース: ドロップダウンリストから SalesGrowthPreDayoutputs パラメーターを選択します。

    image

  2. プッシュするデータを設定します。

    • 宛先: [宛先] ドロップダウンリストから宛先を選択します。使用可能な宛先がない場合は、[宛先の作成] をクリックして宛先を作成します。次の表に、宛先を作成するためのパラメーターを示します。

      パラメーター

      説明

      タイプ

      プッシュチャネルを選択します。DingTalk、Lark、WeCom、Teams がサポートされています。

      名前

      ビジネス要件に基づいて名前を入力します。

      Webhook

      選択したプッシュチャネルの Webhook URL を入力します。対応するプラットフォームで関連するプッシュチャネルの URL を取得する必要があります。

      説明
    • タイトル: 昨日の売上高と売上高の増加 と入力します。

    • 本文: ビジネス要件に基づいてコンテンツを設定します。詳細については、「データプッシュ」トピックの「プッシュするコンテンツを設定する」セクションをご参照ください。

      説明

      本文 セクションでは、SQL クエリノードによってクエリされたフィールド名をプレースホルダーとして直接使用して、データプッシュノードの入力パラメーターを取得できます。

      image

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 4: データプッシュフローをテストする

複合データプッシュフローとシンプルなデータプッシュフローを設定したら、ワークフローをコミットしてワークフロー内のノードをデプロイする前に、複合データプッシュフローをテストする必要があります。

  1. DataPushDemo ワークフローをダブルクリックします。

  2. ワークフローの設定タブで、CombinedPush ノードを右クリックし、[現在のノードとその子孫ノードを実行] を選択します。

    説明

    データプッシュフロー内のノードでエラーが発生した場合は、ノードを右クリックして [ログの表示] を選択し、ログを表示します。

    image

MaxCompute データプッシュ

ステップ 1: データプッシュフローを設定する

DataPushDemo ワークフローをダブルクリックします。ワークフローキャンバスで、MaxComputeDataSyncMaxComputeDataQueryMaxComputeDataPush ノードを接続して、データプッシュフローを形成します。次の図は、データプッシュフローを示しています。

ステップ 2: バッチ同期ノードを設定する

MaxComputeDataSync ノードを構成して、データの準備 セクションで MySQL に書き込まれたテストデータを、将来使用するために作成された MaxCompute データソースに同期できます。

  1. MaxComputeDataSync ノードをダブルクリックします。ノードの設定タブで、パラメーターを設定します。次の表にパラメーターを示します。

    項目

    説明

    スクリーンショット

    ソース

    ソース

    MySQL を選択します。

    image

    データソース名

    追加した MySQL データソースを選択します。

    リソースグループ

    サーバーレスリソースグループを選択します。

    宛先

    宛先

    MaxCompute(ODPS) を選択します。

    データソース名

    ワークスペースに追加されている MaxCompute データソースを選択します。

    設定が完了すると、システムはデータソースとリソースグループ間の接続性をテストします。データソースがネットワーク接続テストに合格したら、[次へ] をクリックして、ソースと宛先を設定します。

  2. ソースと宛先を設定します。

    セクションとパラメーター

    説明

    スクリーンショット

    ソース

    データソース

    次の項目に注意してください。

    • デフォルト値の MySQL を保持します。

    • 追加した MySQL データソースを選択します。

    image

    テーブル

    orders テーブルを選択します。

    データフィルタリング

    このパラメーターは、ビジネス要件に基づいて設定します。この例では、このパラメーターは空のままです。

    分割キー

    ソーステーブルの列をシャードキーとして使用できます。プライマリキー列またはインデックス付き列を使用することをお勧めします。

    データプレビュー

    [データプレビュー] をクリックして、MySQL データソースから取得したデータが期待どおりかどうかを確認できます。

    宛先

    データソース

    次の項目に注意してください。

    • デフォルト値の MaxCompute を保持します。

    • ワークスペースに追加されている MaxCompute データソースを選択します。

    image

    トンネルリソースグループ

    デフォルト値の 共有転送リソース を保持します。指定されたリソースは、トンネルクォータとして使用されます。

    テーブル

    [宛先テーブルスキーマを生成] をクリックして、宛先テーブルを生成します。

    パーティション情報

    日次増分データを対応する日付のパーティションに保存する場合は、日次増分データの同期用に pt パラメーターを設定できます。たとえば、pt を ${bizdate} に設定できます。

    書き込みモード

    [書き込み前に既存のデータをクリーンアップする (insert Overwrite)] を選択します。

  3. ソースフィールドを、ソースフィールドと同じ名前の宛先フィールドにマッピングします。image

  4. チャネル制御ポリシーを設定します。

    • タスクの予想最大同時実行数: データ同期の際に使用される並列スレッドの実際の数は、リソースまたはタスクの特定の特性により、指定されたしきい値以下になる場合があります。使用する並列スレッドの実際の数に基づいて、選択したリソースグループに対して課金されます。この例では、このパラメーターを 2 に設定します。

    • [同期レート]: スロットリングを使用すると、ソースとデスティネーションの読み取りおよび書き込みワークロードを削減できます。スロットリングを有効にしない場合、システムは既存のハードウェア環境で使用可能な最大伝送パフォーマンスでノードを処理します。この例では、[電流制限なし] を選択します。

    • [不正なデータレコードのポリシー]: [不正なデータレコードを許可しない] を選択します。

    • 分散実行: デフォルトでは、このスイッチはオフになっています。指定した並列スレッドの最大数が 8 以上の場合にのみ、スイッチをオンにできます。

    image

  5. スケジューリングプロパティを設定します。

    ノードの設定タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを設定します。

    • スケジューリングパラメーター

      • パラメーター名: 値を bizdate に設定します。

      • パラメーター値: 値を $[yyyymmdd-1] に設定します。

    • スケジュール時刻: 値を 08:00 に設定します。

    • [再実行]: このパラメーターを「実行状態に関係なく許可」に設定します。

    • リソースグループ: スケジューリング用に既存のリソースグループを選択します。

    • [依存関係]: [ルートノードの追加] を選択して、ルートノードをバッチ同期ノードの先祖ノードとして使用します。

  6. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 3: 代入ノードを設定する

MaxCompute データソースでは、ODPS SQL ノードを使用してデータをクエリし、[出力パラメーター] を使用してデータプッシュノードにデータを送信することはできません。 代入ノードを設定して MaxCompute データをクエリし、[入力と出力のパラメーター] セクションに [出力] パラメーターを追加して、クエリ結果をデータプッシュノードに渡すことができます。

  1. MaxComputeDataQuery ノードをダブルクリックします。

    1. ノードの設定タブで、言語 ドロップダウンリストから ODPS SQL を選択します。

    2. ノードのコードを記述します。サンプルコード:

      -- 各パーティションの売上高を降順にランク付けするサブクエリを作成します。
      -- DENSE_RANK() 関数を呼び出して、パーティション内の各行にランクを割り当てます。パーティション内の複数の行の売上高が同じである場合、DENSE_RANK() 関数はそれらの行に同じランクを割り当てます。ランキング番号は連続しています。
      --
      -- サブクエリ:
      -- 1. orders テーブルから次の列を選択します。order_id、category、sales、datetime、pt。
      -- 2. DENSE_RANK() OVER (PARTITION BY pt ORDER BY sales DESC) 関数を呼び出します。
         -- PARTITION BY pt: pt フィールドに基づいてデータレコードをパーティション分割します。サブクエリは、pt フィールドで指定された各パーティションのデータレコードをランク付けします。
         -- ORDER BY sales DESC: 各パーティションのデータレコードを、sales フィールドの値に基づいて降順にランク付けします。
         -- rank 列には、各パーティションの各行の売上高のランクが格納されます。
      --
      -- メインクエリ:
      -- サブクエリのクエリ結果から、売上高が上位 3 位のデータレコードをクエリします。
      -- つまり、各データタイムスタンプの売上高が上位 3 位のデータレコードがクエリされます。
      
      SELECT
        order_id,          -- 注文 ID。
        category,          -- 製品カテゴリ。
        sales,             -- 売上高。
        datetime, -- 注文の支払い日時。
        pt -- データタイムスタンプ。
      FROM (
          SELECT
            order_id,
            category,
            sales,
            datetime,
            pt,
            DENSE_RANK() OVER (PARTITION BY pt ORDER BY sales DESC) AS rank  -- 各パーティションの売上高のランキングを計算します。
          FROM orders
          WHERE pt = '${bizdate}'  -- データタイムスタンプ (${bizdate} で指定) に基づいてデータレコードをクエリします。
      ) AS ranked_orders
      WHERE rank <= 3  -- 各パーティションの上位 3 つのデータレコードを保持します。
  2. ノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを構成します。

    • スケジュール時刻: 値を 08:00 に設定します。

    • リソースグループ: 既存のサーバーレスリソースグループを選択します。

    • [依存アップストリームノード]: Ancestor Node Name パラメーターの値が MaxComputeDataSync であるかどうかを確認します。

    • 出力パラメーター: 入力パラメーターと出力パラメーターのセクションで、出力パラメーターの右側にある [作成] をクリックして、代入ノードの出力パラメーターを追加します。出力パラメーターは、データプッシュノードの入力パラメーターとして使用されます。

      image

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 4: データプッシュノードを設定する

[入力パラメーターと出力パラメーター] セクションで、代入ノードの outputs パラメーターをデータプッシュノードの 入力パラメーターとして設定できます。次に、データプッシュノードは入力パラメーターを使用してデータを特定の宛先にプッシュします。

  1. MaxCompueDataPush ノードをダブルクリックします。ノードの設定タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。[プロパティ] タブで、パラメーターを設定します。次の表に、設定する必要があるパラメーターを示します。

    セクションとパラメーター

    説明

    スクリーンショット

    スケジューリングパラメーター

    パラメーター名

    値を curdate に設定します。

    image

    パラメーター値

    値を $[yyyymmddhh:mi:ss] に設定します。

    スケジュール

    スケジューリングサイクル

    値を に設定します。

    image

    スケジュール時刻

    値を 08:00 に設定します。

    説明

    この例では、毎日 08:00 に特定の宛先にデータがプッシュされるようにします。このパラメーターは、ビジネス要件に基づいて設定できます。

    再実行

    値を 実行状態に関係なく許可 に設定します。

    リソースグループ

    リソースグループ

    データプッシュ機能のリリース日である 2024 年 6 月 28 日より後に作成されたサーバーレスリソースグループを選択する必要があります。リソースグループが 2024 年 6 月 28 日より前に作成された場合は、チケットを送信 してリソースグループをアップグレードしてください。

    説明

    リリースノートの詳細については、「リリースレコード」をご参照ください。

    image

    入力パラメーターと出力パラメーター

    入力パラメーター

    [作成] をクリックして、入力パラメーターを追加します。

    • [作成] をクリックします。パラメーター名: inputs

    • 値のソース: ドロップダウンリストから MaxComputeDataQuery ノードの outputs パラメーターを選択します。

    image

  2. プッシュするデータを設定します。

    • 宛先: [宛先] ドロップダウンリストから宛先を選択します。使用可能な宛先がない場合は、[宛先の作成] をクリックして宛先を作成します。次の表に、宛先を作成するためのパラメーターを示します。

      パラメーター

      説明

      タイプ

      プッシュチャネルを選択します。DingTalk、Lark、WeCom、Teams がサポートされています。

      名前

      ビジネス要件に基づいて名前を入力します。

      Webhook

      選択したプッシュチャネルの Webhook URL を入力します。対応するプラットフォームで関連するプッシュチャネルの URL を取得する必要があります。

      説明
    • タイトル: MaxCompute データ と入力します。

    • 本文: ビジネス要件に基づいてコンテンツを設定します。詳細については、「データプッシュ」トピックの「プッシュするコンテンツを設定する」セクションをご参照ください。

      説明

      本文 セクションでは、代入ノードによってクエリされたフィールド名をプレースホルダーとして直接使用して、データプッシュノードの入力パラメーターを取得できます。

  3. トップツールバーの image アイコンをクリックして、設定を保存します。

ステップ 5: フローをテストする

データプッシュフローを設定したら、ワークフローをコミットしてワークフロー内のノードをデプロイする前に、フローをテストする必要があります。

  1. DataPushDemo ワークフローをダブルクリックします。

  2. MaxComputeDataSync ノードを右クリックし、[現在のノードとその子孫ノードを実行] を選択します。

    説明

    データプッシュフロー内のノードでエラーが発生した場合は、ノードを右クリックして [ログの表示] を選択し、ログを表示します。

    image

ワークフローをコミットし、ワークフロー内のノードをデプロイする

ワークフロー内のすべてのデータプッシュフローを設定したら、DataPushDemo ワークフローをダブルクリックします。ワークフローの設定タブで、すべてのデータプッシュフローをテストします。すべてのデータプッシュフローが期待どおりに実行されたら、ワークフローをコミットし、ワークフロー内のノードをデプロイします。

  1. DataPushDemo ワークフローの設定タブで、image アイコンをクリックしてワークフローを実行します。

  2. ワークフロー内のすべてのノードの横に image が表示されたら、image アイコンをクリックして DataPushDemo ワークフローをコミットします。

  3. [コミット] ダイアログボックスで、コミットするノードを選択し、説明を入力して、[I/O の不整合アラートを無視] を選択します。

  4. [確認] をクリックします。

  5. ノードをデプロイします。詳細については、「ノードをデプロイする」をご参照ください。

次のステップ

ワークフローは、指定されたスケジューリングサイクルに基づいて実行されます。デプロイされたノードに対して、オペレーションセンターでさまざまな O&M 操作を実行できます。詳細については、「自動トリガーノードで基本的な O&M 操作を実行する」をご参照ください。