DataWorks は PyODPS 3 ノードを提供します。このノードを使用して、MaxCompute ジョブ用の Python コードを記述し、定期的に実行するようにスケジュールできます。このトピックでは、DataWorks で Python タスクを構成およびスケジュールする方法について説明します。
概要
PyODPS は、MaxCompute 向けの Python ソフトウェア開発キット (SDK) です。シンプルで使いやすいプログラミングインターフェイスを提供し、Python を使用してジョブの記述、テーブルやビューのクエリ、MaxCompute リソースの管理を行うことができます。詳細については、「PyODPS」をご参照ください。DataWorks では、PyODPS ノードを使用して Python タスクをスケジュールおよび実行し、他のジョブと統合できます。
注意事項
DataWorks リソースグループで PyODPS ノードを実行する際に、コードがサードパーティパッケージを呼び出す必要がある場合は、サーバーレスリソースグループとカスタムイメージを使用してパッケージをインストールできます。
説明コード内のユーザー定義関数 (UDF) がサードパーティパッケージを参照している場合、このメソッドは使用できません。参照の設定方法については、「例:Python UDF でサードパーティパッケージを参照する」をご参照ください。
PyODPS のバージョンをアップグレードするには、サーバーレスリソースグループを使用している場合は、カスタムイメージを使用して
/home/tops/bin/pip3 install pyodps==0.12.1コマンドを実行します。0.12.1はターゲットの PyODPS バージョンに置き換えることができます。専用スケジューリングリソースグループを使用している場合は、O&M Assistant を使用して同じコマンドを実行します。PyODPS タスクが VPC や IDC ネットワーク内のデータソースやサービスなど、特別なネットワーク環境にアクセスする必要がある場合は、スケジューリングにサーバーレスリソースグループを使用します。その後、サーバーレスリソースグループとターゲット環境との間にネットワーク接続を確立します。詳細については、「ネットワーク接続ソリューション」をご参照ください。
PyODPS の構文の詳細については、「PyODPS ドキュメント」をご参照ください。
PyODPS ノードは、PyODPS 2 と PyODPS 3 の 2 種類に分類されます。この 2 種類のノードは、基盤となる Python のバージョンが異なります。PyODPS 2 ノードは Python 2 を使用し、PyODPS 3 ノードは Python 3 を使用します。使用している Python のバージョンに対応する PyODPS ノードタイプを選択してください。
PyODPS ノードで SQL 文を実行したときにデータマップでデータリネージが生成されない場合は、タスクコードで関連する DataWorks のスケジューリングパラメーターを手動で設定することで、この問題を解決できます。データリネージを表示するには、「リネージ情報の表示」をご参照ください。パラメーターを設定するには、「実行時パラメーター (ヒント) の設定」をご参照ください。次のサンプルコードを使用して、実行時に必要なパラメーターを取得できます。
import os ... # DataWorks スケジューラの実行時パラメーターを取得 skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # タスク送信時にヒントを設定 o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...PyODPS ノードの出力ログは最大 4 MB のサイズをサポートします。大量のデータ結果をログに直接出力することは避けてください。代わりに、アラートログや通常の進捗ログを書き込むことで、より価値のある情報を提供します。
制限事項
専用スケジューリングリソースグループを使用して PyODPS ノードを実行する場合、リソースグループでローカルに処理されるデータは 50 MB を超えないようにすることを推奨します。これは、専用スケジューリングリソースグループの仕様によるものです。ローカルで処理されるデータが多すぎてオペレーティングシステムのしきい値を超えると、メモリ不足 (OOM) エラー (Got Killed) が発生する可能性があります。PyODPS ノードに大量のデータ処理コードを記述することは避けてください。詳細については、「PyODPS を効率的に使用するためのベストプラクティス」をご参照ください。
サーバーレスリソースグループを使用して PyODPS ノードを実行する場合、処理する必要のあるデータ量に基づいてノードの計算ユニット (CU) を構成できます。
説明サーバーレスリソースグループでタスクを実行する場合、単一のタスクは最大
64CUの構成をサポートします。ただし、CU 数が多すぎるとリソース不足を引き起こし、タスクの起動に影響を与える可能性があるため、16CUを超えないようにすることを推奨します。Got killed エラーが表示された場合、メモリ使用量が制限を超え、プロセスが中止されたことを意味します。そのため、ローカルでのデータ操作はできるだけ避けてください。この制限は、PyODPS を使用して開始される SQL および DataFrame タスク (to_pandas を除く) には適用されません。
UDF を含まないコードには、プリインストール済みの Numpy および Pandas ライブラリを使用できます。バイナリコードを含む他のサードパーティパッケージはサポートされていません。
互換性の理由から、DataWorks では
options.tunnel.use_instance_tunnelはデフォルトでFalseに設定されています。instance tunnelをグローバルに有効にするには、この値を手動でTrueに設定する必要があります。バイトコードの定義は、Python 3.8 や Python 3.7 などの Python 3 のサブバージョン間で異なります。
現在、MaxCompute は Python 3.7 を使用しています。Python 3.8 の finally ブロックなど、他の Python 3 バージョンの構文を使用すると、実行中にエラーが報告されます。Python 3.7 を使用することを推奨します。
PyODPS 3 は、サーバーレスリソースグループでの実行をサポートしています。購入方法と使用方法については、「サーバーレスリソースグループの使用」をご参照ください。
PyODPS ノードでの複数の Python タスクの同時実行はサポートされていません。
PyODPS ノードのログを出力するには、
printを使用します。logger.infoの使用はサポートされていません。
事前準備
DataWorks ワークスペースに MaxCompute コンピュートエンジンをアタッチします。
操作手順
PyODPS 3 ノードのエディターページで、コードを記述してデバッグできます。
PyODPS 3 コードの例
PyODPS ノードを作成した後、コードを編集して実行できます。PyODPS の構文の詳細については、「基本操作の概要」をご参照ください。このトピックでは、以下の 5 つのコード例を紹介します。要件に応じて例を選択してください。
ODPS エントリポイント
DataWorks では、PyODPS ノードにはグローバル変数
odpsまたはoが含まれており、これが ODPS エントリポイントです。ODPS エントリポイントを手動で定義する必要はありません。print(odps.exist_table('PyODPS_iris'))SQL の実行
PyODPS ノードで SQL 文を実行できます。詳細については、「SQL」をご参照ください。
デフォルトでは、DataWorks で
instance tunnelは無効になっています。これは、instance.open_readerが Result インターフェイスを使用し、最大 10,000 レコードまでしか読み取れないことを意味します。reader.countを使用してレコード数を取得できます。すべてのデータを反復的に取得するには、limit制限を無効にする必要があります。次の文を使用して、instance tunnelをグローバルに有効にし、limit制限を無効にすることができます。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 制限を無効にしてすべてのデータを読み取る with instance.open_reader() as reader: # すべてのデータは Instance Tunnel を通じて読み取ることができるまた、
open_readerの呼び出しにtunnel=Trueを追加して、現在のopen_reader操作に対してのみinstance tunnelを有効にすることもできます。同時に、limit=Falseを追加して、現在の操作に対してのみlimit制限を無効にすることができます。# この open_reader 操作は Instance Tunnel インターフェイスを使用し、すべてのデータを読み取ることができる with instance.open_reader(tunnel=True, limit=False) as reader:
実行時パラメーターの設定
hintsパラメーターを設定することで、実行時パラメーターを設定できます。パラメーターの型はdictです。ヒントパラメーターの詳細については、「SET 操作」をご参照ください。o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})グローバル構成で
sql.settingsを設定した後、タスクが実行されるたびに関連する実行時パラメーターを追加する必要があります。from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # グローバル構成に基づいてヒントを追加
実行結果の読み取り
SQL を実行するインスタンスは、直接
open_reader操作を実行できます。以下の 2 つのシナリオが考えられます。SQL 文が構造化データを返す場合。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 各レコードを処理desc などの SQL 文が実行される場合。
reader.rawプロパティを使用して、元の SQL 実行結果を取得できます。with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)説明カスタムスケジューリングパラメーターを使用し、PyODPS 3 ノードをエディターページから直接実行する場合、パラメーター変数は自動的に置き換えられません。コード内で手動で値を指定する必要があります。
DataFrame
DataFrame (非推奨) を使用してデータを処理することもできます。
実行
DataWorks 環境では、DataFrame の実行には 即時実行メソッドの明示的な呼び出しが必要です。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 即時実行メソッドを呼び出して各レコードを処理print 関数を使用して即時実行メソッドをトリガーするには、
options.interactiveを有効にする必要があります。from odps import options from odps.df import DataFrame options.interactive = True # 最初にスイッチを有効にする iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Print 中に即時実行がトリガーされる詳細情報の出力
options.verboseオプションを設定します。DataWorks では、このオプションはデフォルトで有効になっています。実行中に Logview URL などの詳細情報が出力されます。
PyODPS 3 コード開発
以下の簡単な例は、PyODPS ノードの使用方法を示しています。
データセットを準備し、pyodps_iris サンプルテーブルを作成します。詳細については、「DataFrame データ処理」をご参照ください。
DataFrame を作成します。詳細については、「MaxCompute テーブルから DataFrame を作成する」をご参照ください。
PyODPS ノードに次のコードを入力して実行します。
from odps.df import DataFrame # ODPS テーブルから DataFrame を作成 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
PyODPS タスクの実行
[デバッグ設定] タブの [コンピューティングリソース] セクションで、[コンピューティングリソース、コンピューティングクォータ]、および [DataWorks リソースグループ] を選択して設定します。
説明パブリックネットワークまたは VPC 環境のデータソースにアクセスするには、データソースとの接続性テストに合格したスケジューリング用のリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。
必要に応じて [イメージ] 情報を設定できます。
ツールバーのパラメーターダイアログボックスで、MaxCompute データソースを選択し、[実行] をクリックして PyODPS タスクを実行します。
ノードタスクを定期的に実行するには、必要に応じてスケジューリング情報を設定します。設定の詳細については、「ノードのスケジューリングプロパティの設定」をご参照ください。
DataWorks の SQL ノードとは異なり、PyODPS ノードはコード内の ${param_name} のような文字列を置き換えません。これはコードへの影響を防ぐためです。代わりに、コードが実行される前に
argsという名前の辞書がグローバル変数に追加されます。この辞書からスケジューリングパラメーターを取得できます。たとえば、[パラメーター] セクションでds=${yyyymmdd}を設定した場合、次のメソッドを使用してコード内のパラメーター情報を取得できます。print('ds=' + args['ds']) ds=20240930説明dsという名前のパーティションを取得するには、次のメソッドを使用できます。o.get_table('table_name').get_partition('ds=' + args['ds'])ノードタスクを設定した後、ノードを公開する必要があります。詳細については、「ノードまたはワークフローの公開」をご参照ください。
タスクが公開された後、オペレーションセンターで自動トリガータスクの実行ステータスを表示できます。詳細については、「オペレーションセンター入門」をご参照ください。
ロールを関連付けたノードの実行
ノードに RAM ロールを関連付けて、ノードタスクを実行できます。これにより、詳細な権限コントロールとセキュリティ管理が可能になります。
次のステップ
PyODPS に関するよくある質問:PyODPS の実行時に発生する可能性のある一般的な問題について学びます。これにより、例外を迅速にトラブルシューティングして解決できます。