すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ユーザー定義テーブル値関数(UDTF)

最終更新日:Jan 07, 2025

このトピックでは、Realtime Compute for Apache Flink でユーザー定義テーブル値関数(UDTF)を作成、登録、および使用する方法について説明します。

定義

UDTF は、入力パラメーターとしてゼロ、1 つ、または複数のスカラー値を受け取ります。 これらのパラメーターは可変長パラメーターにすることができます。 UDTF は、単一の値ではなく任意の数の行を返すことができる点を除いて、ユーザー定義スカラー関数(UDSF)に似ています。 返される行は、1 つ以上の列で構成されます。 UDTF が呼び出されるたびに、複数の行または列が返されます。 詳細については、「ユーザー定義関数」をご参照ください。

UDTF の作成

説明

Realtime Compute for Apache Flink は、ビジネス開発を促進するために、ユーザー定義関数(UDF)の例を提供しています。 この例には、UDSF、ユーザー定義集計関数(UDAF)、およびユーザー定義テーブル値関数(UDTF)の実装方法が含まれています。 関連バージョンの開発環境は、各例で構成されています。

  1. ASI_UDX_Demo をダウンロードして、オンプレミスのマシンに解凍します。

    説明

    ASI_UDX_Demo はサードパーティの Web サイトで提供されています。 Web サイトにアクセスすると、Web サイトへのアクセスに失敗したり、Web サイトへのアクセスが遅延したりする可能性があります。

    パッケージを解凍すると、ASI_UDX-main フォルダーが生成されます。 パスのパラメーター:

    • pom.xml: プロジェクトレベルの構成ファイル。プロジェクトの Maven 座標、依存関係、開発者が従うべきルール、欠陥管理システム、組織、ライセンス、およびその他すべてのプロジェクト関連要素について記述します。

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: サンプル UDTF の Java コード。

  2. IntelliJ IDEA を開き、[ファイル] > [開く] を選択します。 抽出した ASI_UDX-main フォルダーを選択し、[OK] をクリックします。

  3. \ASI_UDX-main\src\main\java\ASI_UDTF ディレクトリにある ASI_UDTF.java ファイルをダブルクリックし、ビジネス要件に基づいてファイル内で構成を行います。

    この例では、ASI_UDTF.java は、行内の文字列を縦棒(|)で区切られた複数の文字列の列に分割するコードで構成されています。

    package ASI_UDTF;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    
    public class ASI_UDTF extends TableFunction<Tuple2<String,String>> {
        // 文字列を分割して、名前と場所を抽出する
        public void eval(String str){
            String[] split = str.split("\\|");
            String name = split[0];
            String place = split[1];
            Tuple2<String,String> tuple2 = Tuple2.of(name,place);
            collect(tuple2);
        }
    }

    UDTF でサポートされているデータ型と型推論メカニズムの詳細については、「データ型」および「型推論」をご参照ください。

    説明

    上記の 2 つのドキュメントは、Apache Flink 1.15 のドキュメントから参照されています。 UDTF でサポートされているデータ型と型推論メカニズムは、Apache Flink のメジャーバージョンによって異なる場合があります。 特定のバージョンの Apache Flink で UDTF によってサポートされているデータ型と型推論メカニズムの詳細については、Ververica Runtime(VVR)と Apache Flink バージョンのマッピングに基づいて、Apache Flink バージョンのドキュメントを参照してください。 Apache Flink のエンジンバージョンを表示する方法の詳細については、「コンソール操作」をご参照ください。

    次の UDTF のサンプルは、一般的な複合型 Tuple と Row の値を返します。

    • Tuple 型

      TableFunction<Tuple2<String,Integer>
    • Row 型

      @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
      // 文字列を単語に分割し、各単語とその長さを出力する
      public static class SplitFunction extends TableFunction<Row> {
      
        public void eval(String str) {
          for (String s : str.split(" ")) {
            // collect(...) を使用して行を出力する
            collect(Row.of(s, s.length()));
          }
        }
      }
  4. \ASI_UDX-main\ ディレクトリにある pom.xml ファイルをダブルクリックし、ファイル内で構成を行います。

    この例では、pom.xml は Apache Flink 1.11 のメイン JAR 依存関係の情報で構成されています。 ビジネス要件に基づいて、次のいずれかの操作を実行します。

    • ビジネスが他の JAR パッケージに依存していない場合は、pom.xml ファイルを構成せずに次の手順に進みます。

    • ビジネスが他の JAR パッケージに依存している場合は、必要な JAR パッケージの情報を pom.xml ファイルに追加します。

    Apache Flink 1.11 は、次の JAR パッケージに依存しています。

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. pom.xml ファイルが格納されているディレクトリに移動します。 次に、次のコマンドを実行してファイルをパッケージ化します。

    mvn package -Dcheckstyle.skip

    \ASI_UDX-main\target\ ディレクトリに ASI_UDX-1.0-SNAPSHOT.jar パッケージが表示されたら、UDTF が作成されています。

UDTF の登録

UDTF を登録する方法の詳細については、「UDF の管理」をご参照ください。

UDTF の使用

UDTF を登録した後、次の手順を実行して UDTF を使用できます。

  1. Flink SQL を使用してデプロイメントを作成します。 詳細については、「SQL ドラフトの開発」をご参照ください。

    ASI_UDTF_Source テーブルでは、message フィールドの各行の文字列は縦棒(|)で複数の列に分割されます。 次のコードは例を示しています。

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    -- ASI_UDTF を使用して message フィールドを分割し、結果を ASI_UDTF_Sink テーブルに挿入する
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);
  2. フルマネージド Flink のコンソールの [運用&保守] > [デプロイメント] ページで、開始するデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。

    デプロイメントが開始されると、ASI_UDTF_Source テーブルの message フィールドの複数の文字列の列が ASI_UDTF_Sink テーブルに挿入されます。 これらの文字列の列は縦棒(|)で区切られます。