すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:UDSF

最終更新日:Jan 07, 2025

このトピックでは、Realtime Compute for Apache Flink でユーザー定義スカラー関数(UDSF)を作成、登録、および使用する方法について説明します。

定義

UDSF は、0 個、1 個、または複数のスカラー値を新しいスカラー値にマッピングします。 UDSF の入力データと出力データは、1 対 1 の関係でマッピングされます。 UDSF はデータの行を読み取るたびに、出力値を書き込みます。 詳細については、「ユーザー定義関数」をご参照ください。

UDSF の作成

説明

Realtime Compute for Apache Flink は、ビジネス開発を促進するために、ユーザー定義関数(UDF)の例を提供しています。 この例には、UDSF、ユーザー定義集計関数(UDAF)、およびユーザー定義テーブル値関数(UDTF)の実装方法が含まれています。 関連バージョンの開発環境は、各例で構成されています。

  1. ASI_UDX_Demo をダウンロードして、オンプレミスのマシンに解凍します。

    説明

    ASI_UDX_Demo はサードパーティの Web サイトで提供されています。 Web サイトにアクセスすると、Web サイトへのアクセスに失敗したり、Web サイトへのアクセスが遅延したりする可能性があります。

    パッケージを解凍すると、ASI_UDX-main フォルダーが生成されます。 パスのパラメーター:

    • pom.xml: プロジェクトレベルの構成ファイル。プロジェクトの Maven 座標、依存関係、開発者が従うべきルール、欠陥管理システム、組織、ライセンス、およびその他すべてのプロジェクト関連要素について記述します。

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java: サンプル UDSF の Java コード。

  2. IntelliJ IDEA を開き、[ファイル] > [開く] を選択します。 抽出した ASI_UDX-main フォルダーを選択し、[OK] をクリックします。

  3. \ASI_UDX-main\src\main\java\ASI_UDF ディレクトリにある ASI_UDF.java ファイルをダブルクリックし、ビジネス要件に基づいてファイル内で構成を行います。

    この例では、ASI_UDF.java は、各データレコードの開始位置から終了位置までの文字を取得するコードで構成されています。

    package ASI_UDF;
    
    import org.apache.flink.table.functions.ScalarFunction;
    
    public class ASI_UDF extends ScalarFunction {
        // 各データレコードの開始位置から終了位置までの文字を取得します。
        public String eval(String s, Integer begin, Integer end) {
            return s.substring(begin, end);
        }
    }
  4. \ASI_UDX-main\ ディレクトリにある pom.xml ファイルをダブルクリックし、ファイル内で構成を行います。

    この例では、pom.xml は Apache Flink 1.11 のメイン JAR 依存関係の情報で構成されています。 ビジネス要件に基づいて、次のいずれかの操作を実行します。

    • ビジネスが他の JAR パッケージに依存していない場合は、pom.xml ファイルを構成する必要はなく、次の手順に進みます。

    • ビジネスが他の JAR パッケージに依存している場合は、必要な JAR パッケージの情報を pom.xml ファイルに追加します。

    Apache Flink 1.11 は、次の JAR パッケージに依存しています。

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. pom.xml ファイルが保存されているディレクトリに移動します。 次に、次のコマンドを実行してファイルをパッケージ化します。

    mvn package -Dcheckstyle.skip

    \ASI_UDX-main\target\ ディレクトリに ASI_UDX-1.0-SNAPSHOT.jar パッケージが表示されたら、UDSF が作成されています。

UDSF の登録

UDSF を登録する方法の詳細については、「UDF の管理」をご参照ください。

UDSF の使用

UDSF を登録した後、UDSF を使用できます。 UDSF を使用するには、次の手順を実行します。

  1. Flink SQL を使用してデプロイメントを作成します。 詳細については、「SQL ドラフトの開発」をご参照ください。

    次のコードは、ASI_UDSF_Source テーブルの a フィールドの各行の文字列の 2 番目の文字から 4 番目の文字を取得する方法の例を示しています。

    CREATE TEMPORARY TABLE ASI_UDSF_Source (
      a VARCHAR,
      b INT,
      c INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDSF_Sink (
      a VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    -- ASI_UDSF_Source テーブルの a フィールドの各行の文字列の 2 番目の文字から 4 番目の文字を取得します。
    INSERT INTO ASI_UDSF_Sink
    SELECT ASI_UDSF(a,2,4)
    FROM ASI_UDSF_Source;
  2. フルマネージド Flink のコンソールの [O&M] > [デプロイメント] ページで、開始するデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。

    デプロイメントが開始されると、ASI_UDSF_Source テーブルの a フィールドの各行の文字列の 2 番目の文字から 4 番目の文字が、ASI_UDSF_Sink テーブルの各行に挿入されます。