All Products
Search
Document Center

Realtime Compute for Apache Flink:Write efficient user-defined functions

Last Updated:Jul 08, 2025

This topic describes how to write efficient user-defined functions (UDFs) by using Flink internal data types, reducing type conversion overhead, lowering garbage collection pressure, and improving overall job performance.

Background and overview

Apache Flink provides powerful SQL interface support, allowing you to extend its functionality through UDFs. However, traditional approaches to UDF implementation often rely on Java primitive types (such as Map and List). This requires the Flink engine to frequently convert Java types to Flink internal data structures at runtime, affecting job performance.

Flink data type system

Type

Description

External data type

User-oriented Java types, such as Map, List, and String.

Internal data type

Binary representations, optimized by the Flink engine, such as MapData, ArrayData, and RowData.

During UDF execution, external data types are converted to internal types before data is processed. This process consumes additional CPU and memory resources.

Core principles of writing efficient UDFs

  • Use internal data types for input and output data.

    Avoid using Java types in UDFs. Instead, use Flink internal data types to reduce serialization and deserialization overhead.

  • Define custom type inference logic.

    This ensures both input and output data types are internal data types, facilitating optimization.

  • Avoid creating temporary objects.

    Internal data types support more efficient access methods. Avoid creating new objects inside loops or during high-frequency calls.

Example

Description

As an example, we've written a UDF that extracts all keys from a Map field and returns them as an array using internal data types:

Input

Output

SELECT mapkey(MAP['A',1,'B',2]);

[A, B]

SELECT mapkey(STR_TO_MAP('a=1,b=2'));

[a, b]

Sample code

Java

package com.aliyun.example;

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeInference.newBuilder;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.api.DataTypes;

import java.util.Optional;
import java.util.List;

public class MapKeyUDF extends ScalarFunction {

    public ArrayData eval(MapData input) {
        if (input == null) return null;
        return input.keyArray();
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return newBuilder()
            .inputTypeStrategy(MAP)
            .outputTypeStrategy(nullableIfArgs(MAP_KEYS))
            .build();
    }

    private static final InputTypeStrategy MAP = new InputTypeStrategy() {
        @Override
        public ArgumentCount getArgumentCount() {
            return ConstantArgumentCount.of(1);
        }

        @Override
        public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) {
            return Optional.of(
                callContext.getArgumentDataTypes().stream()
                    .map(DataTypeUtils::toInternalDataType)
                    .collect(Collectors.toList()));
        }

        @Override
        public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
            return null;
        }
    };

    private static final TypeStrategy MAP_KEYS = callContext ->
        Optional.of(
            DataTypeUtils.toInternalDataType(DataTypes.ARRAY(
                ((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType()
            ))
        );

    private static final TypeStrategy nullableIfArgs(TypeStrategy strategy) {
        return callContext -> strategy.infer(callContext).map(DataType::copy);
    }
}

Maven dependencies

    <!-- Flink Table Runtime -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!-- Flink Table Common -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!-- Flink Table API Java Bridge -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge</artifactId>
      <version>${flink.version}</version>
    </dependency>

Key implementation points

  • Use internal data types as input and output types.

    • Use MapData for data input instead of Java Map to avoid type conversion.

    • Return ArrayData for data output instead of Java List, further reducing memory overhead.

    public ArrayData eval(MapData input) {
        if (input == null) {
            return null;
        }
        return input.keyArray();
    }
  • Input type strategy:

    • Ensure input parameters are converted to internal data types.

    • Restrict the function to accept only one parameter.

    private static final InputTypeStrategy MAP = new InputTypeStrategy() {
        @Override
        public ArgumentCount getArgumentCount() {
            // Restrict the function to accept only one parameter.
            return ConstantArgumentCount.of(1);
        }
    
        @Override
        public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) {
            // Ensure input parameters are converted to internal data types.
            return Optional.of(
                    callContext.getArgumentDataTypes().stream()
                            .map(DataTypeUtils::toInternalDataType)
                            .collect(Collectors.toList()));
        }
    
        @Override
        public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
            // No signature returned, generally internally visible
            return null;
        }
    };
  • Output type strategy:

    • Explicitly specify the output type as an array, with its element type consistent with the key type in the input Map.

    • Use internal data types.

        private static final TypeStrategy MAP_KEYS = callContext ->
            Optional.of(
                DataTypeUtils.toInternalDataType(DataTypes.ARRAY(
                    ((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType()
                ))
            );

Advantages

UDFs that use internal data types offer the following performance advantages:

  • Reduced type conversion overhead: Frequent conversion between Java types and Flink internal data types is avoided.

  • Lower garbage collection pressure: Fewer objects are created, lowering garbage collection frequency.

  • Improved data processing efficiency: Leveraging Flink engine's specialized optimizations for internal data types.

  • Reduced memory usage: Internal data types typically use less memory compared to Java types.

Usage notes

Although internal data types are a performance booster, consider the following when writing your UDFs:

  • Limited operations: Internal data types support fewer operations than Java types.

  • A steep learning curve: Familiarity with Flink internal data type APIs is required.

  • Code readability: Code is less intuitive compared to using standard Java types.

  • Debugging difficulty: Debugging internal data types can be more complex.

Best practices

When performance is a main concern, such as processing a large amount of data, using UDFs with Flink internal data types helps you significantly boost your job performance. However, before writing UDFs using internal data types, do the following:

  • Evaluate performance requirements: Prioritize internal data types for UDFs with high performance requirements.

  • Familiarize yourself with internal type APIs: such as ArrayData, MapData, and RowData.

  • Correctly implement type inference: Override getTypeInference and specify input and output data types.

  • Test and verify: Compare the effects of different implementation methods through performance testing.

  • Code comments: Add sufficient comments when using internal data types for better maintainability.

References

  • JavaDoc

  • Flink source code

    • Main path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data

    • Internal data type examples: flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/

  • Manage UDFs