すべてのプロダクト
Search
ドキュメントセンター

DataWorks:PyODPS 2 ノード

最終更新日:Feb 05, 2026

DataWorks では、PyODPS 構文を用いたタスク開発に使用できる PyODPS 2 ノードが提供されています。PyODPS は MaxCompute の Python SDK であり、このノード内で直接 Python コードを記述して MaxCompute を操作できます。

概要

PyODPS は MaxCompute の Python SDK です。ジョブの作成、テーブルおよびビューのクエリ実行、MaxCompute リソースの管理などを行うためのプログラミングインターフェイスを提供します。詳細については、「PyODPS」をご参照ください。DataWorks では、PyODPS ノードを使用して Python タスクを実行・スケジュールし、他のジョブと統合できます。

注意事項

  • PyODPS コードでサードパーティパッケージを必要とする場合、サーバーレスリソースグループ上でノードを実行する際にインストールできます。詳細については、「カスタムイメージ」をご参照ください。

    説明

    コード内にサードパーティパッケージを参照するユーザー定義関数 (UDF) が含まれる場合、上記の方法は使用できません。正しい構成方法については、「UDF の例:Python UDF でサードパーティパッケージを使用する」をご参照ください。

  • PyODPS タスクが VPC や IDC 内のデータソースまたはサービスなど、特定のネットワーク環境にアクセスする必要がある場合は、サーバーレスリソースグループを使用し、サーバーレスリソースグループと対象環境間のネットワーク接続を確立してください。詳細については、「ネットワーク接続ソリューション」をご参照ください。

  • PyODPS 構文その他の詳細については、「PyODPS ドキュメント」をご参照ください。

  • PyODPS ノードには PyODPS 2 と PyODPS 3 の 2 種類があります。違いは基盤となる Python のバージョンにあり、PyODPS 2 ノードでは Python 2 を、PyODPS 3 ノードでは Python 3 を使用します。ご利用の Python バージョンに合致するノードタイプを作成してください。

  • PyODPS ノード内の SQL 文がデータリネージを生成できず、Data Map 上で表示されない場合、タスクコード内で DataWorks の関連スケジューリングパラメーターを手動で設定することで問題を解決できます。データリネージの確認方法については、「データリネージの表示」をご参照ください。パラメーター設定の詳細については、「実行時パラメーター(ヒントワード)の設定」をご参照ください。以下のコードを使用して、実行時に必要なパラメーターを取得できます。

    import os
    # ...
    # DataWorks スケジューラの実行時パラメーターを取得します。
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    # ...
    # タスクを送信する際にヒントワードを設定します。
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    # ...
  • ログに大量のデータを出力しないでください。警告や進行状況の更新など、必須情報のみをログ出力してください。

制限事項

  • 専用スケジューリングリソースグループを使用して PyODPS ノードを実行する場合、ノード内でローカルに処理されるデータ量は 50 MB を超えないことを推奨します。これは専用スケジューリングリソースグループの仕様による制限です。オペレーティングシステムのしきい値を超える大規模なローカルデータ処理を実行すると、メモリ不足 (OOM) エラーが発生し、「Got killed」というメッセージが表示されます。PyODPS ノード内に大規模なデータ処理コードを記述しないでください。詳細については、「効率的な PyODPS 使用のベストプラクティス」をご参照ください。

  • サーバーレスリソースグループを使用して PyODPS ノードを実行する場合、処理対象のデータ量に応じてノードの CU を設定できます。

    説明

    サーバーレスリソースグループでタスクを実行する場合、単一タスクに対して最大 64 CU を設定できますが、リソース不足によるタスク起動失敗を防ぐため、推奨値は 16 CU 以下です。

  • Got killed エラーは、メモリ使用量が上限を超え、プロセスが強制終了されたことを示します。これを防ぐため、ローカルでのデータ操作を避けてください。PyODPS 経由で開始される SQL や DataFrame タスク(to_pandas を除く)は、この制限の対象外です。

  • コード内でプリインストール済みの Numpy および Pandas ライブラリを使用できますが、UDF 内では使用できません。バイナリコードを含むその他のサードパーティパッケージはサポートされていません。

  • 互換性のため、DataWorks では options.tunnel.use_instance_tunnel のデフォルト値が False です。Instance Tunnel をグローバルに有効化するには、この値を手動で True に設定する必要があります。

  • PyODPS 2 ノードの基盤となる Python バージョンは 2.7 です。

  • 単一の PyODPS ノード内で複数の Python タスクを同時実行することはサポートされていません。

  • PyODPS ノードではログ出力に print を使用してください。logger.info はサポートされていません。

事前準備

DataWorks ワークスペースに MaxCompute 計算リソース をバインドします。

操作手順

  1. PyODPS 2 ノードの編集ページで、以下の開発操作を行います。

    PyODPS 2 コードのサンプル

    PyODPS ノードを作成後、コードの編集および実行が可能です。PyODPS 構文の詳細については、「概要」をご参照ください。本ドキュメントでは、ビジネスニーズに最も適したものを選択できるよう、5 つのコード例を提供しています。

    ODPS エントリポイント

    DataWorks の PyODPS ノードには、ODPS エントリポイントとして機能するグローバル変数 odps または o が含まれており、手動で定義する必要はありません。

    print(odps.exist_table('PyODPS_iris'))

    SQL の実行

    PyODPS ノード内で SQL を実行できます。詳細については、「SQL」をご参照ください。

    • デフォルトでは、DataWorks で Instance Tunnel は有効になっておらず、instance.open_reader は Result API を使用して最大 10,000 件のレコードを返します。reader.count を使用してレコード数を取得できます。すべてのデータを反復処理するには、limit 制限を無効化する必要があります。以下のステートメントを使用して、Instance Tunnel を有効化し、limit 制限をグローバルに無効化できます。

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # 全データ読み取りのため、limit 制限を無効化します。
      
      with instance.open_reader() as reader:
        # Instance Tunnel 経由で全データを読み取ることができます。
    • また、open_reader に対して tunnel=True を指定することで、当該 open_reader 操作のみで Instance Tunnel を有効化できます。さらに、limit=False を指定することで、当該操作のみで limit 制限を無効化できます。

      # この open_reader 操作では Instance Tunnel API を使用し、全データを読み取れます。
      with instance.open_reader(tunnel=True, limit=False) as reader:

    実行時パラメーターの設定

    • hints パラメーター(dict 型)を構成することで実行時パラメーターを設定できます。ヒントワードの詳細については、「SET 操作」をご参照ください。

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • グローバル設定で sql.settings を構成すると、DataWorks はこれらの実行時パラメーターをすべての実行に追加します。

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # グローバル構成に基づいてヒントワードを追加します。

    実行結果の読み取り

    SQL を実行するインスタンスは、以下の 2 つのシナリオにおいて直接 open_reader 操作を実行できます。

    • SQL は構造化データを返します。

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # 各レコードを処理します。
    • DESC などの SQL 文を実行した場合。reader.raw 属性を使用して、SQL 実行の生の結果を取得できます。

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      説明

      カスタムスケジューリングパラメーターを使用する PyODPS 2 ノードを手動実行する場合、時間値をハードコードする必要があります。PyODPS ノードでは、パラメーターの直接置換はできません。

    DataFrame

    DataFrame(非推奨) を使用してデータを処理することもできます。

    • 実行

      DataWorks 環境では、DataFrame の実行には、即時実行メソッド の明示的な呼び出しが必要です。

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # 各レコードを処理するために即時実行メソッドを呼び出します。

      print を使用して即時実行をトリガーするには、options.interactive を有効化します。

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # 最初にオプションを有効化します。
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # 表示時に即時実行がトリガーされます。
    • 詳細情報の表示

      options.verbose オプションを設定できます。これは DataWorks ではデフォルトで有効化されており、実行時に Logview URL などの詳細情報が表示されます。

    PyODPS 2 コードの開発

    以下の簡単な例では、PyODPS ノードの使用方法を示します。

    1. pyodps_iris サンプルテーブルを作成して、データセットを準備します。詳細については、「DataFrame を使用したデータ処理」をご参照ください。

    2. DataFrame を作成します。詳細については、「MaxCompute テーブルから DataFrame オブジェクトを作成する」をご参照ください。

    3. PyODPS ノードに以下のコードを入力します。

      from odps.df import DataFrame
      
      # ODPS テーブルから DataFrame を作成します。
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepallength.head(5))

    PyODPS タスクの実行

    1. Run Configurationリソース セクションで、コンピュートエンジンインスタンスコンピュートクォータ、および DataWorks リソースグループ を構成します。

      説明
      • パブリックネットワークまたは VPC 内のデータソースにアクセスするには、当該データソースに接続可能なリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。

      • タスクの要件に基づいて、[Image] パラメーターを設定できます。

    2. ツールバーで 実行 をクリックして PyODPS タスクを実行します。

  2. ノードタスクを定期実行するには、ビジネス要件に応じてスケジューリングプロパティを構成します。詳細については、「ノードのスケジューリング構成」をご参照ください。

    DataWorks の SQL ノードとは異なり、PyODPS ノードでは、意図しない変更を防ぐため、コード内の ${param_name} のような文字列は置換されません。代わりに、コード実行前に DataWorks がグローバル変数に args という名前の dict を追加します。この dict を使用してスケジューリングパラメーターを取得できます。たとえば、パラメーター パラメーターを ds=${yyyymmdd} と設定した場合、コード内で次のようにこのパラメーターにアクセスできます。

    print('ds=' + args['ds'])
    ds=20240930
    説明

    ds という名前の パーティション を取得するには、以下の方法を使用できます。

    o.get_table('table_name').get_partition('ds=' + args['ds'])
  3. ノードタスクの構成が完了したら、公開する必要があります。詳細については、「ノードおよびワークフローのデプロイメント」をご参照ください。

  4. タスクが公開された後、オペレーションセンターに移動して、定期実行タスクの実行詳細を確認できます。詳細については、「オペレーションセンターの使い始め」をご参照ください。

関連付けられたロールでノードを実行する

特定の RAM ロールを関連付ける ことで、ノードタスクを実行できます。これにより、詳細な権限制御とセキュリティの強化が可能になります。

次のステップ

PyODPS のよくある質問:PyODPS 実行中に発生する可能性のある一般的な問題について学習し、例外のトラブルシューティングと解決を迅速に行うことができます。