すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Java

最終更新日:Mar 10, 2026

Realtime Compute for Apache Flink は、Flink SQL ジョブにおける Java ユーザー定義関数 (UDF) をサポートします。本トピックでは、Java UDF の種類、パラメーターの渡し方、および重要な使用上の注意事項について説明します。

注意事項

  • UDF 開発時に JAR パッケージの依存関係の競合を回避するため、以下の点にご注意ください。

    • SQL 開発ページで選択した Flink のバージョンと、Pom 依存関係で指定した Flink のバージョンが一致していることを確認してください。

    • Flink 関連の依存関係については、<scope>provided</scope> を追加して、スコープを `provided` に設定してください。

    • その他のサードパーティ依存関係は、Shade メソッドを使用してパッケージ化してください。詳細については、「Apache Maven Shade Plugin」をご参照ください。

    Flink の依存関係競合に関する詳細については、「Flink の依存関係競合を解決する方法」をご参照ください。

  • SQL ジョブ内で UDF を頻繁に呼び出すことによるタイムアウトを防止するには、UDF の JAR パッケージを依存ファイルとしてアップロードし、ジョブ内で CREATE TEMPORARY FUNCTION 構文を使用して関数を宣言します。例: CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';

UDF の分類

Flink では、以下の 3 種類の UDF がサポートされています。

分類

説明

ユーザー定義スカラー関数 (UDSF)

UDSF は、ゼロ個、1 個、または複数のスカラー値を新しいスカラー値にマップします。入力と出力の間には 1 対 1 の関係があります。つまり、1 行のデータを読み取り、1 つの出力値を書き込みます。詳細については、「ユーザー定義スカラー関数 (UDSF)」をご参照ください。

ユーザー定義集計関数 (UDAF)

UDAF は、複数のレコードを 1 つのレコードに集計します。入力と出力の間には多対一の関係があります。つまり、複数の入力レコードを 1 つの出力値に集計します。詳細については、「ユーザー定義集計関数 (UDAF)」をご参照ください。

ユーザー定義テーブル値関数 (UDTF)

UDTF は、ゼロ個、1 個、または複数のスカラー値を入力パラメーターとして受け取ります。パラメーターの長さは可変です。UDSF と似ていますが、単一の値ではなく任意の行数を出力できます。返される各行は 1 つ以上の列で構成され、1 回の関数呼び出しで複数の行または列を出力できます。詳細については、「ユーザー定義テーブル値関数 (UDTF)」をご参照ください。

UDF の登録

  • グローバル UDF の登録方法については、「グローバル UDF」をご参照ください。

  • ジョブレベルの UDF の登録方法については、「ジョブレベルの UDF」をご参照ください。

UDF へのパラメーターの渡し方

Flink 開発コンソールで UDF のパラメーターを設定し、UDF コード内で使用できます。これにより、コンソール上で UDF のパラメーター値を直接素早く変更できます。

UDF には、オプションの `open(FunctionContext context)` メソッドが提供されます。`FunctionContext` オブジェクトを使用すると、カスタム設定項目をパラメーターとして渡すことができます。手順は以下のとおりです。

  1. Flink 開発コンソールの [オペレーションセンター] > [ジョブ O&M] ページの [デプロイメントの詳細] タブで、[実行パラメーター設定][その他の設定] セクションに pipeline.global-job-parameters 設定項目を追加します。

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'

    `FunctionContext#getJobParameter` は、`pipeline.global-job-parameters` 設定項目の値のみ取得可能です。したがって、UDF で使用するすべての設定項目を `pipeline.global-job-parameters` 内に記述する必要があります。以下に、この設定項目の設定手順を示します。

    ステップ

    操作

    手順

    ステップ 1

    キーと値のペアを定義します。

    キーと値をコロン (:) で区切り、各キーと値のペアをシングルクォート (') で囲みます。

    説明
    • キーまたは値にコロン (:) が含まれる場合は、ダブルクォート (") で囲んでください。

    • キーまたは値にコロン (:) またはダブルクォート (") が含まれる場合は、2 つの連続するダブルクォート ("") でエスケープしてください。

    • キー = k1、値 = {hi,hello} の場合、ペアは 'k1:{hi,hello}' と定義します。

    • キー = k2、値 = str:ing,str:ing の場合、ペアは 'k2:"str:ing,str:ing"' と定義します。

    • キー = k3、値 = str"ing,str:ing の場合、ペアは 'k3:"str""ing,str:ing"' と定義します。

    ステップ 2

    最終的な pipeline.global-job-parameters 値を YAML ファイル形式でフォーマットします。

    各キーと値のペアを改行し、カンマ (,) で区切ります。

    説明
    • YAML ファイル内の複数行文字列は、縦棒 (|) で始まります。

    • YAML ファイル内の複数行文字列の各行は、同じインデント幅にする必要があります。

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. UDF コード内で、`FunctionContext#getJobParameter` を使用して各項目の値を取得します。以下のコードを例として示します。

    以下のサンプルコードを示します。

    context.getJobParameter("k1", null); // 文字列 {hi,hello} を返します。
    context.getJobParameter("k2", null); // 文字列 str:ing,str:ing を返します。
    context.getJobParameter("k3", null); // 文字列 str"ing,str:ing を返します。
    context.getJobParameter("pipeline.global-job-parameters", null); // null を返します。pipeline.global-job-parameters で定義された内容のみ取得可能であり、その他のジョブ設定項目は取得できません。

名前付きパラメーター

説明

UDF の実装に名前付きパラメーターを使用できるのは、Ververica Runtime (VVR) 8.0.7 以降のみです。

SQL で関数を呼び出す際には、すべてのパラメーターを正しい順序で指定する必要があります。関数のパラメーター数が多い場合、パラメーターの数や順序を誤りやすくなります。また、省略可能なパラメーターも省略できません。名前付きパラメーターを使用すると、必要なパラメーターのみを指定できるため、エラーの発生確率を低減でき、より便利です。以下のユーザー定義スカラー関数 (ScalarFunction) の例では、名前付きパラメーターの使用方法を示します。

// ユーザー定義スカラー関数を実装します。最後の 2 つの入力パラメーターは省略可能 (isOptional = true) です。
public class MyFuncWithNamedArgs extends ScalarFunction  {
	private static final long serialVersionUID = 1L;

	public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
			@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
			@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {

		if (i2 != null) {
			return "i2#" + i2;
		}
		if (l3 != null) {
			return "l3#" + l3;
		}
		return "default#" + f1;
	}
}

この UDF を SQL で使用する場合、最初の必須パラメーターのみを指定したり、省略可能なパラメーターを任意に選択して指定したりできます。以下のコードを例として示します。

CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';

CREATE temporary TABLE s1 (
    a INT,
    b BIGINT,
    c VARCHAR,
    d VARCHAR,
    PRIMARY KEY(a) NOT ENFORCED
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1'
);

CREATE temporary TABLE sink (
    a INT,
    b VARCHAR,
    c VARCHAR,
    d VARCHAR
) WITH (
    'connector' = 'print'
);

INSERT INTO sink
SELECT a,
    -- 最初の必須パラメーターのみを指定
    MyNamedUdf(f1 => c) arg1_res,
    -- 最初の必須パラメーターと 2 番目の省略可能なパラメーターを指定
    MyNamedUdf(f1 => c, f2 => a) arg2_res,
    -- 最初の必須パラメーターと 3 番目の省略可能なパラメーターを指定
    MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;

参照