このトピックでは、PyODPS ノードでサードパーティ製パッケージを参照する方法について説明します。 PyODPS 用のサードパーティ製パッケージを生成する方法の詳細については、「PyODPS 用のサードパーティ製パッケージを生成する」をご参照ください。
前提条件
サードパーティ製パッケージをアップロードする
サードパーティ製パッケージを参照する前に、パッケージがアーカイブ リソースとして MaxCompute にアップロードされていることを確認する必要があります。以下のいずれかの方法でサードパーティ製パッケージをアップロードできます。
コードを使用してサードパーティ製パッケージをアップロードします。次のサンプル コードでは、
packages.tar.gzをアップロードするパッケージのパスと名前に置き換えます。import os from odps import ODPS # 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET を、Alibaba Cloud アカウントの AccessKey ID と AccessKey シークレットに設定します。 # AccessKey ID または AccessKey シークレットを直接使用しないことをお勧めします。 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='<your-default-project>', endpoint='<your-end-point>', ) o.create_resource("test_packed.tar.gz", "archive", fileobj=open("packages.tar.gz", "rb"))DataWorks を使用してサードパーティ製パッケージをアップロードします。詳細については、「手順 1: リソースを作成するか、既存のリソースをアップロードする」をご参照ください。
Python UDF でサードパーティ製パッケージを参照する
Python ユーザー定義関数 (UDF) でサードパーティ製パッケージを参照する前に、Python UDF を変更する必要があります。手順:
UDF クラスの
__init__メソッドにサードパーティ製パッケージへの参照を追加します。UDF コード (evaluate 関数や process メソッドなど) を使用して、サードパーティ製パッケージを参照します。
例
この例では、SciPy の psi という名前の Python UDF を使用して、サードパーティ製パッケージが参照されます。
次のコマンドを実行して、SciPy をパッケージ化します。
pyodps-pack -o scipy-bundle.tar.gz scipy次のコードを記述し、
test_psi_udf.pyという名前のファイルとして保存します。import sys from odps.udf import annotate @annotate("double->double") class MyPsi(object): def __init__(self): # 参照パスにパスを追加します。 sys.path.insert(0, "work/scipy-bundle.tar.gz/packages") def evaluate(self, arg0): # IMPORT 文を evaluate 関数内に配置します。 from scipy.special import psi return float(psi(arg0))コードの説明:
__init__関数は、work/scipy-bundle.tar.gz/packagesをsys.pathに追加します。これは、MaxCompute が UDF によって参照されるすべてのアーカイブ リソースをworkディレクトリ内のフォルダに解凍するためです。フォルダ名はリソース名と同じです。packagesディレクトリは、pyodps-packを使用して生成されるパッケージのサブディレクトリです。SciPy の IMPORT 文は、evaluate 関数本体内に配置されます。これは、サードパーティ製パッケージが実行時にのみ使用可能であるためです。UDF が MaxCompute サーバーで解析されるとき、解析環境にはサードパーティ製パッケージが含まれていません。IMPORT 文を関数本体の外に配置してサードパーティ製パッケージをインポートすると、エラーが報告されます。test_psi_udf.pyを MaxCompute Python リソースとしてアップロードし、scipy-bundle.tar.gzをアーカイブ リソースとしてアップロードします。test_psi_udfという名前の UDF を作成し、アップロードされた 2 つのリソース ファイルを参照し、クラス名をtest_psi_udf.MyPsiとして指定します。手順 3 と手順 4 は、PyODPS ノードまたは MaxCompute クライアントで実行できます。
PyODPS ノードの場合:
import os from odps import ODPS # 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET を、Alibaba Cloud アカウントの AccessKey ID と AccessKey シークレットに設定します。 # AccessKey ID または AccessKey シークレットを直接使用しないことをお勧めします。 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='<your-default-project>', endpoint='<your-end-point>', ) bundle_res = o.create_resource( "scipy-bundle.tar.gz", "archive", fileobj=open("scipy-bundle.tar.gz", "rb") ) udf_res = o.create_resource( "test_psi_udf.py", "py", fileobj=open("test_psi_udf.py", "rb") ) o.create_function( "test_psi_udf", class_type="test_psi_udf.MyPsi", resources=[bundle_res, udf_res] )MaxCompute クライアントの場合:
add archive scipy-bundle.tar.gz; add py test_psi_udf.py; create function test_psi_udf as test_psi_udf.MyPsi using test_psi_udf.py,scipy-bundle.tar.gz;
上記の操作が完了したら、UDF を使用して SQL 文を実行します。
set odps.pypy.enabled=false; set odps.isolation.session.enable=true; select test_psi_udf(sepal_length) from iris;
PyODPS DataFrame でサードパーティ製パッケージを参照する
execute メソッドまたは persist メソッドで libraries パラメーターを指定することで、PyODPS DataFrame でサードパーティ製ライブラリを参照できます。 このセクションでは、map メソッドを使用する場合に、PyODPS DataFrame でサードパーティ製パッケージを参照する方法について説明します。 apply メソッドまたは map_reduce メソッドを使用する場合も手順は同様です。
次のコマンドを実行して、SciPy をパッケージ化します。
pyodps-pack -o scipy-bundle.tar.gz scipytest_float_colという名前のテーブルが使用可能です。このテーブルには、FLOAT 型の列が 1 つだけ含まれています。col1 0 3.75 1 2.51psi(col1)の値を計算するには、次のコードを実行します。import os from odps import ODPS, options def my_psi(v): from scipy.special import psi return float(psi(v)) # プロジェクトで隔離機能が有効になっている場合は、次のオプションを構成する必要はありません。 options.sql.settings = {"odps.isolation.session.enable": True} # 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET を、Alibaba Cloud アカウントの AccessKey ID と AccessKey シークレットに設定します。 # AccessKey ID または AccessKey シークレットを直接使用しないことをお勧めします。 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='<your-default-project>', endpoint='<your-end-point>', ) df = o.get_table("test_float_col").to_df() # 次のコードを実行して、結果を取得します。 df.col1.map(my_psi).execute(libraries=["scipy-bundle.tar.gz"]) # 結果を別のテーブルに保存します。 df.col1.map(my_psi).persist("result_table", libraries=["scipy-bundle.tar.gz"])オプション。実行時に同じサードパーティ製パッケージを使用する場合は、グローバル パラメーターを構成できます。
from odps import options options.df.libraries = ["scipy-bundle.tar.gz"]
上記の操作を実行した後、PyODPS DataFrame でサードパーティ製パッケージを参照します。
DataWorks でサードパーティ製パッケージを参照する
DataWorks PyODPS ノードは、組み込みのサードパーティ製パッケージを提供し、他のパッケージを参照するための load_resource_package メソッドも提供します。詳細については、「サードパーティ製パッケージを使用する」をご参照ください。
サードパーティ製パッケージを手動でアップロードして参照する
既存のプロジェクトの中には、すべての WHL 依存関係を手動でアップロードしてコードで参照したり、バイナリ パッケージをサポートしていない初期バージョンの MaxCompute を使用したりするものがあります。このセクションでは、これらの既存のプロジェクトでサードパーティ製パッケージを手動でアップロードして参照する方法について説明します。この例では、map メソッドで python_dateutil が使用されています。
このセクションの手順に従って、既存のプロジェクトまたは環境を維持できます。新しく作成されたプロジェクトの場合は、pyodps-pack を使用することをお勧めします。
Linux Bash で
pip downloadコマンドを実行して、サードパーティ製パッケージとその依存関係をディレクトリにダウンロードします。2 つのパッケージがダウンロードされます:six-1.10.0-py2.py3-none-any.whlとpython_dateutil-2.5.3-py2.py3-none-any.whl。pip download python-dateutil -d /to/path/説明ダウンロードしたパッケージは Linux オペレーティングシステムをサポートしている必要があります。このコマンドは Linux オペレーティングシステムで実行することをお勧めします。
ダウンロードしたパッケージを MaxCompute にアップロードします。
方法 1: コードを使用する。
# ファイル名拡張子が有効であることを確認する必要があります。 odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb')) odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))方法 2: DataWorks を使用する。
手順 1: リソースを作成するか、既存のリソースをアップロードする の手順に従って、宛先リソースをアップロードして送信できます。
サードパーティ製パッケージを参照します。
この例では、DataFrame オブジェクトには STRING 型のフィールドが 1 つだけ含まれています。フィールドには次の内容が含まれています。
datestr 0 2016-08-26 14:03:29 1 2015-08-26 14:03:29グローバル構成で次のサードパーティ製ライブラリを使用します。
from odps import options def get_year(t): from dateutil.parser import parse return parse(t).strftime('%Y') options.df.libraries = ['six.whl', 'python_dateutil.whl'] df.datestr.map(get_year).execute()datestr 0 2016 1 2015即時呼び出しメソッドの
librariesパラメーターを使用して、ライブラリを指定します。def get_year(t): from dateutil.parser import parse return parse(t).strftime('%Y') df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])datestr 0 2016 1 2015
デフォルトでは、PyODPS は Python コードのみを含み、ファイル操作を伴わない Python ライブラリをサポートしています。後のバージョンの MaxCompute では、PyODPS はバイナリ コードを含んでいるか、ファイル操作を伴う Python ライブラリもサポートしています。ライブラリ名には接尾辞が必要です。次の表に、さまざまな Python ライブラリでサポートされている接尾辞を示します。
プラットフォーム | Python バージョン | サポートされている接尾辞 |
RHEL 5 x86_64 | Python 2.7 | cp27-cp27m-manylinux1_x86_64 |
RHEL 5 x86_64 | Python 3.7 | cp37-cp37m-manylinux1_x86_64 |
RHEL 7 x86_64 | Python 2.7 | cp27-cp27m-manylinux1_x86_64, cp27-cp27m-manylinux2010_x86_64, cp27-cp27m-manylinux2014_x86_64 |
RHEL 7 x86_64 | Python 3.7 | cp37-cp37m-manylinux1_x86_64, cp37-cp37m-manylinux2010_x86_64, cp37-cp37m-manylinux2014_x86_64 |
RHEL 7 Arm64 | Python 3.7 | cp37-cp37m-manylinux2014_aarch64 |
すべての WHL パッケージは、アーカイブ リソースとして MaxCompute にアップロードする必要があります。パッケージをアップロードする前に、ファイル名拡張子を変更してパッケージを ZIP ファイルに変換する必要があります。また、ジョブまたはプロジェクトの odps.isolation.session.enable パラメーターを True に設定する必要があります。次の例は、SciPy で特殊関数をアップロードして使用する方法を示しています。
# バイナリ コードを含むパッケージは、アーカイブ リソースとしてアップロードする必要があります。パッケージをアップロードする前に、WHL パッケージを ZIP ファイルに変換する必要があります。
odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
# プロジェクトで隔離機能が有効になっている場合は、次のオプションを構成する必要はありません。
options.sql.settings = { 'odps.isolation.session.enable': True }
def my_psi(value):
# サードパーティ製ライブラリをインポートするには、IMPORT 文を関数内に配置することをお勧めします。これにより、異なるオペレーティングシステムでのバイナリ パッケージの構造の違いによって発生するランタイム エラーを防ぐことができます。
from scipy.special import psi
return float(psi(value))
df.float_col.map(my_psi).execute(libraries=['scipy.zip'])ソース コードのみを含むバイナリ パッケージを、Linux で次のシェル コマンドを実行して WHL ファイルにパッケージ化し、アップロードできます。macOS または Windows で生成された WHL ファイルは、MaxCompute では使用できません。
python setup.py bdist_wheel