このトピックでは、Realtime Compute の開発環境を構築し、ビジネスコードを記述し、ユーザー定義テーブル関数 (UDTF) を公開する方法について説明します。

現在、Realtime Compute は共有モードの UDX をサポートしていません。 UDX は、排他モードでのみサポートされています。

定義

UDF と同様に、UDTF は 0、1、または複数のスカラー値を入力パラメーターとして使用します。 UDF とは異なり、UDTF は単一の値ではなく、任意の数の行を返します。 返される行は、1 つ以上の列で構成することができます。

開発環境の構築

詳細については、「開発環境の構築」をご参照ください。

ビジネスロジックコードの記述

UDTF は、TableFunction クラスに eval メソッドを実装する必要があります。 open および close メソッドはオプションです。 次のサンプルコードは Java で記述されています。

package com.hjc.test.blink.sql.udx;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;

public class SplitUdtf extends TableFunction<String> {

    // The open method is optional. To write the open method, you must import org.apache.flink.table.functions.FunctionContext.
    @Override
    public void open(FunctionContext context) {
        // ...
        }

    public void eval(String str) {
        String[] split = str.split("\\|");
        for (String w : words) {
            collect(s);
        }
    }

    // The close method is optional.
    @Override
    public void close() {
        // ...
        }

}

複数の行を返す

UDTF は、collect() を複数回呼び出すことにより、出力結果を単一行から複数行に変換することができます。

複数の列を返す

UDTF は、出力結果を単一の列から複数の列に変換することもできます。 UDTF が複数の列を返すようにする場合は、戻り値をタプルまたは行として宣言します。 Realtime Compute は、それぞれ 1 から 25 のフィールドを定義する Tuple1 から Tuple25 をサポートします。 次の例は、Tuple3 を使用して 3 つのフィールドを返す UDTF です。

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableFunction;

// If the return value is declared as Tuple, you must explicitly declare the generic types of Tuple, such as String, Long, and Integer in this example.
public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> {

public void eval(String str) {
String[] split = str.split(",");
// The code is for demonstration only. In practice, more verification logic needs to be added.
String first = split[0];
long second = Long.parseLong(split[1]);
int third = Integer.parseInt(split[2]);
Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third); 
collect(tuple3);
}
}		
戻り値がタプルとして宣言されている場合は、フィールド値を null にすることはできず、最大 25 のフィールドのみが許可されます。

次の例は、Row を使用して 3 つのフィールドを返す UDTF です。

import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

public class ParseUdtf extends TableFunction<Row> {

public void eval(String str) {
String[] split = str.split(",");
String first = split[0];
long second = Long.parseLong(split[1]);
int third = Integer.parseInt(split[2]);
Row row = new Row(3);
row.setField(0, first);
row.setField(1, second);
row.setField(2, third);
collect(row);
}

@Override
// If the return value is declared as Row, you must overload the getResultType method to explicitly inform the system of the types of the fields to be returned.
public DataType getResultType(Object[] arguments, Class[] argTypes) {
return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT);
}
}		

戻り値が Row として宣言されている場合は、フィールド値は null になる可能性があります。 ただし、getResultType メソッドをオーバーロードする必要があります。

SQL 構文

UDTF は CROSS JOIN および LEFT JOIN をサポートします。 UDTF を使用する場合は、LATERAL および TABLE のキーワードを追加する必要があります。 例として、上記の ParseUdtf を取り上げます。 まず、関数名を登録する必要があります。

CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';
			

CROSS JOIN: 左の表の各行は、UDTF によって生成されたデータの各行と相関しています。 UDTF が行のデータを生成しない場合は、行はエクスポートされません。

select S.id, S.content, T.a, T.b, T.c
from input_stream as S,
LATERAL TABLE(parseUdtf(content)) as T(a, b, c);
			
LEFT JOIN: 左のテーブルの各行は、UDTF によって生成されたデータの各行と相関しています。 UDTF が行のデータを生成しない場合は、行の UDTF フィールドには null が入力されます。
UDTF を使用する LEFT JOIN ステートメントは、末尾が ON TRUE である必要があります。
select S.id, S.content, T.a, T.b, T.c
from input_stream as S
LEFT JOIN LATERAL TABLE(parseUdtf(content)) as T(a, b, c) ON TRUE;

公開

必要なクラスを見つけ、SQL ステートメントを記述して、 [公開] をクリックします。 [管理] ページで、[開始] をクリックして関数を実行します。

-- UDTF str.split("\\|");
CREATE FUNCTION splitUdtf AS 'com.hjc.test.blink.sql.udx.SplitUdtf';

create table sls_stream(
a int,
b bigint,
c varchar
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='stream-test2',
consumerGroup='consumerGroupTest2'
);

-- Pass the c field to splitUdtf to generate table T(s) that consists of one column and multiple rows after splitting. In the table name, s indicates the field name.
create view v1 as
select a,b,c,s
from sls_stream,
LATERAL TABLE(splitUdtf(c)) as T(s);

create table rds_output(
id int,
len bigint,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);

insert into rds_output
select
a,b,s
from v1