すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Java UDTF

最終更新日:Mar 10, 2026

このトピックでは、Flink 用のユーザー定義のテーブル値関数 (UDTF) を開発、登録、および使用する方法について説明します。

定義

ユーザー定義のテーブル値関数 (UDTF) は、可変長の 0 個以上のスカラー値を入力パラメーターとして受け入れます。単一の値を返すユーザー定義スカラー関数とは異なり、UDTF は複数の行を出力として返すことができます。返される行は、1 つ以上の列で構成できます。詳細については、「ユーザー定義関数」をご参照ください。

UDTF の開発

説明

Flink は、サービスを迅速に開発するのに役立つユーザー定義関数 (UDF) の例を提供しています。これらの例には、UDSF、ユーザー定義集約関数 (UDAF)、およびユーザー定義のテーブル値関数 (UDTF) の実装が含まれています。開発環境は例で事前設定済みであるため、セットアップは不要です。

  1. ASI_UDX_Demo サンプルプロジェクトをダウンロードして、ローカルマシンに解凍します。

    説明

    サードパーティのウェブサイト ASI_UDX_Demo へのアクセスは、失敗するか、遅延する可能性があります。

    展開が完了すると、ASI_UDX-main フォルダが作成されます。このフォルダには、次のファイルが含まれています。

    • pom.xml: プロジェクトレベルの構成ファイルです。このファイルには、プロジェクトの Maven 座標、依存関係、開発者ルール、バグ管理システム、組織、ライセンス、およびその他のプロジェクト関連情報が記述されています。

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: UDTF のサンプル Java コードです。

  2. IntelliJ IDEA で、[File] > [Open] をクリックし、解凍した ASI_UDX-main フォルダを開きます。

  3. \ASI_UDX-main\src\main\java\ASI_UDTF フォルダをダブルクリックし、必要に応じて ASI_UDTF.java ファイルを変更します。

    この例では、ASI_UDTF.java ファイル内のコードは、縦棒 (|) デリミタに基づいて文字列を複数の文字列列に分割します。

    package ASI_UDTF;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    
    public class ASI_UDTF extends TableFunction<Tuple2<String,String>> {
        public void eval(String str){
            String[] split = str.split("\\|");
            String name = split[0];
            String place = split[1];
            Tuple2<String,String> tuple2 = Tuple2.of(name,place);
            collect(tuple2);
        }
    }

    TableFunction がサポートするデータの型と型推論メカニズムの詳細については、「Flink のデータの型」および「型推論」をご参照ください。

    説明

    上記のリンクは、Flink 1.15 ドキュメントのものです。サポートされるデータの型と型推論メカニズムは、主要な Flink バージョン間で異なる場合があります。VVR-to-Flink バージョンマッピングに基づいて、ご利用の Ververica Runtime (VVR) バージョンに対応する Flink ドキュメントを確認してください。Flink バージョンを表示するには、「ワークスペースと名前空間の FAQ」をご参照ください。

    次の例は、一般的な複合型であるタプルと Row の使用方法を示しています。

    • タプル型

      TableFunction<Tuple2<String,Integer>
    • Row 型

      @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
      public static class SplitFunction extends TableFunction<Row> {
      
        public void eval(String str) {
          for (String s : str.split(" ")) {
            // collect(...) を使用して行を出力します
            collect(Row.of(s, s.length()));
          }
        }
      }
  4. \ASI_UDX-main\ フォルダをダブルクリックします。pom.xml ファイルを構成します。

    この例では、pom.xml ファイルは Flink バージョン 1.11 の主要な JAR パッケージの依存関係で構成されています。サービス要件に基づいて:

    • ご利用のサービスが他の JAR パッケージに依存しない場合、pom.xml ファイルを構成する必要はありません。次のステップに進んでください。

    • ご利用のサービスが他の JAR パッケージに依存する場合、必要な依存関係情報を pom.xml ファイルに追加してください。

    Flink バージョン 1.11 の主要な JAR パッケージの依存関係は次のとおりです。

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. pom.xml ファイルを含むディレクトリで、次のコマンドを実行してプロジェクトをパッケージ化します。

    mvn package -Dcheckstyle.skip

    \ASI_UDX-main\target\ ディレクトリに ASI_UDX-1.0-SNAPSHOT.jar パッケージが生成されると、UDTF の開発が完了します。

UDTF の登録

UDTF の登録方法の詳細については、「ユーザー定義関数 (UDF) の管理」をご参照ください。

UDTF の使用

UDTF を登録した後、次の手順に従ってジョブで使用できます。

  1. Flink SQL ジョブを開発します。詳細については、「ジョブ開発マップ」をご参照ください。

    次のサンプルコードは、ASI_UDTF_Source テーブルのメッセージフィールドの各行にある文字列を複数の列に分割します。縦棒 (|) がデリミタとして使用されます。

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);
  2. オペレーションセンター > ジョブ O&M」ページで、対象のジョブを見つけ、[開始][操作] 列でクリックします。

    ジョブが開始されると、`ASI_UDTF_Source` テーブルの `message` フィールドの文字列は縦棒デリミタで分割されます。結果の文字の列は、`ASI_UDTF_Sink` テーブルに挿入されます。