DataWorks は、DataWorks で PyODPS 構文を使用して PyODPS タスクを開発できる PyODPS 2 ノードを提供します。 PyODPS は、MaxCompute の Python SDK と統合されています。 DataWorks コンソールで PyODPS 2 ノードに Python コードを記述して、MaxCompute のデータを処理できます。
前提条件
PyODPS 2 ノードが作成されます。 詳細については、「MaxCompute ノードを作成および管理する」をご参照ください。
背景情報
PyODPS は、Python 用の MaxCompute SDK です。 PyODPS は、シンプルで便利な Python プログラミングインターフェイスを提供します。 このようにして、Python を使用して MaxCompute ジョブのコードを記述し、MaxCompute のテーブルとビューをクエリし、MaxCompute リソースを管理できます。 詳細については、「PyODPS」をご参照ください。 DataWorks では、PyODPS ノードを使用して Python タスクをスケジュールし、Python タスクを他のタイプのタスクと統合できます。
注意事項
DataWorks リソースグループで PyODPS ノードを実行するときにサードパーティパッケージを参照する必要がある場合は、サーバーレスリソースグループを使用し、カスタムイメージを作成してサードパーティパッケージをインストールします。
説明ノードコードのユーザー定義関数(UDF)でサードパーティパッケージを参照する必要がある場合は、上記の方法を使用してサードパーティパッケージをインストールすることはできません。 UDF でサードパーティパッケージを参照する方法については、「例:Python UDF でサードパーティパッケージを参照する」をご参照ください。
サーバーレスリソースグループで実行される PyODPS 3 ノードの PyODPS バージョンをアップグレードするには、イメージ管理機能を使用して、リソースグループで
/home/tops/bin/pip3 install pyodps==0.12.1コマンドを実行できます。スケジューリング用の専用リソースグループで実行される PyODPS 3 ノードの PyODPS バージョンをアップグレードするには、O&M Assistant 機能を使用して、リソースグループで同じコマンドを実行できます。コマンドを実行するときは、0.12.1をアップグレード先のバージョンに変更できます。イメージ管理機能と O&M Assistant 機能の詳細については、「イメージの管理」および「O&M Assistant 機能の使用」をご参照ください。
PyODPS ノードを使用して、仮想プライベートクラウド(VPC)やデータセンターなどの特別なネットワーク環境にデプロイされているデータソースまたはサービスにアクセスする場合は、サーバーレスリソースグループを使用してノードを実行し、リソースグループとデータソースまたはサービスの間にネットワーク接続を確立します。 詳細については、「ネットワーク接続ソリューション」をご参照ください。
PyODPS 構文の詳細については、「概要」をご参照ください。
PyODPS ノードは、PyODPS 2 ノードと PyODPS 3 ノードに分類されます。 2 つのタイプの PyODPS ノードは、基盤となるレイヤーで異なる Python バージョンを使用します。 PyODPS 2 ノードは Python 2 を使用し、PyODPS 3 ノードは Python 3 を使用します。 使用中の Python バージョンに基づいて PyODPS ノードを作成できます。
PyODPS ノードで SQL ステートメントを実行することでデータ系列を生成できず、データマップがデータ系列を期待どおりに表示できない場合は、PyODPS ノードのコードでスケジューリングパラメータを手動で構成することで問題を解決できます。 データ系列を表示する方法の詳細については、「データ系列を表示する」をご参照ください。 パラメータの構成方法の詳細については、「hints パラメータを構成する」をご参照ください。 次のサンプルコードは、タスクの実行に必要なパラメータを取得する方法の例を示しています。
import os ... # DataWorksスケジューラーランタイムパラメーターを取得する skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # タスクを送信中にヒントを設定する o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
PyODPS ノードの出力ログのサイズは最大 4 MB です。 出力ログに多数のデータ結果エントリを含めないことをお勧めします。 代わりに、より多くのアラートログと進捗関連ログを提供して、貴重な情報を取得することをお勧めします。
制限
PyODPS 2 ノードの実行に使用するスケジューリング専用の排他的リソースグループのリソースの仕様により、PyODPS 2 ノードを使用してオンプレミスマシンから 50 MB 以下のデータのみを処理することをお勧めします。 PyODPS 2 ノードがオンプレミスマシンから 50 MB を超えるデータを処理する場合、メモリ不足(OOM)例外が発生し、システムが Got killed を報告する可能性があります。 PyODPS 2 ノードに過剰なデータ処理コードを記述しないことをお勧めします。 詳細については、「PyODPS ノードを効率的に使用するためのベストプラクティス」をご参照ください。
サーバーレスリソースグループを使用して PyODPS 2 ノードを実行する場合は、ノードで処理する必要があるデータ量に基づいて、PyODPS 2 ノードに適切な数の CU を構成できます。
説明サーバーレスリソースグループでタスクを実行する場合、単一のタスクに最大 64 CU を構成できます。 ただし、タスクの起動に影響を与える可能性のあるリソース不足を避けるため、設定を 16 CU 以内にすることをお勧めします。
システムが Got killed を報告した場合、メモリ使用量が制限を超え、システムは関連するプロセスを終了します。 ローカルデータ操作を実行しないことをお勧めします。 ただし、メモリ使用量の制限は、PyODPS によって開始された SQL タスクまたは DataFrame タスク(to_pandas タスクを除く)には適用されません。
DataWorks にプリインストールされている NumPy ライブラリと pandas ライブラリを使用して、UDF 以外の関数を実行できます。 バイナリコードを含むサードパーティパッケージはサポートされていません。
互換性の理由から、DataWorks では options.tunnel.use_instance_tunnel はデフォルトで False に設定されています。 InstanceTunnel をグローバルに有効にする場合は、このパラメータを True に設定する必要があります。
PyODPS 2 ノードの Python バージョンは 2.7 です。
PyODPS 2 ノードで複数の Python タスクを同時に実行することはできません。
簡単なコード編集例
PyODPS ノードを作成した後、コードを記述して実行できます。 PyODPS 構文の詳細については、「概要」をご参照ください。
MaxCompute エントリポイントを使用する
DataWorks では、各 PyODPS ノードにはグローバル変数 odps または o が含まれており、これは MaxCompute エントリポイントです。 したがって、MaxCompute エントリポイントを手動で指定する必要はありません。
print(odps.exist_table('PyODPS_iris'))SQL ステートメントを実行する
PyODPS ノードで SQL ステートメントを実行できます。 詳細については、「SQL」をご参照ください。
デフォルトでは、DataWorks で InstanceTunnel は無効になっています。 この場合、instance.open_reader は Result インターフェイスを使用して実行され、最大 10,000 件のデータレコードを読み取ることができます。 reader.count を使用して、読み取られたデータレコードの数を確認できます。 すべてのデータを反復的に読み取る必要がある場合は、読み取るデータレコードの数の制限を削除する必要があります。 次のステートメントを実行して、InstanceTunnel をグローバルに有効にし、読み取るデータレコードの数の制限を削除できます。
options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 読み取るデータレコードの数の制限を削除します。 with instance.open_reader() as reader: # InstanceTunnel を使用してすべてのデータを読み取ります。tunnel=Trueを に追加して、現在の open_reader 操作で InstanceTunnel を有効にすることもできます。limit=Falseを open_reader に追加して、現在の open_reader 操作で読み取るデータレコードの数の制限を削除できます。# 現在の open_reader 操作は InstanceTunnel を使用して実行され、すべてのデータを読み取ることができます。 with instance.open_reader(tunnel=True, limit=False) as reader:
ランタイムパラメータを構成する
hints パラメータを使用して、ランタイムパラメータを構成できます。 hints パラメータは DICT タイプです。 hints パラメータの詳細については、「SET 操作」をご参照ください。
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})グローバル構成用に sql.settings パラメータを構成する場合は、ノードコードを実行するたびにランタイムパラメータを構成する必要があります。
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # グローバル構成に基づいて hints パラメータを構成します。
SQL ステートメントのクエリ結果を取得する
次のシナリオでは、open_reader メソッドを使用してクエリ結果を取得できます。
SQL ステートメントが構造化データを返す場合。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 各レコードを処理します。DESC などの SQL ステートメントが実行される場合。 この場合、reader.raw プロパティを使用して生のクエリ結果を取得できます。
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)説明カスタムスケジューリングパラメータを使用し、ノードの構成タブで PyODPS 3 ノードを実行する場合は、スケジューリングパラメータを固定値に設定して固定時間を指定する必要があります。 PyODPS ノードのカスタムスケジューリングパラメータの値は自動的に置き換えることができません。
DataFrame を使用してデータを処理する
DataFrame(推奨されません) を使用してデータを処理できます。
DataFrame API 操作を呼び出す
DataFrame API 操作は自動的には呼び出されません。 これらの操作は、すぐに実行されるメソッドを明示的に呼び出した場合にのみ呼び出すことができます。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # すぐに実行されるメソッドを呼び出して、各データレコードを処理します。データ表示のためにすぐに実行されるメソッドを呼び出すには、
options.interactiveを True に設定します。from odps import options from odps.df import DataFrame options.interactive = True # コードの先頭で options.interactive を True に設定します。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # システムが情報を表示した後、メソッドがすぐに実行されます。詳細を表示する
詳細を表示するには、
options.verboseを True に設定する必要があります。 デフォルトでは、このパラメータは DataWorks で True に設定されています。 システムは、実行プロセス中に Logview URL などの詳細を表示します。
例
次の例では、PyODPS ノードの使用方法について説明します。
データセットを準備し、pyodps_iris という名前のテーブルを作成します。詳細については、「DataFrame データ処理」をご参照ください。
DataFrame オブジェクトを作成します。 詳細については、「MaxCompute テーブルから DataFrame オブジェクトを作成する」をご参照ください。
PyODPS ノードのコードエディターに次のコードを入力して、コードを実行します。
from odps.df import DataFrame # MaxCompute テーブルを使用して DataFrame オブジェクトを作成します。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))次の情報が返されます。
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
高度なコード編集例
PyODPS ノードを定期的にスケジュールするには、ノードのスケジューリングプロパティを定義する必要があります。 詳細については、「概要」をご参照ください。
スケジューリングパラメータを構成する
ノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] タブをクリックします。 [スケジューリングパラメーター] セクションの [プロパティ] タブで、ノードのスケジューリングパラメーターを設定します。 PyODPS ノードのスケジューリングパラメーターを構成する方法は、SQL ノードのスケジューリングパラメーターを構成する方法とは異なります。 詳細については、「さまざまなタイプのノードのスケジューリングパラメーターを構成する」をご参照ください。
DataWorks の SQL ノードとは異なり、PyODPS ノードのコードでは ${param_name} などの文字列は置き換えられません。 代わりに、ノードコードが実行される前に、args という名前の辞書がグローバル変数として PyODPS ノードに追加されます。 この辞書からスケジューリングパラメーターを取得できます。 これにより、Python コードは影響を受けません。 たとえば、[プロパティ] タブの [スケジューリングパラメーター] セクションで ds=${yyyymmdd} を設定した場合、次のコマンドを実行してパラメーター値を取得できます:
print('ds=' + args['ds'])
ds=20161116ds という名前のパーティションを取得するには、次のコマンドを実行できます。
o.get_table('table_name').get_partition('ds=' + args['ds'])他のシナリオで PyODPS タスクを開発する方法の詳細については、次のトピックを参照してください。
次のステップ
カスタムシェルスクリプトが正常に実行されたかどうかを確認する:カスタム Python スクリプトが正常に実行されたかどうかを確認するロジックは、カスタムシェルスクリプトが正常に実行されたかどうかを確認するロジックと同じです。 この方法を使用して、カスタム Python スクリプトが正常に実行されたかどうかを確認できます。
PyODPS 3 ノードをデプロイする:標準モードのワークスペースを使用する場合は、PyODPS 3 ノードを定期的にスケジュールする前に、PyODPS 3 ノードを本番環境にデプロイする必要があります。
PyODPS 3 ノードで O&M を実行する:PyODPS 3 ノードが本番環境のオペレーションセンターにデプロイされた後、オペレーションセンターでノードの O&M 操作を実行できます。
PyODPS に関するよくある質問を確認する:PyODPS に関するよくある質問を確認できます。 このようにして、例外が発生した場合に、効率的な方法で問題を特定してトラブルシューティングできます。