本ページでは、Realtime Compute の開発環境を構築し、ビジネスコードを記述してユーザー定義関数 (UDF) を公開する方法について説明します。
注 現在、Realtime Compute は共有モードの UDX をサポートしていません。 UDX は、排他モードでのみサポートされています。
定義
UDF は、0、1、または複数のスカラー値を新しいスカラー値にマップします。開発環境の構築
詳細については、「開発環境の構築」をご参照ください。
ビジネスロジックコードの記述
UDF は、ScalarFunction クラスに eval
メソッドを実装する必要があります。 open
および close
メソッドはオプションです。 次のサンプルコードは Java で記述されています。
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class StringLengthUdf extends ScalarFunction {
// The open method is optional.
// To write the open method, you must import org.apache.flink.table.functions.FunctionContext.
@Override
public void open(FunctionContext context) {
}
public long eval(String a) {
return a == null ? 0 : a.length();
}
public long eval(String b, String c) {
return eval(b) + eval(c);
}
// The close method is optional.
@Override
public void close() {
}
}
公開
必要なクラスを見つけ、SQL ステートメントを記述し、 [公開] をクリックします。 管理ページで、 [開始] をクリックして関数を実行します。
-- udf str.length()
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
create table sls_stream(
a int,
b int,
c varchar
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='yourLogStoreName',
consumerGroup='consumerGroupTest1'
);
create table rds_output(
id int,
len bigint,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);
insert into rds_output
select
a,
stringLengthUdf(c),
c as content
from sls_stream
よくある質問
Q: ユーザー定義の乱数ジェネレーターが実行時に常に同じ値を生成するのはなぜですか。
A: UDF にパラメーターがなく、関数を非決定的として宣言しない場合、コンパイル中に関数が定数値になるように最適化されることがあります。 これを回避するには、isDeterministic()
を上書きして UDF で false
を返すようにします。