本ページでは、開発環境を構築し、ビジネスコードを記述し、Realtime Compute のユーザー定義集計関数 (UDAF) を公開する方法について説明します。

現在、Realtime Compute は共有モードの UDX をサポートしていません。 UDX は、排他モードでのみサポートされています。

定義

UDAF は、複数のレコードを単一の値に集計します。

UDAF 抽象クラスのメソッド

次のコードは、AggregateFunction 抽象クラスのいくつかのコアメソッドを示しています。

UDAF は Java または Scala で実装することができますが、Scala データ型が不必要なパフォーマンスオーバーヘッドをもたらすことがあるので、Java を使用することを推奨します。
/*
* @param <T> The type of the UDAF output result.
* @param <ACC> The accumulator type of a UDAF. An accumulator stores the intermediate computing results of a UDAF.
* You can design an accumulator for each UDAF as required.
*/
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction { 
/*
* Initialize the accumulator of AggregateFunction.
* The system calls the following method once before performing aggregate computing for the first time:
*/
public ACC createAccumulator();
/*
* The system calls the following method after completing each aggregate computing:
*/
public T getValue(ACC accumulator);
}
			

createAccumulator および getValue メソッドのインプットおよびアウトプットは確実であるため、2 つのメソッドは AggregateFunction 抽象クラスで定義することができます。 前述の 2 つのメソッドに加えて、最も基本的な UDAF には、accumulate メソッドが必要です。

/*
* You need to implement an accumulate method to describe how to compute input data and update an accumulator with the computing result.
* The first parameter of the accumulate method must be an accumulator of the ACC type defined in AggregateFunction. 
* During the system operation, the underlying runtime code sends the accumulator in the historical state and user-specified input data (of any amount and type)
to the accumulate method for computing.
*/
public void accumulate(ACC accumulator, ...[ User-specified parameters]...); ;
			

createAccumulator、getValue、および accumulate メソッドを一緒に使用して、最も基本的な UDAF を設計することができます。 ただし、Realtime Compute は、一部の特別なシナリオで、retract メソッドと merge メソッドも必要とします。

/*
* In Realtime Compute, computing is an early firing for an infinite stream most of the time. 
* You may need to modify the computing result, which is called a retraction.
* The SQL optimizer automatically determines the conditions in which data to be retracted is generated and the operations during which data marked with retract tags needs to be processed. 
* You must implement a retract method to define how the retracted data is processed. 取り消しメソッドは、accumulate メソッドの逆の操作です。 
* For example, in a count UDAF, the computing result increments by 1 once the accumulate method processes a data record, and decrements by 1 once the retract method processes a data record. 
* Similar to the accumulate method, the first parameter of the retract method must be an accumulator of the ACC type defined in AggregateFunction. 
* During the system operation, the underlying runtime code sends the accumulator in the historical state and user-specified input data (of any amount and type)
to the retract method for computing.
*/
public void retract(ACC accumulator, ...[ User-specified parameters]...);

/*
* The merge method is widely used in batch computing, as well as in some Realtime Compute scenarios, such as a session window.
* Because Realtime Compute possesses an out-of-order feature, late-arriving data may be located in two separate sessions, which results in the merge of the two sessions as one.
* In this case, a merge method is required to merge multiple accumulators into one accumulator.
* The first parameter of the merge method must be an accumulator of the ACC type defined in AggregateFunction.
* This accumulator stores state data after the merge method is called. 
* The second parameter of the merge method is an ACC-type accumulator traverse iterator, which may include one or more accumulators.
*/
public void merge(ACC accumulator, Iterable<ACC> its);
			

開発環境の構築

詳細については、「開発環境の構築」をご参照ください。

ビジネスロジックコードの記述

次のサンプルコードは Java で記述されています。

import org.apache.flink.table.functions.AggregateFunction;

public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
    // Define the data structure of the accumulator that stores state data of the count UDAF.
    public static class CountAccum {
        public long total;
    }

    // Initialize the accumulator of the count UDAF.
    public CountAccum createAccumulator() {
        CountAccum acc = new CountAccum();
        acc.total = 0;
        return acc;
    }

    // getValue is a method for computing the result of the count UDAF based on the accumulator that stores state data.
    public Long getValue(CountAccum accumulator) {
        return accumulator.total;
    }

    // accumulate is a method for updating the accumulator used by the count UDAF to store state data based on input data.
    public void accumulate(CountAccum accumulator, Object iValue) {
        accumulator.total++;
    }

    public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
         for (CountAccum other : its) {
            accumulator.total += other.total;
         }
    }
}

			
AggregateFunction のサブクラスは、open メソッドと close メソッドの両方をオプションのメソッドとしてサポートします。 詳細については、UDF または UDTF の使用をご参照ください。

公開

必要なクラスを探し、SQL ステートメントを記述して、 [公開] をクリックします。 管理ページで、[開始] をクリックして関数を実行します。

-- UDAF count
CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf';
create table sls_stream(
a int,
b bigint,
c varchar
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessKey',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='stream-test2',
consumerGroup='consumerGroupTest3'
);

create table rds_output(
len1 bigint,
len2 bigint
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourTableName',
userName='yourUserName',
password='yourDatabasePassword'
);

insert into rds_output
select
count(a),
countUdaf(a)
from sls_stream