すべてのプロダクト
Search
ドキュメントセンター

DataWorks:PyODPS 3 ノード

最終更新日:Feb 05, 2026

DataWorks は PyODPS 3 ノードを提供しており、Python で直接 MaxCompute のジョブを作成し、定期的なスケジューリングを設定できます。

概要

PyODPS は MaxCompute 向けの Python ソフトウェア開発キット (SDK) です。シンプルなプログラミングインターフェイスにより、ジョブの作成、テーブルやビューのクエリ、MaxCompute リソースの管理が可能です。詳細については、「PyODPS」をご参照ください。DataWorks では、PyODPS ノードを使用して Python タスクをスケジューリング・実行し、他のジョブと統合できます。

注意事項

  • DataWorks リソースグループで PyODPS ノードを実行する際、コードでサードパーティパッケージが必要な場合は、サーバーレスリソースグループにインストールできます。詳細については、「カスタムイメージ」をご参照ください。

    説明

    この方法は、サードパーティパッケージを参照するユーザー定義関数 (UDF) をサポートしていません。正しい設定方法については、「UDF の例:Python UDF でサードパーティパッケージを使用する」をご参照ください。

  • PyODPS のバージョンをアップグレードするには、「カスタムイメージ」のコマンドを実行します。0.12.1 をアップグレードしたいバージョンに置き換えることができます。サーバーレスリソースグループでは、サードパーティパッケージのインストールの指示に従ってコマンドを実行します。専用スケジューリングリソースグループでは、「O&M Assistant」の指示に従います。

  • PyODPS タスクが VPC ネットワークや IDC 内のデータソースやサービスなど、特定のネットワーク環境にアクセスする必要がある場合は、サーバーレスリソースグループを使用します。サーバーレスリソースグループとターゲット環境間のネットワーク接続を設定してください。詳細については、「ネットワーク接続ソリューション」をご参照ください。

  • PyODPS の構文に関する詳細については、「PyODPS ドキュメント」をご参照ください。

  • PyODPS ノードには、それぞれ Python 2 と Python 3 を使用する PyODPS 2 と PyODPS 3 の 2 種類があります。使用する Python のバージョンに合ったノードタイプを作成してください。

  • PyODPS ノードで実行された SQL 文がデータマップで正しいデータリネージを生成しない場合、タスクコード内で DataWorks スケジューリングの実行時パラメーターを手動で設定できます。データリネージの表示については、「データリネージの表示」をご参照ください。パラメーターの設定については、「実行時パラメーター (ヒント) の設定」をご参照ください。次のサンプルコードを使用して、必要な実行時パラメーターを取得できます。

    import os
    ...
    # DataWorks スケジューラの実行時パラメーターを取得
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # タスク送信時にヒントを設定
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • PyODPS ノードの最大ログ出力は 4 MB です。大量のデータ結果を直接ログに出力することは避けてください。代わりに、アラートログや進捗更新など、価値のある情報の出力に集中してください。

制限事項

  • 専用スケジューリングリソースグループを使用して PyODPS ノードを実行する場合、ノード内でローカルに処理されるデータ量は 50 MB を超えないようにしてください。この制限は、専用スケジューリングリソースグループの仕様によって決まります。過剰なローカルデータを処理すると、オペレーティングシステムのしきい値を超え、`Got Killed` メッセージで示される Out of Memory (OOM) エラーが発生する可能性があります。PyODPS ノードにデータ集約的な処理ロジックを記述することは避けてください。詳細については、「PyODPS を効率的に使用するためのベストプラクティス」をご参照ください。

  • サーバーレスリソースグループを使用して PyODPS ノードを実行する場合、処理する必要のあるデータ量に基づいてノードの CU を設定できます。

    説明

    1 つのタスクには最大 64 CU を設定できますが、16 CU を超える使用は推奨されません。リソース不足を引き起こし、タスクの起動に影響を与える可能性があります。

  • Got Killed エラーは、Out of Memory (OOM) 状態によりプロセスが終了したことを示します。したがって、ローカルでのデータ操作は最小限に抑えるようにしてください。PyODPS を介して開始される SQL および DataFrame タスクは、`to_pandas` を除き、この制限の対象外です。

  • ユーザー定義関数 (UDF) の一部ではないコードでは、プリインストール済みの Numpy および Pandas パッケージを使用できます。バイナリコードを含む他のサードパーティパッケージはサポートされていません。

  • 互換性の理由から、DataWorks では options.tunnel.use_instance_tunnel はデフォルトで False に設定されています。インスタンストンネル をグローバルに有効にするには、この値を手動で True に設定する必要があります。

  • Python 3 のマイナーバージョン間、例えば Python 3.8 と Python 3.7 では、バイトコードの定義が異なります。

    MaxCompute は現在 Python 3.7 を使用しています。Python 3.8 の `finally` ブロックなど、他の Python 3 バージョンの構文を使用すると、実行エラーが発生します。Python 3.7 の使用を推奨します。

  • PyODPS 3 はサーバーレスリソースグループで実行できます。このリソースを購入して使用するには、「サーバーレスリソースグループの使用」をご参照ください。

  • 1 つの PyODPS ノードは、複数の Python タスクの同時実行をサポートしていません。

  • PyODPS ノードからログを出力するには、print 関数を使用します。現在、logger.info 関数はサポートされていません。

事前準備

MaxCompute 計算リソースを DataWorks ワークスペースにバインドします。

操作手順

  1. PyODPS 3 ノードエディターで、次の開発ステップを実行します。

    PyODPS 3 コード例

    PyODPS ノードを作成した後、コードを編集して実行できます。PyODPS の構文に関する詳細については、「概要」をご参照ください。このセクションでは、5 つのコード例を紹介します。ビジネスニーズに最も適した例を選択してください。

    ODPS エントリポイント

    DataWorks は、ODPS エントリポイントとしてグローバル変数 odps (または o) を提供するため、手動で定義する必要はありません。

    print(odps.exist_table('PyODPS_iris'))

    SQL 文の実行

    PyODPS ノードで SQL 文を実行できます。詳細については、「SQL」をご参照ください。

    • DataWorks では、インスタンストンネル はデフォルトで無効になっています。これは、instance.open_reader が最大 10,000 レコードを返す Result インターフェイスを使用することを意味します。reader.count を使用してレコード数を取得できます。すべてのデータを反復処理するには、limit を無効にする必要があります。次の文を使用して、インスタンストンネル をグローバルに有効にし、limit を無効にすることができます。

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # すべてのデータを読み取るために制限を無効にします。
      
      with instance.open_reader() as reader:
        # Instance Tunnel を介してすべてのデータを読み取ることができます。
    • または、open_reader の呼び出しに tunnel=True を追加して、その特定の操作に対してのみ インスタンストンネル を有効にすることもできます。また、limit=False を追加して、その操作の limit を無効にすることもできます。

      # この open_reader 操作で Instance Tunnel インターフェイスを使用してすべてのデータを読み取ります。
      with instance.open_reader(tunnel=True, limit=False) as reader:

    実行時パラメーターの設定

    • dict 型の hints パラメーターを使用して、実行時パラメーターを設定できます。ヒントの詳細については、「SET 操作」をご参照ください。

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • sql.settings をグローバルに設定すると、指定された実行時パラメーターがすべての実行に追加されます。

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # グローバル設定に基づいてヒントを追加します。

    実行結果の読み取り

    SQL 文を実行しているインスタンスは、次の 2 つのシナリオで直接 open_reader 操作を実行できます:

    • SQL 文は、構造化データを返します。

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # 各レコードを処理します。
    • SQL 文が `desc` のようなコマンドである場合。reader.raw 属性を使用して、生の SQL 実行結果を取得できます。

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      説明

      エディターから直接 PyODPS 3 ノードを実行する場合、ノードはプレースホルダーを自動的に置き換えないため、カスタムスケジューリングパラメーターの値をハードコーディングする必要があります。

    DataFrame

    DataFrame (非推奨) を使用してデータを処理することもできます。

    • 実行

      DataWorks では、DataFrame を実行するために、即時実行メソッドを明示的に呼び出す必要があります。

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # 即時実行メソッドを呼び出して各 Record を処理します。

      `print` 文で即時実行をトリガーする必要がある場合は、options.interactive を有効にする必要があります。

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # 最初にオプションを有効にします。
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # print() によって即時実行がトリガーされます。
    • 詳細情報の出力

      options.verbose オプションを設定して、詳細情報を出力できます。DataWorks では、このオプションはデフォルトで有効になっており、実行中に Logview URL などの詳細が出力されます。

    PyODPS 3 コードの開発

    次の例は、PyODPS ノードの使用方法を示しています:

    1. データセットを準備します。pyodps_iris サンプルテーブルを作成します。手順については、「DataFrame を使用したデータ処理」をご参照ください。

    2. DataFrame を作成します。詳細については、「MaxCompute テーブルから DataFrame オブジェクトを作成する」をご参照ください。

    3. 次のコードを PyODPS ノードに入力して実行します。

      from odps.df import DataFrame
      
      # ODPS テーブルから DataFrame を作成します。
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepallength.head(5))

    PyODPS タスクの実行

    1. Run Configuration セクションで、[計算リソース][計算クォータ]、および [DataWorks リソースグループ] を設定します。

      説明
      • パブリックネットワークまたは VPC ネットワーク内のデータソースにアクセスするには、データソースとの接続テストに合格した専用スケジューリングリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。

      • タスクの要件に基づいて [イメージ] を設定できます。

    2. ツールバーのパラメーターダイアログボックスで、作成した MaxCompute データソースを選択し、[実行] をクリックします。

  2. ノードタスクをスケジュールで実行するには、ビジネス要件に基づいてスケジューリングプロパティを設定します。詳細については、「ノードのスケジューリング設定」をご参照ください。

    DataWorks の SQL ノードとは異なり、PyODPS ノードは意図しない文字列置換を避けるため、コード内の `${param_name}` のような文字列を置き換えません。代わりに、コードが実行される前に、args という名前の `dict` がグローバル変数に追加されます。この `dict` からスケジューリングパラメーターを取得できます。たとえば、[パラメーター] タブで ds=${yyyymmdd} を設定した場合、コード内で次のようにこのパラメーターを取得できます。

    print('ds=' + args['ds'])
    ds=20240930
    説明

    ds という名前のパーティションを取得するには、次のメソッドを使用できます。

    o.get_table('table_name').get_partition('ds=' + args['ds'])
  3. ノードタスクを設定した後、デプロイする必要があります。詳細については、「ノードとワークフローのデプロイ」をご参照ください。

  4. タスクがデプロイされた後、オペレーションセンターで定期タスクのステータスを表示できます。詳細については、「オペレーションセンター入門」をご参照ください。

関連付けられたロールでのノードの実行

特定の RAM ロールを関連付けてノードタスクを実行できます。これにより、詳細な権限コントロールとセキュリティ強化が可能になります。

次のステップ

PyODPS よくある質問:一般的な PyODPS の実行に関する問題について学び、例外のトラブルシューティングと解決を迅速に行うのに役立ちます。