すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:効率的なユーザー定義関数の記述

最終更新日:Jul 09, 2025

このトピックでは、Flink の内部データ型を使用し、型変換のオーバーヘッドを削減し、ガベージコレクションの負荷を軽減し、ジョブ全体のパフォーマンスを向上させることで、効率的なユーザー定義関数(UDF)を記述する方法について説明します。

背景と概要

Apache Flink は強力な SQL インターフェースサポートを提供しており、UDF を介して機能を拡張できます。ただし、UDF 実装の従来のアプローチは、Java プリミティブ型(Map や List など)に依存することがよくあります。これにより、Flink エンジンは実行時に Java 型を Flink 内部データ構造に頻繁に変換する必要があり、ジョブのパフォーマンスに影響します。

Flink データ型システム

説明

外部データ型

Map、List、String などのユーザー向けの Java 型。

内部データ型

MapData、ArrayData、RowData など、Flink エンジンによって最適化されたバイナリ表現。

UDF の実行中に、データが処理される前に、外部データ型は内部データ型に変換されます。このプロセスでは、追加の CPU とメモリリソースが消費されます。

効率的な UDF を記述するためのコア原則

  • 入力データと出力データに内部データ型を使用する。

    UDF で Java 型を使用することは避けてください。代わりに、Flink 内部データ型を使用して、シリアル化とデシリアル化のオーバーヘッドを削減します。

  • カスタム型の推論ロジックを定義する。

    これにより、入力データ型と出力データ型の両方が内部データ型になり、最適化が容易になります。

  • 一時オブジェクトの作成を避ける。

    内部データ型は、より効率的なアクセス方法をサポートしています。ループ内または高頻度の呼び出し中に新しいオブジェクトを作成することは避けてください。

説明

例として、Map フィールドからすべてのキーを抽出し、内部データ型を使用して配列として返す UDF を記述しました。

入力

出力

SELECT mapkey(MAP['A',1,'B',2]);

[A, B]

SELECT mapkey(STR_TO_MAP('a=1,b=2'));

[a, b]

サンプルコード

Java

package com.aliyun.example;

// Flink の内部データ型をインポートします
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeInference.newBuilder;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.api.DataTypes;

import java.util.Optional;
import java.util.List;
import java.util.stream.Collectors;

public class MapKeyUDF extends ScalarFunction {

    public ArrayData eval(MapData input) {
        if (input == null) return null;
        return input.keyArray();
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return newBuilder()
            .inputTypeStrategy(MAP) // 入力型戦略を定義
            .outputTypeStrategy(nullableIfArgs(MAP_KEYS)) // 出力型戦略を定義
            .build();
    }

    // 入力型戦略:MapData 型の単一パラメーターを受け入れる
    private static final InputTypeStrategy MAP = new InputTypeStrategy() {
        @Override
        public ArgumentCount getArgumentCount() {
            return ConstantArgumentCount.of(1);
        }

        @Override
        public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) {
            return Optional.of(
                callContext.getArgumentDataTypes().stream()
                    .map(DataTypeUtils::toInternalDataType) // 内部データ型に変換
                    .collect(Collectors.toList()));
        }

        @Override
        public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
            return null;
        }
    };

    // 出力型戦略:入力 Map のキー型に対応する ArrayData 型を返す
    private static final TypeStrategy MAP_KEYS = callContext ->
        Optional.of(
            DataTypeUtils.toInternalDataType(DataTypes.ARRAY(
                ((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType()
            ))
        );

    // 出力型が NULL 許容型になるようにコピーする
    private static final TypeStrategy nullableIfArgs(TypeStrategy strategy) {
        return callContext -> strategy.infer(callContext).map(DataType::copy);
    }
}

Maven 依存関係

    <!-- Flink Table Runtime -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!-- Flink Table Common -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!-- Flink Table API Java Bridge -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge</artifactId>
      <version>${flink.version}</version>
    </dependency>

主要な実装ポイント

  • 入力型と出力型として内部データ型を使用する。

    • 型変換を避けるために、Java Map の代わりにデータ入力に MapData を使用します。

    • データ出力に Java List の代わりに ArrayData を返し、メモリのオーバーヘッドをさらに削減します。

    public ArrayData eval(MapData input) {
        if (input == null) {
            return null;
        }
        return input.keyArray();
    }
  • 入力型戦略:

    • 入力パラメーターが内部データ型に変換されていることを確認します。

    • 関数が 1 つのパラメーターのみを受け入れるように制限します。

    private static final InputTypeStrategy MAP = new InputTypeStrategy() {
        @Override
        public ArgumentCount getArgumentCount() {
            // 関数が 1 つのパラメーターのみを受け入れるように制限します。
            return ConstantArgumentCount.of(1);
        }
    
        @Override
        public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) {
            // 入力パラメーターが内部データ型に変換されていることを確認します。
            return Optional.of(
                    callContext.getArgumentDataTypes().stream()
                            .map(DataTypeUtils::toInternalDataType)
                            .collect(Collectors.toList()));
        }
    
        @Override
        public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
            // シグネチャは返されません。通常は内部的に表示されます
            return null;
        }
    };
  • 出力型戦略:

    • 出力型を配列として明示的に指定し、その要素型を入力 Map のキー型と一致させます。

    • 内部データ型を使用します。

        private static final TypeStrategy MAP_KEYS = callContext ->
            Optional.of(
                DataTypeUtils.toInternalDataType(DataTypes.ARRAY(
                    ((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType()
                ))
            );

利点

内部データ型を使用する UDF には、次のようなパフォーマンス上の利点があります。

  • 型変換オーバーヘッドの削減:Java 型と Flink 内部データ型間の頻繁な変換が回避されます。

  • ガベージコレクションの負荷の軽減:作成されるオブジェクトが少なくなり、ガベージコレクションの頻度が減少します。

  • データ処理効率の向上:内部データ型に対する Flink エンジンの特殊な最適化を活用します。

  • メモリ使用量の削減:内部データ型は通常、Java 型に比べてメモリ使用量が少なくなります。

使用上の注意

内部データ型はパフォーマンスブースターですが、UDF を記述する際には次の点を考慮してください。

  • 操作の制限:内部データ型は、Java 型よりもサポートされる操作が少ないです。

  • 急な学習曲線:Flink 内部データ型 API に関する知識が必要です。

  • コードの可読性:標準の Java 型を使用する場合に比べて、コードが直感的ではありません。

  • デバッグの難しさ:内部データ型のデバッグはより複雑になる可能性があります。

ベストプラクティス

大量のデータを処理する場合など、パフォーマンスが主な関心事である場合は、Flink 内部データ型を使用する UDF を使用すると、ジョブのパフォーマンスを大幅に向上させるのに役立ちます。ただし、内部データ型を使用して UDF を記述する前に、次の手順を実行してください。

  • パフォーマンス要件を評価する:パフォーマンス要件の高い UDF では、内部データ型を優先します。

  • 内部型 API について理解するArrayDataMapDataRowData など。

  • 型の推論を正しく実装するgetTypeInference をオーバーライドし、入力データ型と出力データ型を指定します。

  • テストと検証:パフォーマンステストを通じて、さまざまな実装方法の効果を比較します。

  • コードコメント:保守性を高めるために、内部データ型を使用する場合は十分なコメントを追加します。

参照

  • JavaDoc

  • Flink ソースコード

    • メインパス: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data

    • 内部データ型の例: flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/

  • 詳細については、「UDF の管理」をご参照ください。