MaxComputeは、ユーザー定義関数 (UDF) と、PyODPSやMaxFrameなどのPython開発機能を提供します。 このトピックでは、MaxCompute UDF、PyODPS、およびMaxFrame開発ジョブ内でイメージを使用する方法について説明します。
SQL UDF開発でのイメージの使用
次の例では、列を合計するUDFでPandasを使用します。
Python UDFスクリプトを記述し、
sum_pandas.py
として保存します。 次のサンプルスクリプトを以下に示します。from odps.udf import annotate import pandas as pd @annotate("string, string -> string") class SumColumns(object): def evaluate(self, arg1, arg2): # Convert input parameters to pandas DataFrame df = pd.DataFrame({'col1': arg1.split(','), 'col2': arg2.split(',')}) # Perform data processing operations by using pandas # Calculate the sum of two columns as an example df['sum'] = df['col1'].astype(int) + df['col2'].astype(int) # Convert the processing result to a string and return result = ','.join(df['sum'].astype(str).values) return result
sum_pandas.py
スクリプトをリソースとしてMaxComputeプロジェクトにアップロードします。 詳細については、「リソースの追加」をご参照ください。 次のコマンドを使用します。ADD PY sum_pandas.py -f;
sum_pandas.py
スクリプトをSumColumns UDFとして登録します。 詳細については、「UDFの作成」をご参照ください。 次のコマンドを使用します。CREATE FUNCTION SumColumns AS 'sum_pandas.SumColumns' USING 'sum_pandas.py';
対応するテストデータでテストテーブル
testsum
を設定します。CREATE TABLE testsum (col1 string, col2 string); INSERT INTO testsum VALUES ('1,2,3','1,2,3'),('1,2,3','3,2,1'),('1,2,3','4,5,6');
UDFを呼び出すときにFlagパラメーターを使用してイメージを指定します。
set odps.sql.python.version=cp37; set odps.session.image = ; SELECT SumColumns(col1,col2) AS result FROM testsum;
期待される結果:
+------------+ | result | +------------+ | 2,4,6 | | 4,4,4 | | 5,7,9 | +------------+
PyODPS開発でのイメージの使用
次の例では、scipyパッケージからpsi関数を実装します。
テストテーブル
test_float_col
を準備し、テストデータを挿入します。CREATE TABLE test_float_col (col1 double); INSERT INTO test_float_col VALUES (3.75),(2.51);
psi(col1) を計算するPyODPSコードを記述し、
psi_col.py
として保存します。 以下のサンプルコードを示します。import os from odps import ODPS, options def my_psi(v): from scipy.special import psi return float(psi(v)) # If the project enables isolation, the following option is not required options.sql.settings = {"odps.isolation.session.enable": True} o = ODPS( # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID, # and the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey secret. # It is not recommended to directly use the AccessKey ID and AccessKey secret strings. os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) df = o.get_table("test_float_col").to_df() # Execute directly and get the result df.col1.map(my_psi).execute(image='scipy') # Save to another table df.col1.map(my_psi).persist("result_table", image='scipy')
パラメータの説明:
ALIBABA_CLOUD_ACCESS_KEY_ID: この環境変数を、MaxComputeプロジェクトに必要なMaxCompute権限を持つAccessKey IDに設定します。 AccessKey IDは、AccessKey管理ページから取得できます。
ALIBABA_CLOUD_ACCESS_KEY_SECRET: この環境変数を、AccessKey IDに対応するAccessKeyシークレットに設定します。
your-default-project: MaxComputeプロジェクトの名前。 プロジェクト名を表示するには、MaxComputeコンソールにログインし、左側のナビゲーションウィンドウで [ワークスペース] > [プロジェクト] を選択します。
your-end-point: MaxComputeプロジェクトが存在するリージョンのエンドポイントです。 ネットワーク接続方法に基づいてエンドポイントを選択できます。 たとえば、
http://service.cn-chengdu.maxcompute.aliyun.com/api
。 詳細については、「エンドポイント」をご参照ください。
結果を
result_table
テーブルに表示します。SELECT * FROM result_table
期待される結果:
+----------------------+ | col1 | +----------------------+ | 1.1825373886117962 | | 0.7080484451910534 | +----------------------+
MaxFrame開発でのイメージの使用
次の例では、scipyパッケージからpsi関数を実装します。
テストテーブル
test_float_col
を作成し、テストデータを挿入します。CREATE TABLE test_float_col (col1 double); INSERT INTO test_float_col VALUES (3.75),(2.51);
psi(col1) を計算するためのMaxFrameコードを記述し、
psi_col.py
として保存します。 以下のサンプルコードを示します。import os from odps import ODPS, options from maxframe.session import new_session import maxframe.dataframe as md from maxframe.config import options from maxframe import config # Use the built-in scipy image config.options.sql.settings = { "odps.session.image": "scipy" } def my_psi(v): from scipy.special import psi return float(psi(v)) o = ODPS( # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID, # and the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey secret. # It is not recommended to directly use the AccessKey ID and AccessKey secret strings. os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) # Create a MaxFrame session session = new_session(o) df = md.read_odps_table('test_float_col') # Execute and get the result print(df.col1.map(my_psi).execute().fetch()
パラメータの説明:
ALIBABA_CLOUD_ACCESS_KEY_ID: この環境変数を、MaxComputeプロジェクトに必要なMaxCompute権限を持つAccessKey IDに設定します。 AccessKey IDは、AccessKey管理ページから取得できます。
ALIBABA_CLOUD_ACCESS_KEY_SECRET: この環境変数を、AccessKey IDに対応するAccessKeyシークレットに設定します。
your-default-project: MaxComputeプロジェクトの名前。 プロジェクト名を表示するには、MaxComputeコンソールにログインし、左側のナビゲーションウィンドウで [ワークスペース] > [プロジェクト] を選択します。
your-end-point: MaxComputeプロジェクトが存在するリージョンのエンドポイントです。 ネットワーク接続方法に基づいてエンドポイントを選択できます。 たとえば、
http://service.cn-chengdu.maxcompute.aliyun.com/api
。 詳細については、「エンドポイント」をご参照ください。
期待される結果:
0 1.182537 1 0.708048 Name: col1, dtype: float64