DataWorks は PyODPS 3 ノードを提供しており、MaxCompute ジョブ用の Python コードを直接記述して、MaxCompute ジョブを定期的にスケジュールできます。このトピックでは、DataWorks を使用して Python タスクを構成およびスケジュールする方法について説明します。
前提条件
PyODPS 3 ノードが作成されていること。詳細については、「MaxCompute ノードを作成および管理する」をご参照ください。
背景情報
PyODPS は、Python 用の MaxCompute SDK です。PyODPS は、シンプルで便利な Python プログラミングインターフェイスを提供します。この方法で、Python を使用して MaxCompute ジョブのコードを記述し、MaxCompute のテーブルとビューをクエリし、MaxCompute リソースを管理できます。詳細については、「PyODPS」をご参照ください。DataWorks では、PyODPS ノードを使用して Python タスクをスケジュールし、Python タスクを他のタイプのタスクと統合できます。
注意事項
DataWorks リソースグループで PyODPS ノードを実行する際にサードパーティパッケージを参照する必要がある場合は、サーバーレスリソースグループを使用し、カスタムイメージを作成してサードパーティパッケージをインストールします。
説明ノードコードのユーザー定義関数(UDF)でサードパーティパッケージを参照する必要がある場合は、上記の方法を使用してサードパーティパッケージをインストールすることはできません。UDF でサードパーティパッケージを参照する方法については、「例:Python UDF でサードパーティパッケージを参照する」をご参照ください。
サーバーレスリソースグループで実行される PyODPS 3 ノードの PyODPS バージョンをアップグレードするには、イメージ管理機能を使用して、リソースグループで
/home/tops/bin/pip3 install pyodps==0.12.1コマンドを実行できます。 スケジューリング用の専用リソースグループで実行される PyODPS 3 ノードの PyODPS バージョンをアップグレードするには、O&M Assistant 機能を使用して、リソースグループで同じコマンドを実行できます。 コマンドを実行するとき、0.12.1をアップグレード先のバージョンに変更できます。 イメージ管理機能と O&M Assistant 機能の詳細については、「イメージの管理」および「O&M Assistant 機能の使用」をご参照ください。
PyODPS ノードを使用して、仮想プライベートクラウド(VPC)やデータセンターなどの特別なネットワーク環境にデプロイされているデータソースまたはサービスにアクセスする場合は、サーバーレスリソースグループを使用してノードを実行し、リソースグループとデータソースまたはサービスの間にネットワーク <接続> を確立します。詳細については、「ネットワーク接続ソリューション」をご参照ください。
PyODPS 構文の詳細については、「概要」をご参照ください。
PyODPS ノードは、PyODPS 2 ノードと PyODPS 3 ノードに分類されます。2 つのタイプの PyODPS ノードは、基盤となるレイヤーで異なる Python バージョンを使用します。PyODPS 2 ノードは Python 2 を使用し、PyODPS 3 ノードは Python 3 を使用します。使用中の Python バージョンに基づいて PyODPS ノードを作成できます。
PyODPS ノードで SQL ステートメントを実行してもデータ系列が生成されず、データマップがデータ系列を期待どおりに表示できない場合は、PyODPS ノードのコードでスケジュールパラメータを手動で構成することで問題を解決できます。データ系列を表示する方法の詳細については、「データ系列を表示する」をご参照ください。パラメータの構成方法の詳細については、「hints パラメータを構成する」をご参照ください。次のサンプルコードは、タスクの実行に必要なパラメータを取得する方法の例を示しています。
import os ... # DataWorks スケジューラランタイムパラメータを取得する skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # タスクを送信中にヒントを設定する o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
PyODPS ノードの出力ログのサイズは最大 4 MB です。出力ログに多数のデータ結果エントリを含めないことをお勧めします。代わりに、より多くのアラートログと進捗関連のログを提供して、貴重な情報を取得することをお勧めします。
制限
PyODPS ノードの実行に使用するスケジュール専用の排他的リソースグループのリソースの仕様により、PyODPS ノードを使用してオンプレミスマシンから 50 MB 以下のデータを処理することをお勧めします。PyODPS ノードがオンプレミスマシンから 50 MB を超えるデータを処理する場合、メモリ不足(OOM)例外が発生し、システムが Got killed を報告する可能性があります。PyODPS ノードに過剰なデータ処理コードを記述しないことをお勧めします。詳細については、「PyODPS ノードを効率的に使用するためのベストプラクティス」をご参照ください。
サーバーレスリソースグループを使用して PyODPS ノードを実行する場合は、ノードで処理する必要があるデータ量に基づいて、PyODPS ノードに適切な数の CU を構成できます。
説明サーバーレスリソースグループでタスクを実行する場合、単一のタスクに最大 64 CU を構成できます。ただし、タスクの起動に影響を与える可能性のあるリソース不足を避けるため、設定を 16 CU 以内にすることをお勧めします。
システムが Got killed を報告した場合、メモリ使用量が制限を超えており、システムは関連するプロセスを終了します。オンプレミスデータで操作を実行しないことをお勧めします。ただし、メモリ使用量の制限は、PyODPS によって開始された SQL タスクまたは DataFrame タスク(to_pandas タスクを除く)には適用されません。
DataWorks にプリインストールされている NumPy ライブラリと pandas ライブラリを使用して、UDF 以外の関数を実行できます。バイナリコードを含むサードパーティパッケージはサポートされていません。
互換性の理由から、DataWorks ではデフォルトで options.tunnel.use_instance_tunnel が False に設定されています。InstanceTunnel をグローバルに有効にする場合は、このパラメータを True に設定する必要があります。
Python 3 は、Python 3.7 や Python 3.8 などの異なるサブバージョンでバイトコードを異なる方法で定義します。
MaxCompute は Python 3.7 と互換性があります。特定の構文を持つコードを実行すると、Python 3 の別のサブバージョンを使用する MaxCompute クライアントはエラーを返します。たとえば、finally ブロック構文を持つコードを実行すると、Python 3.8 を使用する MaxCompute クライアントはエラーを返します。Python 3.7 を使用することをお勧めします。
PyODPS 3 ノードは、サーバーレスリソースグループで実行できます。サーバーレスリソースグループの購入および使用方法の詳細については、「6サーバーレスリソースグループを作成および使用する」をご参照ください。
PyODPS 3 ノードで複数の PyODPS 3 タスクを同時に実行することはできません。
簡単なコード編集例
PyODPS ノードを作成した後、コードを記述して実行できます。PyODPS 構文の詳細については、「概要」をご参照ください。
MaxCompute エントリポイントを使用する
DataWorks では、各 PyODPS ノードにグローバル変数 odps または o が含まれており、これは MaxCompute エントリポイントです。したがって、MaxCompute エントリポイントを手動で指定する必要はありません。
print(odps.exist_table('PyODPS_iris'))SQL ステートメントを実行する
PyODPS ノードで SQL ステートメントを実行できます。詳細については、「SQL」をご参照ください。
デフォルトでは、DataWorks で InstanceTunnel は無効になっています。この場合、instance.open_reader は Result インターフェイスを使用して実行され、最大 10,000 件のデータレコードを読み取ることができます。reader.count を使用して、読み取られたデータレコードの数を確認できます。すべてのデータを反復的に読み取る必要がある場合は、読み取るデータレコード数の制限を削除する必要があります。次のステートメントを実行して、InstanceTunnel をグローバルに有効にし、読み取るデータレコード数の制限を削除できます。
options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 読み取るデータレコード数の制限を削除します。 with instance.open_reader() as reader: # InstanceTunnel を使用してすべてのデータを読み取ります。tunnel=Trueを に追加して、現在の open_reader 操作で InstanceTunnel を有効にすることもできます。limit=Falseを open_reader に追加して、現在の open_reader 操作で読み取るデータレコード数の制限を削除できます。# 現在の open_reader 操作は InstanceTunnel を使用して実行され、すべてのデータを読み取ることができます。 with instance.open_reader(tunnel=True, limit=False) as reader:
ランタイムパラメータを構成する
hints パラメータを使用して、ランタイムパラメータを構成できます。 hints パラメータは DICT タイプです。 hints パラメータの詳細については、「SET 操作」をご参照ください。
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})グローバル構成用に sql.settings パラメータを構成する場合は、ノードコードを実行するたびにランタイムパラメータを構成する必要があります。
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # グローバル構成に基づいて hints パラメータを構成します。
SQL ステートメントのクエリ結果を取得する
次のシナリオでは、open_reader メソッドを使用してクエリ結果を取得できます。
SQL ステートメントが構造化データを返す場合。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 各レコードを処理します。DESC などの SQL ステートメントが実行される場合。この場合、reader.raw プロパティを使用して生のクエリ結果を取得できます。
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)説明カスタムスケジュールパラメータを使用し、ノードの構成タブで PyODPS 3 ノードを実行する場合は、スケジュールパラメータを固定値に設定して固定時間を指定する必要があります。PyODPS ノードのカスタムスケジュールパラメータの値は自動的に置き換えられません。
DataFrame を使用してデータを処理する
DataFrame(推奨されません) を使用してデータを処理できます。
DataFrame API 操作を呼び出す
DataFrame API 操作は自動的には呼び出されません。これらの操作は、すぐに実行されるメソッドを明示的に呼び出した場合にのみ呼び出すことができます。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # すぐに実行されるメソッドを呼び出して、各データレコードを処理します。データ表示のためにすぐに実行されるメソッドを呼び出すには、
options.interactiveを True に設定します。from odps import options from odps.df import DataFrame options.interactive = True # コードの最初に options.interactive を True に設定します。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # システムが情報を表示した後、メソッドがすぐに実行されます。詳細を表示する
詳細を表示するには、
options.verboseを True に設定する必要があります。DataWorks では、デフォルトでこのパラメータが True に設定されています。システムは、実行プロセス中に Logview URL などの詳細を表示します。
例
次の例では、PyODPS ノードの使用方法について説明します。
データセットを準備し、テーブル pyodps_iris を作成します。 詳細については、「DataFrame データ処理」をご参照ください。
DataFrame オブジェクトを作成します。詳細については、「MaxCompute テーブルから DataFrame オブジェクトを作成する」をご参照ください。
PyODPS ノードのコードエディタに次のコードを入力して、コードを実行します。
from odps.df import DataFrame # MaxCompute テーブルを使用して DataFrame オブジェクトを作成します。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))次の情報が返されます。
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
高度なコード編集例
PyODPS ノードを定期的にスケジュールする場合は、ノードのスケジュールプロパティを定義する必要があります。詳細については、「概要」をご参照ください。
スケジュールパラメータを構成する
ノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。プロパティ タブの [スケジューリングパラメーター] セクションで、ノードのスケジューリングパラメーターを構成します。PyODPS ノードのスケジューリングパラメーターの構成方法は、SQL ノードの場合とは異なります。詳細については、「さまざまなタイプのノードのスケジューリングパラメーターを構成する」をご参照ください。
DataWorks の SQL ノードとは異なり、PyODPS ノードのコードでは ${param_name} のような文字列は置き換えられません。代わりに、ノードコードが実行される前に、args という名前の辞書がグローバル変数として PyODPS ノードに追加されます。辞書からスケジューリングパラメーターを取得できます。このようにして、Python コードは影響を受けません。たとえば、プロパティ タブの [スケジューリングパラメーター] セクションで ds=${yyyymmdd} を設定した場合、次のコマンドを実行してパラメーター値を取得できます。
print('ds=' + args['ds'])
ds=20161116ds という名前のパーティションを取得するには、次のコマンドを実行できます。
o.get_table('table_name').get_partition('ds=' + args['ds'])他のシナリオで PyODPS タスクを開発する方法の詳細については、次のトピックをご参照ください。
次のステップ
カスタムシェルスクリプトが正常に実行されたかどうかを確認する: カスタム Python スクリプトが正常に実行されたかどうかを確認するロジックは、カスタムシェルスクリプトが正常に実行されたかどうかを確認するロジックと同じです。この方法を使用して、カスタム Python スクリプトが正常に実行されたかどうかを確認できます。
PyODPS 3 ノードをデプロイする: 標準モードのワークスペースを使用する場合は、PyODPS 3 ノードを定期的にスケジュールする前に、本番環境にデプロイする必要があります
PyODPS 3 ノードで O&M を実行する: PyODPS 3 ノードが本番環境のオペレーションセンターにデプロイされた後、オペレーションセンターでノードの O&M 操作を実行できます。
PyODPS に関するよくある質問を確認する: PyODPS に関するよくある質問を確認できます。この方法で、例外が発生した場合に、効率的な方法で問題を特定してトラブルシューティングできます。
FAQ
Q: PyODPS 3 ノードを実行して、コードを使用して Lark データなどのサードパーティ <アプリケーション> のデータを収集し、DataWorks にインポートすると、ノードはオンプレミスマシンで正常に実行され、データを収集できます。ただし、PyODPS 3 ノードが本番環境にコミットされた後にオペレーションセンターで実行されると、応答タイムアウトエラーが報告されます。なぜですか?
A: DataWorks コンソールの[管理センター]にある[ワークスペース]ページの[セキュリティ設定]セクションでサンドボックスホワイトリストを設定し、PyODPS 3 ノードがサードパーティアプリケーションにアクセスできるように、そのアドレス情報を追加します。 以下の図に設定例を示します。
