DataWorks では、PyODPS 構文を用いたタスク開発に使用できる PyODPS 2 ノードが提供されています。PyODPS は MaxCompute の Python SDK であり、このノード内で直接 Python コードを記述して MaxCompute を操作できます。
概要
PyODPS は MaxCompute の Python SDK です。ジョブの作成、テーブルおよびビューのクエリ実行、MaxCompute リソースの管理などを行うためのプログラミングインターフェイスを提供します。詳細については、「PyODPS」をご参照ください。DataWorks では、PyODPS ノードを使用して Python タスクを実行・スケジュールし、他のジョブと統合できます。
注意事項
PyODPS コードでサードパーティパッケージを必要とする場合、サーバーレスリソースグループ上でノードを実行する際にインストールできます。詳細については、「カスタムイメージ」をご参照ください。
説明コード内にサードパーティパッケージを参照するユーザー定義関数 (UDF) が含まれる場合、上記の方法は使用できません。正しい構成方法については、「UDF の例:Python UDF でサードパーティパッケージを使用する」をご参照ください。
PyODPS タスクが VPC や IDC 内のデータソースまたはサービスなど、特定のネットワーク環境にアクセスする必要がある場合は、サーバーレスリソースグループを使用し、サーバーレスリソースグループと対象環境間のネットワーク接続を確立してください。詳細については、「ネットワーク接続ソリューション」をご参照ください。
PyODPS 構文その他の詳細については、「PyODPS ドキュメント」をご参照ください。
PyODPS ノードには PyODPS 2 と PyODPS 3 の 2 種類があります。違いは基盤となる Python のバージョンにあり、PyODPS 2 ノードでは Python 2 を、PyODPS 3 ノードでは Python 3 を使用します。ご利用の Python バージョンに合致するノードタイプを作成してください。
PyODPS ノード内の SQL 文がデータリネージを生成できず、Data Map 上で表示されない場合、タスクコード内で DataWorks の関連スケジューリングパラメーターを手動で設定することで問題を解決できます。データリネージの確認方法については、「データリネージの表示」をご参照ください。パラメーター設定の詳細については、「実行時パラメーター(ヒントワード)の設定」をご参照ください。以下のコードを使用して、実行時に必要なパラメーターを取得できます。
import os # ... # DataWorks スケジューラの実行時パラメーターを取得します。 skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v # ... # タスクを送信する際にヒントワードを設定します。 o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) # ...ログに大量のデータを出力しないでください。警告や進行状況の更新など、必須情報のみをログ出力してください。
制限事項
専用スケジューリングリソースグループを使用して PyODPS ノードを実行する場合、ノード内でローカルに処理されるデータ量は 50 MB を超えないことを推奨します。これは専用スケジューリングリソースグループの仕様による制限です。オペレーティングシステムのしきい値を超える大規模なローカルデータ処理を実行すると、メモリ不足 (OOM) エラーが発生し、「Got killed」というメッセージが表示されます。PyODPS ノード内に大規模なデータ処理コードを記述しないでください。詳細については、「効率的な PyODPS 使用のベストプラクティス」をご参照ください。
サーバーレスリソースグループを使用して PyODPS ノードを実行する場合、処理対象のデータ量に応じてノードの CU を設定できます。
説明サーバーレスリソースグループでタスクを実行する場合、単一タスクに対して最大
64 CUを設定できますが、リソース不足によるタスク起動失敗を防ぐため、推奨値は16 CU以下です。Got killed エラーは、メモリ使用量が上限を超え、プロセスが強制終了されたことを示します。これを防ぐため、ローカルでのデータ操作を避けてください。PyODPS 経由で開始される SQL や DataFrame タスク(
to_pandasを除く)は、この制限の対象外です。コード内でプリインストール済みの Numpy および Pandas ライブラリを使用できますが、UDF 内では使用できません。バイナリコードを含むその他のサードパーティパッケージはサポートされていません。
互換性のため、DataWorks では
options.tunnel.use_instance_tunnelのデフォルト値がFalseです。Instance Tunnel をグローバルに有効化するには、この値を手動でTrueに設定する必要があります。PyODPS 2 ノードの基盤となる Python バージョンは 2.7 です。
単一の PyODPS ノード内で複数の Python タスクを同時実行することはサポートされていません。
PyODPS ノードではログ出力に
printを使用してください。logger.infoはサポートされていません。
事前準備
DataWorks ワークスペースに MaxCompute 計算リソース をバインドします。
操作手順
PyODPS 2 ノードの編集ページで、以下の開発操作を行います。
PyODPS 2 コードのサンプル
PyODPS ノードを作成後、コードの編集および実行が可能です。PyODPS 構文の詳細については、「概要」をご参照ください。本ドキュメントでは、ビジネスニーズに最も適したものを選択できるよう、5 つのコード例を提供しています。
ODPS エントリポイント
DataWorks の PyODPS ノードには、ODPS エントリポイントとして機能するグローバル変数
odpsまたはoが含まれており、手動で定義する必要はありません。print(odps.exist_table('PyODPS_iris'))SQL の実行
PyODPS ノード内で SQL を実行できます。詳細については、「SQL」をご参照ください。
デフォルトでは、DataWorks で Instance Tunnel は有効になっておらず、
instance.open_readerは Result API を使用して最大 10,000 件のレコードを返します。reader.countを使用してレコード数を取得できます。すべてのデータを反復処理するには、limit制限を無効化する必要があります。以下のステートメントを使用して、Instance Tunnel を有効化し、limit制限をグローバルに無効化できます。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 全データ読み取りのため、limit 制限を無効化します。 with instance.open_reader() as reader: # Instance Tunnel 経由で全データを読み取ることができます。また、
open_readerに対してtunnel=Trueを指定することで、当該open_reader操作のみで Instance Tunnel を有効化できます。さらに、limit=Falseを指定することで、当該操作のみでlimit制限を無効化できます。# この open_reader 操作では Instance Tunnel API を使用し、全データを読み取れます。 with instance.open_reader(tunnel=True, limit=False) as reader:
実行時パラメーターの設定
hintsパラメーター(dict型)を構成することで実行時パラメーターを設定できます。ヒントワードの詳細については、「SET 操作」をご参照ください。o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})グローバル設定で
sql.settingsを構成すると、DataWorks はこれらの実行時パラメーターをすべての実行に追加します。from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # グローバル構成に基づいてヒントワードを追加します。
実行結果の読み取り
SQL を実行するインスタンスは、以下の 2 つのシナリオにおいて直接
open_reader操作を実行できます。SQL は構造化データを返します。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 各レコードを処理します。DESCなどの SQL 文を実行した場合。reader.raw属性を使用して、SQL 実行の生の結果を取得できます。with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)説明カスタムスケジューリングパラメーターを使用する PyODPS 2 ノードを手動実行する場合、時間値をハードコードする必要があります。PyODPS ノードでは、パラメーターの直接置換はできません。
DataFrame
DataFrame(非推奨) を使用してデータを処理することもできます。
実行
DataWorks 環境では、DataFrame の実行には、即時実行メソッド の明示的な呼び出しが必要です。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 各レコードを処理するために即時実行メソッドを呼び出します。printを使用して即時実行をトリガーするには、options.interactiveを有効化します。from odps import options from odps.df import DataFrame options.interactive = True # 最初にオプションを有効化します。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # 表示時に即時実行がトリガーされます。詳細情報の表示
options.verboseオプションを設定できます。これは DataWorks ではデフォルトで有効化されており、実行時に Logview URL などの詳細情報が表示されます。
PyODPS 2 コードの開発
以下の簡単な例では、PyODPS ノードの使用方法を示します。
pyodps_iris サンプルテーブルを作成して、データセットを準備します。詳細については、「DataFrame を使用したデータ処理」をご参照ください。
DataFrame を作成します。詳細については、「MaxCompute テーブルから DataFrame オブジェクトを作成する」をご参照ください。
PyODPS ノードに以下のコードを入力します。
from odps.df import DataFrame # ODPS テーブルから DataFrame を作成します。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
PyODPS タスクの実行
Run Configuration の リソース セクションで、コンピュートエンジンインスタンス、コンピュートクォータ、および DataWorks リソースグループ を構成します。
説明パブリックネットワークまたは VPC 内のデータソースにアクセスするには、当該データソースに接続可能なリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。
タスクの要件に基づいて、[Image] パラメーターを設定できます。
ツールバーで 実行 をクリックして PyODPS タスクを実行します。
ノードタスクを定期実行するには、ビジネス要件に応じてスケジューリングプロパティを構成します。詳細については、「ノードのスケジューリング構成」をご参照ください。
DataWorks の SQL ノードとは異なり、PyODPS ノードでは、意図しない変更を防ぐため、コード内の ${param_name} のような文字列は置換されません。代わりに、コード実行前に DataWorks がグローバル変数に
argsという名前のdictを追加します。このdictを使用してスケジューリングパラメーターを取得できます。たとえば、パラメーター パラメーターをds=${yyyymmdd}と設定した場合、コード内で次のようにこのパラメーターにアクセスできます。print('ds=' + args['ds']) ds=20240930説明dsという名前のパーティションを取得するには、以下の方法を使用できます。o.get_table('table_name').get_partition('ds=' + args['ds'])ノードタスクの構成が完了したら、公開する必要があります。詳細については、「ノードおよびワークフローのデプロイメント」をご参照ください。
タスクが公開された後、オペレーションセンターに移動して、定期実行タスクの実行詳細を確認できます。詳細については、「オペレーションセンターの使い始め」をご参照ください。
関連付けられたロールでノードを実行する
特定の RAM ロールを関連付ける ことで、ノードタスクを実行できます。これにより、詳細な権限制御とセキュリティの強化が可能になります。
次のステップ
PyODPS のよくある質問:PyODPS 実行中に発生する可能性のある一般的な問題について学習し、例外のトラブルシューティングと解決を迅速に行うことができます。