This topic describes how to develop, register, and use a user-defined scalar function (UDF).
DefinitionA UDF maps zero, one, or more scalar values to a new scalar value. A one-to-one mapping is established between the input and output of a UDF. Each time a UDF reads a row of data, it writes an output value.
- Only Apache Flink 1.12 and later versions are supported.
- Python 3.7.9 is pre-installed on a fully managed Flink cluster. Therefore, you must develop code in Python 3.7.9.
- JDK 1.8 is used in the running environment of fully managed Flink. If your Python job depends on a third-party JAR package, make sure that the JAR package is compatible with JDK 1.8.
- Only open source Scala 2.11 is supported. If your Python job depends on a third-party JAR package, make sure that the JAR package that corresponds to Scala 2.11 is used.
Develop a UDF
- Download and decompress the python_demo-master package to your on-premises machine.
- In the main menu bar of PyCharm, choose python_demo-master package. to open the decompressed
- Double-click the udfs.py file in the \python_demo-master\udx directory. Then, modify the content of the file based on your business requirements.
In this example, sub_string defines the code to obtain the characters from the begin position to the end position in each data record.
from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int): return s[begin:end]
- Go to the \python_demo directory to which the udx folder belongs and run the following command to package
the files in the directory:
zip -r python_demo.zip udx
If the python_demo.zip package appears in the \python_demo-master\ directory, the Python UDF is developed.
Register a UDF
For more information about how to register a UDF, see Register a UDF.
Use a UDF
- Use Flink SQL to create a job. For more information, see Develop a job.
The following code provides an example on how to obtain the characters the second character to the fourth character of the string in each row of the a field in the ASI_UDF_Source table:
CREATE TEMPORARY TABLE ASI_UDF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDF_Sink SELECT ASI_UDF(a,2,4) FROM ASI_UDF_Source;
- On the Deployments page, find the job for which you want to use the UDF and click Start in the Actions column.
After the job is started, each row of the ASI_UDF_Sink table is inserted with the characters from the second character to the fourth character of the string in each row of the a field in the ASI_UDF_Source table.