Realtime Compute for Apache Flink では、Flink SQLデプロイメントでJavaユーザー定義関数(UDF)を使用できます。このトピックでは、Java UDFのタイプについて説明します。また、UDFにパラメーターを渡す方法と、UDFを開発する際に考慮すべき注意事項についても説明します。
注意事項
UDFは、パッケージ化されたJARファイル内で依存関係の競合を引き起こす可能性があります。UDFを開発する際には、以下の点に注意してください。
SQLデプロイメント用に構成したRealtime Compute for Apache Flink のバージョンが、pom.xmlファイルで指定したApache Flink のバージョンに対応していることを確認します。
Apache Flink に関連する依存関係には、
<scope>provided</scope>を指定します。他のサードパーティの依存関係をパッケージ化するには、Shadeプラグインを使用します。詳細については、「Apache Maven Shade plug-in」をご参照ください。
依存関係の競合を解決する方法については、「コンソール操作」をご参照ください。
SQLデプロイメントでUDFを頻繁に呼び出すと、タイムアウトが発生する可能性があります。UDFのJARファイルをデプロイメントの追加の依存関係としてアップロードし、
CREATE TEMPORARY FUNCTIONステートメントを使用してUDFを宣言することをお勧めします。例:CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';。
UDFタイプ
次の表に、Realtime Compute for Apache Flink でサポートされているUDFタイプを示します。
タイプ | 説明 |
ユーザー定義スカラ関数(UDSF) | UDSFは、ゼロ個以上のスカラ値を新しいスカラ値にマッピングします。入力と出力の間には、1対1のマッピングが確立されます。UDSFを呼び出して、データレコードに基づいて新しい値を生成できます。詳細については、「UDSF」をご参照ください。 |
ユーザー定義集計関数(UDAF) | UDAFは、複数のデータレコードを1つのデータレコードに集計します。入力と出力の間には、多対1のマッピングが確立されます。詳細については、「UDAF」をご参照ください。 |
ユーザー定義テーブル関数(UDTF) | UDTFは、ゼロ個以上のスカラ値を入力パラメーターとして受け入れます。パラメーターは可変長にすることができます。UDTFはUDSFに似ていますが、UDTFは単一の値ではなく任意の数の行を返すことができます。返される行には、1つ以上の列を含めることができます。UDTFを呼び出して、複数の行または列を生成できます。詳細については、「UDTF」をご参照ください。 |
UDF の登録
UDFをグローバルに登録する方法については、「カタログ UDF の登録」をご参照ください。
単一のデプロイメントに対してUDFを登録する方法については、「デプロイメントレベルの UDF の登録」をご参照ください。
UDFパラメーターの受け渡し
Realtime Compute for Apache Flink の開発コンソールでUDFのパラメーターを構成し、UDF内でパラメーター値を取得できます。
コンソールのパラメーター構成は、open(FunctionContext context)メソッドによってUDFに渡されます。このメソッドは実装は任意であり、対応するUDFの実行コンテキストに関する情報を含むFunctionContextオブジェクトを提供します。パラメーター構成を渡すには、次の手順を実行します。
Realtime Compute for Apache Flink の開発コンソールの ページで、使用するデプロイメントの名前をクリックします。[構成] タブで、[パラメーター] セクションの右上隅にある [編集] をクリックします。次に、[その他の構成] フィールドに pipeline.global-job-parameters パラメーターを指定します。サンプルコード:
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'FunctionContext#getJobParameterメソッドで取得できるのは、pipeline.global-job-parametersパラメーターで指定された構成のみです。UDFのすべてのパラメーター構成をpipeline.global-job-parametersパラメーターで指定してください。次の表に、pipeline.global-job-parametersパラメーターの構成方法を示します。
手順
説明
操作
例
手順 1
キーと値のペアを定義します。
キーと値のペアのキーと値をコロン(:)で区切り、各キーと値のペアを単一引用符(')で囲みます。
説明キーまたは値にコロン(:)が含まれている場合は、キーまたは値を二重引用符(")で囲みます。
キーまたは値にコロン(:)と二重引用符(")が含まれている場合は、二重引用符を2つの二重引用符("")を使用してエスケープします。
キーと値のペア
key = k1,value = {hi,hello}は、'k1:{hi,hello}'として定義されます。キーと値のペア
key = k2,value = str:ing,str:ingは、'k2:"str:ing,str:ing"'として定義されます。キーと値のペア
key = k3,value = str"ing,str:ingは、'k3:"str""ing,str:ing"'として定義されます。
手順 2
定義されたキーと値のペアをYAML形式で整理します。
異なるキーと値のペアを異なる行に配置し、行をコンマ(,)で区切ります。
説明複数行の文字列の先頭には、縦棒(|)を追加する必要があります。
文字列の各行は、同じインデントを持つ必要があります。
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'UDFの実装コードで、FunctionContext#getJobParameterメソッドを呼び出して、キーと値のペアの内容を取得します。
サンプルコード:
context.getJobParameter("k1", null); // 文字列 {hi,hello} が返されます。 context.getJobParameter("k2", null); // 文字列 str:ing,str:ing が返されます。 context.getJobParameter("k3", null); // 文字列 str"ing,str:ing が返されます。 context.getJobParameter("pipeline.global-job-parameters", null); // pipeline.global-job-parameters パラメーターで定義されたキーと値のペアのみを取得できるため、null が返されます。
名前付きパラメーター
Ververica Runtime(VVR) 8.0.7 以降を使用する Realtime Compute for Apache Flink でのみ、名前付きパラメーター を使用して UDF を実装できます。
SQLでは、パラメーター値は、パラメーターが定義された順序で関数に渡す必要があり、オプションのパラメーターを省略することはできません。パラメーターの数が多い場合、正しいパラメーター順序を確保することは困難な場合があります。正しいパラメーター順序を確保し、使いやすさを向上させるために、名前付きパラメーターを使用して、使用するパラメーターのみを指定できます。次の例は、名前付きパラメーターの使用方法を示しています。この例では、ScalarFunctionという名前のUDSFを使用しています。
// 2 番目と 3 番目のパラメーターがオプション (isOptional = true) の UDSF を実装します。
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}SQLで関数を呼び出すときは、最初の必須パラメーターのみを指定できます。2 番目と 3 番目のオプションパラメーターも指定できます。サンプルコード:
CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
CREATE temporary TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
CREATE temporary TABLE sink (
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT a,
-- 最初の必須パラメーターのみを指定します
MyNamedUdf(f1 => c) arg1_res,
-- 最初の必須パラメーターと 2 番目のオプションパラメーターを指定します。
MyNamedUdf(f1 => c, f2 => a) arg2_res,
-- 最初の必須パラメーターと 3 番目のオプションパラメーターを指定します。
MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;