本ページでは、開発環境を構築し、ビジネスコードを記述し、Realtime Compute のユーザー定義集計関数 (UDAF) を公開する方法について説明します。
注 現在、Realtime Compute は共有モードの UDX をサポートしていません。 UDX は、排他モードでのみサポートされています。
定義
UDAF は、複数のレコードを単一の値に集計します。
UDAF 抽象クラスのメソッド
次のコードは、AggregateFunction 抽象クラスのいくつかのコアメソッドを示しています。
注 UDAF は Java または Scala で実装することができますが、Scala データ型が不必要なパフォーマンスオーバーヘッドをもたらすことがあるので、Java
を使用することを推奨します。
/*
* @param <T> The type of the UDAF output result.
* @param <ACC> The accumulator type of a UDAF. An accumulator stores the intermediate computing results of a UDAF.
* You can design an accumulator for each UDAF as required.
*/
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
/*
* Initialize the accumulator of AggregateFunction.
* The system calls the following method once before performing aggregate computing for the first time:
*/
public ACC createAccumulator();
/*
* The system calls the following method after completing each aggregate computing:
*/
public T getValue(ACC accumulator);
}
createAccumulator および getValue メソッドのインプットおよびアウトプットは確実であるため、2 つのメソッドは AggregateFunction 抽象クラスで定義することができます。 前述の 2 つのメソッドに加えて、最も基本的な UDAF には、accumulate メソッドが必要です。
/*
* You need to implement an accumulate method to describe how to compute input data and update an accumulator with the computing result.
* The first parameter of the accumulate method must be an accumulator of the ACC type defined in AggregateFunction.
* During the system operation, the underlying runtime code sends the accumulator in the historical state and user-specified input data (of any amount and type)
to the accumulate method for computing.
*/
public void accumulate(ACC accumulator, ...[ User-specified parameters]...); ;
createAccumulator、getValue、および accumulate メソッドを一緒に使用して、最も基本的な UDAF を設計することができます。 ただし、Realtime Compute は、一部の特別なシナリオで、retract メソッドと merge メソッドも必要とします。
/*
* In Realtime Compute, computing is an early firing for an infinite stream most of the time.
* You may need to modify the computing result, which is called a retraction.
* The SQL optimizer automatically determines the conditions in which data to be retracted is generated and the operations during which data marked with retract tags needs to be processed.
* You must implement a retract method to define how the retracted data is processed. 取り消しメソッドは、accumulate メソッドの逆の操作です。
* For example, in a count UDAF, the computing result increments by 1 once the accumulate method processes a data record, and decrements by 1 once the retract method processes a data record.
* Similar to the accumulate method, the first parameter of the retract method must be an accumulator of the ACC type defined in AggregateFunction.
* During the system operation, the underlying runtime code sends the accumulator in the historical state and user-specified input data (of any amount and type)
to the retract method for computing.
*/
public void retract(ACC accumulator, ...[ User-specified parameters]...);
/*
* The merge method is widely used in batch computing, as well as in some Realtime Compute scenarios, such as a session window.
* Because Realtime Compute possesses an out-of-order feature, late-arriving data may be located in two separate sessions, which results in the merge of the two sessions as one.
* In this case, a merge method is required to merge multiple accumulators into one accumulator.
* The first parameter of the merge method must be an accumulator of the ACC type defined in AggregateFunction.
* This accumulator stores state data after the merge method is called.
* The second parameter of the merge method is an ACC-type accumulator traverse iterator, which may include one or more accumulators.
*/
public void merge(ACC accumulator, Iterable<ACC> its);
開発環境の構築
詳細については、「開発環境の構築」をご参照ください。
ビジネスロジックコードの記述
次のサンプルコードは Java で記述されています。
import org.apache.flink.table.functions.AggregateFunction;
public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
// Define the data structure of the accumulator that stores state data of the count UDAF.
public static class CountAccum {
public long total;
}
// Initialize the accumulator of the count UDAF.
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
// getValue is a method for computing the result of the count UDAF based on the accumulator that stores state data.
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}
// accumulate is a method for updating the accumulator used by the count UDAF to store state data based on input data.
public void accumulate(CountAccum accumulator, Object iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}
注 AggregateFunction のサブクラスは、open メソッドと close メソッドの両方をオプションのメソッドとしてサポートします。 詳細については、UDF
または UDTF の使用をご参照ください。
公開
必要なクラスを探し、SQL ステートメントを記述して、 [公開] をクリックします。 管理ページで、[開始] をクリックして関数を実行します。
-- UDAF count
CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf';
create table sls_stream(
a int,
b bigint,
c varchar
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessKey',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='stream-test2',
consumerGroup='consumerGroupTest3'
);
create table rds_output(
len1 bigint,
len2 bigint
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourTableName',
userName='yourUserName',
password='yourDatabasePassword'
);
insert into rds_output
select
count(a),
countUdaf(a)
from sls_stream