すべてのプロダクト
Search
ドキュメントセンター

:PyODPS 3 ノード

最終更新日:Dec 14, 2025

DataWorks は PyODPS 3 ノードを提供します。このノードを使用して、MaxCompute ジョブ用の Python コードを記述し、定期的に実行するようにスケジュールできます。このトピックでは、DataWorks で Python タスクを構成およびスケジュールする方法について説明します。

概要

PyODPS は、MaxCompute 向けの Python ソフトウェア開発キット (SDK) です。シンプルで使いやすいプログラミングインターフェイスを提供し、Python を使用してジョブの記述、テーブルやビューのクエリ、MaxCompute リソースの管理を行うことができます。詳細については、「PyODPS」をご参照ください。DataWorks では、PyODPS ノードを使用して Python タスクをスケジュールおよび実行し、他のジョブと統合できます。

注意事項

  • DataWorks リソースグループで PyODPS ノードを実行する際に、コードがサードパーティパッケージを呼び出す必要がある場合は、サーバーレスリソースグループカスタムイメージを使用してパッケージをインストールできます。

    説明

    コード内のユーザー定義関数 (UDF) がサードパーティパッケージを参照している場合、このメソッドは使用できません。参照の設定方法については、「例:Python UDF でサードパーティパッケージを参照する」をご参照ください。

  • PyODPS のバージョンをアップグレードするには、サーバーレスリソースグループを使用している場合は、カスタムイメージを使用して /home/tops/bin/pip3 install pyodps==0.12.1 コマンドを実行します。0.12.1 はターゲットの PyODPS バージョンに置き換えることができます。専用スケジューリングリソースグループを使用している場合は、O&M Assistant を使用して同じコマンドを実行します。

  • PyODPS タスクが VPC や IDC ネットワーク内のデータソースやサービスなど、特別なネットワーク環境にアクセスする必要がある場合は、スケジューリングにサーバーレスリソースグループを使用します。その後、サーバーレスリソースグループとターゲット環境との間にネットワーク接続を確立します。詳細については、「ネットワーク接続ソリューション」をご参照ください。

  • PyODPS の構文の詳細については、「PyODPS ドキュメント」をご参照ください。

  • PyODPS ノードは、PyODPS 2 と PyODPS 3 の 2 種類に分類されます。この 2 種類のノードは、基盤となる Python のバージョンが異なります。PyODPS 2 ノードは Python 2 を使用し、PyODPS 3 ノードは Python 3 を使用します。使用している Python のバージョンに対応する PyODPS ノードタイプを選択してください。

  • PyODPS ノードで SQL 文を実行したときにデータマップでデータリネージが生成されない場合は、タスクコードで関連する DataWorks のスケジューリングパラメーターを手動で設定することで、この問題を解決できます。データリネージを表示するには、「リネージ情報の表示」をご参照ください。パラメーターを設定するには、「実行時パラメーター (ヒント) の設定」をご参照ください。次のサンプルコードを使用して、実行時に必要なパラメーターを取得できます。

    import os
    ...
    # DataWorks スケジューラの実行時パラメーターを取得
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # タスク送信時にヒントを設定
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • PyODPS ノードの出力ログは最大 4 MB のサイズをサポートします。大量のデータ結果をログに直接出力することは避けてください。代わりに、アラートログや通常の進捗ログを書き込むことで、より価値のある情報を提供します。

制限事項

  • 専用スケジューリングリソースグループを使用して PyODPS ノードを実行する場合、リソースグループでローカルに処理されるデータは 50 MB を超えないようにすることを推奨します。これは、専用スケジューリングリソースグループの仕様によるものです。ローカルで処理されるデータが多すぎてオペレーティングシステムのしきい値を超えると、メモリ不足 (OOM) エラー (Got Killed) が発生する可能性があります。PyODPS ノードに大量のデータ処理コードを記述することは避けてください。詳細については、「PyODPS を効率的に使用するためのベストプラクティス」をご参照ください。

  • サーバーレスリソースグループを使用して PyODPS ノードを実行する場合、処理する必要のあるデータ量に基づいてノードの計算ユニット (CU) を構成できます。

    説明

    サーバーレスリソースグループでタスクを実行する場合、単一のタスクは最大 64CU の構成をサポートします。ただし、CU 数が多すぎるとリソース不足を引き起こし、タスクの起動に影響を与える可能性があるため、16CU を超えないようにすることを推奨します。

  • Got killed エラーが表示された場合、メモリ使用量が制限を超え、プロセスが中止されたことを意味します。そのため、ローカルでのデータ操作はできるだけ避けてください。この制限は、PyODPS を使用して開始される SQL および DataFrame タスク (to_pandas を除く) には適用されません。

  • UDF を含まないコードには、プリインストール済みの Numpy および Pandas ライブラリを使用できます。バイナリコードを含む他のサードパーティパッケージはサポートされていません。

  • 互換性の理由から、DataWorks では options.tunnel.use_instance_tunnel はデフォルトで False に設定されています。instance tunnel をグローバルに有効にするには、この値を手動で True に設定する必要があります。

  • バイトコードの定義は、Python 3.8 や Python 3.7 などの Python 3 のサブバージョン間で異なります。

    現在、MaxCompute は Python 3.7 を使用しています。Python 3.8 の finally ブロックなど、他の Python 3 バージョンの構文を使用すると、実行中にエラーが報告されます。Python 3.7 を使用することを推奨します。

  • PyODPS 3 は、サーバーレスリソースグループでの実行をサポートしています。購入方法と使用方法については、「サーバーレスリソースグループの使用」をご参照ください。

  • PyODPS ノードでの複数の Python タスクの同時実行はサポートされていません。

  • PyODPS ノードのログを出力するには、print を使用します。logger.info の使用はサポートされていません。

事前準備

DataWorks ワークスペースに MaxCompute コンピュートエンジンをアタッチします。

操作手順

  1. PyODPS 3 ノードのエディターページで、コードを記述してデバッグできます。

    PyODPS 3 コードの例

    PyODPS ノードを作成した後、コードを編集して実行できます。PyODPS の構文の詳細については、「基本操作の概要」をご参照ください。このトピックでは、以下の 5 つのコード例を紹介します。要件に応じて例を選択してください。

    ODPS エントリポイント

    DataWorks では、PyODPS ノードにはグローバル変数 odps または o が含まれており、これが ODPS エントリポイントです。ODPS エントリポイントを手動で定義する必要はありません。

    print(odps.exist_table('PyODPS_iris'))

    SQL の実行

    PyODPS ノードで SQL 文を実行できます。詳細については、「SQL」をご参照ください。

    • デフォルトでは、DataWorks で instance tunnel は無効になっています。これは、instance.open_reader が Result インターフェイスを使用し、最大 10,000 レコードまでしか読み取れないことを意味します。reader.count を使用してレコード数を取得できます。すべてのデータを反復的に取得するには、limit 制限を無効にする必要があります。次の文を使用して、instance tunnel をグローバルに有効にし、limit 制限を無効にすることができます。

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # 制限を無効にしてすべてのデータを読み取る
      
      with instance.open_reader() as reader:
        # すべてのデータは Instance Tunnel を通じて読み取ることができる
    • また、open_reader の呼び出しに tunnel=True を追加して、現在の open_reader 操作に対してのみ instance tunnel を有効にすることもできます。同時に、limit=False を追加して、現在の操作に対してのみ limit 制限を無効にすることができます。

      # この open_reader 操作は Instance Tunnel インターフェイスを使用し、すべてのデータを読み取ることができる
      with instance.open_reader(tunnel=True, limit=False) as reader:

    実行時パラメーターの設定

    • hints パラメーターを設定することで、実行時パラメーターを設定できます。パラメーターの型は dict です。ヒントパラメーターの詳細については、「SET 操作」をご参照ください。

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • グローバル構成で sql.settings を設定した後、タスクが実行されるたびに関連する実行時パラメーターを追加する必要があります。

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # グローバル構成に基づいてヒントを追加

    実行結果の読み取り

    SQL を実行するインスタンスは、直接 open_reader 操作を実行できます。以下の 2 つのシナリオが考えられます。

    • SQL 文が構造化データを返す場合。

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # 各レコードを処理
    • desc などの SQL 文が実行される場合。reader.raw プロパティを使用して、元の SQL 実行結果を取得できます。

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      説明

      カスタムスケジューリングパラメーターを使用し、PyODPS 3 ノードをエディターページから直接実行する場合、パラメーター変数は自動的に置き換えられません。コード内で手動で値を指定する必要があります。

    DataFrame

    DataFrame (非推奨) を使用してデータを処理することもできます。

    • 実行

      DataWorks 環境では、DataFrame の実行には 即時実行メソッドの明示的な呼び出しが必要です。

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # 即時実行メソッドを呼び出して各レコードを処理

      print 関数を使用して即時実行メソッドをトリガーするには、options.interactive を有効にする必要があります。

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # 最初にスイッチを有効にする
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # Print 中に即時実行がトリガーされる
    • 詳細情報の出力

      options.verbose オプションを設定します。DataWorks では、このオプションはデフォルトで有効になっています。実行中に Logview URL などの詳細情報が出力されます。

    PyODPS 3 コード開発

    以下の簡単な例は、PyODPS ノードの使用方法を示しています。

    1. データセットを準備し、pyodps_iris サンプルテーブルを作成します。詳細については、「DataFrame データ処理」をご参照ください。

    2. DataFrame を作成します。詳細については、「MaxCompute テーブルから DataFrame を作成する」をご参照ください。

    3. PyODPS ノードに次のコードを入力して実行します。

      from odps.df import DataFrame
      
      # ODPS テーブルから DataFrame を作成
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepallength.head(5))

    PyODPS タスクの実行

    1. [デバッグ設定] タブの [コンピューティングリソース] セクションで、[コンピューティングリソース、コンピューティングクォータ]、および [DataWorks リソースグループ] を選択して設定します。

      説明
      • パブリックネットワークまたは VPC 環境のデータソースにアクセスするには、データソースとの接続性テストに合格したスケジューリング用のリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。

      • 必要に応じて [イメージ] 情報を設定できます。

    2. ツールバーのパラメーターダイアログボックスで、MaxCompute データソースを選択し、[実行] をクリックして PyODPS タスクを実行します。

  2. ノードタスクを定期的に実行するには、必要に応じてスケジューリング情報を設定します。設定の詳細については、「ノードのスケジューリングプロパティの設定」をご参照ください。

    DataWorks の SQL ノードとは異なり、PyODPS ノードはコード内の ${param_name} のような文字列を置き換えません。これはコードへの影響を防ぐためです。代わりに、コードが実行される前に args という名前の辞書がグローバル変数に追加されます。この辞書からスケジューリングパラメーターを取得できます。たとえば、[パラメーター] セクションで ds=${yyyymmdd} を設定した場合、次のメソッドを使用してコード内のパラメーター情報を取得できます。

    print('ds=' + args['ds'])
    ds=20240930
    説明

    ds という名前のパーティションを取得するには、次のメソッドを使用できます。

    o.get_table('table_name').get_partition('ds=' + args['ds'])
  3. ノードタスクを設定した後、ノードを公開する必要があります。詳細については、「ノードまたはワークフローの公開」をご参照ください。

  4. タスクが公開された後、オペレーションセンターで自動トリガータスクの実行ステータスを表示できます。詳細については、「オペレーションセンター入門」をご参照ください。

ロールを関連付けたノードの実行

ノードに RAM ロールを関連付けて、ノードタスクを実行できます。これにより、詳細な権限コントロールとセキュリティ管理が可能になります。

次のステップ

PyODPS に関するよくある質問:PyODPS の実行時に発生する可能性のある一般的な問題について学びます。これにより、例外を迅速にトラブルシューティングして解決できます。