DataWorks の PyODPS 3 ノードを使用して、Python 3 を使用して MaxCompute ジョブを開発し、定期的に実行するようにスケジュールできます。このトピックでは、PyODPS 3 タスクを作成および構成する方法について説明します。
前提条件
PyODPS 3 ノードが作成されていること。詳細については、「MaxCompute ノードの作成と管理」をご参照ください。
背景情報
PyODPS は MaxCompute 用の Python SDK です。MaxCompute ジョブの開発、テーブルとビューのクエリ、リソースの管理のためのインターフェイスを提供します。詳細については、「PyODPS」をご参照ください。DataWorks PyODPS ノードを使用すると、Python タスクをスケジュールし、他のタスクと統合できます。
注意事項
PyODPS コードが DataWorks リソースグループで実行され、サードパーティパッケージを必要とする場合、サーバーレスリソースグループにカスタムイメージを介してインストールできます。
説明この方法では、サードパーティパッケージを参照する UDF はサポートされていません。サードパーティの依存関係を持つ UDF を構成する方法については、「UDF の例: Python UDF でサードパーティパッケージを使用する」をご参照ください。
サーバーレスリソースグループで PyODPS をアップグレードするには、カスタムイメージを介して
/home/tops/bin/pip3 install pyodps==0.12.1を実行します (0.12.1を目的のバージョンに置き換えます)。専用スケジューリングリソースグループの場合は、O&M Assistant を介して同じコマンドを実行します。
特定のネットワーク環境 (VPC や IDC など) のデータソースまたはサービスにアクセスするには、サーバーレスリソースグループを使用し、ネットワーク接続を構成します。「ネットワーク接続ソリューション」をご参照ください。
PyODPS 構文の詳細については、「PyODPS ドキュメント」をご参照ください。
DataWorks は PyODPS 2 (Python 2) および PyODPS 3 (Python 3) ノードをサポートしています。ご利用の Python バージョンに一致するノードタイプを選択してください。
PyODPS ノードで実行された SQL ステートメントが Data Map でデータリネージを生成しない場合、この問題を解決するために、コード内でスケジューリングパラメーターを手動で構成できます。データリネージの表示方法の詳細については、「リネージ情報の表示」をご参照ください。パラメーター設定の詳細については、「実行時パラメーター (ヒント) の設定」をご参照ください。タスク実行に必要なパラメーターを取得するには、次のコードを使用できます。
import os ... # get DataWorks scheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # setting hints while submitting a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
PyODPS ノードの出力ログの上限は 4 MB です。大規模なデータセットの出力は避けてください。ログはアラートと進捗状況の追跡のみに使用することを推奨します。
制限事項
専用スケジューリングリソースグループを使用する場合、ローカルデータ処理は 50 MB に制限されます。過剰なメモリ使用量は、リソース制限により OOM (Got Killed) エラーをトリガーする可能性があります。大量のデータをローカルで処理することは避けてください。詳細については、「効率的な PyODPS 使用のベストプラクティス」をご参照ください。
サーバーレスリソースグループを使用して PyODPS ノードを実行する場合、処理する必要があるデータ量に基づいて PyODPS ノードの CU を構成します。
説明サーバーレスリソースグループの場合、タスクあたりの最大構成は
64CUです。リソース競合を防ぎ、タスク実行の遅延を回避するために、16CUの制限を推奨します。Got killed エラーは、メモリ使用量が制限を超えたことを示します。ローカルデータ操作は避けてください。PyODPS 経由で開始された SQL および DataFrame タスク (to_pandas を除く) は、この制限の対象外です。
UDF 以外のコードは、プリインストール済み NumPy および Pandas パッケージを使用できます。バイナリコードを含むその他のサードパーティパッケージはサポートされていません。
互換性のため、DataWorks はデフォルトで options.tunnel.use_instance_tunnel を False に設定します。インスタンス トンネル をグローバルに有効にするには、この値を True に設定します。
Python 3 の異なるマイナーバージョン (Python 3.8 や Python 3.7 など) は、異なるバイトコード定義を持っています。
MaxCompute は現在 Python 3.7 をサポートしています。他のバージョンに固有の構文 (Python 3.8 の finally ブロックなど) を使用すると、エラーが発生します。Python 3.7 の使用を推奨します。
PyODPS 3 はサーバーレスリソースグループでの実行をサポートしています。サーバーレスリソースグループを購入するには、「サーバーレスリソースグループの使用」をご参照ください。
PyODPS ノードは、複数の Python タスクの同時実行をサポートしていません。
コードの編集: 簡単な例
PyODPS ノードを作成した後、コードを記述して実行できます。PyODPS 構文の詳細については、「概要」をご参照ください。
ODPS エントリポイント
DataWorks PyODPS ノードには、エントリポイントとして機能するグローバル変数 odps または o が含まれています。手動で定義する必要はありません。
print(odps.exist_table('PyODPS_iris'))SQL の実行
PyODPS ノードで SQL ステートメントを実行できます。詳細については、「SQL」をご参照ください。
デフォルトでは、DataWorks の インスタンストンネル は無効になっています。このモードでは、instance.open_reader は、10,000 レコードに制限されている Result インターフェイスを使用します。reader.count を使用してレコード数を取得できます。すべてのデータを読み取るには、
limit制限を無効にする必要があります。次の文を使用して、インスタンストンネルをグローバルに有効にし、limit制限を無効にします。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # Disable the limit to read all data. with instance.open_reader() as reader: # Use the instance tunnel to read all data.あるいは、
tunnel=Trueを に追加して、特定の呼び出しに対して インスタンス トンネル を有効化できます。また、limit=Falseを追加して、その呼び出しに対するlimit制限を無効化することもできます。# Use the instance tunnel interface for this open_reader call and read all data. with instance.open_reader(tunnel=True, limit=False) as reader:
実行時パラメーターの設定
hints パラメーター (タイプ: dict) を使用して、実行時パラメーターを構成できます。ヒントの詳細については、「SET 操作」をご参照ください。
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})sql.settings をグローバルに構成する場合、各実行に関連する実行時パラメーターを適用する必要があります。
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # Add hints based on the global configuration.
実行結果の読み取り
SQL 実行によって生成されたインスタンスは、open_reader を直接呼び出すことができます。2 つのシナリオが適用されます。
SQL ステートメントは構造化データを返します。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Process each record.SQL ステートメント (desc など) は非構造化データを返します。reader.raw 属性を使用して、生の実行結果を取得できます。
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)説明カスタムスケジューリングパラメーターを使用し、構成ページから PyODPS 3 ノードを手動でトリガーする場合、PyODPS ノードはこのモードでパラメーター置換を実行できないため、時間をハードコードする必要があります。
DataFrame
DataFrame を使用してデータを処理することもできます。
実行
DataWorks では、即時実行メソッド を明示的に呼び出して、DataFrame を実行する必要があります。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # Call the immediate execution method to process each record.印刷時に即時実行をトリガーしたい場合、
options.interactiveを有効にする必要があります。from odps import options from odps.df import DataFrame options.interactive = True # Enable the switch at the beginning. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Immediate execution is triggered during printing.詳細の出力
options.verboseオプションを True に設定します。DataWorks では、このオプションはデフォルトで有効になっており、実行中に Logview URL などの詳細が出力されます。
例
次の簡単な例は、PyODPS ノードの使用方法を示しています。
データセットを用意し、pyodps_iris サンプルテーブルを作成します。詳細については、「DataFrame を使用したデータ処理」をご参照ください。
DataFrame を作成します。詳細については、「MaxCompute テーブルからの DataFrame オブジェクトの作成」をご参照ください。
PyODPS ノードに次のコードを入力して実行します。
from odps.df import DataFrame # Create a DataFrame from a MaxCompute table. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))戻り結果:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
コードの編集: 高度な例
ノードが定期的なスケジューリングを必要とする場合、そのスケジューリングプロパティを構成します。スケジューリング構成の詳細については、「概要」をご参照ください。
スケジューリングパラメーターの使用
ノードエディターで [プロパティ] をクリックします。[パラメーター] セクションで、カスタムパラメーターを設定します。PyODPS ノードでの変数の定義は、SQL ノードとは異なります。詳細については、「スケジューリングパラメーターの設定」をご参照ください。
SQLノードとは異なり、PyODPSノードはPythonコードとの競合を防止するために、${param_name} のようなプレースホルダーに対する文字列置き換えを実行しません。代わりに、DataWorks は実行前にグローバル args 辞書を注入します。この辞書からパラメーターを取得できます。たとえば、[Parameters] で ds=${yyyymmdd} を設定した場合、次のコードを使用してこのパラメーターを取得できます:
print('ds=' + args['ds'])
ds=20161116ds という名前のパーティションを取得する必要がある場合は、次のメソッドを使用します。
o.get_table('table_name').get_partition('ds=' + args['ds'])PyODPS タスク開発に関するその他のシナリオについては、以下をご参照ください。
次のステップ
カスタム Shell スクリプトタスクの成功の判断: カスタム Python スクリプトタスクの成功ロジックは、Shell ノードと同じです。このメソッドを検証に使用できます。
タスクの公開: 標準モードのワークスペースを使用する場合、定期的なスケジューリングを有効にするために、公開プロセスを介してタスクを本番環境にデプロイする必要があります。
定期タスクの O&M: タスクがスケジューリングのためにオペレーションセンターにデプロイされた後、DataWorks オペレーションセンターで O&M 操作を実行できます。
PyODPS に関するよくある質問: PyODPS 実行中の一般的な問題について学び、例外を迅速にトラブルシューティングして解決できます。
よくある質問
Q: PyODPS 3 ノードはローカル開発中は機能しますが、サードパーティ API (例: Feishu) にアクセスする際にオペレーションセンターでタイムアウトします。なぜですか?
A: サンドボックスのホワイトリストを設定する必要があります。 に移動し、[セキュリティ設定] セクションで、サードパーティ API のエンドポイントをホワイトリストに追加してアクセスを許可します。例:
