Flink Execution Engine

1. Background

With the continuous development of the Internet and mobile Internet, all walks of life have accumulated massive amounts of business data. In order to improve user experience and enhance the competitiveness of products in the market, enterprises have adopted real-time methods to process big data. The real-time large screen of social media, real-time recommendation of e-commerce, real-time traffic prediction of city brain, real-time anti-fraud in the financial industry, the success of these products shows that the real-time processing of big data has become an unstoppable trend.

Under the general trend of real-time, Flink has become the de facto standard in the real-time computing industry. We have seen that not only Alibaba, but also leading manufacturers in various fields at home and abroad use Flink as the technical base for real-time computing.

Real-time business is just a starting point. One of Flink's goals is to provide users with a real-time and offline integrated user experience. In fact, many users not only need real-time data statistics, but also need to compare with historical (yesterday, or even the same period last year) data in order to confirm the effect of operation or product strategies. From the user's point of view, the original stream and batch independent solutions have some pain points:

The labor cost is relatively high. Since stream and batch are two systems, the same logic requires two teams to develop it twice.

Data link redundancy. In many scenarios, stream and batch computing content are actually the same, but since they are two systems, the same logic still needs to be run twice, resulting in a certain waste of resources.

The data caliber is inconsistent. This is the most important problem users encounter. Two systems, two sets of operators, and two sets of UDFs will inevitably produce different degrees of errors, which have brought great troubles to the business side. These errors cannot be solved simply by relying on manpower or resource input.

On Double Eleven in 2020, when the real-time flood peak reached a record high of 4 billion, the Flink team and the DT team jointly launched a data warehouse architecture based on Flink's full-link streaming and batch integration, which solved the problems brought by the Lambda architecture. A series of problems: stream batch jobs use the same SQL, which improves R&D efficiency by 3 to 4 times; a set of engines ensures that the data caliber is naturally consistent; stream batch jobs run on the same cluster, and peak shaving and valley filling greatly improve resource efficiency .

The success of Flink's stream-batch integration is inseparable from the healthy and vigorous development of the Flink open source community. From the 2020 annual report of the Apache Software Foundation, it can be seen that Flink ranks among the best in three key indicators reflecting the prosperity of the open source community: user mailing list activity, Flink ranks first; developer submissions Flink ranks second, Github users The number of visits ranks second. These data are not limited to the field of big data, but all projects under the Apache Open Source Foundation.

2020 is also the second year for Blink to feed back the community. In the past two years, we have gradually contributed the experience accumulated by Blink in the group back to the community, making Flink a truly integrated streaming-batch platform. I hope to use this article to share with you what Flink has done in the past two years in terms of execution engine stream-batch fusion. At the same time, I also hope that old users and new friends of Flink can learn more about the "past life and present life" of Flink's stream-batch integration architecture.

2. Hierarchical architecture integrating streaming and batching

In general, Flink's core engine is mainly divided into the following three layers:

SDK layer. There are two main types of Flink SDKs. The first is the relational SDK, which is SQL/Table, and the second is the physical SDK, which is DataStream. These two types of SDKs are stream-batch unification, that is, whether it is SQL or DataStream, the user's business logic can be used in both stream and batch scenarios at the same time as long as it is developed once;

Execution engine layer. The execution engine provides a unified DAG to describe the data processing flow Data Processing Pipeline (Logical Plan). Regardless of whether it is a stream task or a batch task, the user's business logic will be converted into this DAG graph before execution. The execution engine converts this logical DAG into a Task executed in a distributed environment through the Unified DAG Scheduler. Data is transmitted between Tasks through Shuffle. We use the Pluggable Unified Shuffle architecture to support both streaming and batch Shuffle methods;

state storage. The state storage layer is responsible for storing the state execution state of the operator. For stream jobs, there are open source RocksdbStatebackend, MemoryStatebackend, and a commercial version of GemniStatebackend; for batch jobs, we have introduced BatchStateBackend in the community version.

This article mainly shares the following aspects:

Stream-batch-integrated DataStream introduces how to solve the current challenges faced by the Flink SDK through stream-batch-integrated DataStream;

The DAG Scheduler integrating streaming and batching introduces how to fully tap the performance advantages of the streaming engine through a unified Pipeline Region mechanism; how to improve the ease of use of the engine and improve the resource utilization of the system by dynamically adjusting the execution plan;

Stream-batch-integrated Shuffle architecture introduces how to use a unified Shuffle architecture to meet the strategic customization requirements of different Shuffles while avoiding repeated development on common requirements;

The fault-tolerant strategy integrating streaming and batching introduces how to use a unified fault-tolerant strategy to meet the fault tolerance in the batch scenario and improve the fault tolerance effect in the streaming scenario.

3. Stream batch integration DataStream

SDK analysis and challenges

As shown in the figure above, there are currently three types of SDKs provided by Flink:

Table/SQL is an advanced Relational SDK, which is mainly used in some data analysis scenarios, and can support both Bounded and Unbounded inputs. Since Table/SQL is declarative, the system can help users to perform many optimizations. For example, according to the Schema provided by the user, it can perform optimizations such as Filter Push Down predicate pushdown and on-demand deserialization of binary data. Currently Table/SQL can support Batch and Streaming execution modes. [1]

DataStream is a Physical SDK. Although the Relatinal SDK is powerful, it also has some limitations: it does not support the operation of State and Timer; due to the upgrade of Optimizer, the physical execution plan of the two versions may be incompatible with the same SQL. The DataStream SDK can support Low Level operations in the State and Timer dimensions. At the same time, because DataStream is an Imperative SDK, it has a good "control" over the physical execution plan, so there is no incompatibility caused by version upgrades. DataStream still has a large user base in the community. For example, there are still nearly 500 unclosed DataStream issues. Although DataStream can support both Bounded and Unbounded Input applications written in DataStream, but before Flink-1.12 it only supported the execution mode of Streaming.

DataSet is a Physical SDK that only supports Bounded input. It will optimize some operators according to the characteristics of Bounded, but it does not support operations such as EventTime and State. Although DataSet is the earliest SDK provided by Flink, with the continuous development of real-time and data analysis scenarios, compared with DataStream and SQL, the influence of DataSet in the community is gradually declining.

At present, Table/SQL has relatively mature support for stream-batch unification scenarios, but there are still some challenges for Phyiscal SDK, mainly in two aspects:

Using the existing Physical SDK, it is impossible to write a stream-batch-integrated Application that can be used for real production. For example, if a user writes a program to process real-time data in Kafka, it is also very natural to use the same program to process historical data stored on OSS/S3/HDFS. But at present, neither DataSet nor DataStream can meet the "simple" demands of users. You may find it strange that DataStream does not support both Bounded Input and Unbounded Input. Why is there still a problem? In fact, "the devil is hidden in the details", I will elaborate further in the section Unified DataStream.

The cost of learning and understanding is relatively high. As Flink continues to grow, more and more new users join the Flink community, but for these new users, it is necessary to learn two Physical SDKs. Compared with other engines, the learning cost for users to get started is relatively high; the two SDKs have different semantics. For example, DataStream has Watermark and EventTime, but DataSet does not. For users, it is necessary to understand the two mechanisms The threshold is not small; because the two SDKs are not compatible, once a new user chooses the wrong one, he will face a lot of switching costs.

Unified Physical SDK


In order to solve the challenges faced by the above-mentioned Physical SDK, we use the Unified DataStream SDK as Flink's unified Physical SDK. This part mainly solves two problems:

Why choose DataStream as the Unified Physical SDK?

What capabilities does Unified DataStream provide compared to the "old" DataStream so that users can write a stream-batch integrated Application that can be used in real production?

Why not Unified DataSet

In order to solve the problem of relatively high learning and understanding costs, the most natural and simplest solution is to choose one of DataStream and DataSet as Flink's only Physical SDK. So why did we choose DataStream instead of DataSet? There are two main reasons:

User benefits. As analyzed earlier, with the development of the Flink community, the influence of DataSet in the community is gradually declining. If you choose to use DataSet as the Unified Physical SDK, the user's previous large "investment" in DataStream will be forfeited. And choosing DataStream can allow many users to get additional returns on their existing DataStream "investment";

Development costs. DataSet is too old and lacks a lot of support for the basic concepts of modern real-time computing engines, such as EventTime, Watermark, State, Unbounded Source, etc. Another deeper reason is that the implementation of existing DataSet operators cannot be reused in streaming scenarios, such as Join. This is not the case for DataStream, which can be reused a lot. So how to reuse DataStream operators in both stream and batch scenarios?

Unified DataStream

Many users who have a certain understanding of Flink may ask: DataStream supports Bounded/Unbounded input at the same time. Why do we say: It is impossible to write a stream-batch integrated Application that can be used for real production with DataStream? To put it simply, DataStream was originally designed for use in unbounded scenarios, so it still has a certain distance from traditional batch engines in terms of efficiency, usability, and ease of use in bounded scenarios. Specifically, it is reflected in the following two aspects:

- efficiency

Let me show you an example first. Below is a performance comparison chart of DataStream and DataSet for a WordCount job of the same size. As can be seen from this example, the performance of DataSet is nearly 5 times that of DataStream.

Obviously, in order for DataStream to support both streaming and batch scenarios in production, it is necessary to greatly improve the efficiency of DataStream in Bounded scenarios. So why is DataStream less efficient than DataSet?

As we mentioned earlier, the original main design goal of DataStream is to be used in the Unbounded scenario, and the main feature of the Unbounded scenario is out-of-order, that is to say, any DataStream operator cannot assume the order in which the Records are processed. Therefore, many operators will use a K/V storage to cache these out-of-order data, and then take the data out of the K/V storage for processing and output when appropriate. In general, operator's access to K/V storage involves a lot of serialization and deserialization, and also causes random disk I/O; in DataSet, it is assumed that the data is bounded, that is, it can be optimized to Avoid random disk I/O access, and also optimize serialization and deserialization. This is the main reason why WorkerCount written in DataSet is 5 times faster than WordCount written in DataStream.

Knowing the reason, is it necessary to rewrite all the operators of DataStream? No problem in theory, but DataStream has a large number of operators that need to be rewritten, and some operators are more complicated, such as a series of operators related to Window. As you can imagine, if all of them are rewritten, the amount of work will be huge. So we almost completely avoided rewriting all operators through the single-key BatchStateBackend, and at the same time achieved very good results.

- Consistency

Students who have a certain understanding of Flink should know that the original Application written with DataStream adopts the Streaming execution mode. In this mode, the semantics of end-to-end Exactly Once is maintained through Checkpoint. Specifically, the Sink of a job Only after all operators (including Sink itself) of the whole graph have completed their respective Snapshots, will Sink commit the data to the external system. This is a typical 2PC protocol relying on the Flink Checkpoint mechanism.

In the Bounded scenario, although Streaming can also be used, there may be some problems for users:

Large resource consumption: using the Streaming method, all resources need to be obtained at the same time. In some cases, users may not have so many resources;

High cost of fault tolerance: In the Bounded scenario, some operators may not be able to support the Snapshot operation for efficiency, and once an error occurs, the entire job may need to be re-executed.

Therefore, in the Bounded scenario, users hope that the Application can adopt the Batch execution mode, because the Batch execution mode can naturally solve the above two problems. It is relatively simple to support the Batch execution mode in the Bounded scenario, but it introduces a very difficult problem - using the existing Sink API cannot guarantee end-to-end Exactly Once semantics. This is because there is no Checkpoint in the Bounded scenario, and the original Sink relies on Checkpoint to ensure end-to-end ExactlyOnce. At the same time, we don't want developers to develop two different implementations for sinks in different modes, because this does not take advantage of the connection between Flink and other ecosystems.

In fact, a Transactional Sink mainly solves the following four problems:

What to commit?
How to commit?
Where to commit?
When to commit?

Flink should allow sink developers to provide What to commit and How to commit, and the system should choose Where to commit and When to commit according to different execution modes to ensure end-to-end Exactly Once. In the end, we proposed a new Unified Sink API, which allows developers to develop only one sink and run in both Streaming and Batch execution modes. What is introduced here is only the main idea. How to ensure the consistency of End to End in the scenario of limited flow; how to connect with external ecology such as Hive and Iceberg, in fact, there are still certain challenges.

4. DAG Scheduler integrated with streaming and batching

What problem does Unified DAG Scheduler solve?

It turns out that Flink has two scheduling modes:

One is the stream scheduling mode. In this mode, the Scheduler will apply for all the resources required by a job, and then schedule all the tasks of the job at the same time. All tasks communicate with each other in the form of a pipeline. Batch jobs can also be used in this way, and the performance will also be greatly improved. However, for batch jobs that run for a long time, this mode still has certain problems: in the case of a relatively large scale, it consumes more resources at the same time. For some users, he may not have so many resources. ; The cost of fault tolerance is relatively high, for example, once an error occurs, the entire job needs to be rerun.

One is the batch scheduling mode. This mode is similar to the traditional batch engine. All tasks can apply for resources independently, and tasks communicate through Batch Shuffle. The advantage of this approach is that the cost of fault tolerance is relatively small. However, this mode of operation also has some shortcomings. For example, the data between tasks are all interacted through the disk, causing a lot of disk IO.

In general, these two scheduling methods can basically meet the requirements of the stream-batch integration scenario, but there is still a lot of room for improvement, specifically reflected in three aspects:

1. The structure is inconsistent and the maintenance cost is high. The essence of scheduling is to allocate resources, in other words, to solve the problem of when to deploy which tasks to where. The original two scheduling modes have certain differences in the timing and granularity of resource allocation, which eventually led to the inability to fully unify the scheduling architecture, requiring developers to maintain two sets of logic. For example, in the stream scheduling mode, the granularity of resource allocation is all tasks in the entire physical execution plan; in the batch scheduling mode, the granularity of resource allocation is a single task. When the Scheduler gets a resource, it needs to go two ways according to the job type. Different processing logic;

2. Performance. Although the cost of fault tolerance is relatively small in the traditional batch scheduling method, it introduces a large amount of disk I/O, and the performance is not optimal, so it cannot take advantage of the Flink streaming engine. In fact, in a scenario where resources are relatively sufficient, the "stream" scheduling method can be adopted to run Batch jobs, thereby avoiding additional disk I/O and improving job execution efficiency. Especially at night, streaming jobs can release certain resources, which makes it possible for batch jobs to run in the "Streaming" mode.

3. Adaptive. At present, the physical execution plans of the two scheduling methods are static. Statically generated physical execution plans have problems such as high tuning labor costs and low resource utilization.

Unified scheduling based on Pipeline Region

In order to give full play to the advantages of the streaming engine while avoiding some shortcomings in the simultaneous scheduling of the whole graph, we introduce the concept of Pipeline Region. Unified DAG Scheduler allows in a DAG graph, Tasks can communicate through Pipeline or Blocking. These Tasks connected by the data exchange method of Pipeline are called a Pipeline Region. Based on the above concepts, Flink introduces the concept of Pipeline Region. Regardless of whether it is a stream job or a batch job, resources are applied for and tasks are scheduled according to the granularity of the Pipeline Region. Careful readers can find that, in fact, the two original modes are special cases of Pipeline Region scheduling.


Even if the "flow" scheduling mode can be satisfied in terms of resources, which tasks can be scheduled in a "flow" manner?

Some students still worry that adopting the "flow" scheduling method will have a higher fault tolerance cost, because in the "flow" scheduling method, if an error occurs in a task, all tasks connected to it will fail and then run again.

In Flink, there are two connection methods between different tasks [2], one is the All-to-All connection method, the upstream Task will be connected with all downstream tasks; the other is the PointWise connection method, the upstream The task will only be connected with some downstream tasks.

If all tasks of a job are connected through the All-to-All method, once the "flow" scheduling method is adopted, the entire physical topology needs to be scheduled at the same time, so there is indeed a problem of relatively high FailOver cost [3 ]. However, in the actual Batch job topology, not all tasks are connected through All-to-All edges. A large number of Tasks in the Batch job are connected through PointWise edges, and the tasks connected by PointWise are scheduled in a "flow" manner. The connectivity graph can improve the execution efficiency of the job while reducing the fault tolerance cost of the job. As shown in the figure below, in the full 10T TPC-DS test, enabling all PointWise edges to use the Pipeline link method can improve the overall performance. There is more than 20% performance improvement.

The above is just one of the four strategies provided by Schduler to divide the Pipeline Region[4]. In fact, Planner can customize which Tasks adopt the Pipeline transmission mode and which Tasks adopt the Batch transmission mode according to the actual operation scenario. .

adaptive scheduling

The essence of scheduling is the decision-making process of resource allocation for physical execution plans. After the Pipeline Region solves the problem of determining the physical execution plan, stream jobs and batch jobs can be uniformly scheduled according to the granularity of the Pipeline Region. There are some problems with statically generating physical execution plans for batch jobs [5]:

The cost of manpower is high. For batch jobs, although the concurrency of each stage in the physical execution plan can be inferred theoretically based on statistical information, static decision results may be seriously inaccurate due to a large number of UDFs or lack of statistical information. In order to ensure the SLA of business operations, during the big promotion period, business students need to manually adjust the concurrency of high-quality batch jobs based on the traffic estimates of the big promotion. repeat this process. The entire tuning process requires manual operations by business students, and the labor cost is relatively high. Even so, misjudgments may occur and the user SLA cannot be met;

Resource utilization is low. Due to the high cost of manually configuring concurrency, it is impossible to manually configure concurrency for all jobs. For low-medium priority jobs, business students will choose some default values as the concurrency, but in most cases these default values are too large, resulting in a waste of resources; and although high-priority jobs can be manually configured concurrently, Due to the cumbersome configuration method, after the big promotion, although the traffic has dropped, the business side will still use the configuration during the big promotion, which also causes a lot of waste of resources;

Poor stability. The waste of resources eventually leads to oversubscription of resources. At present, most batch jobs are mixed with streaming job clusters. Specifically, the resources requested are all non-guaranteed resources. Once resources are tight or machine hotspots appear, these non-guaranteed resources are the first to be adjusted.

In order to solve these problems in statically generated physical execution, we introduce the adaptive scheduling function [6] for batch jobs. Compared with the original static physical execution plan, using this feature can greatly improve user resource utilization. Adaptive Scheduler can dynamically determine the concurrency of the current JobVertex according to the execution status of the upstream JobVertex of a JobVertex. In the future, we can also dynamically decide what operator to use downstream based on the data produced by the upstream JobVertex.

5. Shuffle architecture integrating flow and batch

Flink is a stream-batch integrated platform, so the engine provides two types of Shuffle, Streaming and Batch, for different execution modes. Although Streaming Shuffle and Batch Shuffle have certain differences in specific strategies, they are essentially for re-partitioning data, so there are certain commonalities between different Shuffles. Therefore, our goal is to provide a unified Shuffle architecture, which can not only meet the customization of different Shuffle strategies, but also avoid repeated development on common requirements.

In general, the Shuffle architecture can be divided into four levels as shown in the figure below. The Shuffle requirements for streams and batches are somewhat different at each layer, and they also have a lot of commonalities. I have made some brief analysis below.

Differences between Streaming Batch Shuffles

Everyone knows that batch jobs and streaming jobs have different requirements for Shuffle, which can be reflected in the following three aspects:

1. The life cycle of Shuffle data. The Shuffle data of stream jobs is basically consistent with the life cycle of Task; while the Shuffle data of batch jobs is decoupled from the life cycle of Task;

2. Shuffle data storage medium. Because the life cycle of the Shuffle data of the stream job is relatively short, the Shuffle data of the stream job can be stored in the memory; while the life cycle of the Shuffle data of the batch job has certain uncertainty, so the Shuffle data of the batch job needs to be stored on the disk middle;

3. Shuffle deployment method [7]. Deploying the Shuffle service and computing nodes together is advantageous for streaming jobs, because it reduces unnecessary network overhead and thus reduces latency. But for batch jobs, this deployment method has certain problems in terms of resource utilization, performance, and stability. [8]

Commonality between Streaming Batch Shuffles

Shuffle of batch jobs and stream jobs has differences and commonalities. The commonalities are mainly reflected in:

1. Meta management of data. The so-called Shuffle Meta refers to the mapping of logical data division to data physical location. Regardless of whether it is a stream or a batch scenario, under normal circumstances, it is necessary to find out the physical location of reading or writing data from the Meta; under abnormal circumstances, in order to reduce the cost of fault tolerance, the Shuffle Meta data is usually persisted change;
2. Data transmission. Logically speaking, the Shuffle of stream jobs and batch jobs is for re-partitioning/re-distribution of data. In a distributed system, repartitioning of data involves data transmission across threads, processes, and machines.

Stream-batch-integrated Shuffle architecture

The Unified Shuffle architecture abstracts three components [9]: Shuffle Master, Shuffle Reader, and Shuffle Writer. Flink completes the repartition of data between operators by interacting with these three components. The differences in specific strategies of different Shuffle plug-ins can be satisfied through these three components:

Shuffle Master resource application and resource release. That is to say, the plug-in needs to notify the framework How to request/release resource. It is up to Flink to decide When to call it;
The upstream operators of Shuffle Writer use Writer to write data into Shuffle Service——Streaming Shuffle will write data into memory; External/Remote Batch Shuffle can write data into external storage;

Operators downstream of Shuffle Reader can read Shuffle data through Reader;
At the same time, we also provide architecture-level support for the commonality of stream-batch Shuffle—Meta management, data transmission, and service deployment[10]—so as to avoid repeated development of complex components. Efficient and stable data transmission is one of the most complex subsystems of a distributed system. For example, problems such as upstream and downstream backpressure, data compression, and memory zero copy must be solved during transmission. In the new architecture, it only needs to be developed once. At the same time, it is used together in both stream and batch scenarios, which greatly reduces the cost of development and maintenance.

6. Fault-tolerant strategy integrating streaming and batching

Flink's original fault tolerance strategy is based on checkpoints. Specifically, whether a single Task fails or the JobMaster fails, Flink will restart the entire job according to the latest checkpoint. Although there is room for optimization in this strategy, it is generally accepted for streaming scenarios. Currently, checkpoints [11] are not enabled in the Flink Batch operating mode, which means that if any error occurs, the entire job must be executed from scratch.

Although the original strategy can theoretically guarantee that the correct result will eventually be produced, it is obvious that most customers cannot accept the cost of this fault-tolerant strategy. In order to solve these problems, we have made corresponding improvements to the fault tolerance of Task and JM respectively.

Pipeline Region Failover

Although there is no timed Checkpoint in the Batch execution mode, in the Batch execution mode, Flink allows communication between Tasks through Blocking Shuffle. After the Task that reads the Blocking Shuffle fails, since the Blocking Shuffle stores all the data required by the Task, it is only necessary to restart the Task and all downstream tasks connected to it through the Pipeline Shuffle instead of restarting the entire job .

In general, the Pipeline Region Failover strategy is the same as that of the Scheduler when performing normal scheduling. It splits a DAG into some Pipeline Regions connected by several Pipeline shuffle. Whenever a FailOver occurs in a Task, it will only restart the The Region where the Task is located is enough.

JM Failover

JM is a job control center, which contains various execution statuses of jobs. Flink uses these states to schedule and deploy tasks. Once an error occurs in JM, all these states will be lost. Without this information, the new JM cannot continue to schedule the original job even if all the worker nodes have not failed. For example, since the end information of the task has been lost, after a task ends, the new JM cannot judge whether the existing state meets the conditions for scheduling downstream tasks—all input data has been generated.

From the above analysis, we can see that the key to JM Failover is how to make a JM "restore memory". In VVR[12], we restore the key state of JM through the mechanism based on Operation Log.

Careful students may have discovered that although the starting point of these two improvements is for batch scenarios, they are actually also effective for streaming jobs. The above is just a brief introduction to the ideas of the two fault-tolerant strategies. In fact, there are still many things worth thinking about. For example, how should we deal with the loss of Blocking upstream data? What key states in JM need to be restored?

7. Future Outlook

In order to provide a faster and more stable user experience than today, we have started the research and development of the next-generation streaming architecture; Flink has been recognized by more and more users in the streaming-batch integration scenario, but we also know that the industry still has Many high-level traditional big data systems are worth learning. Finally, I also hope that interested partners can join us to build a streaming-batch integrated big data computing engine with a perfect user experience.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us