DataWorks は PyODPS 3 ノードを提供しており、Python で直接 MaxCompute のジョブを作成し、定期的なスケジューリングを設定できます。
概要
PyODPS は MaxCompute 向けの Python ソフトウェア開発キット (SDK) です。シンプルなプログラミングインターフェイスにより、ジョブの作成、テーブルやビューのクエリ、MaxCompute リソースの管理が可能です。詳細については、「PyODPS」をご参照ください。DataWorks では、PyODPS ノードを使用して Python タスクをスケジューリング・実行し、他のジョブと統合できます。
注意事項
DataWorks リソースグループで PyODPS ノードを実行する際、コードでサードパーティパッケージが必要な場合は、サーバーレスリソースグループにインストールできます。詳細については、「カスタムイメージ」をご参照ください。
説明この方法は、サードパーティパッケージを参照するユーザー定義関数 (UDF) をサポートしていません。正しい設定方法については、「UDF の例:Python UDF でサードパーティパッケージを使用する」をご参照ください。
PyODPS のバージョンをアップグレードするには、「カスタムイメージ」のコマンドを実行します。
0.12.1をアップグレードしたいバージョンに置き換えることができます。サーバーレスリソースグループでは、サードパーティパッケージのインストールの指示に従ってコマンドを実行します。専用スケジューリングリソースグループでは、「O&M Assistant」の指示に従います。PyODPS タスクが VPC ネットワークや IDC 内のデータソースやサービスなど、特定のネットワーク環境にアクセスする必要がある場合は、サーバーレスリソースグループを使用します。サーバーレスリソースグループとターゲット環境間のネットワーク接続を設定してください。詳細については、「ネットワーク接続ソリューション」をご参照ください。
PyODPS の構文に関する詳細については、「PyODPS ドキュメント」をご参照ください。
PyODPS ノードには、それぞれ Python 2 と Python 3 を使用する PyODPS 2 と PyODPS 3 の 2 種類があります。使用する Python のバージョンに合ったノードタイプを作成してください。
PyODPS ノードで実行された SQL 文がデータマップで正しいデータリネージを生成しない場合、タスクコード内で DataWorks スケジューリングの実行時パラメーターを手動で設定できます。データリネージの表示については、「データリネージの表示」をご参照ください。パラメーターの設定については、「実行時パラメーター (ヒント) の設定」をご参照ください。次のサンプルコードを使用して、必要な実行時パラメーターを取得できます。
import os ... # DataWorks スケジューラの実行時パラメーターを取得 skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # タスク送信時にヒントを設定 o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...PyODPS ノードの最大ログ出力は 4 MB です。大量のデータ結果を直接ログに出力することは避けてください。代わりに、アラートログや進捗更新など、価値のある情報の出力に集中してください。
制限事項
専用スケジューリングリソースグループを使用して PyODPS ノードを実行する場合、ノード内でローカルに処理されるデータ量は 50 MB を超えないようにしてください。この制限は、専用スケジューリングリソースグループの仕様によって決まります。過剰なローカルデータを処理すると、オペレーティングシステムのしきい値を超え、`Got Killed` メッセージで示される Out of Memory (OOM) エラーが発生する可能性があります。PyODPS ノードにデータ集約的な処理ロジックを記述することは避けてください。詳細については、「PyODPS を効率的に使用するためのベストプラクティス」をご参照ください。
サーバーレスリソースグループを使用して PyODPS ノードを実行する場合、処理する必要のあるデータ量に基づいてノードの CU を設定できます。
説明1 つのタスクには最大
64CU を設定できますが、16CU を超える使用は推奨されません。リソース不足を引き起こし、タスクの起動に影響を与える可能性があります。Got Killed エラーは、Out of Memory (OOM) 状態によりプロセスが終了したことを示します。したがって、ローカルでのデータ操作は最小限に抑えるようにしてください。PyODPS を介して開始される SQL および DataFrame タスクは、`to_pandas` を除き、この制限の対象外です。
ユーザー定義関数 (UDF) の一部ではないコードでは、プリインストール済みの Numpy および Pandas パッケージを使用できます。バイナリコードを含む他のサードパーティパッケージはサポートされていません。
互換性の理由から、DataWorks では
options.tunnel.use_instance_tunnelはデフォルトでFalseに設定されています。インスタンストンネルをグローバルに有効にするには、この値を手動でTrueに設定する必要があります。Python 3 のマイナーバージョン間、例えば Python 3.8 と Python 3.7 では、バイトコードの定義が異なります。
MaxCompute は現在 Python 3.7 を使用しています。Python 3.8 の `finally` ブロックなど、他の Python 3 バージョンの構文を使用すると、実行エラーが発生します。Python 3.7 の使用を推奨します。
PyODPS 3 はサーバーレスリソースグループで実行できます。このリソースを購入して使用するには、「サーバーレスリソースグループの使用」をご参照ください。
1 つの PyODPS ノードは、複数の Python タスクの同時実行をサポートしていません。
PyODPS ノードからログを出力するには、
print関数を使用します。現在、logger.info関数はサポートされていません。
事前準備
MaxCompute 計算リソースを DataWorks ワークスペースにバインドします。
操作手順
PyODPS 3 ノードエディターで、次の開発ステップを実行します。
PyODPS 3 コード例
PyODPS ノードを作成した後、コードを編集して実行できます。PyODPS の構文に関する詳細については、「概要」をご参照ください。このセクションでは、5 つのコード例を紹介します。ビジネスニーズに最も適した例を選択してください。
ODPS エントリポイント
DataWorks は、ODPS エントリポイントとしてグローバル変数
odps(またはo) を提供するため、手動で定義する必要はありません。print(odps.exist_table('PyODPS_iris'))SQL 文の実行
PyODPS ノードで SQL 文を実行できます。詳細については、「SQL」をご参照ください。
DataWorks では、
インスタンストンネルはデフォルトで無効になっています。これは、instance.open_readerが最大 10,000 レコードを返す Result インターフェイスを使用することを意味します。reader.countを使用してレコード数を取得できます。すべてのデータを反復処理するには、limitを無効にする必要があります。次の文を使用して、インスタンストンネルをグローバルに有効にし、limitを無効にすることができます。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # すべてのデータを読み取るために制限を無効にします。 with instance.open_reader() as reader: # Instance Tunnel を介してすべてのデータを読み取ることができます。または、
open_readerの呼び出しにtunnel=Trueを追加して、その特定の操作に対してのみインスタンストンネルを有効にすることもできます。また、limit=Falseを追加して、その操作のlimitを無効にすることもできます。# この open_reader 操作で Instance Tunnel インターフェイスを使用してすべてのデータを読み取ります。 with instance.open_reader(tunnel=True, limit=False) as reader:
実行時パラメーターの設定
dict型のhintsパラメーターを使用して、実行時パラメーターを設定できます。ヒントの詳細については、「SET 操作」をご参照ください。o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})sql.settingsをグローバルに設定すると、指定された実行時パラメーターがすべての実行に追加されます。from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # グローバル設定に基づいてヒントを追加します。
実行結果の読み取り
SQL 文を実行しているインスタンスは、次の 2 つのシナリオで直接
open_reader操作を実行できます:SQL 文は、構造化データを返します。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 各レコードを処理します。SQL 文が `desc` のようなコマンドである場合。
reader.raw属性を使用して、生の SQL 実行結果を取得できます。with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)説明エディターから直接 PyODPS 3 ノードを実行する場合、ノードはプレースホルダーを自動的に置き換えないため、カスタムスケジューリングパラメーターの値をハードコーディングする必要があります。
DataFrame
DataFrame (非推奨) を使用してデータを処理することもできます。
実行
DataWorks では、DataFrame を実行するために、即時実行メソッドを明示的に呼び出す必要があります。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 即時実行メソッドを呼び出して各 Record を処理します。`print` 文で即時実行をトリガーする必要がある場合は、
options.interactiveを有効にする必要があります。from odps import options from odps.df import DataFrame options.interactive = True # 最初にオプションを有効にします。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # print() によって即時実行がトリガーされます。詳細情報の出力
options.verboseオプションを設定して、詳細情報を出力できます。DataWorks では、このオプションはデフォルトで有効になっており、実行中に Logview URL などの詳細が出力されます。
PyODPS 3 コードの開発
次の例は、PyODPS ノードの使用方法を示しています:
データセットを準備します。pyodps_iris サンプルテーブルを作成します。手順については、「DataFrame を使用したデータ処理」をご参照ください。
DataFrame を作成します。詳細については、「MaxCompute テーブルから DataFrame オブジェクトを作成する」をご参照ください。
次のコードを PyODPS ノードに入力して実行します。
from odps.df import DataFrame # ODPS テーブルから DataFrame を作成します。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
PyODPS タスクの実行
Run Configuration セクションで、[計算リソース]、[計算クォータ]、および [DataWorks リソースグループ] を設定します。
説明パブリックネットワークまたは VPC ネットワーク内のデータソースにアクセスするには、データソースとの接続テストに合格した専用スケジューリングリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。
タスクの要件に基づいて [イメージ] を設定できます。
ツールバーのパラメーターダイアログボックスで、作成した MaxCompute データソースを選択し、[実行] をクリックします。
ノードタスクをスケジュールで実行するには、ビジネス要件に基づいてスケジューリングプロパティを設定します。詳細については、「ノードのスケジューリング設定」をご参照ください。
DataWorks の SQL ノードとは異なり、PyODPS ノードは意図しない文字列置換を避けるため、コード内の `${param_name}` のような文字列を置き換えません。代わりに、コードが実行される前に、
argsという名前の `dict` がグローバル変数に追加されます。この `dict` からスケジューリングパラメーターを取得できます。たとえば、[パラメーター] タブでds=${yyyymmdd}を設定した場合、コード内で次のようにこのパラメーターを取得できます。print('ds=' + args['ds']) ds=20240930説明dsという名前のパーティションを取得するには、次のメソッドを使用できます。o.get_table('table_name').get_partition('ds=' + args['ds'])ノードタスクを設定した後、デプロイする必要があります。詳細については、「ノードとワークフローのデプロイ」をご参照ください。
タスクがデプロイされた後、オペレーションセンターで定期タスクのステータスを表示できます。詳細については、「オペレーションセンター入門」をご参照ください。
関連付けられたロールでのノードの実行
特定の RAM ロールを関連付けてノードタスクを実行できます。これにより、詳細な権限コントロールとセキュリティ強化が可能になります。
次のステップ
PyODPS よくある質問:一般的な PyODPS の実行に関する問題について学び、例外のトラブルシューティングと解決を迅速に行うのに役立ちます。