Automatic derivation of parallelism for Flink batch jobs

I. Introduction

For most users, it is not easy to configure the appropriate parallelism for Flink operators. For batch jobs, a small degree of parallelism will lead to long running time and slow failure recovery, while an unnecessary large degree of parallelism will lead to waste of resources, and the overhead of task deployment and data shuffle will also increase.

In order to control the execution time of batch jobs, the parallelism of operators should be proportional to the amount of data they need to process. The user needs to configure the degree of parallelism by estimating the amount of data that the estimator needs to process. However, it is very difficult to accurately estimate the amount of data that the estimator needs to process: the amount of data that needs to be processed may change every day, and there may be a large number of UDFs and complex operators in the job, making it difficult to judge the output data quantity.

To solve this problem, we introduced a new scheduler in Flink 1.15: Adaptive Batch Scheduler. The adaptive batch job scheduler automatically derives the degree of parallelism as the job runs, based on the actual amount of data each operator needs to process. It brings the following benefits:

* Greatly reduce the tediousness of batch job concurrency tuning;
* Different parallelism can be configured for different operators according to the amount of data processed, which is especially beneficial for SQL jobs that can only configure global parallelism before;
* Can better adapt to daily changing data volumes.

Two, usage

To enable Flink to automatically derive the parallelism of operators, the following configurations are required:

Enable adaptive batch job scheduler;
The parallelism of the configuration operator is -1.

2.1 Enable Adaptive Batch Job Scheduler

To enable the adaptive batch job scheduler, the following configurations are required:

Configure jobmanager.scheduler: AdaptiveBatch;
Configure execution.batch-shuffle-mode to ALL-EXCHANGES-BLOCKING (the default). Because currently the adaptive batch job scheduler only supports jobs whose shuffle mode is ALL-EXCHANGES-BLOCKING.

In addition, there are some related configurations to specify the upper and lower limits of the automatically derived operator parallelism, the expected amount of data processed by each operator, and the default parallelism of the source operator. For details, please refer to the Flink documentation [1].

2.2 The degree of parallelism of the configuration operator is -1

The adaptive batch job scheduler will only infer the parallelism for operators whose parallelism is not specified by the user (that is, the parallelism is the default value -1). So the following configuration is required:

*config parallelism.default: -1;
* For SQL jobs, you need to configure table.exec.resource.default-parallelism: -1;
*For DataStream/DataSet jobs, avoid specifying the degree of parallelism through the operator's setParallelism() method in the job;
*For DataStream/DataSet jobs, avoid specifying the degree of parallelism in the job via the setParallelism() method of StreamExecutionEnvironment/ExecutionEnvironment.

3. Implementation details

Next we introduce the implementation details of the adaptive batch job scheduler. Before that, let's briefly introduce some terminology concepts involved:

Logical node (JobVertex) [2] and logical topology (JobGraph) [3]: a logical node is an operator chain formed by linking several operators together for better performance, and a logical topology is a connection of multiple logical nodes composed of data flow diagrams.

Execution node (ExecutionVertex) [4] and execution topology (ExecutionGraph) [5]: The execution node corresponds to a deployable physical task, which is generated by the expansion of logical nodes according to the degree of parallelism. For example, if a logical node has a parallelism of 100, 100 corresponding execution nodes will be generated. Execution topology is the physical execution graph composed of all execution node connections.

The introduction of the above concepts can be found in the Flink documentation [6]. It should be noted that the adaptive batch job scheduler determines the parallelism of the operators contained in the node by deriving the parallelism of the logical node.

The implementation details mainly include the following parts:

*Enabling the scheduler to collect the size of the execution node output data;
*Introduce a new component VertexParallelismDecider [7] to be responsible for calculating the parallelism of logical nodes according to the amount of data that needs to be processed;
* Support dynamic construction of execution topology, that is, the execution topology starts with an empty execution topology, and then gradually adds execution nodes with job scheduling;
* Introduce an adaptive batch job scheduler to update and schedule execution topologies.

3.1 Collect the amount of data output by the execution node

The adaptive batch job scheduler determines its degree of parallelism based on the amount of data that the logical node needs to process, so it needs to collect the amount of data produced by the upstream node. To this end, we introduce a numBytesProduced counter to record the data volume of the data partition (ResultPartition) produced by each execution node, and send the accumulated value to the scheduler when the execution node finishes running.

3.2 Determining the appropriate degree of parallelism for logical nodes

We introduced a new component VertexParallelismDecider to be responsible for calculating the degree of parallelism for logical nodes. The calculation algorithm is as follows:

suppose

*V is the amount of data that is expected to be processed by each execution node configured by the user;
*totalBytesnon-broadcast is the total amount of non-broadcast data that the logical node needs to process;
*totalBytesbroadcast is the total amount of broadcast data that the logical node needs to process;
*maxBroadcastRatio is the upper limit of the broadcast data processed by each execution node;
*normalize(x) is a function that outputs the nearest power of 2 to x.

It is worth noting that we have introduced two special treatments in this formula:

* Limit the proportion of broadcast data processed by each execution node;
*Adjust parallelism to a power of 2.

In addition, the above formula cannot be directly used to determine the parallelism of the source node, because the source node does not consume data. To solve this problem, we introduced the configuration option jobmanager.adaptive-batch-scheduler.default-source-parallelism, which allows users to manually configure the parallelism of source nodes. Note that this option is not required for all sources, as some sources can infer the parallelism themselves (for example, HiveTableSource, see HiveParallelismInference for details), and for these sources it is more recommended to infer the parallelism themselves.

3.2.1 Limit the proportion of broadcast data processed by each execution node

We limit the upper limit ratio of broadcast data processed by each execution node to maxBroadcastRatio in the formula. That is, each execution node processes at least (1-maxBroadcastRatio) V of non-broadcast data. If this is not done, when the amount of broadcast data is close to V*, even if the amount of non-broadcast data is very small, it may be calculated with a large degree of parallelism, which is unnecessary and will lead to waste of resources and task deployment costs increase.

Typically, the amount of broadcast data that an execution node needs to process will be smaller than the amount of non-broadcast data to be processed. Therefore, we set maxBroadcastRatio to 0.5 by default. Currently, this value is hardcoded in the code, we will consider making it configurable in the future.

3.2.2 Adjusting the degree of parallelism to a power of 2
The normalize function adjusts the degree of parallelism to the nearest power of 2 to avoid introducing data skew. In order to better understand this section, we recommend that you read the subpartition dynamic mapping section first.

Taking Figure 4(b) as an example, A1/A2 generates 4 sub-partitions, and B is finally determined to have a parallelism of 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that the amount of data in different sub-partitions is the same, so the amount of data that B3 needs to consume is twice that of B1/B2, resulting in data skew.

In order to solve this problem, we need to make the number of sub-partitions consumed by all downstream execution nodes the same, that is to say, the number of sub-partitions produced by the upstream should be an integer multiple of the parallelism of the downstream logical nodes. For simplicity, we hope that the maximum parallelism specified by the user is 2^N (if not, it will be automatically adjusted to no more than 2^N of the configured value), and then adjust the parallelism of the downstream logical node to the closest 2^ M (M <= N), so that the sub-partitions can be guaranteed to be consumed evenly by the downstream.

However, this is only a temporary solution, which should eventually be resolved through automatic load balancing, which we will implement in subsequent versions.

3.3 Dynamic construction of execution topology
Before the introduction of the adaptive batch job scheduler, the execution topology was built in a static manner, that is, the execution topology was fully created before scheduling started. In order for logical node parallelism to be determined at runtime, the execution topology needs to support dynamic construction.

3.3.1 Dynamically adding nodes and edges to the execution topology
Dynamic construction of execution topology means that a Flink job starts with an empty execution topology, and then gradually attaches execution nodes as it is scheduled, as shown in Figure 2.

Execution topology consists of execution nodes and execution edges (ExecutionEdge). A logical node is expanded to create an execution node and added to the execution topology only if:

The degree of parallelism corresponding to the logical node has been determined (so that Flink knows how many execution nodes should be created);
All upstream logical nodes have been unrolled (so that Flink connects newly created execution nodes with upstream execution nodes via execution edges).


3.3.2 Dynamic mapping of subpartitions
Before the introduction of the adaptive batch job scheduler, when deploying execution nodes, Flink needed to know the parallelism of its downstream logical nodes. Because the parallelism of the downstream logic node determines the number of sub-partitions that the upstream execution node needs to produce. Taking Figure 3 as an example, the parallelism of downstream B is 2, so upstream A1/A2 needs to generate 2 sub-partitions, the sub-partition with index 0 is consumed by B1, and the sub-partition with index 1 is consumed by B2.

But obviously, this is not suitable for dynamic graphs, because when the upstream execution node is deployed, the parallelism of the downstream logical nodes may not be determined yet (that is, when A1/A2 is deployed, the parallelism of B is not yet determined). To solve this problem, we need to decouple the number of subpartitions produced by upstream execution nodes from the parallelism of downstream logical nodes.

We achieve decoupling by the following method: set the number of sub-partitions generated by the upstream execution node to the maximum parallelism degree of the downstream logical node (the maximum parallelism degree is a configurable fixed value), and then after the parallelism degree of the downstream logical node is determined, These sub-partitions are evenly distributed to different downstream execution nodes for consumption. That is to say, when deploying downstream execution nodes, each downstream execution node will be allocated to a sub-partition range for consumption. Assume N is the downstream logical node parallelism and P is the number of subpartitions. For the kth downstream execution node, the range of subpartitions consumed should be:


3.4 Dynamically update and schedule execution topology
The way the adaptive batch job scheduler schedules jobs is basically the same as the default scheduler. The only difference is that the adaptive batch job scheduler starts scheduling from an empty execution topology. Before processing any scheduling events, it will try to determine all logic The parallelism of the node, and then try to generate the corresponding execution node for the logical node, and connect the upstream node through the execution edge to update the execution topology.

The scheduler will try to determine the parallelism of all logical nodes according to the topological order before each scheduling:

For the source node, its parallelism will be determined before scheduling;
For non-source nodes, the degree of parallelism cannot be determined until the data output of all upstream nodes is completed.
The scheduler will then attempt to expand the logical nodes to generate execution nodes in topological order. A logical node that can be expanded should meet the following conditions:

The degree of parallelism of the logical node has been determined;
All upstream logical nodes have been expanded.
4. Future Outlook - Automatic Load Balancing
When running batch jobs, there may be data skew (a certain execution node needs to process much more data than other execution nodes), which will lead to a long tail phenomenon of the job and slow down the completion speed of the job. If Flink can automatically improve or solve this problem, it can be of great help to users.

A typical case of data skew is that some subpartitions have significantly more data than other subpartitions. This situation can be resolved by dividing finer-grained sub-partitions and balancing the workload according to the size of the sub-partitions (Figure 5). The work on an adaptive batch scheduler can be considered a first step towards it, since the requirements for automatic rebalancing are similar to those of an adaptive batch scheduler, both of which require the support of dynamic graphs and the acquisition of resulting partition sizes.

Based on the implementation of the adaptive batch job scheduler, we can solve the above problems by increasing the maximum parallelism (for finer-grained sub-partitions) and simply changing the sub-partition range division algorithm (for workload balancing). In the current design, the range of sub-partitions is divided according to the number of sub-partitions. We can change it to divide according to the amount of data in the sub-partitions, so that the amount of data in each sub-partition can be roughly the same, so as to balance the downstream The workload of the execution node.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us