A user-defined scalar function (UDSF) maps zero, one, or more scalar values to a single scalar value. Each input row produces exactly one output value.
This topic describes how to create, register, and use a Python UDSF in Realtime Compute for Apache Flink.
Limits
The following constraints apply when you develop Python user-defined functions (UDFs) in Realtime Compute for Apache Flink:
| Constraint | Requirement |
|---|---|
| Apache Flink version | 1.12 and later |
| Python version | Pre-installed on every workspace. VVR earlier than 8.0.11: Python 3.7.9. VVR 8.0.11 and later: Python 3.9.21. |
| JDK version | JDK 8 and JDK 11. Third-party JAR packages must be compatible with JDK 8 or JDK 11. |
| Scala version | Open-source Scala 2.11 only. Third-party JAR packages must be compatible with Scala 2.11. |
After upgrading to VVR 8.0.11 or later, test, deploy, and run your existing PyFlink drafts again to confirm compatibility.
Create a UDSF
The following steps use Windows as the example environment. Flink provides a sample repository that includes implementations for UDSFs, user-defined aggregate functions (UDAFs), and user-defined table-valued functions (UDTFs).
Download and decompress python_demo-master to your local machine.
This is a third-party GitHub repository. Access may be slow or intermittent.
In PyCharm, choose File > Open and open the decompressed
python_demo-masterdirectory.Open
udfs.pyin the\python_demo-master\udxpath and define your UDSF.from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int): return s[begin:end]The
sub_stringexample extracts characters from positionbeginto positionendin the input string.From the
\python_demo-masterdirectory, run the following command to package theudxdirectory:zip -r python_demo.zip udxWhen
python_demo.zipappears in\python_demo-master\, the package is ready.
Register a UDSF
After creating the package, register the UDSF in the Realtime Compute for Apache Flink console. For registration steps, see Manage user-defined functions (UDFs).
Use a UDSF
After registering the UDSF, use it in a Flink SQL job.
Create a draft using Flink SQL. For details, see Job development overview. The following example calls
ASI_UDSF(the registered name of your UDSF) to extract characters from positions 2 to 4 of theafield in the source table:CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a, 2, 4) FROM ASI_UDSF_Source;In the left-side navigation pane of the development console, choose O&M > Deployments. Find the deployment, then click Start in the Actions column. After the deployment starts, characters at positions 2–4 of the
afield inASI_UDSF_Sourceare written toASI_UDSF_Sink.