すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Java UDSF

最終更新日:Mar 10, 2026

このトピックでは、Flink のユーザー定義スカラー関数 (UDSF) を開発、登録、使用する方法について説明します。

定義

ユーザー定義スカラー関数 (UDSF) は、0 個、1 個、または複数のスカラー値を新しいスカラー値にマッピングします。これにより、関数が 1 行のデータを処理して単一の出力値を返すという 1 対 1 の関係が作成されます。詳細については、「ユーザー定義関数」をご参照ください。

UDSF の開発

説明

Flink は、サービスを迅速に開発できるよう、ユーザー定義関数 (UDF) の例を提供しています。この例には、UDSF、ユーザー定義集約関数 (UDAF)、およびユーザー定義のテーブル値関数 (UDTF) の実装が含まれています。開発環境は例の中で事前設定されているため、セットアップは不要です。

  1. ローカルマシンに ASI_UDX_Demo の例をダウンロードして解凍します。

    説明

    ASI_UDX_Demo はサードパーティの Web サイトでホストされています。アクセスに失敗したり、遅延が発生したりする場合があります。

    解凍後、ASI_UDX-main フォルダが作成されます。このフォルダには、次のものが含まれています。

    • pom.xml:プロジェクトレベルの設定ファイルです。プロジェクトの Maven 座標、依存関係、開発者向けのルール、バグ管理システム、組織、ライセンスなど、プロジェクトに関連するすべての要素を記述します。

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java:UDSF の Java サンプルコードです。

  2. IntelliJ IDEA で、[ファイル] > [開く] をクリックし、解凍した ASI_UDX-main フォルダを選択します。

  3. \ASI_UDX-main\src\main\java\ASI_UDF フォルダをダブルクリックします。必要に応じて ASI_UDF.java ファイルを設定します。

    この例では、ASI_UDF.java ファイルは、各入力文字列の `begin` 位置から `end` 位置までの文字を抽出するコードで設定されています。

    package ASI_UDF;
    
    import org.apache.flink.table.functions.ScalarFunction;
    
    public class ASI_UDF extends ScalarFunction {
        public String eval(String s, Integer begin, Integer end) {
            return s.substring(begin, end);
        }
    }
  4. \ASI_UDX-main\ フォルダをダブルクリックします。pom.xml ファイルを設定します。

    この例では、pom.xml ファイルは Flink バージョン 1.11 のメイン JAR パッケージの依存関係で設定されています。ご利用のサービスの要件に基づいて、次のように操作します。

    • ご利用のサービスが他の JAR パッケージに依存していない場合は、pom.xml ファイルを設定する必要はありません。次のステップに進んでください。

    • ご利用のサービスが他の JAR パッケージに依存している場合は、必要な依存関係情報を pom.xml ファイルに追加します。

    Flink バージョン 1.11 のメイン JAR パッケージの依存関係は次のとおりです。

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. pom.xml ファイルが含まれているディレクトリで、次のコマンドを実行してプロジェクトをパッケージ化します。

    mvn package -Dcheckstyle.skip

    ASI_UDX-1.0-SNAPSHOT.jar パッケージが \ASI_UDX-main\target\ ディレクトリに作成されます。これで UDSF の開発は完了です。

UDSF の登録

UDSF の登録方法の詳細については、「ユーザー定義関数 (UDF) の管理」をご参照ください。

UDSF の使用

UDSF を登録すると、それを使用できるようになります。手順は次のとおりです。

  1. Flink SQL ジョブを開発します。詳細については、「ジョブ開発マップ」をご参照ください。

    次のサンプルコードは、`ASI_UDSF_Source` テーブルの `a` フィールドの文字列から、2 番目から 4 番目までの文字を抽出します。

    CREATE TEMPORARY TABLE ASI_UDSF_Source (
      a VARCHAR,
      b INT,
      c INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDSF_Sink (
      a VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDSF_Sink
    SELECT ASI_UDSF(a,2,4)
    FROM ASI_UDSF_Source;
  2. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブを見つけ、[操作] 列の [開始] をクリックします。

    ジョブが開始されると、`ASI_UDSF_Source` テーブルの各行の `a` フィールドの文字列から 2 番目から 4 番目までの文字が `ASI_UDSF_Sink` テーブルに挿入されます。