Fully managed Flink allows you to use Python user-defined functions (UDFs) in SQL deployments. This topic describes the classification, definition, and dependencies of Flink UDFs. This topic also describes how to debug deployments and optimize deployment performance.

UDF classification

The following table describes the three types of UDFs that are supported by Realtime Compute for Apache Flink.
UDF classification Description
User-defined scalar function (UDSF) A UDSF maps zero, one, or more scalar values to a new scalar value. A one-to-one mapping is established between the input and output of a UDSF. Each time a UDSF reads a row of data, it writes an output value. For more information, see UDSFs.
User-defined aggregate function (UDAF) A UDAF aggregates multiple values into a single value. A many-to-one mapping is established between the input and output of a UDAF. Multiple input records are aggregated to generate one output value. For more information, see UDAFs.
User-defined table-valued function (UDTF) A UDTF takes zero, one, or more scalar values as input parameters. These parameters can be variable-length parameters. UDTFs are similar to UDFs but differ in that they return any number of rows instead of a single value. Returned rows consist of one or more columns. Multiple rows or columns are returned each time a UDTF is called. For more information, see UDTFs.

Register a UDF

UDFs are classified into catalog UDFs and deployment-level UDFs. To register a UDF, perform the following steps based on the type of the UDF:
  • Catalog UDF
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column. In the left-side navigation pane, click Draft Editor.
    3. Click the UDFs tab. In the UDFs pane of the page, click the Create icon icon.
    4. Upload the Python file of the UDF that you want to register. Register UDF Artifact
      You can use one of the following methods to upload the Python file of a UDF:
      • Upload a file: In the Register UDF Artifact dialog box, click click to select next to Select a file and select the Python file of the UDF that you want to register. If a dependency file exists, click click to select next to Dependencies and select the Python file or ZIP package on which the Python file of the UDF depends.
      • External URL: Enter an external URL.
      Note
      • If the Python file or the dependency file of the UDF is large, we recommend that you upload the file by using an external URL. If the external URL is the endpoint of an Object Storage Service (OSS) bucket, the dependency file of the UDF must be stored in the sql-artifacts/namespaces/{namespace} directory.
      • The Python file of your UDF is uploaded and stored in the sql-artifacts directory of the OSS bucket that you select. In the console of fully managed Flink, the system parses the Python file of the UDF and checks whether the classes of the Flink UDF, UDAF, and UDTF interfaces are used in the file. Then, the system automatically extracts the class names and specifies the class names in the Function Name field.
      • The dependencies of a Python UDF can be packaged into a Python file of the UDF or uploaded by clicking click to select next to Dependencies in the Register UDF Artifact dialog box.
    5. Click Ok.

      In the UDFs pane on the left side of the SQL editor, you can view all the UDFs that are registered.

Dependencies

Commonly used Python packages such as pandas, NumPy, and PyArrow are pre-installed in fully managed Flink. You can view a list of third-party Python packages that are pre-installed in fully managed Flink on the Develop a job page.

Note The pre-installed Python packages must be imported in Python functions. Sample code:
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    import numpy as np
    return np.percentile(values, percentile)

You can also use other types of third-party Python packages in Python UDFs. If you use a third-party Python package that is not pre-installed in fully managed Flink, you must upload the package as a dependency file to fully managed Flink. For more information, see Manage UDFs and Use Python dependencies.

Debugging

In the code implementation of Python UDFs, you can use the logging method to generate logs and locate errors based on the logs. The following code is an example.
@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j
After logs are generated, you can view the logs in the log file of TaskManager.

Performance optimization

  • Preloaded resources
    If you use the resource pre-load feature, resources can be loaded in advance during UDF initialization. This way, you do not need to reload resources each time the eval() method is used to compute data. For example, if you only want to load a large deep learning model once and then make predictions on the model multiple times, you can use the following code:
    from pyflink.table import DataTypes
    from pyflink.table.udf import ScalarFunction, udf
    
    class Predict(ScalarFunction):
        def open(self, function_context):
            import pickle
    
            with open("resources.zip/resources/model.pkl", "rb") as f:
                self.model = pickle.load(f)
    
        def eval(self, x):
            return self.model.predict(x)
    
    predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
    Note For more information about how to upload Python files, see Use data files.
  • Use the pandas library

    Fully managed Flink allows you to use pandas UDFs in addition to common Python UDFs. The input data types of pandas UDFs use the data structures defined in pandas, such as pandas.Series and pandas.DataFrame. You can use high-performance Python libraries such as pandas and NumPy in pandas UDFs to develop high-performance Python UDFs. For more information, see Vectorized User-defined Functions.

  • Configure required parameters
    The performance of Python UDFs mainly depends on their implementation. If you encounter performance issues related to Python UDFs, you need to optimize their implementation. The performance of Python UDFs is also affected by the values of the parameters described in the following table.
    Parameter Description
    python.fn-execution.bundle.size Python UDFs are asynchronously calculated. During calculation, a Java operator asynchronously sends data to a Python process for processing. The Java operator caches data and then sends the data to the Python process when the number of data records in the cache reaches a specific threshold. The maximum number of data records that can be cached.

    Default value: 100000.

    python.fn-execution.bundle.time The maximum duration for which data can be cached. If the number of cached data reaches the value specified by the python.fn-execution.bundle.size parameter or the duration for which data is cached reaches the value specified by the python.fn-execution.bundle.time parameter, the calculation on the cached data is triggered.

    Default value: 1000. Unit: milliseconds.

    python.fn-execution.arrow.batch.size The maximum number of data records that are allowed in an arrow batch when you use pandas UDFs. Default value: 10000.
    Note The value of the python.fn-execution.arrow.batch.size parameter cannot be greater than the value of the python.fn-execution.bundle.size parameter.
    Note If the values of these parameters are too large, excessive data needs to be processed during checkpointing. This prolongs checkpointing or even causes a failure of checkpointing. For more information about these parameters, see Configuration.