PyODPS は MaxCompute の Python SDK です。DataWorks では、PyODPS ノードを作成して、MaxCompute に対して Python コードを記述および実行できます。MaxCompute エントリポイントは事前に構成されているため、認証設定は不要です。
このトピックでは、PyODPS ノードの DataWorks 固有の動作 (環境の制限、主要な機能、コード例) について説明します。
前提条件
開始する前に、以下があることを確認してください。
MaxCompute が有効になっている DataWorks ワークスペース
DataStudio で作成された PyODPS ノード (PyODPS 2 (Python 2) または PyODPS 3 (Python 3))
ノードの作成手順については、「PyODPS 2 タスクの開発」および「PyODPS 3 タスクの開発」をご参照ください。

制限事項
メモリ
メモリ使用量がノードの制限を超えると、ノードは Got killed と報告して終了します。これを回避するには、データを DataWorks 環境にダウンロードしてローカルで処理するのではなく、データ処理タスクを MaxCompute にプッシュして分散実行します。2 つのアプローチの比較については、「概要」の「注意事項」セクションをご参照ください。
データレコード
デフォルトでは、DataWorks の PyODPS ノードで options.tunnel.use_instance_tunnel は False に設定されています。これは、instance.open_reader が Result インターフェイスを使用することを意味します。このインターフェイスは最大 10,000 件のデータレコードを返し、複雑なデータ型のサポートには制限があります。すべてのレコードを読み取るか、Array などの複雑な型を処理するには、InstanceTunnel を有効にしてください。詳細については、「SQL 実行結果の読み取り」をご参照ください。
パッケージ
ほとんどの一般的なデータサイエンスパッケージはプリインストール済みです (「プリインストール済みのパッケージ」をご参照ください)。以下の制限が適用されます。
atexit はサポートされていません。代わりに
try-finallyを使用してください。matplotlib は利用できないため、DataFrame の
plot関数に影響します。DataFrame UDF は Python サンドボックスで実行され、純粋な Python ライブラリと NumPy のみを使用できます。pandas などの他のサードパーティライブラリは UDF 内ではサポートされていません。UDF 以外の操作では、NumPy と pandas を利用できます。
バイナリコードを含むサードパーティパッケージはサポートされていません。
MaxCompute エントリポイント
各 PyODPS ノードは、グローバル変数として odps と o を公開します。どちらも MaxCompute エントリポイントを参照します。DataWorks はこれらを自動的に構成するため、コード内で認証設定を行う必要はありません。
# pyodps_iris テーブルが存在するかどうかを確認します。
print(o.exist_table('pyodps_iris'))エントリオブジェクトoは MaxCompute にのみアクセスできます。他の Alibaba Cloud サービスにはアクセスできず、o.from_globalなどのメソッドを使用して追加の認証情報を取得することもできません。
SQL ステートメントの実行
execute_sql() または run_sql() を使用して、DDL (データ定義言語) および DML (データ操作言語) ステートメントを実行します。
o.execute_sql('select * from pyodps_iris')DDL/DML 以外のステートメントについては、適切なメソッドを使用してください。
run_security_query:GRANT および REVOKE ステートメント用run_xflowまたはexecute_xflow:API オペレーション用
完全な SQL ドキュメントについては、「SQL」をご参照ください。
SQL 実行結果の読み取り
DataWorks ではデフォルトで InstanceTunnel が有効になっていないため、instance.open_reader は 10,000 レコードに制限されており、複雑なデータ型をサポートしていない場合があります。ご利用のプロジェクトでデータ保護が有効になっておらず、すべてのレコードを読み取るか、Array などの複雑な型を使用する必要がある場合は、InstanceTunnel を有効にしてください。
オプション 1:グローバルに有効化
ノード内の後続のすべての open_reader 呼び出しに適用されます。
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # 10,000 レコードの制限を解除します。
with instance.open_reader() as reader:
# InstanceTunnel がアクティブです。reader.count を使用してレコードの総数を取得します。
passオプション 2:リーダーごとに有効化
現在の open_reader 呼び出しにのみ適用されます。
with instance.open_reader(tunnel=True, limit=False) as reader:
# このリーダーに対して InstanceTunnel がアクティブです。すべてのレコードにアクセスできます。
pass詳細については、「SQL ステートメントの実行結果の取得」をご参照ください。
DataFrame
DataWorks で DataFrame 操作を実行するには、execute や persist などの即時実行メソッドを明示的に呼び出します。これがないと、操作はトリガーされません。
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
# execute() を使用して操作をトリガーし、結果を反復処理します。
for record in iris[iris.sepalwidth < 3].execute():
print(record)デフォルトでは、DataWorks で options.verbose が有効になっているため、Logview URL などの実行詳細が自動的に出力されます。
詳細については、「DataFrame (非推奨)」をご参照ください。
スケジューリングパラメーター
DataWorks は、SQL ノードとは異なる方法で PyODPS ノードにスケジューリングパラメーターを挿入します。
SQL ノード:
${param_name}は SQL 文字列に直接置換されます。PyODPS ノード:コードが実行される前に、グローバル辞書
argsが設定されます。パラメーター値は${param_name}ではなく、args['param_name']を使用して読み取ります。
この設計により、Python コードでの意図しない文字列置換を回避できます。
例:PyODPS ノードの スケジューリング設定 タブで、基本プロパティ の下の パラメーター フィールドに ds=${yyyymmdd} を設定します。次に、コードで値を読み取ります。
# ds スケジューリングパラメーターの値を表示します。例:ds=20161116
print('ds=' + args['ds'])ds が指すパーティションからデータをクエリするには、次のようにします。
o.get_table('table_name').get_partition('ds=' + args['ds'])詳細については、「スケジューリングパラメーターの設定と使用」をご参照ください。
ランタイムヒント
hints パラメーターを使用して、ランタイム設定を execute_sql に渡します。値は dict である必要があります。
o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})ノード内のすべての SQL 実行に同じ設定を適用するには、options.sql.settings をグローバルに設定します。
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # ヒントは自動的に適用されます。サードパーティパッケージ
プリインストール済みのパッケージ
DataWorks ノードには、次のパッケージがプリインストールされています。
| パッケージ | Python 2 ノード | Python 3 ノード |
|---|---|---|
| requests | 2.11.1 | 2.26.0 |
| numpy | 1.16.6 | 1.18.1 |
| pandas | 0.24.2 | 1.0.5 |
| scipy | 0.19.0 | 1.3.0 |
| scikit_learn | 0.18.1 | 0.22.1 |
| pyarrow | 0.16.0 | 2.0.0 |
| lz4 | 2.1.4 | 3.1.10 |
| zstandard | 0.14.1 | 0.17.0 |
カスタムパッケージのインストール
必要なパッケージがプリインストールされていない場合は、pyodps-pack を使用してバンドルし、load_resource_package を使用してノードにロードします。
パッケージをバンドルします。次の例では、
ipaddressパッケージをバンドルします。pyodps-pack -o ipaddress-bundle.tar.gz ipaddressPython 2 ノードの場合は、
--dwpy27を追加します。pyodps-pack --dwpy27 -o ipaddress-bundle.tar.gz ipaddressバンドルサイズを小さくするには、DataWorks にすでにプリインストールされているパッケージを除外します。
pyodps-pack -o bundle.tar.gz --exclude numpy --exclude pandas <your-package>ダウンロードされたパッケージの合計サイズは 100 MB を超えることはできません。
.tar.gzファイルを MaxCompute リソースとしてアップロードして送信します。PyODPS ノードで、パッケージをロードしてインポートします。
load_resource_package("ipaddress-bundle.tar.gz") import ipaddress
詳細については、「PyODPS 用のサードパーティパッケージの生成」および「PyODPS ノードでのサードパーティパッケージの参照」をご参照ください。
別のアカウントでの MaxCompute へのアクセス
デフォルトでは、エントリオブジェクト o は、現在のワークスペースに対して DataWorks によって提供される認証情報を使用します。別の Alibaba Cloud アカウントを使用して MaxCompute プロジェクトにアクセスするには、as_account メソッドを使用して別のエントリオブジェクトを作成します。
as_account には PyODPS 0.11.3 以降が必要です。
手順
新しいアカウントにプロジェクトに対する必要な権限を付与します。「付録:別のアカウントへの権限付与」をご参照ください。
PyODPS ノードで、新しいアカウント用のエントリオブジェクトを作成します。
import os # 認証情報をハードコーディングするのではなく、環境変数に保存します。 new_odps = o.as_account( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET') )現在のユーザーを確認して、アカウントの切り替えが成功したことを確認します。
print(new_odps.get_project().current_user)出力が新しいアカウントの AccessKey ID と一致する場合、切り替えは成功です。
例
この例では、テーブルを作成し、別のアカウントを使用してクエリを実行し、結果を出力します。
pyodps_irisテーブルを作成し、サンプルデータをインポートします。手順については、「テーブルの作成とデータのアップロード」をご参照ください。CREATE TABLE IF NOT EXISTS pyodps_iris ( sepallength DOUBLE COMMENT 'sepal length (cm)', sepalwidth DOUBLE COMMENT 'sepal width (cm)', petallength DOUBLE COMMENT 'petal length (cm)', petalwidth DOUBLE COMMENT 'petal width (cm)', name STRING COMMENT 'type' );新しいアカウントにプロジェクトとテーブルに対する権限を付与します。「付録:別のアカウントへの権限付与」をご参照ください。
PyODPS 3 ノードを作成し、次のコードを実行します。手順については、「PyODPS 3 タスクの開発」をご参照ください。
from odps import ODPS import os # 認証情報をハードコーディングするのではなく、環境変数に保存します。 os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'] = '<your-access-key-id>' os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'] = '<your-access-key-secret>' od = o.as_account( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET') ) # sepallength > 5 の行をクエリします。 with od.execute_sql('SELECT * FROM pyodps_iris WHERE sepallength > 5').open_reader() as reader: print(reader.raw) for record in reader: print(record["sepallength"], record["sepalwidth"], record["petallength"], record["petalwidth"], record["name"]) # 現在のユーザーを確認します。 print(od.get_project().current_user)ノードを実行します。出力は次のようになります。
Executing user script with PyODPS 0.11.4.post0 "sepallength","sepalwidth","petallength","petalwidth","name" 5.4,3.9,1.7,0.4,"Iris-setosa" ... <User 139xxxxxxxxxxxxx>
診断
ノードの実行がハングして出力がない場合は、コードの先頭に次のコメントを追加してください。DataWorks は 30 秒ごとにすべてのスレッドのスタックトレースを出力します。
# -*- dump_traceback: true -*-この機能には、バージョン 0.11.4.1 以降を実行している PyODPS 3 ノードが必要です。
PyODPS バージョンの確認
PyODPS ノードで次のコードを実行して、インストールされているバージョンを出力します。
import odps
print(odps.__version__)
# 出力例:0.11.2.3バージョンはノードのランタイムログにも表示されます。

付録:別のアカウントへの権限付与
別の Alibaba Cloud アカウントが現在のワークスペースのプロジェクトとテーブルにアクセスできるようにするには、ODPS SQL ノードを作成し、次のコマンドを実行します。ノードの作成手順については、「ODPS SQL ノードの作成」をご参照ください。権限の詳細については、「ユーザーと権限」をご参照ください。
-- プロジェクトにアカウントを追加します。
ADD USER ALIYUN$<account_name>;
-- プロジェクトに CreateInstance 権限を付与します。
GRANT CreateInstance ON PROJECT <project_name> TO USER ALIYUN$<account_name>;
-- テーブルに Describe および Select 権限を付与します。
GRANT Describe, Select ON TABLE <table_name> TO USER ALIYUN$<account_name>;
-- 権限を確認します。
SHOW GRANTS FOR ALIYUN$<account_name>;
付録:サンプルデータ
このトピックの例では、pyodps_iris テーブルを使用します。テーブルを作成して Iris データセットをインポートするには、「PyODPS ノードを使用して特定の基準に基づいてデータをクエリする」のステップ 1 に従ってください。
次のステップ
基本操作の概要 — 完全な PyODPS API リファレンス
DataFrame の概要 — DataFrame 操作の詳細
PyODPS ノードを使用して Jieba に基づいて中国語テキストをセグメント化する — エンドツーエンドの例
操作履歴の表示 — ランタイムログの表示と実行中のタスクの停止