すべてのプロダクト
Search
ドキュメントセンター

:代入ノード

最終更新日:Dec 19, 2025

複雑なデータワークフローでは、ノード間で動的な情報を渡す必要があることがよくあります。一般的な方法は中間テーブルを使用することですが、このアプローチは少量のデータを渡すには非常に非効率的であり、不要な I/O と複雑さを増します。代入ノードは軽量なソリューションを提供します。短いスクリプト (MaxCompute SQL、Python 2、または Shell) を実行し、その出力を直接下流ノードにパラメーターとして渡します。これにより、上流タスクの結果に基づいてタスクが動的に構成される柔軟なパイプラインを構築できます。

注意事項

  • エディション:DataWorks Standard Edition 以上。

  • 権限:DataWorks ワークスペースで [開発] または [ワークスペース管理者] のロールが必要です。詳細については、「ワークスペースへのメンバーの追加」をご参照ください。

仕組み

代入ノードのコア機能はパラメーター渡し、つまり代入ノードから下流ノードへのデータ転送です。

  • 代入ノードは、最後の出力またはクエリ結果を、outputs という名前のシステム生成のノード出力パラメーターに自動的に割り当てることでデータを生成します。

  • 下流ノードがデータを消費します。ノード入力パラメーター (例:param) を追加して、outputs を使用するように構成します。

パラメーターのフォーマット

outputs パラメーターの値とフォーマットは、使用されるスクリプト言語によって異なります。

言語

フォーマット

MaxCompute SQL

最後の SELECT 文の出力。

結果セットは2次元配列として渡されます。

Python 2

最後の print 文の出力。

出力は単一の文字列として扱われ、カンマ (,) で分割されて1次元配列を形成します。

例:出力が 'Electronics,Clothing,Books' の場合、下流ノードは ['Electronics','Clothing','Books'] を受け取ります。
重要

出力内容に含まれるカンマはエスケープしてください。たとえば、出力が 'Electronics,Clothing\, Shoes & Accessories' の場合、下流ノードはそれを ['Electronics', 'Clothing, Shoes & Accessories'] として正しく解析します。

Shell

最後の echo 文の出力。

操作手順

代入ノードの結果は、任意の下流ノードに渡すことができます。次の例では、Shell ノードを使用してこのワークフローを説明します。

  1. 代入ノードの構成

    ワークフローで、代入ノードを作成して編集します。必要に応じて MaxCompute SQL、Python 2、または Shell を選択し、コードを記述します。

    image

  2. Shell ノードの構成

    image

    1. Shell ノードを作成します。

    2. 右側の [スケジューリング] パネルで、[入出力パラメーター] タブを選択します。

    3. [入力パラメーター] セクションで、[パラメーターの作成] をクリックします。

    4. 入力パラメーターに param などのパラメーター名を割り当て、値を outputs パラメーターに設定します。

      説明

      この構成の後、DataWorks は Shell ノードと代入ノードの間に自動的に依存関係を作成します。

    5. パラメーターを構成した後、Shell スクリプト内で ${param} の形式を使用して渡された値を参照できます。

  3. 実行と検証

    1. ワークフローキャンバスの上部ツールバーで [デプロイ] をクリックし、[完全デプロイ] を選択します。

    2. [オペレーションセンター] > [自動トリガーノード O&M] > [自動トリガーノード] に移動します。

    3. 対象のワークフローで スモークテスト を実行し、結果が期待どおりであることを確認します。

制限事項

  • パラメーターは、直接の下流ノードにのみ渡すことができます。

  • サイズ制限:outputs は 2 MB を超えることはできません。超えた場合、代入ノードは失敗します。

  • 構文の制限:

    • 代入ノードのコードにコメントを含めないでください。コメントは出力の解析を妨げ、ノードの失敗や不正な値の生成を引き起こす可能性があります。

    • MaxCompute SQL モードでは WITH 句はサポートされていません。

言語別の例

outputs のデータ形式と参照方法は言語によって異なります。

例1:MaxCompute SQL クエリ結果の受け渡し

SQL クエリの結果は、2次元配列として Shell ノードに渡されます。

  • 代入ノード

    SQL コードが2行2列を返すと仮定します。

    SELECT 'beijing', '1001'
    UNION ALL 
    SELECT 'hangzhou', '1002';
  • Shell ノード

    代入ノードの outputs を参照する param という名前の入力パラメーターを追加します。

    次のスクリプトを使用してデータを読み取ります。

    echo "Entire result set: ${region}"
    echo "First row: ${region[0]}"
    echo "Second field of the first row: ${region[0][1]}"

    DataWorks は静的な置き換えを実行します。出力:

    Full result set: beijing,1001
    hangzhou,1002
    First row: beijing,1001
    Second field of the first row: 1001

例2:Python 2 出力の受け渡し

Python 2 の print 文の出力は、カンマ (,) で分割され、1次元配列として渡されます。

  • 代入ノード

    Python 2 のコードは次のとおりです。

    print 'Electronics, Clothing, Books';
  • Shell ノード

    代入ノードの outputs を参照する param という名前の入力パラメーターを追加します。次のスクリプトを使用してデータを読み取ります。

    次のスクリプトを使用してデータを読み取ります。

    # Output the full array
    echo "Full result set: ${types}"
    
    # Output a specific element by index
    echo "Second element: ${types[1]}"

    DataWorks は静的な置き換えを実行します。出力:

    Full result set: Electronics,Clothing,Books
    Second element: Clothing
説明

Shell ノードの処理ロジックは Python 2 ノードと類似しているため、ここでは説明を省略します。

シナリオ:複数の業務ラインのパーティションテーブルからのデータの一括処理

この例では、代入ノードfor-each ノードを使用して、複数の業務ラインからのユーザー行動データを一括処理する方法を示します。このアプローチにより、複数のプロダクトラインに対して単一のロジックセットを使用することで、データ処理を自動化できます。

image

背景情報

ある大手インターネット企業のデータ開発者であると仮定します。E コマース (ecom)、金融 (finance)、物流 (logistics) という3つのコア業務ラインのデータ処理を担当しています。将来的にはさらに多くの業務ラインが追加される可能性があります。毎日、これら3つの業務ラインのユーザー行動ログに対して同じ集約ロジックを実行する必要があります。このロジックは、各ユーザーの1日あたりの人気度 (PV) を計算し、その結果を統一された集計テーブルに保存します。

  • 上流ソーステーブル (DWD レイヤー):

    • dwd_user_behavior_ecom_d:E コマースユーザー行動テーブル。

    • dwd_user_behavior_finance_d:金融ユーザー行動テーブル。

    • dwd_user_behavior_logistics_d:物流ユーザー行動テーブル。

    • dwd_user_behavior_${line-of-business}_d:他の潜在的な業務ラインのユーザー行動テーブル。

    • これらのテーブルは同じスキーマを持ち、日 (dt) でパーティション化されています。

  • 下流ターゲットテーブル (DWS レイヤー):

    • dws_user_summary_d:ユーザー集計テーブル。

    • このテーブルは、業務ライン (biz_line) と日 (dt) の両方でパーティション化されています。すべての業務ラインの集約結果を保存するために使用されます。

業務ラインごとに個別のタスクを作成すると、メンテナンスコストが高くなり、エラーが発生しやすくなります。for-each ノードを使用すると、1セットの処理ロジックを維持するだけで済みます。システムは自動的にすべての業務ラインを走査して計算を完了します。

データ準備

まず、サンプルテーブルを作成し、テストデータを挿入します。この例では、データタイムスタンプ 20251010 を使用します。

  1. MaxCompute コンピューティングリソースをワークスペースに関連付けます

  2. DataStudio に移動してデータ開発を行い、MaxCompute SQL ノードを作成します。

  3. ソーステーブル (DWD レイヤー) の作成:次のコードを MaxCompute SQL ノードに追加し、それを選択して実行します。

    -- E-commerce user behavior table
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Behavior type',
        event_time  BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence'
    ) 
    COMMENT 'Details of E-commerce user behavioral logs'
    PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- Verify that the E-commerce user behavior table is created.
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- Finance user behavior table
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Behavior type',
        event_time  BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence'
    ) 
    COMMENT 'Details of finance user behavioral logs'
    PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- Verify that the finance user behavior table is created.
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- Logistics user behavior table
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Behavior type',
        event_time  BIGINT COMMENT 'Millisecond-level UNIX timestamp of the event occurrence'
    ) 
    COMMENT 'Details of logistics user behavioral logs'
    PARTITIONED BY (dt STRING COMMENT 'Date partition in yyyymmdd format');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    
    -- Verify that the logistics user behavior table is created.
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. ターゲットテーブル (DWS レイヤー) の作成:次のコードを MaxCompute SQL ノードに追加し、それを選択して実行します。

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT 'User ID',
        pv          BIGINT COMMENT 'Daily popularity',
    ) 
    COMMENT 'Daily user popularity aggregate table'
    PARTITIONED BY (
        dt           STRING COMMENT 'Date partition in yyyymmdd format',
        biz_line     STRING COMMENT 'Line-of-business partition, such as ecom, finance, logistics'
    );
    重要

    ワークスペースが標準環境を使用している場合は、このノードを本番環境に公開し、データバックフィルを実行する必要があります。

ワークフローの実装

  1. ワークフローを作成します。右側の [スケジューリングパラメーター] ペインで、スケジューリングパラメーター bizdate を前日 $[yyyymmdd-1] に設定します。

    image

  2. ワークフローで、get_biz_list という名前の代入ノードを作成します。MaxCompute SQL に次のコードを記述します。このノードは、処理する業務ラインのリストを出力します。

    -- Output all lines of business to be processed
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. for-each ノードの構成

    • ワークフローキャンバスに戻り、get_biz_list 代入ノードの下流に for-each ノードを作成します。

    • for-each ノードの設定ページに移動します。右側の [スケジュール] タブの [スケジューリングパラメーター] > [スクリプトパラメーター] で、loopDataArray パラメーターの値を get_biz_list ノードの outputs に設定します。

      image

    • for-each ノードのループ本体で、[内部ノードの作成] をクリックします。MaxCompute SQL ノードを作成し、ループ本体の処理ロジックを記述します。

      説明
      • このスクリプトは for-each ノードによって駆動され、業務ラインごとに1回実行されます。

      • 組み込み変数 ${dag.foreach.current} は、実行時に現在の業務ライン名に動的に置き換えられます。期待される反復値は 'ecom'、'finance'、'logistics' です。

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
  4. 検証ノードの追加

    ワークフローキャンバスに戻ります。for-each ノードで、[下流ノードの作成] をクリックします。MaxCompute SQL ノードを作成し、次のコードを追加します。

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

公開と実行

ワークフローを本番環境に公開します。オペレーションセンターで、[自動トリガーノード O&M] > [自動トリガーノード] に移動し、対象のワークフローを見つけてスモークテストを実行し、データタイムスタンプとして '20251010' を選択します。

実行が完了したら、テストインスタンスで実行ログを表示します。最終ノードの期待される出力は次のとおりです。

user_id

pv

dt

biz_line

user001

2

20251010

ecom

user002

1

20251010

ecom

user003

3

20251010

finance

user004

1

20251010

finance

user001

1

20251010

logistics

user005

1

20251010

logistics

このソリューションの利点

  • 高い拡張性:新しい業務ラインが追加された場合、代入ノードに1行の SQL コードを追加するだけで済みます。処理ロジックを変更する必要はありません。

  • 容易なメンテナンス:すべての業務ラインが同じ処理ロジックを共有します。1か所の変更がすべての業務ラインに反映されます。

よくある質問

  • Q:MaxCompute SQL で、「find no select sql in sql assignment!」というエラーが表示されるのはなぜですか?

    A:このエラーは、MaxCompute SQL コードに SELECT 文がないために発生します。SELECT 文を追加する必要があります。WITH 構文は現在サポートされておらず、これを使用した場合もこのエラーが発生します。

  • Q:Shell または Python で、「OutPut Result is null, cannot handle!」というエラーが表示されるのはなぜですか?

    A:このエラーは、出力がないために発生します。コードに printecho などの print 文が含まれているか確認してください。

  • Q:Shell または Python で、カンマを含む出力要素をどのように処理しますか?

    A:カンマ ,\, に変換してエスケープする必要があります。次の Python コードが例です。

    categories = ["Electronics", "Clothing, Shoes & Accessories"]
    
    # Escape commas contained in each element
    # Replace ',' with '\,'
    escaped_categories = [cat.replace(",", "\,") for cat in categories]
    
    # Join the escaped elements with a comma
    output_string = ",".join(escaped_categories)
    print output_string
    # The final string output to the downstream node is:
    # Electronics,Clothing\, Shoes & Accessories
  • Q:下流ノードは複数の上流代入ノードから結果を受け取ることができますか?

    A:はい、できます。異なるノードからの結果を異なるパラメーターに割り当てることができます。

    image

  • Q:代入ノードは他の言語をサポートしていますか?

    A:代入ノードは現在、MaxCompute SQLPython 2Shell のみをサポートしています。ただし、EMR HiveHologres SQLEMR Spark SQLAnalyticDB for PostgreSQLClickHouse SQLMySQL などの一部のノードには、同じ機能を提供する組み込みのパラメーター代入機能があります。

    image

関連ドキュメント