Realtime Compute for Apache Flink は、Flink SQL ジョブにおける Java ユーザー定義関数 (UDF) をサポートします。本トピックでは、Java UDF の種類、パラメーターの渡し方、および重要な使用上の注意事項について説明します。
注意事項
UDF 開発時に JAR パッケージの依存関係の競合を回避するため、以下の点にご注意ください。
SQL 開発ページで選択した Flink のバージョンと、Pom 依存関係で指定した Flink のバージョンが一致していることを確認してください。
Flink 関連の依存関係については、
<scope>provided</scope>を追加して、スコープを `provided` に設定してください。その他のサードパーティ依存関係は、Shade メソッドを使用してパッケージ化してください。詳細については、「Apache Maven Shade Plugin」をご参照ください。
Flink の依存関係競合に関する詳細については、「Flink の依存関係競合を解決する方法」をご参照ください。
SQL ジョブ内で UDF を頻繁に呼び出すことによるタイムアウトを防止するには、UDF の JAR パッケージを依存ファイルとしてアップロードし、ジョブ内で
CREATE TEMPORARY FUNCTION構文を使用して関数を宣言します。例:CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';
UDF の分類
Flink では、以下の 3 種類の UDF がサポートされています。
分類 | 説明 |
ユーザー定義スカラー関数 (UDSF) | UDSF は、ゼロ個、1 個、または複数のスカラー値を新しいスカラー値にマップします。入力と出力の間には 1 対 1 の関係があります。つまり、1 行のデータを読み取り、1 つの出力値を書き込みます。詳細については、「ユーザー定義スカラー関数 (UDSF)」をご参照ください。 |
ユーザー定義集計関数 (UDAF) | UDAF は、複数のレコードを 1 つのレコードに集計します。入力と出力の間には多対一の関係があります。つまり、複数の入力レコードを 1 つの出力値に集計します。詳細については、「ユーザー定義集計関数 (UDAF)」をご参照ください。 |
ユーザー定義テーブル値関数 (UDTF) | UDTF は、ゼロ個、1 個、または複数のスカラー値を入力パラメーターとして受け取ります。パラメーターの長さは可変です。UDSF と似ていますが、単一の値ではなく任意の行数を出力できます。返される各行は 1 つ以上の列で構成され、1 回の関数呼び出しで複数の行または列を出力できます。詳細については、「ユーザー定義テーブル値関数 (UDTF)」をご参照ください。 |
UDF の登録
グローバル UDF の登録方法については、「グローバル UDF」をご参照ください。
ジョブレベルの UDF の登録方法については、「ジョブレベルの UDF」をご参照ください。
UDF へのパラメーターの渡し方
Flink 開発コンソールで UDF のパラメーターを設定し、UDF コード内で使用できます。これにより、コンソール上で UDF のパラメーター値を直接素早く変更できます。
UDF には、オプションの `open(FunctionContext context)` メソッドが提供されます。`FunctionContext` オブジェクトを使用すると、カスタム設定項目をパラメーターとして渡すことができます。手順は以下のとおりです。
Flink 開発コンソールの ページの [デプロイメントの詳細] タブで、[実行パラメーター設定] の [その他の設定] セクションに pipeline.global-job-parameters 設定項目を追加します。
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'`FunctionContext#getJobParameter` は、`pipeline.global-job-parameters` 設定項目の値のみ取得可能です。したがって、UDF で使用するすべての設定項目を `pipeline.global-job-parameters` 内に記述する必要があります。以下に、この設定項目の設定手順を示します。
ステップ
操作
手順
例
ステップ 1
キーと値のペアを定義します。
キーと値をコロン (:) で区切り、各キーと値のペアをシングルクォート (') で囲みます。
説明キーまたは値にコロン (:) が含まれる場合は、ダブルクォート (") で囲んでください。
キーまたは値にコロン (:) またはダブルクォート (") が含まれる場合は、2 つの連続するダブルクォート ("") でエスケープしてください。
キー = k1、
値 = {hi,hello}の場合、ペアは'k1:{hi,hello}'と定義します。キー = k2、
値 = str:ing,str:ingの場合、ペアは'k2:"str:ing,str:ing"'と定義します。キー = k3、
値 = str"ing,str:ingの場合、ペアは'k3:"str""ing,str:ing"'と定義します。
ステップ 2
最終的な pipeline.global-job-parameters 値を YAML ファイル形式でフォーマットします。
各キーと値のペアを改行し、カンマ (,) で区切ります。
説明YAML ファイル内の複数行文字列は、縦棒 (|) で始まります。
YAML ファイル内の複数行文字列の各行は、同じインデント幅にする必要があります。
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'UDF コード内で、`FunctionContext#getJobParameter` を使用して各項目の値を取得します。以下のコードを例として示します。
以下のサンプルコードを示します。
context.getJobParameter("k1", null); // 文字列 {hi,hello} を返します。 context.getJobParameter("k2", null); // 文字列 str:ing,str:ing を返します。 context.getJobParameter("k3", null); // 文字列 str"ing,str:ing を返します。 context.getJobParameter("pipeline.global-job-parameters", null); // null を返します。pipeline.global-job-parameters で定義された内容のみ取得可能であり、その他のジョブ設定項目は取得できません。
名前付きパラメーター
UDF の実装に名前付きパラメーターを使用できるのは、Ververica Runtime (VVR) 8.0.7 以降のみです。
SQL で関数を呼び出す際には、すべてのパラメーターを正しい順序で指定する必要があります。関数のパラメーター数が多い場合、パラメーターの数や順序を誤りやすくなります。また、省略可能なパラメーターも省略できません。名前付きパラメーターを使用すると、必要なパラメーターのみを指定できるため、エラーの発生確率を低減でき、より便利です。以下のユーザー定義スカラー関数 (ScalarFunction) の例では、名前付きパラメーターの使用方法を示します。
// ユーザー定義スカラー関数を実装します。最後の 2 つの入力パラメーターは省略可能 (isOptional = true) です。
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}この UDF を SQL で使用する場合、最初の必須パラメーターのみを指定したり、省略可能なパラメーターを任意に選択して指定したりできます。以下のコードを例として示します。
CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
CREATE temporary TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
CREATE temporary TABLE sink (
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT a,
-- 最初の必須パラメーターのみを指定
MyNamedUdf(f1 => c) arg1_res,
-- 最初の必須パラメーターと 2 番目の省略可能なパラメーターを指定
MyNamedUdf(f1 => c, f2 => a) arg2_res,
-- 最初の必須パラメーターと 3 番目の省略可能なパラメーターを指定
MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;参照
Java UDF の開発および使用例については、「ユーザー定義集計関数 (UDAF)」、「ユーザー定義スカラー関数 (UDSF)」、および「ユーザー定義テーブル値関数 (UDTF)」をご参照ください。
Python UDF のデバッグおよびチューニング方法については、「概要」をご参照ください。
Python UDF の開発および使用例については、「ユーザー定義集計関数 (UDAF)」、「ユーザー定義スカラー関数 (UDSF)」、および「ユーザー定義テーブル値関数 (UDTF)」をご参照ください。