このトピックでは、ユーザー定義関数(UDF)とサードパーティ製 Python ライブラリの使用方法について説明します。
UDF の使用
DataFrame では、Sequence オブジェクトに対して map メソッドを使用して、そのすべての要素に対して UDF を呼び出すことができます。
>>> iris.sepallength.map(lambda x: x + 1).head(5)
sepallength
0 6.1
1 5.9
2 5.7
3 5.6
4 6.0UDF を使用する場合、List 型または Dict 型を入力または出力として使用することはできません。
map メソッドの実行後に Sequence の型が変更された場合は、Sequence の新しい型を明示的に指定する必要があります。
>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
sepallength
0 t5.1
1 t4.9
2 t4.7
3 t4.6
4 t5.0関数がクロージャを含む場合、関数外のクロージャ変数の値の変更によって、関数内の変数の値が変更されます。
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(lambda x: x + i))その結果、dfs 内の各 SequenceExpr オブジェクトは df.sepal_length + 9 になります。この問題を解決するには、関数を別の関数の戻り値として使用するか、partial を使用します。次の 2 つの例を参照してください。
>>> dfs = []
>>> def get_mapper(i):
>>> return lambda x: x + i
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(get_mapper(i)))>>> import functools
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))map メソッドは、既存の UDF もサポートしています。関数名を表す str 型のパラメーター、または Function オブジェクトを渡すことができます。詳細については、「関数」をご参照ください。
Python 関数に map メソッドを実装する場合は、MaxCompute Python UDF を使用する必要があります。プロジェクトが Python UDF をサポートしていない場合は、map メソッドを使用できません。さらに、すべての Python UDF の制限が適用されます。
使用可能なサードパーティ製ライブラリ(C 言語のコードを含む)は NumPy のみです。サードパーティ製 Python ライブラリの使用方法の詳細については、「サードパーティ製 Python ライブラリの使用」をご参照ください。
DataFrame は、UDF に加えて、多くのビルトイン関数を備えており、その一部は map 関数を使用して実装されています。したがって、プロジェクトが Python UDF をサポートしていない場合は、これらの関数を使用できません。Alibaba Cloud パブリックサービスは Python UDF をサポートしていないことに注意してください。
バイトコード定義の違いにより、Python 3 でサポートされている新機能(yield from など)を使用すると、Python 2.7 の MaxCompute Worker でコードを実行したときにエラーが発生する可能性があります。したがって、本番コードを作成する前に、Python 3 の MapReduce API を使用してコードが正常に実行されることを確認することをお勧めします。
カウンターの使用例:
from odps.udf import get_execution_context
def h(x):
ctx = get_execution_context()
counters = ctx.get_counters()
counters.get_counter('df', 'add_one').increment(1)
return x + 1
df.field.map(h)LogView の JSONSummary でカウンターの値を確認できます。
1 行に対して UDF を使用する
1 行に対して UDF を使用するには、apply メソッドを使用します。axis パラメーターを 1 に設定して、操作が行に対して実行されることを示す必要があります。apply メソッドの UDF は、先行する Collection オブジェクトからのデータ行であるパラメーターを受け取ります。属性またはオフセットを使用して、フィールド内のデータを取得できます。
reduce パラメーターが True に設定されている場合、Sequence オブジェクトが返されます。それ以外の場合、Collection オブジェクトが返されます。namesパラメーターとtypesパラメーターを使用して、返される Sequence オブジェクトまたは Collection オブジェクトのフィールド名と型を指定できます。デフォルトでは、型を指定しない場合、STRING 型が使用されます。>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3) sepaladd 0 8.6 1 7.9 2 7.9reduceがapplyメソッドの UDF で False の場合、yieldキーワードを使用して複数の行の結果を返すことができます。>>> iris.count() 150 >>> >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count() 300返されるフィールド名と型を関数にアノテーションすることもできます。関数を呼び出すときに指定する必要はありません。
>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1).count() 300map-onlyのmap_reduceを使用することもできます。これは、applyメソッドでaxis=1を指定した場合と同じです。>>> iris.map_reduce(mapper=handle).count() 300MaxCompute で既存のユーザー定義テーブル関数(UDTF)を使用するには、UDTF の名前を指定します。
>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])行に対して
applyメソッドを使用し、reduceが False の場合、既存の行で lateral view を使用して、集計などの目的で使用できます。>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris[iris.category, iris.apply(handle, axis=1)]
すべての列に対してカスタム集計を使用する
apply メソッドを使用する場合、axis が指定されていないか、axis の値が 0 の場合、カスタム集計クラスを渡してすべての Sequence オブジェクトを集計できます。
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]>>> iris.exclude('name').apply(Agg)
sepallength_aggregation sepalwidth_aggregation petallength_aggregation petalwidth_aggregation
0 5.843333 3.054 3.758667 1.198667UDF を使用する場合、LIST 型または DICT 型を入力または出力として使用することはできません。
リソースの参照
UDF は、MaxCompute リソース(テーブルリソースやファイルリソースなど)を読み取ったり、Collection オブジェクトをリソースとして参照したりすることもできます。これを行うには、UDF をクロージャまたは呼び出し可能なクラスとして記述する必要があります。次の 2 つの例を参照してください。
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
sepallength
0 Iris-setosa
1 Iris-versicolor>>> def myfunc(resources): # resources are passed in by calling order
>>> names = set()
>>> fileobj = resources[0] # file resources are represented by a file-like object
>>> for l in fileobj:
>>> names.add(l)
>>> collection = resources[1]
>>> for r in collection:
>>> names.add(r.name) # retrieve the values by using the field name or offset
>>> def h(x):
>>> if x in names:
>>> return True
>>> else:
>>> return False
>>> return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>> df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
name isin
0 Iris-setosa True
1 Iris-versicolor True
2 Iris-virginica Falseパーティションテーブルを読み取るとき、パーティションフィールドは含まれません。
行操作で axis が 1 の場合、関数クロージャまたは呼び出し可能なクラスを記述する必要があります。列の集計操作の場合は、__init__ 関数を使用してリソースを読み取るだけで済みます。
>>> words_df
sentence
0 Hello World
1 Hello Python
2 Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>> stop_words = set([r[0] for r in resources[0]])
>>> def h(row):
>>> return ' '.join(w for w in row[0].split() if w not in stop_words),
>>> return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
sentence
0 Hello World
1 Hello Python
2 Life short use Pythonこの例では、stop_words はローカル変数であり、実行中に MaxCompute でリソースとして参照されます。
サードパーティ製 Python ライブラリの使用
サードパーティ製 Python パッケージ(whl、egg、zip、tar.gz 形式)を MaxCompute にアップロードできます。グローバルメソッドまたは即時呼び出しメソッドを使用する場合は、パッケージファイルを指定する必要があります。すべての依存ライブラリが指定されていることを確認する必要があります。指定されていない場合、ファイルをインポートするときにエラーが発生する可能性があります。
PyODPS リソースアップロードインターフェイス create_resource を呼び出すことによって、リソースをアップロードできます。
python-dateutil パッケージを例に説明します。
pip downloadコマンドを実行して、パッケージとその依存関係を特定のパスにダウンロードできます。2 つのパッケージ、six-1.10.0-py2.py3-none-any.whl と python_dateutil-2.5.3-py2.py3-none-any.whl がダウンロードされます。ダウンロードしたパッケージは Linux 環境をサポートしている必要があることに注意してください。$ pip download python-dateutil -d /to/path/次に、2 つのファイルを MaxCompute にリソースとしてアップロードします。
>>> # ファイル名拡張子が正しいことを確認してください。 >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb')) >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))これで、STRING フィールドのみを含む DataFrame オブジェクトができました。
>>> df datestr 0 2016-08-26 14:03:29 1 2015-08-26 14:03:29グローバル構成で次のサードパーティ製ライブラリを使用します。
>>> from odps import options >>> >>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> options.df.libraries = ['six.whl', 'python_dateutil.whl'] >>> df.datestr.map(get_year) datestr 0 2016 1 2015または、メソッドの
librariesパラメーターを使用して、サードパーティ製ライブラリを指定します。>>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl']) datestr 0 2016 1 2015
デフォルトでは、PyODPS は純粋な Python コードを含み、ファイル操作を含まないサードパーティ製ライブラリをサポートしています。MaxCompute の今後のバージョンでは、PyODPS はバイナリコードまたはファイル操作を含む Python ライブラリもサポートします。これらのライブラリには、cp27-cp27m-manylinux1_x86_64 というサフィックスを付けて、アーカイブとしてアップロードする必要があります。.whl パッケージは .zip ファイルに名前を変更する必要があります。また、odps.isolation.session.enable を True に設定するか、プロジェクトで
isolationを有効にする必要があります。次の例は、scipyの特殊関数をアップロードして使用する方法を示しています。>>> # バイナリコードを含むパッケージは、アーカイブメソッドを使用してアップロードする必要があり、ファイル拡張子 .whl を .zip に置き換える必要があります。 >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb')) >>> >>> # プロジェクトで isolation が有効になっている場合、次のオプションは省略可能です。 >>> options.sql.settings = { 'odps.isolation.session.enable': True } >>> >>> def psi(value): >>> # 異なるオペレーティングシステム間でのバイナリパッケージの構造の違いによって発生するエラーを回避するために、関数内でサードパーティ製ライブラリをインポートすることをお勧めします。 >>> from scipy.special import psi >>> return float(psi(value)) >>> >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])ソースコードのみを含むバイナリパッケージは、Wheel ファイルにパッケージ化して Linux シェルでアップロードできます。Mac および Windows で生成された Wheel ファイルは、MaxCompute では使用できません。
python setup.py bdist_wheelMaxCompute コンソールを使用してリソースをアップロードすることもできます。
ほとんどの Python パッケージは、さまざまなプラットフォーム上のバイナリファイルを含むパッケージを含め、.whl パッケージで提供されます。したがって、最初に MaxCompute で実行できるパッケージを見つける必要があります。
さらに、すべての依存関係を含める必要があります。次の表に、パッケージの依存関係を示します。
パッケージ名
依存関係
pandas
numpy、python-dateutil、pytz、six
scipy
numpy
scikit-learn
numpy、scipy
説明numpy パッケージはすでに提供されています。 pandas、scipy、および scikit-learn パッケージを実行するには、python-dateutil、pytz、pandas、SciPy、sklearn、および six パッケージのみをアップロードする必要があります。
python-dateutil-2.6.0.zip は python-dateutil 内で見つかり、ダウンロードできます。

ダウンロードしたファイルの名前を python-dateutil.zip に変更し、MaxCompute コンソールでリソースとしてアップロードします。
add archive python-dateutil.zip; // アーカイブ python-dateutil.zip を追加します説明pytz-2017.2.zip ファイルと six-1.11.0.tar.gz ファイルを、python-dateutil ファイルと同じ方法で検索、ダウンロード、およびアップロードします。
Pandas など、C 言語で記述されたコードを含むパッケージが MaxCompute で正常に動作するようにするには、名前に cp27-cp27m-manylinux1_x86_64 が含まれている .whl パッケージを見つける必要があります。そのため、pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl を見つけてダウンロードし、拡張子を .zip に変更し、MaxCompute コンソールで
add archive pandas.zip;を実行してアップロードする必要があります。scipy パッケージと scikit-learn パッケージも、前述の手順と同じ方法で MaxCompute にアップロードします。
次の表に、すべてのパッケージについてダウンロードするリソースを示します。
パッケージ名
ファイル名
アップロードするリソースの名前
python-dateutil
python-dateutil.zip
pytz
pytz.zip
six
six.tar.gz
pandas
pandas.zip
scipy
scipy.zip
scikit-learn
sklearn.zip
サードパーティ製の Python ライブラリを指定する
使用するライブラリをグローバルに指定します。
>>> from odps import options >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']即時呼び出しメソッドで、使用するライブラリをローカルに指定します。
>>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])