PyFlink Get Started Quickly

Introduction to PyFlink

PyFlink is a sub-module of Flink and part of the entire Flink project. Its main purpose is to provide Flink's Python language support. Because in fields such as machine learning and data analysis, the Python language is very important, even the most important development language. Therefore, in order to meet the needs of more users and broaden the ecology of Flink, we launched the PyFlink project.

There are two main goals of the PyFlink project. The first is to export Flink's computing power to Python users, that is, we will provide a series of Python APIs in Flink to facilitate users who are familiar with the Python language to develop Flink jobs.

The second point is to distribute the Python ecosystem based on Flink. Although we will provide a series of Python APIs in Flink for Python users to use, this is a learning cost for users, because users need to learn how to use Flink's Python API and understand the purpose of each API. Therefore, we hope that users can use the API of the Python library they are more familiar with at the API layer, but the underlying computing engine uses Flink, thereby reducing their learning costs. This is something we're going to do in the future, and it's currently in the startup phase.

The picture below shows the development of the PyFlink project. Currently, three versions have been released, and the supported content is becoming more and more abundant.

Introduction to related functions of PyFlink

We mainly introduce the following functions of PyFlink, Python Table API, Python UDF, vectorized Python UDF, Python UDF Metrics, PyFlink dependency management, and Python UDF execution optimization.

Python Table API

The purpose of the Python Table API is to allow users to use the Python language to develop Flink jobs. There are three types of APIs in Flink, Process, Function, and Table API. The former two are relatively low-level APIs. Jobs developed based on Process and Function will execute their logic in strict accordance with user-defined behaviors, while Table API is a relatively high-level API. The API, based on the Table API development job, its logic will be executed after a series of optimizations.

Python Table API, as the name implies, provides Python language support for Table API.

The following is a Flink job developed by the Python Table API. The logic of the job is to read the file, calculate the word count, and then write the calculation result to the file. Although simple, this example includes all the basic flow of developing a Python Table API job.

First of all, we need to define the execution mode of the job, such as batch mode or stream mode, and what is the concurrency of the job? What is the configuration of the job. Next, we need to define the source table and sink table. The source table defines where the data source of the job comes from and what the data format is; the sink table defines where the job execution results are written and what the data format is. Finally, we need to define the execution logic of the job. In this example, it is to calculate the written count.

The following are some screenshots of Python Table API, you can see that its quantity and functions are relatively complete.

Python UDFs

The Python Table API is a relational API, and its function can be compared to SQL, and the custom function in SQL is a very important function, which can greatly expand the scope of use of SQL. The main purpose of Python UDF is to allow users to use the Python language to develop custom functions, thereby expanding the usage scenarios of the Python Table API. At the same time, Python UDF can be used not only in Python Table API jobs, but also in Java Table API jobs and SQL jobs.

In PyFlink we support multiple ways to define Python UDF. Users can define a Python class that inherits ScalarFunction, or define an ordinary Python function or Lambda function to implement the logic of the custom function. In addition, we also support the definition of Python UDF through Callable Function and Partial Function. Users can choose the most suitable way according to their needs.

PyFlink provides a variety of ways to use Python UDF, including Python Table API, Java table API and SQL, we will introduce them one by one.

To use a Python UDF in the Python Table API, after defining the Python UDF, the user first needs to register the Python UDF, which can be registered by calling table environment register, and then named, and then the Python UDF can be used in the job through this name.

It is used in a similar way in the Java Table API, but the registration method is different. The DDL statement is required to register in the Java Table API job.

In addition, users can also use Python UDF in SQL jobs. Similar to the previous two methods, the user first needs to register the Python UDF, which can be registered through the DDL statement in the SQL script, or in the environment configuration file of the SQL Client.

Python UDF Architecture

Briefly introduce the execution architecture of Python UDF. Flink is written in the Java language and runs in the Java virtual machine, while the Python UDF runs in the Python virtual machine, so the Java process and the Python process need to communicate with each other. In addition, state, log, and metrics need to be transmitted between the two, and their transmission protocols need to support 4 types.

Vectorizing Python UDFs

The main purpose of vectorized Python UDF is to enable Python users to develop high-performance Python UDFs by using commonly used Python libraries in data analysis fields such as Pandas or Numpy.

The vectorized Python UDF is relative to the normal Python UDF, and we can see the difference between the two in the figure below.

The figure below shows the execution of a vectorized Python UDF. First of all, on the Java side, Java will convert multiple pieces of data into Arrow format after saving, and then send it to the Python process. After the Python process receives the data, it converts it into a Pandas data structure, and then calls the user-defined vectorized Python UDF. At the same time, the execution result of the vectorized Python UDF will be converted into Arrow format data, and then sent to the Java process.

In terms of usage, vectorized Python UDFs are similar to ordinary Python UDFs, with only a few differences in the following places. First, the declaration method of the vectorized Python UDF needs to add a UDF type, declaring that this is a vectorized Python UDF, and the input and output type of the UDF is Pandas Series.

Python UDF Metrics

We mentioned earlier that there are many ways to define Python UDF, but if you need to use Metrics in Python UDF, then Python UDF must inherit ScalarFunction to define. A Function Context parameter is provided in the open method of the Python UDF. Users can register Metrics through the Function Context parameter, and then report through the registered Metrics object.

PyFlink dependency management

In terms of types, PyFlink dependencies mainly include the following types, ordinary PyFlink files, archive files, third-party libraries, PyFlink interpreters, or Java Jar packages, etc. From the solution point of view, for each type of dependency, PyFlink provides two solutions, one is the API solution, the other is the command line option, you can choose one of them.

Python UDF execution optimization

The execution optimization of Python UDF mainly includes two aspects, execution plan optimization and runtime optimization. It is very similar to SQL. A job containing Python UDF first generates an optimal execution plan through predefined rules. When the execution plan has been determined, some other optimization methods can be used to achieve the highest possible execution efficiency during actual execution.

Python UDF execution plan optimization

The optimization of the execution plan mainly includes the following optimization ideas. One is the splitting of different types of UDFs. Since multiple types of UDFs may be contained in one node at the same time, different types of UDFs cannot be executed together; the second aspect is Filter pushdown, its main purpose It is to reduce the amount of input data containing Python UDF nodes as much as possible, thereby improving the execution performance of the entire job; the third optimization idea is Python UDF Chaining, the communication overhead between the Java process and the Python process, and the serialization and deserialization overhead are relatively large , while Python UDF Chaining can minimize the communication overhead between the Java process and the Python process.

Splitting of different types of UDFs

If there is such a job, it contains two UDFs, where add is a Python UDF, and subtract is a vectorized Python UDF. By default, the execution plan of this job will have a project node, and these two UDFs are located in this project node at the same time. The main problem with this execution plan is that ordinary Python UDFs process one piece of data at a time, while vectorized Python UDFs process multiple pieces of data at a time, so such an execution plan cannot be executed.

But by splitting, we can split this project node into two project nodes, where the first project node only contains ordinary Python UDF, while the second node only contains vectorized Python UDF. After different types of Python UDFs are split into different nodes, each node contains only one type of UDF, so the operator can choose the most appropriate execution method according to the type of UDF it contains.

Filter is pushed down before the Python UDF

The main purpose of filter pushdown is to push down the filter operator before the Python UDF node, and minimize the data volume of the Python UDF node.

If we have such a job, the original execution plan of the job includes two Project nodes, one is add, subtract, and a Filter node. This execution plan works, but needs to be optimized. It can be seen that because the Python node is located before the Filter node, the Python UDF has been calculated before the Filter node, but if the Filter is filtered and pushed before the Python UDF, then the input data volume of the Python UDF node can be greatly reduced.

Python UDF Chaining

Suppose we have a job that contains two types of UDFs, one is add and the other is subtract, both of which are ordinary Python UDFs. An execution plan contains two project nodes, where the first project node is calculated as subtract first, and then transferred to the second project node for execution.

Its main problem is that since subtract and add are located on two different nodes, their calculation results need to be sent back from Python to Java, and then sent by the Java process to Python on the second node for execution. It is equivalent to the data turning around between the Java process and the Python process, so it brings completely unnecessary communication overhead and serialization and deserialization overhead. Therefore, we can optimize the execution plan to the right figure, that is, run the add node and the subtract node in one node, and directly call the add node after the result of the subtract node is calculated.

Python UDF runtime optimization

At present, there are three ways to improve the execution efficiency of Python UDF operations: one is Cython optimization, which is used to improve the execution efficiency of Python code; The efficiency of serialization and deserialization; the third is to provide vectorized Python UDF function.

Demonstration of related functions of PyFlink

First of all, you open this page, which provides some demos of PyFlink. These demos are run in docker, so if you want to run these demos, you need to install the docker environment locally.

Then, we can run the command, which will start a PyFlink cluster, and the PyFlink examples we run later will be submitted to the cluster for execution.

The first example is word count. We first define the environment, source, sink, etc. in it, and we can run this job.

This is the execution result of the job. You can see that the word Flink appears twice, and the word PyFlink appears once.

Next run an example of a Python UDF. This example is somewhat similar to the previous ones. First, we define that it uses PyFlink, runs in batch mode, and the concurrency of the job is 1 at the same time. The difference is that we define a UDF in the job, its input includes two columns, both of type Bigint, and its output type is also corresponding. The logic of this UDF is to output the sum of these two columns as a result.

Let's execute the job, and the execution result is 3.

Next we run a Python UDF with dependencies. The UDF of the previous job does not contain any dependencies, and directly adds up the two input columns. In this example, the UDF references a third-party dependency, which we can execute through the API set python requirement.

Next we run the job, and its execution result is the same as before, because the logic of the two jobs is similar.

Next, let's look at an example of a vectorized Python UDF. When UDF is defined, we add a UDF type field, indicating that we are a vectorized Python UDF, and other logics are similar to those of ordinary Python UDFs. In the end, its execution result is also 3, because its logic is the same as before, calculating the sum of the two pages.

Let's look at another example, using Python in Java's Table job. In this job, we will use a Python UDF, which is registered through the DDL statement, and then used in the execute SQL statement.

Next, let's look at an example of using Python UDF in a pure SQL job. In the resource file, we declare a UDF named add1, its type is Python, and we can also see its UDF location.

Next we run it and the execution result is 234.

PyFlink's next step plan

Currently, PyFlink only supports the Python Table API. We plan to support the DataStream API in the next version, and will also support Python UDAF and Pandas UDAF. In addition, we will continue to optimize the execution efficiency of PyFlink at the execution layer.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us