PyFlink Next Generation Python Runtime Introduction

1. New features of PyFlink

PyFlink 1.14 has added many new functions, mainly divided into three aspects: function, usability and performance.

In terms of functions, State TTL config has been added. Before 1.14, the Python Datastream API and some functions of operating State have been implemented, but the configuration of State TTL config is not provided, which also means that the user cannot automatically clear the State value when writing a custom function of the Python Datastream API. Instead, manual operations are required, which is not user-friendly enough.

In terms of ease of use, the following functions have been added:

Supported tar.gz format in dependency management section.

Profile function. Users will use some Python custom functions when writing PyFlink, but it is not clear where the performance bottleneck of these functions is. With the profile function, when a performance bottleneck occurs in a Python function, you can use the profile to analyze the specific cause of the bottleneck, so that some optimizations can be made for this part.

Print function. Before 1.14, you must use Python's custom logging module to print custom log information. But for Python users, print is a way they are more accustomed to outputting log information. So this part of the functionality was added in 1.14.

Local Debug mode. Before 1.14, if users use Python-defined functions to develop PyFlink jobs locally, they must use remote debug to debug custom logic, but it is relatively cumbersome to use and has a high threshold for use. This mode has been changed in 1.14. If you write a PyFlink job locally and use a Python custom function, you can automatically switch to the local debug mode, and you can directly debug the custom Python function in the ide.

In terms of performance, the following features have been added:

Operator Fusion. This function is mainly aimed at the scenario of performing several consecutive operator operations in the Python Datastream API job. For example, two .map operations, before 1.14, these two .maps will be run in two Python workers respectively, and after Operator Fusion is implemented, they will be merged and run in the same operator, and then executed by the Python worker As a result, a very good performance optimization is achieved.

State serialization/deserialization optimization. Prior to 1.14, State serialization/deserialization optimization used Python's built-in serializer pickle, which can serialize various Python-defined data structures, but needs to serialize the State type information into the data structure. Structs that would result in serialization would be larger. It was optimized in 1.14, using a custom serializer, and one type corresponds to one serializer for optimization, making the serialization information smaller.

Finish Bundle optimization. Before 1.14, Finish Bundle was a synchronous operation, but now it has been changed to an asynchronous operation, which improves its performance and can solve some scenarios that Checkpoint cannot complete.

2. PyFlink Runtime

The top Python Table API & SQL and Datastream API on the left side of the figure are Python APIs provided to users. The user writes a PyFlink job through these two Python APIs, and then converts the Python API into a Java API through a py4j tripartite library, which can correspond to the Flink Java API to describe the job.

There is an additional optimizer for Table and SQL jobs, which has two kinds of rules, one is common common rules, and the other is Python rules. Why are there Python rules here? As we all know, common rules are effective for various Table and SQL existing jobs, while the optimization of Python rules is aimed at scenarios where custom Python functions are used in PyFlink jobs, and the corresponding operators can be extracted.

After describing a job, it is translated into a jobgraph with corresponding Python operators. The jobgraph described by Python operators will be submitted to TM (Runtime) to run, and there are also Python operators in Runtime.

On the right side of the figure are various components of Python operators, describing the core part of PyFlink Runtime. It is mainly divided into two parts: Java operator and Python worker.

It has many components in Java operator, including data service and State service, and some processing for checkpoint, watermark and State request. Because custom Python functions cannot run directly on Flink's existing architecture, which is based on the JVM, but writing Python functions requires a Python Runtime, so operator workers are used to solve this problem.

The solution is as follows: launch a Python process to run Python-defined functions, and use Java operators to process upstream data, and then send it to the corresponding Python worker after special processing. The inter-process communication scheme is used here, which is the data service in the figure. State service is aimed at Python Datastream API's operations on State. By operating State in Python, the data will be returned from Python worker to Java operator, and Java operator will get the corresponding State data by accessing State backend and send it back to Python worker. The user can then manipulate the result of State.

The picture above is PyFlink Runtime Workflow. The roles in it are Python operator, Python runner, bundle processor, coder, and Python operation. These different roles run in different places. The Python operator and the Python runner run in the Java JVM and are responsible for connecting the upstream and downstream Java operators, while the bundle processor, coder, and Python operation run in the PVM. The bundle processor utilizes the existing Apache Bean framework and can receive data from Java Python data, between them using inter-process communication. The coder is a custom serializer on the Python side. The Java side sends a piece of data, which is sent to the Python runner through the Python operator. After being serialized by the Python runner, it is sent to the bundle processor through inter-process communication. The bundle processor deserializes the serialized binary array through the coder and obtains a Python object. Finally, the deserialized Python parameter is used as an input parameter of a function body through Python operation, and then the custom Python function is called to obtain the custom result.

The bottlenecks of the above-mentioned process mainly exist in the following aspects: firstly, the calculation end calls the user-defined function and there is an overhead written in Python at the framework layer before the call; secondly, the custom serialization part requires serialization on both the Java end and the Python end. and deserialization data; the third part is the communication between processes.

Aiming at the above bottlenecks, a series of optimizations have been made:

In terms of computing, use codegen to change all the variables of the existing Python call function into constants, and the execution efficiency of the function will be higher; in addition, changing all the implementations of the existing Python operation to cython is equivalent to converting Python into .c The implementation method, the performance has been greatly improved;
In terms of serialization, custom serializers are provided, all of which are pure c implementations, which are more efficient than Python.
In terms of communication, it has not yet been optimized.
The problem of serialization and communication is essentially the problem of Java and Python calling each other, that is, how to optimize the runtime architecture of PyFlink.

3. FFI-based PEMJA

It is already a common problem for Java and Python to call each other, and there are already many implementations.

The first is the scheme of calling each other between processes, that is, the scheme of network communication, including the following:

The socket solution, all its communication protocols are implemented by itself, which can be very flexible, but cumbersome;
The py4j scheme, that is, both PyFlink and PySpark use py4j when writing jobs on the client side;
Alink scheme, which uses py4j at runtime, and also has custom Python functions; grpc scheme, which uses the existing grp service, does not require a custom protocol, and has custom service and message;
In addition, the shared memory solution is another inter-process communication solution, such as Tensorflow on Flink, which is implemented through shared memory. There is also PyArrow Plasma, which is also an object-based shared memory storage.
The above solutions are all aimed at inter-process communication, so can Python and Java run in the same process, thereby completely eliminating the trouble caused by inter-process communication?

There are indeed some existing libraries that try to do this, the first solution is to convert Python to Java. For example, p2j converts Python source code into Java source code, and voc converts Python code directly into Java bytecode. The essence of this solution is to convert Python into a set of codes that can run directly on the JVM. But this solution also has a lot of flaws, because Python is constantly developing, it has various grammars, and it is very difficult to map Python grammars to corresponding objects in Java, after all, they are different languages.

The second solution is a Java-based implementation of the Python interpreter. The first is the Jython solution. Python is actually a set of Python interpreters written in C language. The Python interpreter written in C can run on top of C, so the Python interpreter implemented in Java can also run directly on the JVM. Another solution is Graalvm, which provides a truffle framework that can support various programming languages to use a common structure. This structure can run on the JVM, so that various languages can run in the same process. inside.

The premise of the above solution is to be able to recognize Python code, which means that it must be compatible with various existing Python codes, but at present, compatibility is a difficult problem to solve, so it prevents this set of Python from being converted to Java. Possibility of program continuation.

The third is a set of schemes based on FFI.

The essence of FFI is how the host language calls a guest language, that is, the mutual calls between Java and Python. There are many corresponding specific implementation schemes.

Java provides JNI (Java native interface), which allows Java users to call some libs implemented by C through the JNI interface, and vice versa. With this set of interfaces, JVM manufacturers will implement JNI according to this set of interfaces, so as to realize the mutual calling between Java and c.

The Python/C API is also similar. Python is a set of interpreters implemented in c, so it can well support Python code to call three-party libraries in c, and vice versa.

Cython provides a tool that can convert source code into code that another language can understand. For example, convert Python code into a set of very efficient C language code, and then embed it into the cPython interpreter to run directly, which is very efficient.

Ctypes enables Python to efficiently call the c library by encapsulating the c library.

The core of the FFI-based scheme mentioned above is c. With the c bridge, a code written in Java can call c through the JNI interface, and then c can call the cPython API interface, and finally realize that Java and Python run in the same thread. This is the overall idea of PEMJA . It solves the problem of inter-process communication, and because it uses the Python/C API provided by itself, there is no compatibility problem, and it overcomes the defects of Java's implementation of interpreters.

The problem JPype solves is the problem of calling Java from Python. It does not support calling Python from Java, so it does not apply to this scenario.

JEP implements Java to call Python, but there are many limitations in its specific implementation. First, it can only be installed with source code, which has very high requirements on the environment, and it needs to rely on some .source files from cPython, which is very unfavorable for cross-platform installation. use. The startup entry of JEP must be a JEP program, which needs to dynamically load the class library, and must be set in the environment variable in advance, which is very unfavorable for it to run on another architecture as a third-party middleware plug-in. In addition, there are performance problems. It does not overcome the existing Python GIL problems well, so its performance is not so efficient.

PEMJA basically overcomes the above problems, and better realizes the mutual calling of Java and Python.

The figure above is a performance comparison of several frameworks. A relatively standard and simple String upper function is used here. The main comparison here is the overhead of the framework layer, not the performance of the custom function, so the simplest function is used. At the same time, considering that the most commonly used data structure for various existing functions is String, String is used here.

Here we compare the performance of 100 bytes and 1000 bytes under these 4 interpreters. It can be seen that Jython is not as efficient as imagined, but has the lowest performance among the 4 implementations. The performance of JEP is far inferior to that of PEMJA. When PEMJA is 100 bytes, it is about 40% of the pure Java implementation. When it is 1000 bytes, the performance actually surpasses the pure Java implementation.

How to explain this phenomenon? String upper itself is a Java implementation, but in Python it is a .c implementation. The execution efficiency of the function itself is higher than that of Java. Combined with the fact that the framework overhead is small enough, the overall performance is higher than that of Java. It means that in some scenarios, the performance of Python UDF may surpass that of Java UDF.

A key point why many users now use Java UDF instead of Python UDF is that the performance of Python UDF is far inferior to that of Java. But if Java's performance is not better than Python, Python has an advantage, because it is a scripting language after all, and it is more convenient to write.

The damond thread in Java is responsible for initialization and final destruction as well as creation and release of resources in PEMJA and the corresponding Python PVM. The user is using the PEMJA instance in Java, and the instance is mapped to the corresponding PEMJA instance in PEMJA, and instant will create each Python sub interpreter. Compared with the global Python interpreter, the Python double interpreter is a smaller concept that can control the GIL. It has its own independent hip space, so it can achieve namespace isolation. Each thread here corresponds to a Python sub interpret, which can execute its own Python function in the corresponding PVM.

4. PyFlink Runtime 2.0

PyFlink Runtime 2.0 is based on PEMJA.

The left side of the figure above is the architecture of PyFlink 1.0. There are two processes in it, one is a Java process and the other is a Python process. The data interaction between them is realized through data service and State service, using process IPC communication.

With PEMJA, you can replace the data service and state service with PEMJA Lib, and then you can run the original JVM on the left and the PVM on the right in the same process, thus completely solving the problem of IPC process communication.

The figure above compares the performance of the existing PyFlink UDF, PyFlink's UDF based on PEMJA, and Java UDF. Also use the String upper function to compare the performance of 100 bytes and 1000 bytes. It can be seen that in the case of 100 bytes, the implementation of UDF on PEMJA has basically reached 50% of the performance of Java UDF. In the case of 1000 bytes, the performance of UDF on PEMJA has surpassed that of Java UDF. Although this is related to the implementation of custom functions, it can also illustrate the high performance of this PEMJA framework.

5. Future Work

In the future, the PEMJA framework will be open sourced (officially open sourced on January 14, 2022), because it involves a general solution, not only on PyFlink, but also various Java and Python calling schemes can also be used This set of frameworks will make an independent open source for the PEMJA framework. Its first version temporarily only supports Java calling Python functions, and later will support Python calling Java functions, because the Python Datastream API function written in Python calls State is dependent on Python calling Java functions. In addition, PEMJA will be implemented to support Numpy's native data structure. After this support is implemented, pandas UDF will be able to be used, and the performance will get a qualitative leap.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us