このトピックでは、Realtime Compute の開発環境を構築し、ビジネスコードを記述し、ユーザー定義テーブル関数 (UDTF) を公開する方法について説明します。
定義
UDF と同様に、UDTF は 0、1、または複数のスカラー値を入力パラメーターとして使用します。 UDF とは異なり、UDTF は単一の値ではなく、任意の数の行を返します。 返される行は、1 つ以上の列で構成することができます。
開発環境の構築
詳細については、「開発環境の構築」をご参照ください。
ビジネスロジックコードの記述
UDTF は、TableFunction クラスに eval メソッドを実装する必要があります。 open および close メソッドはオプションです。 次のサンプルコードは Java で記述されています。
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
public class SplitUdtf extends TableFunction<String> {
// The open method is optional. To write the open method, you must import org.apache.flink.table.functions.FunctionContext.
@Override
public void open(FunctionContext context) {
// ...
}
public void eval(String str) {
String[] split = str.split("\\|");
for (String w : words) {
collect(s);
}
}
// The close method is optional.
@Override
public void close() {
// ...
}
}
複数の行を返す
UDTF は、collect()
を複数回呼び出すことにより、出力結果を単一行から複数行に変換することができます。
複数の列を返す
UDTF は、出力結果を単一の列から複数の列に変換することもできます。 UDTF が複数の列を返すようにする場合は、戻り値をタプルまたは行として宣言します。 Realtime Compute は、それぞれ 1 から 25 のフィールドを定義する Tuple1 から Tuple25 をサポートします。 次の例は、Tuple3 を使用して 3 つのフィールドを返す UDTF です。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableFunction;
// If the return value is declared as Tuple, you must explicitly declare the generic types of Tuple, such as String, Long, and Integer in this example.
public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> {
public void eval(String str) {
String[] split = str.split(",");
// The code is for demonstration only. In practice, more verification logic needs to be added.
String first = split[0];
long second = Long.parseLong(split[1]);
int third = Integer.parseInt(split[2]);
Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third);
collect(tuple3);
}
}
次の例は、Row を使用して 3 つのフィールドを返す UDTF です。
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
public class ParseUdtf extends TableFunction<Row> {
public void eval(String str) {
String[] split = str.split(",");
String first = split[0];
long second = Long.parseLong(split[1]);
int third = Integer.parseInt(split[2]);
Row row = new Row(3);
row.setField(0, first);
row.setField(1, second);
row.setField(2, third);
collect(row);
}
@Override
// If the return value is declared as Row, you must overload the getResultType method to explicitly inform the system of the types of the fields to be returned.
public DataType getResultType(Object[] arguments, Class[] argTypes) {
return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT);
}
}
戻り値が Row として宣言されている場合は、フィールド値は null になる可能性があります。 ただし、getResultType
メソッドをオーバーロードする必要があります。
SQL 構文
UDTF は CROSS JOIN および LEFT JOIN をサポートします。 UDTF を使用する場合は、LATERAL
および TABLE
のキーワードを追加する必要があります。 例として、上記の ParseUdtf
を取り上げます。 まず、関数名を登録する必要があります。
CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';
CROSS JOIN: 左の表の各行は、UDTF によって生成されたデータの各行と相関しています。 UDTF が行のデータを生成しない場合は、行はエクスポートされません。
select S.id, S.content, T.a, T.b, T.c
from input_stream as S,
LATERAL TABLE(parseUdtf(content)) as T(a, b, c);
ON TRUE
である必要があります。
select S.id, S.content, T.a, T.b, T.c
from input_stream as S
LEFT JOIN LATERAL TABLE(parseUdtf(content)) as T(a, b, c) ON TRUE;
公開
必要なクラスを見つけ、SQL ステートメントを記述して、 [公開] をクリックします。 [管理] ページで、[開始] をクリックして関数を実行します。
-- UDTF str.split("\\|");
CREATE FUNCTION splitUdtf AS 'com.hjc.test.blink.sql.udx.SplitUdtf';
create table sls_stream(
a int,
b bigint,
c varchar
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='stream-test2',
consumerGroup='consumerGroupTest2'
);
-- Pass the c field to splitUdtf to generate table T(s) that consists of one column and multiple rows after splitting. In the table name, s indicates the field name.
create view v1 as
select a,b,c,s
from sls_stream,
LATERAL TABLE(splitUdtf(c)) as T(s);
create table rds_output(
id int,
len bigint,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);
insert into rds_output
select
a,b,s
from v1