DataWorks は、Python で MaxCompute ジョブを作成し、定期的に実行するための PyODPS 3 ノードを提供します。このトピックでは、DataWorks を使用して Python ジョブを構成およびスケジュールする方法について説明します。
前提条件
PyODPS 3 ノードが作成されていること。詳細については、「MaxCompute ノードの作成と管理」をご参照ください。
背景情報
PyODPS は MaxCompute の Python SDK です。MaxCompute ジョブの作成、テーブルやビューのクエリ、リソースの管理を行うための Python プログラミングインターフェイスを提供します。詳細については、「PyODPS」をご参照ください。DataWorks では、PyODPS ノードを使用して Python ジョブをスケジュールおよび実行し、他の種類のジョブと統合できます。
注意事項
-
PyODPS コードでサードパーティパッケージが必要な場合は、サーバーレスリソースグループとカスタムイメージを使用してインストールできます。
説明コードにサードパーティパッケージを参照するユーザー定義関数 (UDF) が含まれている場合、この方法はサポートされません。正しい構成については、「UDF の例:Python UDF でサードパーティパッケージを使用する」をご参照ください。
-
PyODPS のバージョンをアップグレードするには、カスタムイメージを使用してサーバーレスリソースグループに対して
/home/tops/bin/pip3 install pyodps==0.12.1コマンドを実行するか (0.12.1をターゲットの PyODPS バージョンに置き換えることができます)、O&M Assistant を使用して専用スケジューリングリソースグループに対して同じコマンドを実行します。
-
PyODPS ジョブが VPC 内のデータソースやサービス、またはオンプレミスデータセンター (IDC) などの特別なネットワーク環境にアクセスする必要がある場合は、サーバーレスリソースグループを使用し、そのリソースグループとターゲット環境との間にネットワーク接続を確立します。詳細については、「ネットワーク接続ソリューション」をご参照ください。
-
PyODPS の構文の詳細については、「PyODPS ドキュメント」をご参照ください。
-
PyODPS ノードには、PyODPS 2 と PyODPS 3 の 2 種類があります。これらは異なる基盤となる Python バージョンを使用します:PyODPS 2 ノードは Python 2 を使用し、PyODPS 3 ノードは Python 3 を使用します。ご利用の Python バージョンに一致するノードタイプを作成してください。
-
PyODPS ノードで SQL を実行しても正しいデータリネージが生成されず、データマップに表示されない場合は、コード内で DataWorks のスケジューリングおよび実行時パラメーターを手動で設定することで問題を解決します。データリネージの表示方法については、「データリネージの表示」をご参照ください。パラメーター設定については、「実行時パラメーターヒントの設定」をご参照ください。次のサンプルコードを使用して、必要な実行時パラメーターを取得できます:
import os ... # DataWorks スケジューラの実行時パラメーターを取得 skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # タスク送信時にヒントを設定 o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
-
PyODPS ノードの出力ログの最大サイズは 4 MB です。大きなデータ結果をログに出力することは避け、重要なアラートと進捗情報のみを出力するようにしてください。
制限事項
-
専用スケジューリングリソースグループで PyODPS ノードを実行する場合、50 MB を超えるローカルデータを処理しないでください。これは、専用スケジューリングリソースグループのリソース仕様によるものです。オペレーティングシステムのしきい値を超える大量のローカルデータを処理すると、メモリ不足 (OOM) エラーが発生し、
Got Killedメッセージが表示されることがあります。PyODPS ノードに直接、広範なデータ処理コードを記述することは避けてください。詳細については、「PyODPS を効率的に使用するためのベストプラクティス」をご参照ください。 -
サーバーレスリソースグループを使用して PYODPS ノードを実行する場合、処理する必要のあるデータ量に基づいてノードの CU を構成できます。
説明サーバーレスリソースグループでタスクを実行する場合、単一のタスクは最大で
64CUの構成をサポートしますが、CU 値が過大になるとリソース不足を引き起こし、タスクの起動に影響を与える可能性があるため、16CUを超えないようにすることを推奨します。 -
Got killed エラーは、メモリ使用量が制限を超え、プロセスが終了したことを示します。これを防ぐには、ローカルデータ操作を避けてください。この制限は、PyODPS を介して開始される SQL または DataFrame ジョブ (
to_pandasを除く) には適用されません。 -
カスタム関数を含まないコードには、プリインストール済みの Numpy および Pandas ライブラリを使用できます。バイナリコードを含む他のサードパーティパッケージはサポートされていません。
-
互換性の理由から、DataWorks では options.tunnel.use_instance_tunnel はデフォルトで False に設定されています。インスタンストンネルをグローバルに有効にする必要がある場合は、この値を手動で True に設定する必要があります。
-
バイトコードの定義は、Python 3.8 と Python 3.7 のような Python 3 のマイナーバージョン間で異なります。
MaxCompute は現在 Python 3.7 を使用しています。Python 3.8 の
finallyブロックなど、他の Python 3 バージョンの構文を使用すると、実行エラーが発生します。Python 3.7 の使用を推奨します。 -
PyODPS 3 はサーバーレスリソースグループでの実行をサポートしています。購入して使用するには、「サーバーレスリソースグループの使用」をご参照ください。
-
単一の PyODPS ノード内で複数の Python ジョブを同時に実行することはサポートされていません。
コードの編集:基本例
PyODPS ノードを作成した後、コードを編集して実行できます。PyODPS の構文の詳細については、「基本操作の概要」をご参照ください。
-
ODPS エントリポイント
DataWorks の PyODPS ノードは、ODPS エントリポイントとして odps または o という名前のグローバル変数を提供します。手動で定義する必要はありません。
print(odps.exist_table('PyODPS_iris')) -
SQL の実行
PyODPS ノードで SQL ステートメントを実行できます。詳細については、「SQL」をご参照ください。
-
デフォルトでは、DataWorks でインスタンストンネルは無効になっています。これは、instance.open_reader が Result インターフェイスを使用し、最大 10,000 レコードを読み取ることを意味します。reader.count を使用してレコード数を取得できます。すべてのデータを反復処理するには、
limitを無効にする必要があります。次のステートメントを使用して、インスタンストンネルをグローバルに有効にし、limitを無効にします。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 制限を無効にしてすべてのデータを読み取ります。 with instance.open_reader() as reader: # インスタンストンネルを介してすべてのデータを読み取ることができます。 -
また、 に
tunnel=Trueを追加して、現在の open_reader 呼び出しに対してインスタンストンネルを有効にすることもできます。同様に、limit=Falseを追加して、現在の呼び出しに対するlimit制限を無効にすることもできます。# 現在の open_reader 操作でインスタンストンネルインターフェイスを使用してすべてのデータを読み取ります。 with instance.open_reader(tunnel=True, limit=False) as reader:
-
-
実行時パラメーター
-
hints パラメーター (これは dict です) を使用して実行時パラメーターを設定します。ヒントの詳細については、「SET 操作」をご参照ください。
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16}) -
グローバル構成で sql.settings を設定すると、これらの実行時パラメーターがすべての実行に追加されます。
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # この呼び出しには、グローバル構成からのヒントが含まれています。
-
-
実行結果
SQL 実行インスタンスは、次の 2 つのシナリオで open_reader 操作を直接実行できます:
-
SQL ステートメントは構造化データを返します。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 各レコードを処理します。 -
descのようなステートメントを実行する場合、reader.raw プロパティを使用して、生の SQL 実行結果を取得できます。with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)説明カスタムスケジューリングパラメーターを使用する場合、ページから PyODPS 3 ノードを直接トリガーして実行する際には、時間をハードコーディングする必要があります。PyODPS ノードはこの値を直接代入できません。
-
-
DataFrame
DataFrame (非推奨) を使用してデータを処理することもできます。
-
実行
DataWorks 環境では、DataFrame 操作は即時実行メソッドを呼び出すことによって明示的にトリガーする必要があります。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 即時実行メソッドを呼び出して各レコードを処理します。印刷時に即時実行をトリガーする必要がある場合は、
options.interactiveを有効にする必要があります。from odps import options from odps.df import DataFrame options.interactive = True # 最初にスイッチを有効にします。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # 印刷時に即時実行がトリガーされます。 -
詳細情報の出力
options.verboseオプションを設定します。このオプションは DataWorks でデフォルトで有効になっており、実行中に Logview URL などの詳細情報を出力します。
-
例
次の例は、PyODPS ノードの使用方法を示しています:
-
データセットを準備し、pyodps_iris サンプルテーブルを作成します。詳細については、「DataFrame を使用したデータ処理」をご参照ください。
-
DataFrame を作成します。詳細については、「MaxCompute テーブルから DataFrame を作成する」をご参照ください。
-
PyODPS ノードに次のコードを入力して実行します。
from odps.df import DataFrame # ODPS テーブルから DataFrame を作成します。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))次の結果が返されます:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
コードの編集:高度な例
ノードを定期的に実行する必要がある場合は、そのスケジューリングプロパティを定義する必要があります。詳細については、「ノードのスケジューリングプロパティの構成」をご参照ください。
スケジューリングパラメーター
ノードエディターの右側のペインで Scheduling Settings をクリックします。Parameter セクションで、カスタムパラメーターを構成します。PyODPS ノードでの変数の定義方法は、SQL ノードでの定義方法とは異なります。詳細については、「スケジューリングパラメーターの構成」をご参照ください。
DataWorks の SQL ノードとは異なり、PyODPS ノードはコード内の ${param_name} のような文字列を置き換えません。代わりに、コードが実行される前に、args という名前の dict がグローバル変数に追加されます。この dict からスケジューリングパラメーターを取得できます。たとえば、Parameter で ds=${yyyymmdd} を設定した場合、次のメソッドを使用してコード内でこのパラメーターを取得できます。
print('ds=' + args['ds'])
ds=20161116
ds という名前のパーティションを取得する必要がある場合は、次のメソッドを使用できます。
o.get_table('table_name').get_partition('ds=' + args['ds'])
他のユースケース向けの PyODPS ジョブの開発に関する詳細については、次のトピックをご参照ください:
次のステップ
-
カスタム Shell スクリプトが正常に実行されたかどうかを判断する:カスタム Python スクリプトが正常に実行されたかどうかを判断するロジックは、Shell スクリプトの場合と同じです。この方法で検証できます。
-
ジョブのデプロイ:標準モードのワークスペースを使用している場合、ジョブを定期的に実行する前に、本番環境にデプロイする必要があります。
-
定期実行ジョブの O&M:ジョブが本番環境にデプロイされ、スケジュールされた後、オペレーションセンターでジョブの O&M を実行できます。
-
PyODPS よくある質問:PyODPS ジョブの実行に関する一般的な質問への回答を見つけて、問題を迅速にトラブルシューティングするのに役立ててください。
よくある質問
Q:PyODPS3 ノードを使用して、Lark などのサードパーティ API からデータを収集し、DataWorks にインポートしています。コードはローカル開発環境では問題なく実行されますが、本番環境に送信してオペレーションセンターで実行すると、応答タイムアウトエラーが報告されます。なぜですか?
A: 。 サンドボックスホワイトリストに、PyODPS 3 ジョブがアクセスできるようにサードパーティ API のドメイン名を追加します。 例:
Lark API のドメイン名 open.feishu.cn を追加し、ポートを 443 に設定します。