このトピックでは、ユーザー定義集計関数 (UDAF) をJavaで記述する方法について説明します。
UDAFコード構造
IntelliJ IDEAまたはMaxCompute StudioでMavenを使用して、UDAFをJavaで記述できます。 UDAFコードには次の情報を含めることができます。
Javaパッケージ: オプション。
定義されているJavaクラスは、今後使用するためにJARファイルにパッケージ化できます。
基本UDAFクラス: 必須。
必要なUDAFクラスは、
com.aliyun.odps.udf.Aggregatorおよびcom.aliyun.odps.udf.annotation.Resolveです。 com.aliyun.odps.udf.annotation.Resolveクラスは、@ Resolveアノテーションに対応しています。com.aliyun.odps.udf.UDFException: オプション。 このクラスは、Javaクラスを初期化および終了するために使用されるメソッドに対応します。 他のUDAFクラスまたは複合データ型を使用する場合は、[概要] の手順に従って必要なクラスを追加します。@ Resolveアノテーション: 必須です。アノテーションは
@ Resolve(<signature>)形式です。signatureは、入力パラメーターのデータ型とUDAFの戻り値を定義する関数シグネチャです。 UDAFは、リフレクション機能を使用して関数シグネチャを取得できません。 関数シグネチャは、@ Resolve("smallint->varchar(10)")などの@ Resolveアノテーションを使用してのみ取得できます。@ Resolveアノテーションの詳細については、このトピックの「 @ Resolveアノテーション」をご参照ください。カスタムJavaクラス: 必須。
カスタムJavaクラスは、UDAFコードの組織単位です。 このクラスは、ビジネス要件を満たすために使用される変数とメソッドを定義します。
カスタムJavaクラスを実装するメソッド: required.
Java UDAFsは、
com.aliyun.odps.udf.Aggregatorクラスを継承し、次のメソッドを実装する必要があります。import com.aliyun.odps.udf.ContextFunction; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDFException; public abstract class Aggregator implements ContextFunction { // The initialization method. @Override public void setup(ExecutionContext ctx) throws UDFException { } // The terminate method. @Override public void close() throws UDFException { } // Create an aggregation buffer. abstract public Writable newBuffer(); // The iterate method. // The buffer is an aggregation buffer, which stores the data that is aggregated in a specific phase. The aggregated data refers to the dataset that is obtained after GROUP BY is performed for different Map tasks. One buffer is created for each row of data that is aggregated. // Writable[] indicates a row of data, which specifies the passed column in the code. For example, writable[0] indicates the first column, and writable[1] indicates the second column. // args specifies the parameters that are used to call a UDAF in SQL. It cannot be NULL, but the values in args can be NULL, which indicates that the input data is NULL. abstract public void iterate(Writable buffer, Writable[] args) throws UDFException; // The terminate method. abstract public Writable terminate(Writable buffer) throws UDFException; // The merge method. abstract public void merge(Writable buffer, Writable partial) throws UDFException; }最も重要なメソッドは、
iterate、merge、terminateです。 メソッドは、UDAFのメインロジックを実装するために使用されます。 さらに、ユーザー定義の書き込み可能バッファを実装する必要があります。ユーザ定義の書き込み可能バッファは、メモリ内のオブジェクトを、ディスク内の永続的な記憶およびネットワーク伝送のためのバイトシーケンス (または他のデータ伝送プロトコル) に変換する。 MaxComputeは分散コンピューティングを使用してUDAFを処理します。 したがって、MaxComputeは、異なるデバイス間でデータを送信する前に、データをシリアル化または逆シリアル化する必要があります。
Java UDAFを作成するときは、Javaデータ型またはJava書き込み可能データ型を使用できます。 MaxComputeプロジェクトでサポートされているデータ型、Javaデータ型、およびJava書き込み可能データ型間のマッピングの詳細については、このトピックの「データ型」をご参照ください。
サンプルコード:
// Package the defined Java classes into a file named org.alidata.odps.udaf.examples.
package org.alidata.odps.udaf.examples;
// The base UDAF classes.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
// The custom Java class.
// The @Resolve annotation.
@Resolve("double->double")
public class AggrAvg extends Aggregator {
// The methods that are used to implement the custom Java class.
private static class AvgBuffer implements Writable {
private double sum = 0;
private long count = 0;
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(sum);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
sum = in.readDouble();
count = in.readLong();
}
}
private DoubleWritable ret = new DoubleWritable();
@Override
public Writable newBuffer() {
return new AvgBuffer();
}
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
DoubleWritable arg = (DoubleWritable) args[0];
AvgBuffer buf = (AvgBuffer) buffer;
if (arg != null) {
buf.count += 1;
buf.sum += arg.get();
}
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
if (buf.count == 0) {
ret.set(0);
} else {
ret.set(buf.sum / buf.count);
}
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
AvgBuffer p = (AvgBuffer) partial;
buf.sum += p.sum;
buf.count += p.count;
}
}前述のUDAFコードでは、iterateメソッドとmergeメソッドに同じバッファーが使用されています。 UDAFコードに基づいて、データの入力行をこのバッファに集約できます。
制限事項
UDFを使用したインターネットへのアクセス
デフォルトでは、MaxComputeではUDFを使用してインターネットにアクセスすることはできません。 UDFを使用してインターネットにアクセスする場合は、ネットワーク接続申請フォームに入力してください。
ビジネス要件に基づいて、アプリケーションを送信します。 申請が承認された後、MaxComputeテクニカルサポートチームがお客様に連絡し、ネットワーク接続の確立を支援します。 ネットワーク接続申請フォームへの記入方法の詳細については、「ネットワーク接続プロセス」をご参照ください。
UDFを使用したVPCへのアクセス
デフォルトでは、MaxComputeではUDFを使用してVPCのリソースにアクセスすることはできません。 UDFを使用してVPC内のリソースにアクセスするには、MaxComputeとVPCの間にネットワーク接続を確立する必要があります。 関連する操作の詳細については、「UDFを使用したVPCのリソースへのアクセス」をご参照ください。
UDF、UDAF、またはUDTFを使用したテーブルデータの読み取り
UDF、UDAF、またはUDTFを使用して、次の種類のテーブルからデータを読み取ることはできません。
スキーマの進化を行うテーブル
複雑なデータ型を含むテーブル
JSONデータ型を含むテーブル
取引テーブル
使用上の注意
Java UDAFを作成する前に、次の点に注意してください。
同じ名前で異なるロジックを持つクラスを、異なるUDAFのJARファイルにパッケージ化しないことをお勧めします。 たとえば、UDAF 1のJARファイルの名前はudaf1.jar、UDAF 2のJARファイルの名前はudaf2.jarです。 両方のJARファイルには
com.aliyun.UserFunction.classという名前のクラスが含まれていますが、このクラスのファイル内のロジックは異なります。 同じSQL文でUDAF 1とUDAF 2が呼び出された場合、MaxComputeは2つのファイルのいずれかからcom.aliyun.UserFunction.classを読み込みます。 その結果、UDAFは期待どおりに実行できず、コンパイルが失敗する可能性があります。Java UDAFの入力パラメーターまたは戻り値のデータ型がオブジェクトです。 Java UDAFコードで指定するデータ型の最初の文字は、Stringなどの大文字である必要があります。
MaxCompute SQLのNULL値は、JavaではNULLで表されます。 MaxCompute SQLでは、Javaのプリミティブデータ型はNULL値を表すことができません。 したがって、これらのデータ型は使用できません。
@ Resolveアノテーション
注釈形式 @ Resolve:
@Resolve(<signature>)signatureパラメーターは、入力パラメーターと戻り値のデータ型を指定する文字列です。 UDAFを実行する場合、UDAFの入力パラメーターと戻り値のデータ型は、関数シグネチャで指定されたデータ型と一致している必要があります。 データ型の整合性は、セマンティック解析中にチェックされます。 データ型に矛盾がある場合は、エラーが返されます。 署名の形式:
'arg_type_list -> type'
arg_type_list: 入力パラメーターのデータ型を指定します。 複数の入力パラメーターを使用する場合は、複数のデータ型を指定し、コンマ (,) で区切ります。 BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision、scale) 、CHAR、VARCHAR、複合データ型 (ARRAY、MAP、STRUCT) 、およびネストされた複合データ型がサポートされています。arg_type_listは、アスタリスク (*) または空のまま ('') で表すことができます。arg_type_listがアスタリスク (*) で表される場合、ランダムな数の入力パラメーターが許可されます。arg_type_listが空 ('') の場合、入力パラメーターは使用されません。
@ Resolveアノテーションの構文拡張機能の詳細については、「UDAFおよびUDTFの動的パラメーター」をご参照ください。
typeは、戻り値のデータ型を指定します。 UDAFの場合、値の1列のみが返されます。 BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision、scale) のデータ型がサポートされています。 ARRAY、MAP、STRUCTなどの複雑なデータ型、およびネストされた複雑なデータ型もサポートされています。
UDAFコードを記述するときに、MaxComputeプロジェクトで使用されるデータ型エディションに基づいてデータ型を選択できます。 データ型のエディションと各エディションでサポートされているデータ型の詳細については、「データ型のエディション」をご参照ください。
次の表に、@ Resolveアノテーションの例を示します。
@ Resolveアノテーション | 説明 |
| 入力パラメーターのデータ型はBIGINTまたはDOUBLEで、戻り値のデータ型はSTRINGです。 |
| ランダムな数の入力パラメーターが使用され、戻り値のデータ型はSTRINGです。 |
| 入力パラメーターは使用されず、戻り値のデータ型はDOUBLEです。 |
| 入力パラメーターのデータ型はARRAY<BIGINT> 、戻り値のデータ型はSTRUCT<x:STRING, y:INT> です。 |
データ型
MaxComputeでは、異なるデータ型エディションが異なるデータ型をサポートします。 MaxCompute V2.0以降では、ARRAY、MAP、STRUCTなど、より多くのデータ型と複雑なデータ型がサポートされています。 MaxComputeデータ型のエディションの詳細については、「データ型のエディション」をご参照ください。
次の表に、MaxComputeプロジェクトでサポートされているデータ型、Javaデータ型、およびJava書き込み可能データ型の間のマッピングを示します。 データ型の一貫性を確保するには、マッピングに基づいてJava UDAFを記述する必要があります。
MaxComputeタイプ | Javaタイプ | Java書き込み可能タイプ |
TINYINT | java.lang.Byte | ByteWritable |
SMALLINT | java.lang.Short | ShortWritable |
INT | java.lang.Integer | IntWritable |
BIGINT | java.lang.Long | LongWritable |
FLOAT | java.lang.Float | FloatWritable |
DOUBLE | java.lang.Double | DoubleWritable |
DECIMAL | java.math.BigDecimal | BigDecimalWritable |
BOOLEAN | java.lang.Boolean | BooleanWritable |
STRING | java.lang.String | Text |
VARCHAR | com.aliyun.odps.data.Varchar | VarcharWritable |
BINARY | com.aliyun.odps.data.Binary | BytesWritable |
DATE | java.sql.Date | DateWritable |
DATETIME | java.util.Date | DatetimeWritable |
TIMESTAMP | java.sql.Timestamp | TimestampWritable |
INTERVAL_YEAR_MONTH | 非該当 | IntervalYearMonthWritable |
INTERVAL_DAY_TIME | 非該当 | IntervalDayTimeWritable |
ARRAY | java.util.List | 非該当 |
MAP | java.util.Map | 非該当 |
STRUCT | com.aliyun.odps.data.Struct | 非該当 |
MaxComputeプロジェクトがMaxCompute V2.0データ型エディションを使用している場合にのみ、UDAFの入力パラメーターまたは戻り値をJava書き込み可能データ型にすることができます。
注意事項
Java UDAFを開発した後、MaxCompute SQLを使用してこのUDAFを呼び出すことができます。 Java UDAFの開発方法の詳細については、「概要」の「開発プロセス」セクションをご参照ください。 次のいずれかのメソッドを使用して、Java UDAFを呼び出すことができます。
MaxComputeプロジェクトでUDFを使用する: この方法は、組み込み関数を使用する方法と似ています。
プロジェクト間でUDFを使用する: プロジェクトaでプロジェクトBのUDFを使用します。次のステートメントは、例を示します。
select B:udf_in_other_project(arg0, arg1) as res from table_t;プロジェクト間共有の詳細については、「パッケージに基づくプロジェクト間リソースアクセス」をご参照ください。
MaxCompute Studioを使用してJava UDAFを開発および呼び出す方法の詳細については、このトピックの「例」をご参照ください。
例
この例では、MaxCompute Studioを使用してAggrAvgという名前のUDAFを開発する方法について説明します。 AggrAvg UDAFを用いて平均値を計算する。 次の図は、AggrAvg UDAFのロジックを示しています。

入力データをスライスします。 MaxComputeは、MapReduce処理ワークフローに基づいて、入力データを指定されたサイズにスライスします。 各スライスのサイズは、作業者が指定された時間内に計算を終了するのに適したサイズである。
odps.stage.mapper.split.sizeパラメーターを設定して、スライスのサイズを調整できます。 データスライシングのロジックの詳細については、「概要」の「プロセス」セクションをご参照ください。各ワーカーは、スライス内のデータレコード数と総データ量をカウントします。 各スライスのデータレコード数と総データ量を中間結果として使用できます。
各ワーカーは、ステップ2で生成された各スライスの情報を収集します。
最終出力では、
r.sum/r.countはすべての入力データの平均値です。
MaxCompute Studioを使用してJava UDAFを開発して呼び出すには、次の手順を実行します。
準備をします。
MaxCompute Studioを使用してUDFを開発およびデバッグする前に、MaxCompute Studioをインストールし、MaxCompute StudioをMaxComputeプロジェクトに接続する必要があります。 MaxCompute Studioをインストールし、MaxCompute StudioをMaxComputeプロジェクトに接続する方法の詳細については、以下のトピックを参照してください。
UDAFコードを書きます。
[Project] タブの左側のナビゲーションウィンドウで、 を選択し、[java] を右クリックして、 を選択します。
[新しいMaxCompute javaクラスの作成] ダイアログボックスで、[UDAF] をクリックし、[名前] フィールドにクラス名を入力して、enterキーを押します。 この例では、Javaクラスの名前はAggrAvgです。
Name: MaxCompute Javaクラスの名前。 パッケージを作成していない場合は、このパラメーターをpackagename.classname形式で指定します。 システムは自動的にパッケージを生成します。
コードエディターでコードを記述します。
サンプルUDAFコード: import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import com.aliyun.odps.io.DoubleWritable; import com.aliyun.odps.io.Writable; import com.aliyun.odps.udf.Aggregator; import com.aliyun.odps.udf.UDFException; import com.aliyun.odps.udf.annotation.Resolve; @Resolve("double->double") public class AggrAvg extends Aggregator { private static class AvgBuffer implements Writable { private double sum = 0; private long count = 0; @Override public void write(DataOutput out) throws IOException { out.writeDouble(sum); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { sum = in.readDouble(); count = in.readLong(); } } private DoubleWritable ret = new DoubleWritable(); @Override public Writable newBuffer() { return new AvgBuffer(); } @Override public void iterate(Writable buffer, Writable[] args) throws UDFException { DoubleWritable arg = (DoubleWritable) args[0]; AvgBuffer buf = (AvgBuffer) buffer; if (arg != null) { buf.count += 1; buf.sum += arg.get(); } } @Override public Writable terminate(Writable buffer) throws UDFException { AvgBuffer buf = (AvgBuffer) buffer; if (buf.count == 0) { ret.set(0); } else { ret.set(buf.sum / buf.count); } return ret; } @Override public void merge(Writable buffer, Writable partial) throws UDFException { AvgBuffer buf = (AvgBuffer) buffer; AvgBuffer p = (AvgBuffer) partial; buf.sum += p.sum; buf.count += p.count; } }
オンプレミスマシンでUDAFをデバッグし、コードが期待どおりに実行されることを確認します。
デバッグ操作の詳細については、「UDFの開発」の「ローカル実行を実行してUDFをデバッグする」をご参照ください。
説明前の図のパラメーター設定は参考用です。
UDAFコードをJARファイルにパッケージ化し、そのファイルをMaxComputeプロジェクトにアップロードしてから、UDAFを作成します。 この例では、
user_udafという名前のUDAFが作成されます。UDAFのパッケージ化方法の詳細については、「Javaプログラムのパッケージ化、パッケージのアップロード、MaxCompute UDFの作成」の「手順」をご参照ください。

MaxCompute Studioの左側のナビゲーションウィンドウで、[Project Explorer] をクリックします。 MaxComputeプロジェクトを右クリックしてMaxComputeクライアントを起動し、SQL文を実行して作成したUDAFを呼び出します。
次の例は、クエリするmy_tableテーブルのデータ構造を示しています。
+------------+------------+ | col0 | col1 | +------------+------------+ | 1.2 | 2.0 | | 1.6 | 2.1 | +------------+------------+次のSQL文を実行してUDAFを呼び出します。
select user_udaf(col0) as c0 from my_table;次の応答が返されます。
+----+ | c0 | +----+ | 1.4| +----+