すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:UDF とサードパーティ製 Python ライブラリの使用

最終更新日:Jun 10, 2025

このトピックでは、ユーザー定義関数(UDF)とサードパーティ製 Python ライブラリの使用方法について説明します。

UDF の使用

DataFrame では、Sequence オブジェクトに対して map メソッドを使用して、そのすべての要素に対して UDF を呼び出すことができます。

>>> iris.sepallength.map(lambda x: x + 1).head(5)
   sepallength
0          6.1
1          5.9
2          5.7
3          5.6
4          6.0
説明

UDF を使用する場合、List 型または Dict 型を入力または出力として使用することはできません。

map メソッドの実行後に Sequence の型が変更された場合は、Sequence の新しい型を明示的に指定する必要があります。

>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
   sepallength
0         t5.1
1         t4.9
2         t4.7
3         t4.6
4         t5.0

関数がクロージャを含む場合、関数外のクロージャ変数の値の変更によって、関数内の変数の値が変更されます。

>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(lambda x: x + i))

その結果、dfs 内の各 SequenceExpr オブジェクトは df.sepal_length + 9 になります。この問題を解決するには、関数を別の関数の戻り値として使用するか、partial を使用します。次の 2 つの例を参照してください。

>>> dfs = []
>>> def get_mapper(i):
>>>     return lambda x: x + i
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(get_mapper(i)))
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))

map メソッドは、既存の UDF もサポートしています。関数名を表す str 型のパラメーター、または Function オブジェクトを渡すことができます。詳細については、「関数」をご参照ください。

Python 関数に map メソッドを実装する場合は、MaxCompute Python UDF を使用する必要があります。プロジェクトが Python UDF をサポートしていない場合は、map メソッドを使用できません。さらに、すべての Python UDF の制限が適用されます。

使用可能なサードパーティ製ライブラリ(C 言語のコードを含む)は NumPy のみです。サードパーティ製 Python ライブラリの使用方法の詳細については、「サードパーティ製 Python ライブラリの使用」をご参照ください。

DataFrame は、UDF に加えて、多くのビルトイン関数を備えており、その一部は map 関数を使用して実装されています。したがって、プロジェクトが Python UDF をサポートしていない場合は、これらの関数を使用できません。Alibaba Cloud パブリックサービスは Python UDF をサポートしていないことに注意してください。

説明

バイトコード定義の違いにより、Python 3 でサポートされている新機能(yield from など)を使用すると、Python 2.7 の MaxCompute Worker でコードを実行したときにエラーが発生する可能性があります。したがって、本番コードを作成する前に、Python 3 の MapReduce API を使用してコードが正常に実行されることを確認することをお勧めします。

カウンターの使用例:

from odps.udf import get_execution_context
def h(x):
    ctx = get_execution_context()
    counters = ctx.get_counters()
    counters.get_counter('df', 'add_one').increment(1)
    return x + 1
df.field.map(h)

LogView の JSONSummary でカウンターの値を確認できます。

1 行に対して UDF を使用する

1 行に対して UDF を使用するには、apply メソッドを使用します。axis パラメーターを 1 に設定して、操作が行に対して実行されることを示す必要があります。apply メソッドの UDF は、先行する Collection オブジェクトからのデータ行であるパラメーターを受け取ります。属性またはオフセットを使用して、フィールド内のデータを取得できます。

  • reduce パラメーターが True に設定されている場合、Sequence オブジェクトが返されます。それ以外の場合、Collection オブジェクトが返されます。names パラメーターと types パラメーターを使用して、返される Sequence オブジェクトまたは Collection オブジェクトのフィールド名と型を指定できます。デフォルトでは、型を指定しない場合、STRING 型が使用されます。

    >>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
       sepaladd
    0       8.6
    1       7.9
    2       7.9
  • reduceapply メソッドの UDF で False の場合、yield キーワードを使用して複数の行の結果を返すことができます。

    >>> iris.count()
    150
    >>>
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
    300
  • 返されるフィールド名と型を関数にアノテーションすることもできます。関数を呼び出すときに指定する必要はありません。

    >>> from odps.df import output
    >>>
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris.apply(handle, axis=1).count()
    300
  • map-onlymap_reduce を使用することもできます。これは、apply メソッドで axis=1 を指定した場合と同じです。

    >>> iris.map_reduce(mapper=handle).count()
    300
  • MaxCompute で既存のユーザー定義テーブル関数(UDTF)を使用するには、UDTF の名前を指定します。

    >>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
  • 行に対して apply メソッドを使用し、reduce が False の場合、既存の行で lateral view を使用して、集計などの目的で使用できます。

    >>> from odps.df import output
    >>>
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris[iris.category, iris.apply(handle, axis=1)]

すべての列に対してカスタム集計を使用する

apply メソッドを使用する場合、axis が指定されていないか、axis の値が 0 の場合、カスタム集計クラスを渡してすべての Sequence オブジェクトを集計できます。

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
   sepallength_aggregation  sepalwidth_aggregation  petallength_aggregation  petalwidth_aggregation
0                 5.843333                   3.054                 3.758667                1.198667
説明

UDF を使用する場合、LIST 型または DICT 型を入力または出力として使用することはできません。

リソースの参照

UDF は、MaxCompute リソース(テーブルリソースやファイルリソースなど)を読み取ったり、Collection オブジェクトをリソースとして参照したりすることもできます。これを行うには、UDF をクロージャまたは呼び出し可能なクラスとして記述する必要があります。次の 2 つの例を参照してください。

>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
       sepallength
0      Iris-setosa
1  Iris-versicolor
>>> def myfunc(resources):  # resources are passed in by calling order
>>>     names = set()
>>>     fileobj = resources[0] # file resources are represented by a file-like object
>>>     for l in fileobj:
>>>         names.add(l)
>>>     collection = resources[1]
>>>     for r in collection:
>>>         names.add(r.name)  # retrieve the values by using the field name or offset
>>>     def h(x):
>>>         if x in names:
>>>             return True
>>>         else:
>>>             return False
>>>     return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>>         df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
              name   isin
0      Iris-setosa   True
1  Iris-versicolor   True
2   Iris-virginica  False
説明

パーティションテーブルを読み取るとき、パーティションフィールドは含まれません。

行操作で axis が 1 の場合、関数クロージャまたは呼び出し可能なクラスを記述する必要があります。列の集計操作の場合は、__init__ 関数を使用してリソースを読み取るだけで済みます。

>>> words_df
                     sentence
0                 Hello World
1                Hello Python
2  Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>>     stop_words = set([r[0] for r in resources[0]])
>>>     def h(row):
>>>         return ' '.join(w for w in row[0].split() if w not in stop_words),
>>>     return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
                sentence
0            Hello World
1           Hello Python
2  Life short use Python
説明

この例では、stop_words はローカル変数であり、実行中に MaxCompute でリソースとして参照されます。

サードパーティ製 Python ライブラリの使用

サードパーティ製 Python パッケージ(whleggziptar.gz 形式)を MaxCompute にアップロードできます。グローバルメソッドまたは即時呼び出しメソッドを使用する場合は、パッケージファイルを指定する必要があります。すべての依存ライブラリが指定されていることを確認する必要があります。指定されていない場合、ファイルをインポートするときにエラーが発生する可能性があります。

  • PyODPS リソースアップロードインターフェイス create_resource を呼び出すことによって、リソースをアップロードできます。

    python-dateutil パッケージを例に説明します。

    1. pip download コマンドを実行して、パッケージとその依存関係を特定のパスにダウンロードできます。2 つのパッケージ、six-1.10.0-py2.py3-none-any.whlpython_dateutil-2.5.3-py2.py3-none-any.whl がダウンロードされます。ダウンロードしたパッケージは Linux 環境をサポートしている必要があることに注意してください。

      $ pip download python-dateutil -d /to/path/
    2. 次に、2 つのファイルを MaxCompute にリソースとしてアップロードします。

      >>> # ファイル名拡張子が正しいことを確認してください。
      >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
      >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
    3. これで、STRING フィールドのみを含む DataFrame オブジェクトができました。

      >>> df
                     datestr
      0  2016-08-26 14:03:29
      1  2015-08-26 14:03:29
    4. グローバル構成で次のサードパーティ製ライブラリを使用します。

      >>> from odps import options
      >>>
      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
      >>> df.datestr.map(get_year)
         datestr
      0     2016
      1     2015

      または、メソッドの libraries パラメーターを使用して、サードパーティ製ライブラリを指定します。

      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
         datestr
      0     2016
      1     2015

    デフォルトでは、PyODPS は純粋な Python コードを含み、ファイル操作を含まないサードパーティ製ライブラリをサポートしています。MaxCompute の今後のバージョンでは、PyODPS はバイナリコードまたはファイル操作を含む Python ライブラリもサポートします。これらのライブラリには、cp27-cp27m-manylinux1_x86_64 というサフィックスを付けて、アーカイブとしてアップロードする必要があります。.whl パッケージは .zip ファイルに名前を変更する必要があります。また、odps.isolation.session.enable を True に設定するか、プロジェクトで isolation を有効にする必要があります。次の例は、scipy の特殊関数をアップロードして使用する方法を示しています。

    >>> # バイナリコードを含むパッケージは、アーカイブメソッドを使用してアップロードする必要があり、ファイル拡張子 .whl を .zip に置き換える必要があります。
    >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
    >>>
    >>> # プロジェクトで isolation が有効になっている場合、次のオプションは省略可能です。
    >>> options.sql.settings = { 'odps.isolation.session.enable': True }
    >>>
    >>> def psi(value):
    >>>     # 異なるオペレーティングシステム間でのバイナリパッケージの構造の違いによって発生するエラーを回避するために、関数内でサードパーティ製ライブラリをインポートすることをお勧めします。
    >>>     from scipy.special import psi
    >>>     return float(psi(value))
    >>>
    >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])

    ソースコードのみを含むバイナリパッケージは、Wheel ファイルにパッケージ化して Linux シェルでアップロードできます。Mac および Windows で生成された Wheel ファイルは、MaxCompute では使用できません。

    python setup.py bdist_wheel
  • MaxCompute コンソールを使用してリソースをアップロードすることもできます。

    1. ほとんどの Python パッケージは、さまざまなプラットフォーム上のバイナリファイルを含むパッケージを含め、.whl パッケージで提供されます。したがって、最初に MaxCompute で実行できるパッケージを見つける必要があります。

    2. さらに、すべての依存関係を含める必要があります。次の表に、パッケージの依存関係を示します。

      パッケージ名

      依存関係

      pandas

      numpy、python-dateutil、pytz、six

      scipy

      numpy

      scikit-learn

      numpy、scipy

      説明

      numpy パッケージはすでに提供されています。 pandas、scipy、および scikit-learn パッケージを実行するには、python-dateutil、pytz、pandas、SciPy、sklearn、および six パッケージのみをアップロードする必要があります。

    3. python-dateutil-2.6.0.zippython-dateutil 内で見つかり、ダウンロードできます。

    4. ダウンロードしたファイルの名前を python-dateutil.zip に変更し、MaxCompute コンソールでリソースとしてアップロードします。

      add archive python-dateutil.zip; // アーカイブ python-dateutil.zip を追加します
      説明

      pytz-2017.2.zip ファイルと six-1.11.0.tar.gz ファイルを、python-dateutil ファイルと同じ方法で検索、ダウンロード、およびアップロードします。

    5. Pandas など、C 言語で記述されたコードを含むパッケージが MaxCompute で正常に動作するようにするには、名前に cp27-cp27m-manylinux1_x86_64 が含まれている .whl パッケージを見つける必要があります。そのため、pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl を見つけてダウンロードし、拡張子を .zip に変更し、MaxCompute コンソールで add archive pandas.zip; を実行してアップロードする必要があります。scipy パッケージと scikit-learn パッケージも、前述の手順と同じ方法で MaxCompute にアップロードします。

    次の表に、すべてのパッケージについてダウンロードするリソースを示します。

    パッケージ名

    ファイル名

    アップロードするリソースの名前

    python-dateutil

    python-dateutil-2.6.0.zip

    python-dateutil.zip

    pytz

    pytz-2017.2.zip

    pytz.zip

    six

    six-1.11.0.tar.gz

    six.tar.gz

    pandas

    pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zip

    pandas.zip

    scipy

    scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zip

    scipy.zip

    scikit-learn

    scikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zip

    sklearn.zip

サードパーティ製の Python ライブラリを指定する

  • 使用するライブラリをグローバルに指定します。

    >>> from odps import options
    >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
  • 即時呼び出しメソッドで、使用するライブラリをローカルに指定します。

    >>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])