Flink Adaptive Batch Processing Capability Evolution

Flink is a streaming and batching computing framework, which was mainly used in streaming computing scenarios in the early years. In recent years, with the promotion of the concept of streaming and batching, more and more enterprises begin to use Flink to process batch business.

Although Flink naturally supports batch processing at the framework level, problems still exist in actual production and use. Therefore, in recent versions, the community has also been continuously improving Flink batch processing, which is reflected in three aspects: API, implementation and operation and maintenance.

At the API level, we have been improving SQL, improving its syntax, and making it compatible with HIVE SQL; We are also improving the DataStream interface to better support batch job development. On the operation and maintenance level, we hope Flink batch can be more easily used in production, so we have improved the history server to better show the status of jobs in operation and after completion. At the same time, we have introduced the SQL Gateway compatible with the History ecosystem. At the execution level, we have improved the performance and stability of the operator, execution plan, scheduling, and network layers.

One of the main ideas is to adaptively optimize job execution according to runtime information, such as data volume, data mode, execution time, available resources, etc., including automatically setting appropriate concurrency for job nodes according to data volume, finding and mitigating the impact of slow nodes on jobs by predicting execution, and introducing adaptive data transmission to improve resource utilization and processing performance, Dynamic partition clipping is applied to multi partition tables to improve processing efficiency.

Some of these improvements make Flink batch processing easier to use, some guarantee the stability of batch processing jobs, some improve job execution performance, or both.

1、 Adaptive Batch Scheduler

Previously, concurrency tuning was required before the job went online. For batch job users, the situation they encounter is as follows:

Batch processing jobs are often very many, and tuning the concurrency of jobs will be a very heavy workload, time-consuming and laborious.

The daily data volume may be changing, especially during the promotion period, the data will grow several times or even dozens of times, so it is difficult to estimate the data, resulting in difficulties in tuning. At the same time, if you want to change the concurrency configuration before and after the activity, it will also consume more manpower.

Flink consists of an execution topology composed of multiple computing nodes in series. Due to the complexity of the operator and the characteristics of the data itself, it is difficult to predict the amount of data and configure the fine-grained concurrency of intermediate nodes. However, a globally unified concurrency may lead to resource waste, and even additional scheduling deployment and network transmission costs.

In addition, except for source and sink, SQL jobs can only be configured with globally unified parallelism, and cannot be set with fine-grained parallelism. Therefore, they also face resource waste and additional overhead.

To solve the problem, Flink introduces an adaptive batch processing scheduler. Users can configure the amount of data they want each concurrent instance to process. Flink will automatically determine the actual concurrency of each logical node according to the amount of data of each actual node in the running process, so as to ensure that the amount of data for each concurrent instance roughly meets the user's expectations.

The characteristics of the above configuration methods are that the configuration is independent of the data volume of the data job, so it is more general. A set of configurations can be applied to many jobs, and there is no need to tune each job separately. Secondly, the automatically set parallelism can adapt to different amounts of data every day. At the same time, because the actual amount of data to be processed by each node can be collected at runtime, the parallelism of node granularity can be set to achieve better results.

The process is as shown in the figure above: after all the execution nodes of upstream logical node A have executed and produced data, the total output data can be collected, that is, the amount of data to be consumed by node B. Then, the information is handed over to the parallelism calculation strategy component to calculate the appropriate parallelism, and the execution node topology is dynamically generated for scheduling and deployment.

In traditional Flink execution, the execution topology is static, and the parallelism of all nodes is known during job submission. Therefore, the upstream can divide separate data sub partitions for each downstream execution node consuming it during execution. During downstream startup, only the corresponding data sub partition needs to be read to obtain data. However, in the case of dynamic concurrency, the downstream concurrency has not been determined when executing upstream, so the main problem to be solved is to decouple the execution of upstream nodes from the concurrency of downstream nodes.

To support the dynamic execution topology, we have made the following improvements: the number of data partitions produced by upstream nodes is not determined by the downstream concurrency, but is determined by the downstream maximum concurrency.

As shown on the right side of the figure above, there may be four concurrency downstream, and the data produced by A can be divided into four parts, so the concurrency actually determined by the downstream may be one, two, three, and four, and then each node will be allocated a consumption partition range. For example, when downstream concurrency is 2, two data partitions are consumed respectively; When the downstream concurrency is 3, some may consume one data partition and some may consume two. Finally, the upstream node execution is no longer constrained by the downstream concurrency, and flexible data allocation can be carried out. The concept of dynamic execution topology can also be realized.

Automatic concurrency can achieve two effects: first, users no longer need to configure the parallelism for each job separately, and Flink batch is easier to use; Second, fine-grained concurrency settings can improve resource utilization and avoid meaningless high concurrency. We tested the cluster with multiple client TPC-DS as full as possible. After enabling the adaptive concurrency setting, the total execution time was reduced by 7.9%.

Adaptive batch scheduling also provides a good basis for subsequent optimization. Based on the flexible data partition and allocation method, the actual data volume of each data partition can be collected, so that in the case of different partitions due to data skew, small partitions can be merged and handed over to the same downstream for processing, so that the data processed by downstream nodes is more balanced.

Secondly, due to the introduction of dynamic execution topology, a better execution plan can be dynamically formulated based on the information at execution time. For example, you can decide which join mode to use according to the size of the data on both ends of the join node.

2、 Specific Execution

Hot machines in production cannot be avoided. For example, in production, jobs run in mixed part clusters or batch jobs are intensively refreshed, which may lead to high load on some machines, making tasks running on this node much slower than those running on other nodes, thus slowing down the execution time of the entire job. At the same time, occasional machine exceptions can lead to the same problem. These slow tasks will affect the execution time of the whole operation, making the output baseline of the operation unable to be guaranteed. It has become an obstacle for some users to use Flink for batch processing.

Therefore, we introduced the prediction execution mechanism in Flink 1.16.

After enabling prediction execution, if Flink finds that some tasks in the batch processing job are significantly slower than other tasks, it will pull up a new execution instance for them. These new execution instances will consume the same data and produce the same results as the original slow task instances, while the original slow task execution instances will also be retained. The first completed instance will be recognized by the scheduler as the only completed instance, and its data will also be found and consumed by downstream. Other instances will be canceled and the data will be cleared.

To implement forecast execution, we have introduced the following components for Flink:

Slow Task Detector is mainly used to regularly detect and report slow tasks.

In the current implementation, when an execution node of a logical node is particularly slow and exceeds a certain threshold of the median execution time of most of its nodes, it will be considered as a slow node. The prediction execution scheduler will receive the slow node and identify the machine node where the slow task is located as the hot machine. Through the blacklist mechanism (Blocklist Handler), the hotspot machine is blacklisted, so that the new tasks scheduled subsequently will not fall on the blacklisted machine. The blacklist mechanism currently supports the most common deployment methods of Flink, such as Yarn, K8s and standalone.

If the number of running execution instances of the slow node does not reach the upper limit of the configuration, it will pull up the predicted execution instances to the upper limit and deploy them to the machine that has not been blackened. After any execution instance ends, the scheduler will identify whether other related execution instances are also running, and if so, it will actively cancel them.

The data generated by the completed instance will be displayed to the downstream and trigger the downstream node scheduling.

At the framework level, we support the prediction execution of the Source node to ensure that different concurrent execution instances of the same Source can always read the same data. The basic idea is to introduce a cache to record the data fragments that have been obtained by each source concurrently and processed by each execution instance. When an execution instance has processed all the currently allocated partitions of the Source concurrently, it can request new partitions, and the new partitions will also be added to the cache.

Because of the unified support at the framework level, most existing sources can support forecast execution without additional modifications. Only when a new version of Source is used and a custom SourceEvent is used, the SourceEnumerator needs to implement an additional interface, otherwise an exception will be thrown when prediction execution is enabled. This interface is mainly used to ensure that user-defined events can be delivered to the correct execution instance. Since prediction execution is enabled, there may be multiple execution instances running at the same time.

We also support forecast execution at the Rest and WebUI levels. When the prediction execution occurs, you can see all the concurrent execution instances of the prediction execution in the detailed interface of the job node. At the same time, you can also see the number of TaskManagers that have been blacked out on the resource overview card, and the number of slots that have not been occupied but have been blacked out so that they cannot be used. Users can use this to judge the current resource usage. In addition, you can view the currently blackened TaskManager in the TaskManager interface.

In the current version, Sink does not support prediction execution. We will give priority to supporting the prediction execution of the Sink node in the future. The problem to be solved is to ensure that each Sink will commit only one piece of data, and other data generated by canceled Sinks can be cleaned up.

In addition, we are also planning to further improve the slow task detection strategy. At present, once data skew occurs, the amount of concurrent data of individual execution may be larger than that of other execution concurrency, so the execution duration will also be larger than that of other nodes, but this node may not be a slow task. Therefore, it is necessary to be able to correctly identify and handle the situation, so as to avoid wasting resources by pulling up invalid prediction execution instances. At present, the initial idea is to normalize the task execution time according to the data volume actually processed by each execution instance, which also depends on the Adaptive Batch Scheduler's collection of data volume produced by each node mentioned above.

3、 Hybrid Shuffle

Flink mainly has two data exchange methods:

Streaming Pipeline Shuffle: It is characterized by simultaneous startup of upstream and downstream, air to air transmission of data, and no disk dropping is required, so it has certain advantages in performance. However, it requires a large amount of resources, and often requires jobs to obtain resources several times the parallelism of a single node at the same time to run, which is difficult to meet for production batch jobs. At the same time, because of the demand for batch resources, jobs cannot run if they are not obtained at the same time. When multiple jobs seize resources at the same time, resource deadlocks may occur.

Batch Blocking Shuffle: The data will be directly downloaded, and the downstream will directly read from the upstream data. The exchange mode makes the job more adaptive to resources. In theory, it does not need to run upstream and downstream simultaneously. As long as there is a slot, the entire job can be executed. However, the performance is relatively poor. The downstream stage can only be run after the upstream stage is completed. At the same time, all data is downloaded, which will cause IO overhead.

Therefore, we hope that there is a Shuffle mode that can combine the advantages of the two. When resources are sufficient, it can give play to the performance advantages of streaming shuffles; When resources are limited, jobs can have the resource adaptation capability of batch shuffle, even if only one slot can run. At the same time, the ability to adapt resources can be switched adaptively, so that users do not need to be aware of it and do not need to conduct separate tuning.

To this end, we introduced Hybrid Shuffle.

Secondly, the current Hybrid Shuffle is implemented based on the Default Scheduler, so it is incompatible with automatic concurrency settings and predictive execution. In order to better support batch processing, consolidation is also required.

4、 Dynamic Partition Pruning

An important task of the optimizer is to avoid invalid and redundant calculations. Partition tables are widely used in generation. Here we will introduce how to reduce the read of invalid partitions in partition tables.

We introduce the optimization with several simplified examples from TPC-DS model. As shown in the figure above, there is a sales table with the partition field named SLOD_ Date, the table has 2000 partitions in total.

In the figure above, the SQL statement refers to the selection of SLOD from the sales table_ Date=2. If there is no partition clipping, you need to read all partition data and then filter; In the case of static partition clipping, the scan node can be informed of the determined partition through various optimizations such as filter pushdown in the optimization phase. During the execution of Scan, only specific partitions need to be read, which greatly reduces read IO and speeds up job execution.

The above figure has two tables, namely, the fact table sales table and the dimension table date_ Dim: join two tables. A filter condition acts on the dimension table, so the optimization of static partition clipping cannot be performed.

Dimension table date_ DIM will read all the data and do the filter. The sales table of the fact table will read all the partitions and then do the join. There is only year=2000 and solid_ date = date_ The sk related data can be output. It can be inferred that many partition data are invalid, but these partitions cannot be analyzed in the static optimization phase. They need to be dynamically analyzed according to the dimension table data in the running phase, so they are called dynamic partition pruning.

The idea of dynamic partition clipping is as follows:

Step 1: execute the join dimension table operator, such as Scan (date_dim) and Filter.

The second step is to send the filter result of the first step to the partition table operator Scan.

Step 3: Filter the data in step 2 out of the invalid partitions and only read the valid data.

Step 4: Complete the Join according to the results of steps 1 and 3.

The difference between dynamic partition clipping and static partition clipping is that dynamic partition clipping cannot determine which partition data is valid in the optimization phase, and it must be determined after the job is executed.

Steps to realize dynamic partition clipping on Flink are as follows:

First, a special node, DynamicFilterDataCollector (hereinafter referred to as DataCollector), will be added to the Physical Plan to collect and de duplicate filter data. Only relevant fields will be retained and sent to the partition table Scan. After the partition table Scan obtains the data, it performs partition trimming and finally completes the join. On the Streaming Graph, the Source operator (corresponding to the Scan node) has no input, but we hope that the Source operator can receive the data from the DataCollector operator, and the dimension table side data_ The filters with dim Scan and year=2000 have no dependency on the sales scan scheduling on the right, which may cause the operators on the right to be executed first and the operators on the left to be executed later, so that dynamic partition clipping optimization cannot be completed.

Therefore, we introduced the OrderEnforce node to inform the scheduler of the data dependency between them, so as to ensure that the left operator is scheduled first, and ensure that the dynamic partition clipping optimization can be executed correctly.

Later, we also plan to solve the above scheduling dependency problems from the framework level to make Streaming Graph more elegant.

Left data_ The dim Scan and Filter are executed first, and the data is sent to the DataCollector and Join. In order to solve the problem that the Source operator has no input side, we use the Flink Coordinator mechanism. After the DataCollector collects the data, it sends it to the Coordinator and completes the pruning of invalid partitions. The partition table Scan obtains valid partition information from the Coordinator. After the Sales Scan node is executed, the final join is performed.

There is also a data side between the DataCollector and OrderEnforce. There will be no real data transmission in the data side. It is only used to inform the scheduler that the DataCollector is called before OrderEnforce.

The above figure shows the performance comparison before and after optimization based on TPC-DS 10T dataset. The blue color is non partitioned table, and the red color is partitioned table. The time saved after optimization is about 30%.

Q&A

Q: When a hot machine generates a slow task, it will assign other machines to pull up the instance, and then re execute the slow task. When the instance is pulled up again, will hot spots still occur?

A: Theoretically, it is possible because the prediction execution itself is a strategy of exchanging resources for time. However, the production practice has proved that this mechanism is effective. Compared with the additional resource cost and further hot spots, trade off is still cost-effective.

Q: Is speculative execution based on data volume?

A: The current strategy is based on the execution duration. For example, the median execution time of most tasks is one minute. If a task is executed for more than 1.5 minutes, it will be considered as a slow task. Specific values can be configured.

Q: Is slow node detection configurable?

A: This policy is hard coded at present, and it does not support configuration policy yet. After the strategy becomes stable, it may be open to users, who can change the slow task detection strategy through secondary development or plug-in.

Q: Presumably, can the execution mechanism support both the DataStream API and Flink SQL?

A: Yes.

Q: In terms of blackout mechanism, will there be a waste of resources due to too much blackout or untimely blackout?

A: At present, the blacking under the forecast execution is conservative, and the blacking will only last for 1 minute by default. However, if slow tasks continue to appear, the blacking time will be refreshed continuously, so the slow machine node where the slow task is located will also be in the blacking list all the time.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us