すべてのプロダクト
Search
ドキュメントセンター

DataWorks:代入ノード

最終更新日:Feb 04, 2026

複雑なデータワークフローでは、ノード間で動的な情報を渡す必要があることがよくあります。一般的な方法として中間テーブルを使用する方法がありますが、このアプローチは少量のデータを渡すには非効率的であり、不要な I/O と複雑さを増大させます。代入ノードは、軽量なソリューションを提供します。このノードは短いスクリプト (MaxCompute SQL、Python 2、または Shell) を実行し、その出力を直接パラメーターとして下流ノードに渡します。これにより、上流タスクの結果に基づいてタスクが動的に構成される、柔軟なパイプラインを構築できます。

注意事項

  • エディション:DataWorks Standard Edition 以上。

  • 権限: DataWorks ワークスペースでは、[開発] または [ワークスペース管理者] ロールが必要です。詳細については、「ワークスペースにメンバーを追加する」をご参照ください。

仕組み

代入ノードのコア機能は、パラメーターの受け渡し、つまり代入ノードから下流ノードへのデータ転送です。

  • 代入ノードは、最後の出力またはクエリ結果を、システムが生成した outputs という名前のノード出力パラメーターに自動的に割り当てることでデータを生成します。

  • 下流ノードがそのデータを使用します。ノード入力パラメーター (例:param) を追加して、outputs を使用するように設定します。

パラメーターの形式

outputs パラメーターの値と形式は、使用するスクリプト言語によって異なります。

言語

形式

MaxCompute SQL

最後の SELECT 文の出力。

結果セットは 2 次元配列として渡されます。

Python 2

最後の print 文の出力。

出力は単一の文字列として扱われ、カンマ (,) で分割されて 1 次元配列になります。

例:出力が 'Electronics,Clothing,Books' の場合、下流ノードは ['Electronics','Clothing','Books'] を受け取ります。
重要

出力内容に含まれるカンマはエスケープしてください。例えば、出力が 'Electronics,Clothing\, Shoes & Accessories' の場合、下流ノードはこれを ['Electronics', 'Clothing, Shoes & Accessories'] として解析します。

Shell

最後の echo 文の出力。

操作手順

代入ノードの結果は、任意の下流ノードに渡すことができます。次の例では、Shell ノードを使用してこのワークフローを説明します。

  1. 代入ノードの設定

    ワークフローで、代入ノードを作成して編集します。必要に応じて MaxCompute SQL、Python 2、または Shell を選択し、コードを記述します。

    image

  2. Shell ノードの設定

    image

    1. Shell ノードを作成します。

    2. 右側の [スケジューリング] パネルで、[入出力パラメーター] タブを選択します。

    3. [入力パラメーター] セクションで、[パラメーターの作成] をクリックします。

    4. 入力パラメーターに param などのパラメーター名を割り当て、値を outputs パラメーターに設定します。

      説明

      この設定後、DataWorks は Shell ノードと代入ノードの間に依存関係を自動的に作成します。

    5. パラメーターを設定した後、Shell スクリプト内で ${param} の形式で渡された値を参照できます。

  3. 実行と検証

    1. ワークフロー キャンバスで、上部のツールバーにある [デプロイ] をクリックし、[フル デプロイメント] を選択します。

    2. [オペレーションセンター] > [自動トリガーノード O&M] > [自動トリガーノード]に移動します。

    3. 対象のワークフローでスモークテストを実行し、結果が期待どおりであることを確認します。

制限事項

  • パラメーターは、直後の下流ノードにのみ渡すことができます。

  • サイズ制限:outputs は 2 MB を超えることはできません。超えた場合、代入ノードは失敗します。

  • 構文の制限:

    • 代入ノードのコードにコメントを含めないでください。コメントは出力の解析を妨げ、ノードの失敗や不正な値の生成を引き起こす可能性があります。

    • MaxCompute SQL モードでは WITH 句はサポートされていません。

言語別の例

outputs のデータ形式と参照方法は言語によって異なります。

例 1:MaxCompute SQL クエリ結果の受け渡し

SQL クエリの結果は、2 次元配列として Shell ノードに渡されます。

  • 代入ノード

    SQL コードが 2 行 2 列を返すと仮定します。

    SELECT 'beijing', '1001'
    UNION ALL 
    SELECT 'hangzhou', '1002';
  • Shell ノード

    代入ノードの outputs を参照する param という名前の入力パラメーターを追加します。

    次のスクリプトを使用してデータを読み取ります。

    echo "Full result set: ${region}"
    echo "First row: ${region[0]}"
    echo "Second field of the first row: ${region[0][1]}"

    DataWorks は静的な置き換えを実行します。出力:

    Full result set: beijing,1001
    hangzhou,1002
    First row: beijing,1001
    Second field of the first row: 1001

例 2:Python 2 出力の受け渡し

Python 2 の print 文の出力は、カンマ (,) で分割され、1 次元配列として渡されます。

  • 代入ノード

    Python 2 のコードは次のとおりです。

    print 'Electronics, Clothing, Books';
  • Shell ノード

    代入ノードの outputs を参照する param という名前の入力パラメーターを追加します。次のスクリプトを使用してデータを読み取ります。

    次のスクリプトを使用してデータを読み取ります。

    # 配列全体を出力
    echo "Full result set: ${types}"
    
    # インデックスで特定の要素を出力
    echo "Second element: ${types[1]}"

    DataWorks は静的な置き換えを実行します。出力:

    Full result set: Electronics,Clothing,Books
    Second element: Clothing
説明

Shell ノードの処理ロジックは Python 2 ノードと似ています。

ユースケース:複数の業務ラインにまたがるパーティションテーブルのデータをバッチ処理

この例では、代入ノードfor-each ノードを使用して、複数の業務ラインのユーザー行動データをバッチ処理する方法を示します。

image

背景情報

ある大手インターネット企業のデータ開発者であると仮定します。あなたは、E コマース、金融、物流という 3 つのコア業務ラインのデータ処理を担当しています。将来的にはさらに多くの業務ラインが追加される可能性があります。これらの 3 つの業務ラインのユーザー行動ログに対して、毎日同じ集約ロジックを実行する必要があります。このロジックは、各ユーザーの PV を計算し、結果を統一された集計テーブルに保存します。

  • 上流ソーステーブル (DWD レイヤー):

    • dwd_user_behavior_ecom_d:E コマースのユーザー行動テーブル。

    • dwd_user_behavior_finance_d:金融のユーザー行動テーブル。

    • dwd_user_behavior_logistics_d:物流のユーザー行動テーブル。

    • dwd_user_behavior_${line-of-business}_d:他の潜在的な業務ラインのユーザー行動テーブル。

    • これらのテーブルは同じスキーマを持ち、日 (dt) でパーティション化されています。

  • 下流宛先テーブル (DWS レイヤー):

    • dws_user_summary_d:ユーザー集計テーブル。

    • このテーブルは、業務ライン (biz_line) と日 (dt) の両方でパーティション化されています。すべての業務ラインの集約結果を保存するために使用されます。

業務ラインごとに個別のタスクを作成すると、メンテナンスコストが高くなり、エラーも発生しやすくなります。for-each ノードを使用すれば、1 セットの処理ロジックをメンテナンスするだけで済みます。システムは自動的にすべての業務ラインを走査して計算を完了します。

データ準備

まず、サンプルテーブルを作成し、テストデータを挿入します。この例では、データタイムスタンプ 20251010 を使用します。

  1. MaxCompute 計算リソースをワークスペースに関連付けます

  2. DataStudio に移動して、MaxCompute SQL ノードを作成します。

  3. ソーステーブルの作成:MaxCompute SQL ノードに次のコードを追加し、それを選択して実行します。

    -- E コマースのユーザー行動テーブル
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT 'ユーザー ID',
        action_type STRING COMMENT '行動タイプ',
        event_time  BIGINT COMMENT 'イベント発生のミリ秒レベルの UNIX タイムスタンプ'
    ) 
    COMMENT 'E コマースのユーザー行動ログの詳細'
    PARTITIONED BY (dt STRING COMMENT 'yyyymmdd 形式の日付パーティション');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- E コマースのユーザー行動テーブルが作成されたことを確認
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- 金融のユーザー行動テーブル
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT 'ユーザー ID',
        action_type STRING COMMENT '行動タイプ',
        event_time  BIGINT COMMENT 'イベント発生のミリ秒レベルの UNIX タイムスタンプ'
    ) 
    COMMENT '金融のユーザー行動ログの詳細'
    PARTITIONED BY (dt STRING COMMENT 'yyyymmdd 形式の日付パーティション');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- 金融のユーザー行動テーブルが作成されたことを確認
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- 物流のユーザー行動テーブル
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT 'ユーザー ID',
        action_type STRING COMMENT '行動タイプ',
        event_time  BIGINT COMMENT 'イベント発生のミリ秒レベルの UNIX タイムスタンプ'
    ) 
    COMMENT '物流のユーザー行動ログの詳細'
    PARTITIONED BY (dt STRING COMMENT 'yyyymmdd 形式の日付パーティション');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    
    -- 物流のユーザー行動テーブルが作成されたことを確認
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. 宛先テーブルの作成:MaxCompute SQL ノードに次のコードを追加し、それを選択して実行します。

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT 'ユーザー ID',
        pv          BIGINT COMMENT '日次人気度'
    ) 
    COMMENT '日次ユーザー人気度集計テーブル'
    PARTITIONED BY (
        dt           STRING COMMENT 'yyyymmdd 形式の日付パーティション',
        biz_line     STRING COMMENT '業務ラインパーティション (ecom、finance、logistics など)'
    );
    重要

    ワークスペースが標準モードを使用している場合、このノードを本番環境にデプロイし、データバックフィルを実行する必要があります。

ワークフローの実装

  1. ワークフローを作成します。右側の [スケジューリング] ペインで、スケジューリングパラメーター bizdate を前日の $[yyyymmdd-1] に設定します。

    image

  2. ワークフローで、get_biz_list という名前の代入ノードを作成します。MaxCompute SQL で次のコードを記述します。このノードは、処理する業務ラインのリストを出力します。

    -- 処理するすべての業務ラインを出力
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. for-each ノードの設定

    • ワークフローキャンバスに戻り、代入ノードの下流に for-each ノードを作成します。

    • for-each ノードの設定ページに移動します。右側の [スケジューリング] タブで、[スケジューリングパラメーター] > [スクリプトパラメーター] の下で、loopDataArray パラメーターの値を get_biz_list ノードの出力に設定します。

      image

    • for-each ノードのループ本文で、[Create Internal Node] をクリックします。MaxCompute SQL ノードを作成し、ループ本文の処理ロジックを記述します。

      説明
      • このスクリプトは for-each ノードによって駆動され、業務ラインごとに 1 回実行されます。

      • 組み込み変数 ${dag.foreach.current} は、実行時に現在の業務ライン名に動的に置き換えられます。期待される反復値は ecomfinancelogistics です。

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
  4. 検証ノードの追加

    ワークフローキャンバスに戻ります。for-each ノードで、[子孫ノードを作成] をクリックします。MaxCompute SQL ノードを作成し、次のコードを追加します。

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

デプロイと実行

ワークフローを本番環境にデプロイします。オペレーションセンターで、[自動トリガーノード運用保守] > [自動トリガーノード] に移動し、対象のワークフローを見つけ、テストを実行し、データタイムスタンプとして '20251010' を選択します。

実行が完了したら、テストインスタンスで実行ログを表示します。最終ノードの期待される出力は次のとおりです。

user_id

pv

dt

biz_line

user001

2

20251010

ecom

user002

1

20251010

ecom

user003

3

20251010

finance

user004

1

20251010

finance

user001

1

20251010

logistics

user005

1

20251010

logistics

メリット

  • 高い拡張性:新しい業務ラインが追加された場合、代入ノードで SQL コードを 1 行追加するだけで済みます。処理ロジックを変更する必要はありません。

  • 容易なメンテナンス:すべての業務ラインが同じ処理ロジックを共有します。1 か所の変更がすべての業務ラインに反映されます。

よくある質問

  • Q:MaxCompute SQL で、「find no select sql in sql assignment!」というエラーが表示されるのはなぜですか?

    A:このエラーは、MaxCompute SQL コードに SELECT 文がないために発生します。SELECT 文を追加する必要があります。現在、WITH 構文はサポートされておらず、これを使用した場合もこのエラーが発生します。

  • Q:Shell または Python で、「OutPut Result is null, cannot handle!」というエラーが表示されるのはなぜですか?

    A:このエラーは、出力がないために発生します。コードに printecho などの print 文が含まれているか確認してください。

  • Q:Shell または Python で、カンマを含む出力要素をどのように処理すればよいですか?

    A:カンマ ,\, に変換してエスケープする必要があります。次の Python コードが例です。

    categories = ["Electronics", "Clothing, Shoes & Accessories"]
    
    # 各要素に含まれるカンマをエスケープ
    # ',' を '\,' に置換
    escaped_categories = [cat.replace(",", "\,") for cat in categories]
    
    # エスケープされた要素をカンマで結合
    output_string = ",".join(escaped_categories)
    print output_string
    # 下流ノードに出力される最終的な文字列:
    # Electronics,Clothing\, Shoes & Accessories
  • Q:下流ノードは複数の代入ノードから結果を受け取ることができますか?

    A:はい、できます。異なるノードからの結果を異なるパラメーターに割り当てることができます。

    image

  • Q:代入ノードは他の言語をサポートしていますか?

    A:代入ノードは現在、MaxCompute SQL、Python 2、Shell のみをサポートしています。ただし、EMR Hive、Hologres SQL、EMR Spark SQL、AnalyticDB for PostgreSQL、ClickHouse SQL、MySQL などの一部のノードには、同じ機能を提供する組み込みのパラメーター割り当て機能があります。

    image

参考資料