All Products
Search
Document Center

Realtime Compute for Apache Flink:Python

Last Updated:Mar 09, 2026

Realtime Compute for Apache Flink lets you use Python user-defined functions (UDFs) in Flink SQL jobs. This topic describes the types of Python UDFs, how to use Python dependencies, and how to tune their performance.

Types of user-defined functions

Type

Description

User-defined scalar function (UDSF)

A UDSF maps zero, one, or more scalar values to a new scalar value. It has a one-to-one relationship between input and output. It reads one row of data and returns one output value. For more information, see User-defined scalar functions (UDSFs).

User-defined aggregate function (UDAF)

A UDAF aggregates multiple records into a single record. It has a many-to-one relationship between input and output. It aggregates multiple input records into a single output value. For more information, see User-defined aggregate functions (UDAFs).

User-defined table-valued function (UDTF)

A UDTF takes zero, one, or more scalar values as input parameters, which can be of variable length. It is similar to a UDSF but can return any number of rows as output, not just a single value. The returned rows can consist of one or more columns. A single function call can output multiple rows or columns. For more information, see User-defined table-valued functions (UDTFs).

Use Python dependencies

Realtime Compute for Apache Flink clusters come with common Python packages, such as Pandas, NumPy, and PyArrow, pre-installed. For a list of third-party Python packages installed in Realtime Compute for Apache Flink, see Develop a Python job. To use a pre-installed Python package, you can import it into your Python function. The following example shows how to do this.

@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    import numpy as np
    return np.percentile(values, percentile)

You can also use other third-party Python packages in your Python UDFs. If you use a third-party Python package that is not pre-installed, you must upload it as a dependency file when you register the Python UDF. For more information, see Manage user-defined functions (UDFs) and Use Python dependencies.

Debug code

You can implement logging in your Python UDF to output information. This helps you locate issues. The following example shows how to do this.

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j

You can view the generated logs in the TaskManager log file. For more information, see View operational logs.

Performance tuning

Pre-load resources

Pre-loading resources lets you load them during UDF initialization. This avoids reloading resources every time the `eval` method is executed. For example, you can load a large deep learning model once and then run batch predictions on it multiple times. The following code provides an example.

from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf

class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
Note

For information about how to upload Python data files, see Use Python dependencies.

Use the Pandas library

In addition to regular Python UDFs, Realtime Compute for Apache Flink also supports Pandas UDFs. The input data types for Pandas UDFs are data structures defined in Pandas, such as `pandas.Series` and `pandas.DataFrame`. You can use high-performance Python libraries such as Pandas and NumPy in Pandas UDFs to create high-performance Python UDFs. For more information, see Vectorized User-defined Functions.

Configure parameters

The performance of a Python UDF largely depends on its implementation. If you encounter performance issues, optimize the UDF implementation. The performance of a Python UDF is also affected by the values of the following parameters.

Parameter

Description

python.fn-execution.bundle.size

Python UDFs are computed asynchronously. During execution, the Java operator sends data to the Python process for asynchronous processing. Before sending the data, the Java operator caches it. When the cache reaches a certain threshold, the data is sent to the Python process. The `python.fn-execution.bundle.size` parameter controls the maximum number of data records that can be cached.

The default value is 100000 records.

python.fn-execution.bundle.time

This parameter controls the maximum cache time for data. The computation of cached data is triggered when the number of cached records reaches the threshold defined by `python.fn-execution.bundle.size` or the cache time reaches the threshold defined by `python.fn-execution.bundle.time`.

The default value is 1000 milliseconds.

python.fn-execution.arrow.batch.size

When you use a Pandas UDF, this parameter specifies the maximum number of data records that an Arrow batch can contain. The default value is 10000.

Note

The value of the `python.fn-execution.arrow.batch.size` parameter cannot be greater than the value of the `python.fn-execution.bundle.size` parameter.

Note

Setting these three parameters to the largest possible values is not always the best approach. If these parameters are set to excessively large values, too much data may need to be processed during checkpointing. This can increase the checkpoint duration or even cause checkpoint failures. For more information about these parameters, see Configuration.

References