すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Java

最終更新日:Apr 16, 2025

Realtime Compute for Apache Flink では、Flink SQLデプロイメントでJavaユーザー定義関数(UDF)を使用できます。このトピックでは、Java UDFのタイプについて説明します。また、UDFにパラメーターを渡す方法と、UDFを開発する際に考慮すべき注意事項についても説明します。

注意事項

  • UDFは、パッケージ化されたJARファイル内で依存関係の競合を引き起こす可能性があります。UDFを開発する際には、以下の点に注意してください。

    • SQLデプロイメント用に構成したRealtime Compute for Apache Flink のバージョンが、pom.xmlファイルで指定したApache Flink のバージョンに対応していることを確認します。

    • Apache Flink に関連する依存関係には、<scope>provided</scope> を指定します。

    • 他のサードパーティの依存関係をパッケージ化するには、Shadeプラグインを使用します。詳細については、「Apache Maven Shade plug-in」をご参照ください。

    依存関係の競合を解決する方法については、「コンソール操作」をご参照ください。

  • SQLデプロイメントでUDFを頻繁に呼び出すと、タイムアウトが発生する可能性があります。UDFのJARファイルをデプロイメントの追加の依存関係としてアップロードし、CREATE TEMPORARY FUNCTION ステートメントを使用してUDFを宣言することをお勧めします。例:CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';

UDFタイプ

次の表に、Realtime Compute for Apache Flink でサポートされているUDFタイプを示します。

タイプ

説明

ユーザー定義スカラ関数(UDSF)

UDSFは、ゼロ個以上のスカラ値を新しいスカラ値にマッピングします。入力と出力の間には、1対1のマッピングが確立されます。UDSFを呼び出して、データレコードに基づいて新しい値を生成できます。詳細については、「UDSF」をご参照ください。

ユーザー定義集計関数(UDAF)

UDAFは、複数のデータレコードを1つのデータレコードに集計します。入力と出力の間には、多対1のマッピングが確立されます。詳細については、「UDAF」をご参照ください。

ユーザー定義テーブル関数(UDTF)

UDTFは、ゼロ個以上のスカラ値を入力パラメーターとして受け入れます。パラメーターは可変長にすることができます。UDTFはUDSFに似ていますが、UDTFは単一の値ではなく任意の数の行を返すことができます。返される行には、1つ以上の列を含めることができます。UDTFを呼び出して、複数の行または列を生成できます。詳細については、「UDTF」をご参照ください。

UDF の登録

UDFパラメーターの受け渡し

Realtime Compute for Apache Flink の開発コンソールでUDFのパラメーターを構成し、UDF内でパラメーター値を取得できます。

コンソールのパラメーター構成は、open(FunctionContext context)メソッドによってUDFに渡されます。このメソッドは実装は任意であり、対応するUDFの実行コンテキストに関する情報を含むFunctionContextオブジェクトを提供します。パラメーター構成を渡すには、次の手順を実行します。

  1. Realtime Compute for Apache Flink の開発コンソールの [運用と保守] > [デプロイメント] ページで、使用するデプロイメントの名前をクリックします。[構成] タブで、[パラメーター] セクションの右上隅にある [編集] をクリックします。次に、[その他の構成] フィールドに pipeline.global-job-parameters パラメーターを指定します。サンプルコード:

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'

    FunctionContext#getJobParameterメソッドで取得できるのは、pipeline.global-job-parametersパラメーターで指定された構成のみです。UDFのすべてのパラメーター構成をpipeline.global-job-parametersパラメーターで指定してください。次の表に、pipeline.global-job-parametersパラメーターの構成方法を示します。

    手順

    説明

    操作

    手順 1

    キーと値のペアを定義します。

    キーと値のペアのキーと値をコロン(:)で区切り、各キーと値のペアを単一引用符(')で囲みます。

    説明
    • キーまたは値にコロン(:)が含まれている場合は、キーまたは値を二重引用符(")で囲みます。

    • キーまたは値にコロン(:)と二重引用符(")が含まれている場合は、二重引用符を2つの二重引用符("")を使用してエスケープします。

    • キーと値のペア key = k1,value = {hi,hello} は、'k1:{hi,hello}' として定義されます。

    • キーと値のペア key = k2,value = str:ing,str:ing は、'k2:"str:ing,str:ing"' として定義されます。

    • キーと値のペア key = k3,value = str"ing,str:ing は、'k3:"str""ing,str:ing"' として定義されます。

    手順 2

    定義されたキーと値のペアをYAML形式で整理します。

    異なるキーと値のペアを異なる行に配置し、行をコンマ(,)で区切ります。

    説明
    • 複数行の文字列の先頭には、縦棒(|)を追加する必要があります。

    • 文字列の各行は、同じインデントを持つ必要があります。

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. UDFの実装コードで、FunctionContext#getJobParameterメソッドを呼び出して、キーと値のペアの内容を取得します。

    サンプルコード:

    context.getJobParameter("k1", null); // 文字列 {hi,hello} が返されます。
    context.getJobParameter("k2", null); // 文字列 str:ing,str:ing が返されます。
    context.getJobParameter("k3", null); // 文字列 str"ing,str:ing が返されます。
    context.getJobParameter("pipeline.global-job-parameters", null); // pipeline.global-job-parameters パラメーターで定義されたキーと値のペアのみを取得できるため、null が返されます。

名前付きパラメーター

説明

Ververica Runtime(VVR) 8.0.7 以降を使用する Realtime Compute for Apache Flink でのみ、名前付きパラメーター を使用して UDF を実装できます。

SQLでは、パラメーター値は、パラメーターが定義された順序で関数に渡す必要があり、オプションのパラメーターを省略することはできません。パラメーターの数が多い場合、正しいパラメーター順序を確保することは困難な場合があります。正しいパラメーター順序を確保し、使いやすさを向上させるために、名前付きパラメーターを使用して、使用するパラメーターのみを指定できます。次の例は、名前付きパラメーターの使用方法を示しています。この例では、ScalarFunctionという名前のUDSFを使用しています。

// 2 番目と 3 番目のパラメーターがオプション (isOptional = true) の UDSF を実装します。
public class MyFuncWithNamedArgs extends ScalarFunction  {
	private static final long serialVersionUID = 1L;

	public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
			@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
			@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {

		if (i2 != null) {
			return "i2#" + i2;
		}
		if (l3 != null) {
			return "l3#" + l3;
		}
		return "default#" + f1;
	}
}

SQLで関数を呼び出すときは、最初の必須パラメーターのみを指定できます。2 番目と 3 番目のオプションパラメーターも指定できます。サンプルコード:

CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';

CREATE temporary TABLE s1 (
    a INT,
    b BIGINT,
    c VARCHAR,
    d VARCHAR,
    PRIMARY KEY(a) NOT ENFORCED
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1'
);

CREATE temporary TABLE sink (
    a INT,
    b VARCHAR,
    c VARCHAR,
    d VARCHAR
) WITH (
    'connector' = 'print'
);

INSERT INTO sink
SELECT a,
    -- 最初の必須パラメーターのみを指定します
    MyNamedUdf(f1 => c) arg1_res,
    -- 最初の必須パラメーターと 2 番目のオプションパラメーターを指定します。
    MyNamedUdf(f1 => c, f2 => a) arg2_res,
    -- 最初の必須パラメーターと 3 番目のオプションパラメーターを指定します。
    MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;

参考資料

  • UDAFを使用してデータをソートおよび集計する方法については、「UDAF を使用してデータをソートおよび集計する」をご参照ください。

  • Java UDF の開発および使用方法については、「UDAF」、「UDSF」、および「UDTF」をご参照ください。

  • Python UDFのデバッグと最適化の方法については、「Python」をご参照ください。

  • Python UDF の開発および使用方法については、「UDAF」、「UDSF」、および「UDTF」をご参照ください。