ユーザー定義集計関数 (UDAF) は、SUM や AVG のような組み込みの集計関数と同様に動作しますが、集約ロジックをユーザーが定義する点が異なります。MaxCompute は Python 2.7 を使用して Python 2 UDAF を実行します。
MaxCompute Studio を使用して UDAF コードを記述およびアップロードします。
コード構造
Python 2 UDAF ファイルには以下の 5 つのコンポーネントがあります。
| コンポーネント | 必須 | 説明 |
|---|---|---|
| エンコーディング宣言 | 任意 | コードに中国語文字が含まれる場合は、ファイルの先頭に #coding:utf-8 または # -*- coding: utf-8 -*- を追加します。この宣言がない場合、UDAF はランタイムでエラーを返します。 |
| モジュールのインポート | 必須 | from odps.udf import annotate および from odps.udf import BaseUDAF をインポートします。ファイルリソースまたはテーブルリソースを参照する場合は、from odps.distcache import get_cache_file または from odps.distcache import get_cache_table もインポートします。 |
| 関数シグネチャ | 必須 | @annotate(<signature>) を使用して入力および戻り値のデータ型を宣言します。MaxCompute はセマンティック解析中に型の一貫性を検証します。 |
| カスタム Python クラス | 必須 | 集約ロジックを定義するために BaseUDAF を継承します。 |
| メソッド | 必須 | メソッド で説明されている 4 つのメソッドを実装します。 |
メソッド
すべての 4 つのメソッドが必須です。BaseUDAF を継承したクラス内でこれらを実装します。
| メソッド | 必須 | 説明 |
|---|---|---|
BaseUDAF.new_buffer() |
必須 | 中間値である buffer を返します。buffer はリストや辞書などの マーシャリング可能な オブジェクトである必要があります。そのサイズは入力データ量に応じて増加してはなりません。マーシャリング後、buffer のサイズは 2 MB を超えてはなりません。 |
BaseUDAF.iterate(buffer[, args, ...]) |
必須 | 各入力行の args を buffer に集約します。 |
BaseUDAF.merge(buffer, pbuffer) |
必須 | pbuffer を buffer にマージします。MaxCompute はワーカー間で部分結果を結合する際にこのメソッドを呼び出します。 |
BaseUDAF.terminate(buffer) |
必須 | 最終的な buffer を MaxCompute SQL の基本データ型の値に変換して返します。 |
例:平均値の計算
次の UDAF は DOUBLE 型のカラムの平均値を計算し、DOUBLE 型の結果を返します。
#coding:utf-8
from odps.udf import annotate
from odps.udf import BaseUDAF
@annotate('double->double')
class Average(BaseUDAF):
def new_buffer(self):
# Buffer stores [sum, count] as a list
return [0, 0]
def iterate(self, buffer, number):
if number is not None:
buffer[0] += number
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def terminate(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
UDAF をアップロードして登録した後(たとえば my_avg として)、組み込みの集計関数と同様に MaxCompute SQL から呼び出します。
-- Compute the average score per category
SELECT category, my_avg(score) AS avg_score
FROM my_table
GROUP BY category;
プロジェクトをまたいだ呼び出しを行う場合は、関数名の前にプロジェクト名をプレフィックスとして付けます。
SELECT B:my_avg(score) AS avg_score FROM my_table;
開発および登録プロセスの詳細については、「Python UDF の開発」をご参照ください。
関数シグネチャとデータ型
シグネチャ形式
@annotate('<arg_type_list> -> <return_type>')
-
arg_type_list:カンマ区切りの入力型。任意の数の入力を表すには*を使用し、入力がない場合は空白のままにします。 -
return_type:単一の戻り値型。UDAF は常に 1 つのカラムを返します。
サポートされる入力型: BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、ARRAY、MAP、STRUCT、およびネストされた複合型。
サポートされる戻り値型: BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、ARRAY、MAP、STRUCT、およびネストされた複合型。
例:
| シグネチャ | 意味 |
|---|---|
@annotate('bigint,double->string') |
BIGINT および DOUBLE 入力を受け取り、STRING を返す |
@annotate('*->string') |
任意の数の入力を受け取り、STRING を返す |
@annotate('->double') |
入力なしで DOUBLE を返す |
@annotate('array<bigint>->struct<x:string, y:int>') |
ARRAY<BIGINT> を受け取り、STRUCT<x:STRING, y:INT> を返す |
データ型のマッピング
MaxCompute SQL の各データ型に対応する Python 2 の型を使用して UDAF ロジックを記述します。
| MaxCompute SQL 型 | Python 2 型 |
|---|---|
| BIGINT | INT |
| STRING | STR |
| DOUBLE | FLOAT |
| BOOLEAN | BOOL |
| DATETIME | INT |
| FLOAT | FLOAT |
| CHAR | STR |
| VARCHAR | STR |
| BINARY | BYTEARRAY |
| DATE | INT |
| DECIMAL | DECIMAL.DECIMAL |
| ARRAY | LIST |
| MAP | DICT |
| STRUCT | COLLECTIONS.NAMEDTUPLE |
- DATETIME は UNIX タイムスタンプ形式(1970-01-01 00:00:00 協定世界時 (UTC) からの経過ミリ秒)に従って INT にマッピングされます。DATETIME 値を処理するには、Python の
datetimeモジュールを使用します。 - MaxCompute SQL の NULL は Python 2 の
Noneにマッピングされます。 odps.udf.int(value, silent=True)は、valueを INT に変換できない場合にエラーを発生させる代わりにNoneを返します。
制限事項
MaxCompute はサンドボックス環境で UDAF コードを実行します。以下の操作は許可されていません。
-
ローカルファイルの読み取りまたは書き込み
-
サブプロセスの起動
-
スレッドの起動
-
ソケット通信の使用
-
他のシステムから Python 2 のユーザー定義関数 (UDF) を呼び出す
Python 標準ライブラリのみを使用するコードをアップロードしてください。標準ライブラリモジュールまたは C 拡張モジュールが上記のいずれかの操作を実行する場合、そのモジュールは使用できません。
利用可能なモジュール:
-
C 拡張への依存関係のない純粋な Python の標準ライブラリモジュールはすべて利用可能です。
-
以下の C 拡張モジュールが利用可能です:
array、audioop、binascii、bisect、cmath、_codecs_cn、_codecs_hk、_codecs_iso2022、_codecs_jp、_codecs_kr、_codecs_tw、_collections、cStringIO、datetime、_functools、future_builtins、_heapq、_hashlib、itertools、_json、_locale、_lsprof、math、_md5、_multibytecodec、operator、_random、_sha256、_sha512、_sha、_struct、strop、time、unicodedata、_weakref、cPickle。
出力制限: sys.stdout または sys.stderr への書き込みは 20 KB に制限されています。20 KB を超える文字は自動的に破棄されます。
サードパーティ製ライブラリ
NumPy などのサードパーティ製ライブラリは、標準ライブラリの補足として MaxCompute Python 2 環境にプリインストール済みです。
リソースの参照
UDAF コード内から MaxCompute リソースとして登録されたファイルおよびテーブルにアクセスするには、odps.distcache モジュールを使用します。
odps.distcache.get_cache_file(resource_name)
指定されたファイルリソースのファイルライクオブジェクトを返します。
-
resource_nameは、ご利用の MaxCompute プロジェクト内に存在するファイルリソースの名前でなければなりません。無効な名前または存在しないファイルを指定するとエラーになります。 -
UDAF 作成時にファイルを宣言する必要があります。リソースを宣言せずに呼び出すとエラーになります。
-
リソースを解放するために、使用後に返されたオブジェクトに対して
close()を呼び出してください。
odps.distcache.get_cache_table(resource_name)
指定されたテーブルリソースについて、1 回の反復ごとに ARRAY 型のレコードを 1 つずつ生成するジェネレーターを返します。
-
resource_nameは、ご利用の MaxCompute プロジェクト内に存在するテーブルリソースの名前でなければなりません。無効な名前または存在しないテーブルを指定するとエラーになります。
使用例については、「リソースの参照 (Python 2 UDF)」および「リソースの参照 (Python 2 UDTF)」をご参照ください。
次のステップ
-
Python UDF の開発 — MaxCompute Studio を使用した UDAF の記述、アップロード、テストに関するステップバイステップガイド
-
組み込み関数 — UDAF が補足または置き換え可能な、組み込み集計関数のリファレンス
-
パッケージによるプロジェクト間のリソースアクセス — プロジェクト間の UDAF 共有を設定する
-
データ型エディション — ご利用のプロジェクトで利用可能なデータ型の確認方法