すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Python 3 UDAF

最終更新日:Mar 18, 2025

Python Software Foundationは、Python 2のEnd of Life (EOL) を発表しました。 このため、MaxComputeはPython 3をサポートし、CPython 3.7.3を使用します。 このトピックでは、Python 3でユーザー定義集計関数 (UDAF) を記述する方法について説明します。

UDAFコード構造

MaxCompute Studioを使用して、Python 3でUDAFコードを記述できます。 UDAFコードには次の情報が含まれている必要があります。

  • モジュールのインポート: 必須。

    UDAFコードには、少なくともfrom odps.udf import annotatefrom odps.udf import BaseUDAFを含める必要があります。 from odps.udf import annotateは、関数署名モジュールをインポートするために使用されます。 これにより、MaxComputeはコードで定義されている関数シグネチャを識別できます。 from odps.udf import BaseUDAFはPython UDAFの基本クラスです。 このクラスを使用して、iteratemergeterminateなどのメソッドを派生クラスに実装する必要があります。

    UDAFコードでファイルまたはテーブルリソースを参照する場合は、UDAFコードにfrom odps.distcache import get_cache_fileまたはfrom odps.distcache import get_cache_tableを含める必要があります。

  • 関数署名: 必須。

    関数シグネチャは、@ annotate(<signature>) 形式です。 signatureパラメーターは、入力パラメーターのデータ型とUDFの戻り値を定義するために使用されます。 関数シグネチャの詳細については、「関数シグネチャとデータ型」をご参照ください。

  • カスタムPythonクラス (派生クラス): 必須です。

    カスタムJavaクラスは、UDAFコードの組織単位です。 このクラスは、ビジネス要件を満たすために使用される変数とメソッドを定義します。 UDFコードでは、MaxComputeにインストールされているサードパーティライブラリを参照したり、ファイルやテーブルを参照したりすることもできます。 詳細については、「サードパーティライブラリ」または「リファレンスリソース」をご参照ください。

  • Pythonクラスを実装するメソッド: required.

    次の表に、Pythonクラスの実装に使用できる4つのメソッドを示します。 ビジネス要件に基づいて方法を設定できます。

    移動方法

    説明

    BaseUDAF.new_buffer()

    UDAFの中間値バッファを返します。 bufferは、LISTやDICTなどのマーシャリング可能なオブジェクトである必要があります。bufferサイズはデータ量に応じて大きくすることはできません。 極端な場合、マーシャリング操作後のbufferサイズは2 MBを超えることはできません。

    BaseUDAF.iterate(buffer[, args, ...])

    argsを中間値bufferに集約します。

    BaseUDAF.merge(buffer, pbuffer)

    pbufferと中間値bufferのマージ結果をbufferに格納します。

    BaseUDAF.terminate (buffer)

    MaxCompute SQLでbufferを基本データ型の値に変換します。

サンプルコード:

# Import the function signature module and the base class. 
from odps.udf import annotate
from odps.udf import BaseUDAF
# The function signature. 
@annotate('double->double')
# The custom Python class. 
class Average(BaseUDAF):
# Methods used to implement the custom Python class. 
    def new_buffer(self):
        return [0, 0]
    def iterate(self, buffer, number):
        if number is not None:
            buffer[0] += number
            buffer[1] += 1
    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]
    def terminate(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
説明

Python 2 UDAFとPython 3 UDAFは、基盤となるPythonバージョンの点で異なります。 使用するPythonバージョンの機能に基づいてUDAFを作成できます。

注意事項

Python 3はPython 2と互換性がありません。 このため、単一のSQL文でPython 2コードとPython 3コードを同時に使用することはできません。

説明

Python Software Foundationは、2020年の初めにPython 2の寿命 (EOL) を発表しました。 したがって、Python 2 UDFを移植することを推奨します。 既存のMaxComputeプロジェクトでは、Python 2 UDFをポートすることを推奨します。 新しいプロジェクトでは、Python 3を使用してすべてのPython UDFを記述することを推奨します。

ポートPython 2 UDAFs

Python Software Foundationは、Python 2のEOLを発表しました。 したがって、Python 2 UDAFを移植することをお勧めします。 Python 2 UDAFのポートに使用される方法は、MaxComputeプロジェクトのタイプによって異なります。

  • 新しいプロジェクト: プロジェクトが新しいMaxComputeプロジェクトである場合、またはPythonを使用してMaxComputeプロジェクトのUDAFを初めて作成する場合は、Python 3を使用してすべてのPython UDAFを作成することを推奨します。

  • 既存のプロジェクト: プロジェクトが多数のPython 2 UDAFが作成される既存のプロジェクトである場合、Python 3を有効にするときは注意して進めてください。 Python 2 UDAFをPython 3 UDAFに徐々に置き換える場合は、次のいずれかの方法を使用します。

    • Python 3を使用して新しいUDAFを作成し、セッションレベルで新しいジョブに対してPython 3を有効にします。 Python 3を有効にする方法の詳細については、「Python 3を有効にする」をご参照ください。

    • Python 2 UDAFを書き換えて、Python 2とPython 3の両方と互換性を持たせます。 Python 2 UDAFを書き直す方法の詳細については、「Python 2コードのPython 3への移植」をご参照ください。

      説明

      複数のプロジェクト間で共有できるパブリックUDAFを作成する場合は、UDAFをPython 2とPython 3の両方と互換性があることをお勧めします。

Python 3の有効化

デフォルトでは、Python 2はMaxComputeプロジェクトでUDFを書き込むために使用されます。 Python 3でUDFを記述する場合は、実行するSQL文の前に次のコマンドを追加します。 次に、ステートメントをコミットして実行します。

set odps.sql.python.version=cp37;

サードパーティライブラリ

サードパーティライブラリNumPyは、MaxComputeのPython 3環境にインストールされていません。 NumPy UDAFを使用するには、NumPyホイールパッケージを手動でアップロードする必要があります。 Pythonパッケージインデックス (PyPI) からNumPyホイールパッケージをダウンロードするか、イメージからこのパッケージを取得すると、パッケージの名前はnumpy-<Version>-cp37-cp37m-manylinux1_x86_64.whlの形式になります。 パッケージをアップロードする方法の詳細については、「リソース操作」または「例: Python UDFでのサードパーティパッケージの参照」をご参照ください。

関数シグネチャとデータ型

関数シグネチャの形式:

@annotate(<signature>)

signatureパラメーターは、入力パラメーターと戻り値のデータ型を指定する文字列です。 UDAFを実行する場合、入力パラメーターのデータ型とUDAFの戻り値は、関数シグネチャで指定されたデータ型と一致している必要があります。 データ型の整合性は、セマンティック解析中にチェックされます。 データ型に矛盾がある場合は、エラーが返されます。 関数シグネチャの形式:

'arg_type_list -> type'
  • arg_type_list: 入力パラメーターのデータ型を指定します。 複数の入力パラメーターを使用する場合は、複数のデータ型を指定し、コンマ (,) で区切ります。 BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision、scale) 、CHAR、VARCHAR、複合データ型 (ARRAY、MAP、STRUCT) 、およびネストされた複合データ型がサポートされています。

    arg_type_listは、アスタリスク (*) または空のまま ('') で表すことができます。

    • arg_type_listがアスタリスク (*) で表される場合、ランダムな数の入力パラメーターが許可されます。

    • arg_type_listが空 ('') の場合、入力パラメーターは使用されません。

    @ Resolveアノテーションの構文拡張機能の詳細については、「UDAFおよびUDTFの動的パラメーター」をご参照ください。

  • typeは、戻り値のデータ型を指定します。 UDAFの場合、値の1列のみが返されます。 BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision、scale) のデータ型がサポートされています。 ARRAY、MAP、STRUCTなどの複雑なデータ型、およびネストされた複雑なデータ型もサポートされています。

説明

UDAFコードを記述するときに、MaxComputeプロジェクトで使用されるデータ型エディションに基づいてデータ型を選択できます。 データ型のエディションと各エディションでサポートされているデータ型の詳細については、「データ型のエディション」をご参照ください。

次の表に、有効な関数シグネチャの例を示します。

関数署名

説明

@ アノテート ('bigint、double->string ')

入力パラメーターのデータ型はBIGINTとDOUBLEで、戻り値のデータ型はSTRINGです。

@ annotate('*->string')

ランダムな数の入力パラメーターが使用され、戻り値のデータ型はSTRINGです。

@ annotate ('-> double')

入力パラメーターは使用されず、戻り値のデータ型はDOUBLEです。

@ annotate ('array<bigint>->struct<x:string, y:int>')

入力パラメーターのデータ型はARRAY<BIGINT> 、戻り値のデータ型はSTRUCT<x:STRING, y:INT> です。

次の表に、MaxCompute SQLでサポートされているデータ型とPython 2データ型の間のマッピングを示します。 データ型の一貫性を確保するには、マッピングに基づいてPython UDAFを記述する必要があります。 次の表に、データ型のマッピングについて説明します。

MaxCompute SQLタイプ

Python 3タイプ

BIGINT

INT

STRING

UNICODE

DOUBLE

FLOAT

BOOLEAN

BOOL

DATETIME

DATETIME.DATETIME

FLOAT

FLOAT

CHAR

UNICODE

VARCHAR

UNICODE

BINARY

BYTES

DATE

DATETIME.DATE

DECIMAL

DECIMAL.DECIMAL

ARRAY

LIST

MAP

DICT

STRUCT

COLLECTIONS.NAMEDTUPLE

参照リソース

odps.distcacheモジュールを使用して、Python 2 UDAFコードでファイルとテーブルを参照できます。

  • odps.distcache.get_cache_file(resource_name): 特定のファイルの内容を返します。

    • resource_nameは、MaxComputeプロジェクト内の既存のファイルの名前を指定する文字列です。 ファイル名が無効であるか、ファイルが存在しない場合は、エラーが返されます。

      説明

      UDAFコードでファイルを参照するには、UDAFを作成するときにファイルを宣言する必要があります。 それ以外の場合、UDAFを呼び出すとエラーが返されます。

    • 戻り値はファイルのようなオブジェクトです。 このオブジェクトが使用されなくなった場合は、closeメソッドを呼び出してファイルを解放する必要があります。

  • odps.distcache.get_cache_table(resource_name): 特定のテーブルの内容を返します。

    • resource_nameは、MaxComputeプロジェクト内の既存のテーブルの名前を指定する文字列です。 テーブル名が無効であるか、テーブルが存在しない場合は、エラーが返されます。

    • 戻り値はGENERATOR型です。 呼び出し元はテーブルをトラバースしてテーブルの内容を取得します。 ARRAYタイプのレコードは、呼び出し元がテーブルをトラバースするたびに取得されます。

詳細については、「リファレンスリソース (Python 3 UDF) 」および「リファレンスリソース (Python 3 UDTF) 」をご参照ください。

使用上の注意

概要 の手順に従ってPython 3 UDAFを開発した後、MaxCompute SQLを使用してこのUDAFを呼び出すことができます。 次の手順では、Python 3 UDAFを呼び出す方法について説明します。

  • MaxComputeプロジェクトでUDFを使用する: この方法は、組み込み関数を使用する方法と似ています。

  • プロジェクト間でUDFを使用する: プロジェクトaでプロジェクトBのUDFを使用します。次のステートメントは、例を示します。select B:udf_in_other_project(arg0, arg1) as res from table_t; プロジェクト間共有の詳細については、「パッケージに基づくプロジェクト間リソースアクセス」をご参照ください。

MaxCompute Studioを使用してPython 3 UDAFを開発および呼び出す方法の詳細については、「Python UDFの開発」をご参照ください。

UDAFsの動的パラメータ

関数署名

Python UDAFの関数シグネチャの形式の詳細については、「関数シグネチャとデータ型」をご参照ください。

  • パラメーターリストでアスタリスク (*) を使用して、入力パラメーターの長さとタイプを指定できます。 たとえば、@ annotate('double,*->string ') は、最初のパラメーターがDOUBLEデータ型で、その後に任意の長さと型のパラメーターが続くパラメーターリストを示します。 この場合、コードをコンパイルして入力パラメーターの数と種類を計算し、Cプログラミング言語のprintf関数に基づいてそれらを管理する必要があります。

    説明

    戻り値のアスタリスク (*) は異なる意味を示します。

  • UDTFの戻り値でアスタリスク (*) を使用して、STRINGデータ型の任意の数の値を返すことができることを示すことができます。 戻り値の数は、関数が呼び出されたときに設定されるエイリアスの数に基づいています。 たとえば、@ annotate("bigint,string->double,*") の呼び出しメソッドは、UDTF(x, y) as (a, b, c) です。 この例では、エイリアスabcとして設定されます。 エディタは、aがDOUBLE型であり、bおよびcがSTRINGデータ型であることを識別する。 Resolveアノテーションで返される最初の列の戻り値のデータ型が指定されます。 この例では、3つの戻り値が提供される。 したがって、UDTFによって呼び出されるforwardメソッドは、3つの要素の配列を転送する必要があります。 それ以外の場合は、エラーが返されます。

    説明

    ただし、コンパイル中にエラーは返されません。 したがって、UDTFの呼び出し元は、UDTFで定義されているルールに基づいてSQLのエイリアスの数を設定する必要があります。 集計関数の戻り値の数は1に固定されています。 したがって、このルールはUDAFには影響しません。

UDAFの例

from odps.udf import annotate
from odps.udf import BaseUDAF
@annotate('bigint,*->string')
class MultiColSum(BaseUDAF):
    def new_buffer(self):
        return [0]
    def iterate(self, buffer, *args):
        for arg in args:
            buffer[0] += int(arg)
    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
    def terminate(self, buffer):
        return str(buffer[0])

UDAFの戻り値の数は1つに固定できます。 上記の例では、戻り値は複数の入力パラメーターの値の合計と複数行の値の合計です。 サンプル文:

-- Calculate the sum of values of multiple input parameters.
SELECT my_multi_col_sum(a,b,c,d,e) from values (1,"2","3","4","5"), (6,"7","8","9","10") t(a,b,c,d,e);
-- The return value is 55.