Realtime Compute for Apache Flink lets you use Python user-defined functions (UDFs) in Flink SQL jobs. This topic describes the types of Python UDFs, how to use Python dependencies, and how to tune their performance.
Types of user-defined functions
Type | Description |
User-defined scalar function (UDSF) | A UDSF maps zero, one, or more scalar values to a new scalar value. It has a one-to-one relationship between input and output. It reads one row of data and returns one output value. For more information, see User-defined scalar functions (UDSFs). |
User-defined aggregate function (UDAF) | A UDAF aggregates multiple records into a single record. It has a many-to-one relationship between input and output. It aggregates multiple input records into a single output value. For more information, see User-defined aggregate functions (UDAFs). |
User-defined table-valued function (UDTF) | A UDTF takes zero, one, or more scalar values as input parameters, which can be of variable length. It is similar to a UDSF but can return any number of rows as output, not just a single value. The returned rows can consist of one or more columns. A single function call can output multiple rows or columns. For more information, see User-defined table-valued functions (UDTFs). |
Use Python dependencies
Realtime Compute for Apache Flink clusters come with common Python packages, such as Pandas, NumPy, and PyArrow, pre-installed. For a list of third-party Python packages installed in Realtime Compute for Apache Flink, see Develop a Python job. To use a pre-installed Python package, you can import it into your Python function. The following example shows how to do this.
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
import numpy as np
return np.percentile(values, percentile)You can also use other third-party Python packages in your Python UDFs. If you use a third-party Python package that is not pre-installed, you must upload it as a dependency file when you register the Python UDF. For more information, see Manage user-defined functions (UDFs) and Use Python dependencies.
Debug code
You can implement logging in your Python UDF to output information. This helps you locate issues. The following example shows how to do this.
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + jYou can view the generated logs in the TaskManager log file. For more information, see View operational logs.
Performance tuning
Pre-load resources
Pre-loading resources lets you load them during UDF initialization. This avoids reloading resources every time the `eval` method is executed. For example, you can load a large deep learning model once and then run batch predictions on it multiple times. The following code provides an example.
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
class Predict(ScalarFunction):
def open(self, function_context):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")For information about how to upload Python data files, see Use Python dependencies.
Use the Pandas library
In addition to regular Python UDFs, Realtime Compute for Apache Flink also supports Pandas UDFs. The input data types for Pandas UDFs are data structures defined in Pandas, such as `pandas.Series` and `pandas.DataFrame`. You can use high-performance Python libraries such as Pandas and NumPy in Pandas UDFs to create high-performance Python UDFs. For more information, see Vectorized User-defined Functions.
Configure parameters
The performance of a Python UDF largely depends on its implementation. If you encounter performance issues, optimize the UDF implementation. The performance of a Python UDF is also affected by the values of the following parameters.
Parameter | Description |
python.fn-execution.bundle.size | Python UDFs are computed asynchronously. During execution, the Java operator sends data to the Python process for asynchronous processing. Before sending the data, the Java operator caches it. When the cache reaches a certain threshold, the data is sent to the Python process. The `python.fn-execution.bundle.size` parameter controls the maximum number of data records that can be cached. The default value is 100000 records. |
python.fn-execution.bundle.time | This parameter controls the maximum cache time for data. The computation of cached data is triggered when the number of cached records reaches the threshold defined by `python.fn-execution.bundle.size` or the cache time reaches the threshold defined by `python.fn-execution.bundle.time`. The default value is 1000 milliseconds. |
python.fn-execution.arrow.batch.size | When you use a Pandas UDF, this parameter specifies the maximum number of data records that an Arrow batch can contain. The default value is 10000. Note The value of the `python.fn-execution.arrow.batch.size` parameter cannot be greater than the value of the `python.fn-execution.bundle.size` parameter. |
Setting these three parameters to the largest possible values is not always the best approach. If these parameters are set to excessively large values, too much data may need to be processed during checkpointing. This can increase the checkpoint duration or even cause checkpoint failures. For more information about these parameters, see Configuration.
References
For information about how to register, update, and delete UDFs, see Manage user-defined functions (UDFs).
For examples of how to develop and use Python UDFs, see User-defined aggregate functions (UDAFs), User-defined scalar functions (UDSFs), and User-defined table-valued functions (UDTFs).
For information about how to use custom Python virtual environments, third-party Python packages, JAR packages, and data files in Flink Python jobs, see Use Python dependencies.
For examples of how to develop and use Java UDFs, see User-defined aggregate functions (UDAFs), User-defined scalar functions (UDSFs), and User-defined table-valued functions (UDTFs).
For information about how to debug and tune Java UDFs, see Overview.