Deeply analyze Flink fine-grained resource management

1. Fine-grained resource management and applicable scenarios

Before Flink1.14, a coarse-grained resource management method was used. The resources required by each operator slot request were unknown, which was represented by a special value of UNKNOWN inside Flink. This value can be compared with any The physical slot of the resource specification to match. From the perspective of TaskManager (hereinafter referred to as TM), the number of slots it has and the resource dimension of each slot are statically determined according to the Flink configuration.

For most simple jobs, the existing coarse-grained resource management can basically meet the requirements for resource efficiency. For example, in the above job, the data is read by Kafka and after some simple processing, the data is finally written to Redis. For this kind of job, we can easily keep the upstream and downstream concurrency consistent, and put the entire pipeline of the job into a SlotSharingGroup (hereinafter referred to as SSG). In this case, the resource requirements of the slots are basically the same, and the user can directly adjust the default slot configuration to achieve high resource utilization efficiency. At the same time, since the peak values of different task hotspots are not necessarily the same, through the peak-shaving and valley-filling effect, the Putting different tasks into one large slot can further reduce the overall resource overhead.

However, for some complex jobs that may be encountered in production, coarse-grained resource management cannot meet their needs well.

For example, for the job in the figure, there are two 128 concurrent Kafka sources and one 32 concurrent Redis dimension table, with two data processing paths up and down. One is two Kafka sources. After joining, after some aggregation operations, the data is finally sinked into the third 16-concurrent Kafka; the other path is to join Kafka and Redis dimension tables, and the results flow into an online TensorFlow-based The inference module is finally stored in Reids.

Coarse-grained resource management in this job can lead to inefficient resource utilization.

First of all, the upstream and downstream concurrency of the job is inconsistent. If you want to put the entire job in a slot, you can only align it with the highest 128 concurrency. The alignment process is not a big problem for lightweight operators, but for heavy resource consumption Operators will cause a lot of waste of resources. For example, the Redis dimension table in the figure caches all data in memory to improve performance, while the aggregation operator requires a relatively large managed memory to store state. For these two operators, only 32 and 16 resources need to be applied for respectively, but after alignment and concurrency, 128 resources need to be applied for respectively.

At the same time, the pipeline of the entire job may not be able to be placed in a slot or TM due to too many resources, such as the memory of the above operator, or the Tensorflow module requires a GPU to ensure computational efficiency. Since GPU is a very expensive resource, there may not be enough number on the cluster, so the job cannot apply for enough resources due to alignment and concurrency, and finally fails to execute.

We can split the whole job into multiple SSGs. As shown in the figure, we divide the operator into 4 SSGs according to concurrency to ensure that the concurrency within each SSG is aligned. However, since each slot has only one default specification, all resource dimensions of the slot still need to be aligned to the maximum value of each SSG. For example, the memory needs to be aligned with the requirements of the Redis dimension table, the managed memory needs to be aligned with the aggregation operator, and even A GPU needs to be added to the extended resources, which still cannot solve the problem of resource waste.

In order to solve this problem, we propose fine-grained resource management. The basic idea is that the resource specifications of each slot can be individually customized, and users can apply for them on demand to maximize resource utilization efficiency.

To sum up, fine-grained resource management is to improve the overall utilization efficiency of resources by enabling each module of the job to apply for and use resources on demand. Its applicable scenarios include the following: there is a significant difference in the concurrency of upstream and downstream tasks in the job, the resources of the pipeline are too large, or it contains expensive expansion resources. In these situations, the job needs to be split into multiple SSGs, and the resource requirements of different SSGs are different. In this case, resource waste can be reduced through fine-grained resource management. In addition, for batch tasks, the job may contain one or more stages, and there are significant differences in resource consumption between different stages, and fine-grained resource management is also required to reduce resource overhead.

2. Flink resource scheduling framework

There are three main roles in Flink's resource scheduling framework, namely JobMaster (hereinafter referred to as JM), ResourceManager (hereinafter referred to as RM) and TaskManager. The task written by the user will be compiled into JobGraph first, and then submitted to JM after injecting resources. The role of JM is to manage the resource application and execute deployment of JobGraph.

The scheduling-related component in JM is Scheduler, which will generate a series of SlotRequests based on the JobGraph, then aggregate these SlotRequests, generate a ResourceRequirement and send it to RM. After RM receives the resource statement, it will first check the existing resources in the cluster. If it meets its requirements, it will send a request to TM, asking him to offer slots to the corresponding JM (here, the allocation of slots is done by the SlotManager component). If the existing resources are not enough, it will apply for new resources from the external K8s or Yarn through the internal driver, and finally JM will start to deploy operators after receiving enough slots, so that the job can run.

Following this framework, the technical implementation details and design choice in fine-grained resource management will be analyzed and explained next.

3. Resource configuration interface based on SlotSharingGroup

At the entrance, Flink needs to inject resource configuration into JobGraph. This part is the resource configuration interface based on SlotSharingGroup proposed in FLIP-156. Regarding the design choice of the resource configuration interface, the main problem is the granularity of resource configuration:

The first is the smallest operator granularity operator. If users configure resources on the operator, Filnk needs to further aggregate resources into slot levels based on chaining and slot sharing before scheduling resources.

The advantage of using this granularity is that we can decouple resource configuration from the logic of chaining and slot sharing. Users only need to consider the needs of the current operator, regardless of whether it is embedded with other operators or whether it is scheduled to a slot middle. Second, it enables Flink to calculate resources per slot more accurately. If the upstream and downstream operators in a certain SSG have different concurrency, then the resources required by the physical slots corresponding to the SSG may also be different; and if Flink masters the resources of each operator, it has the opportunity to further optimize resource efficiency.

Of course, it also has some disadvantages. First, the cost of user configuration is too high. Complex operations in production include a large number of operators, which are difficult for users to configure one by one. Secondly, in this case, it is difficult to support coarse-grained and mixed resource allocation. If there are both coarse-grained and fine-grained operators in an SSG, Flink will not be able to determine how many resources it needs. Finally, due to a certain degree of deviation in the allocation or estimation of resources by users, this deviation will continue to accumulate, and the peak-shaving and valley-filling effects of operators cannot be effectively utilized.

The second option is to use the task formed after operator chaining as the granularity of resource allocation. In this case, we must expose Flink's internal chaining logic to users. At the same time, Flink's runtime still needs to further aggregate resources into slot levels according to the configuration of task slot sharing before scheduling resources.

Its advantages and disadvantages are roughly the same as the operator granularity, but compared with operators, it reduces the configuration cost of users to a certain extent, but this is still a pain point. At the same time, its cost is that resource configuration and chaining cannot be decoupled, and the internal logic of chaining and Flink is exposed to users, resulting in the limitation of internal potential optimization. Because once the user configures the resources of a task, the change of chaining logic may split the task into two or three, resulting in incompatible user configuration.

The third option is to directly use SlotSharingGroup as the granularity of resource configuration, so that for Flink, what you see in resource configuration is what you get, and the previous resource aggregation logic is omitted.

At the same time, this option has the following advantages:

First, make user configuration more flexible. We give the user the right to choose the configuration granularity. You can configure operator resources, task resources, and even subgraph resources. You only need to put the subgraph into an SSG and configure its resources.
Second, it is relatively simple to support coarse and fine-grained mixed configurations. The granularity of all configurations is slot, so there is no need to worry about the same slot containing both coarse-grained and fine-grained tasks. For a coarse-grained slot, its resource size can be simply calculated according to the default specifications of TM. This feature also makes the allocation logic of fine-grained resource management compatible with coarse-grained scheduling. We can regard coarse-grained as a part of fine-grained special case.
Third, it allows users to take advantage of the peak-shaving and valley-filling effects between different operators to effectively reduce the impact of deviations.
Of course, some restrictions will also be introduced, which couples the chaining of resource configuration and Slot Sharing together. In addition, if operators in an SSG have concurrency differences, in order to maximize resource utilization efficiency, users may need to manually ungroup them.

After comprehensive consideration, we finally chose the resource configuration interface based on SlotSharingGroup in FLIP-156. In addition to the advantages mentioned above, the most important thing is that it can be found from the resource scheduling framework that the slot is actually the most basic unit in resource scheduling. From Scheduler to resource scheduling applications are made in units of slots, directly Using this granularity avoids adding complexity to the system.

Going back to the example job, after supporting the fine-grained resource management configuration interface, we can configure different resources for the four SSGs, as shown in the figure above. As long as the scheduling framework is matched strictly according to this principle, we can maximize resource utilization efficiency.

4. Dynamic resource cutting mechanism

After solving the resource allocation, the next step is to apply for slots for these resources. This step requires the use of the dynamic resource cutting mechanism proposed by FLIP-56.

A brief review of this picture shows that the JobGraph on the far left already has resources, and you can go to the right to enter the resource scheduling of JM, RM, and TM. Under coarse-grained resource management, TM slots are fixed in size and determined according to the startup configuration. In this case, RM cannot meet slot requests of different specifications. Therefore, we need to make certain changes to the way slots are created.

Let’s first look at the existing static slot application mechanism. In fact, when the TM is started, the slots have already been divided and numbered. It will report these slots to the Slot Manager. When the slot request comes, the Slot Manager will decide to apply for slot1 and slot3. Finally, the slot will be released after the task on slot1 finishes running. In this case, only slot3 is occupied. We can find that although TM has 0.75 core and 3G free resources at this time, if the job applies for a slot of the corresponding resource size, TM cannot satisfy it, because the slot has been divided in advance.

Therefore, we propose a dynamic resource cutting mechanism. The slot is no longer generated and unchanged after the TM is started, but is dynamically cut from the TM according to the actual slot request. When the TM starts, we regard the resources that can be allocated to the slot as an entire resource pool. For example, there are 1 core and 4G memory resources in the above picture. Now there is a fine-grained job, and the Slot Manager decides to request a 0.25 core from the TM. 1G slot, TM will check whether its own resource pool can cut this slot, then dynamically generate the slot and allocate corresponding resources to JM, and then apply for a 0.5core, 2G slot for this job, the Slot Manager can still use it from the same To apply for a slot on a TM, as long as the free resources are not exceeded. When a slot is no longer needed, we can destroy it, and the corresponding resources will return to the free resource pool.

Through this mechanism, we address the problem of how fine-grained resource requests are satisfied.

Going back to the sample job, we only need 8 TMs of the same specification to schedule the job. Each TM is equipped with a GPU to meet SSG4, and then the CPU-intensive SSG1 and the memory-intensive SSG2 and SSG3 are mixed and aligned. The overall CPU-to-memory ratio on the TM is sufficient.

5. Resource application strategy

What is resource application strategy? It includes two decisions when RM interacts with Resource Provider and TM. One is what resource specification TM to apply for from Resource Provider and how many TMs of each specification need. The other is how to place slots in each TM. Both of these decisions are actually made inside the Slot Manager component.

The coarse-grained resource application strategy is relatively simple, because there is only one specification of TM, and the slot specifications are the same. In the allocation strategy, it is only necessary to consider whether to spread the slots to each TM as much as possible. However, strategies under fine-grained resource management need to take into account different requirements.

First, we introduced a dynamic resource cutting mechanism. The scheduling of slots can be regarded as a multi-dimensional box packing problem, which not only needs to consider how to reduce resource fragmentation, but also needs to ensure the efficiency of resource scheduling. In addition, whether the slot needs to be evaluated, and the cluster may have some requirements on the resource specifications of the TM, such as not being too small. If the TM resource is too small on K8s, it will cause the startup to be too slow, and the registration will time out at the end, but it cannot be too large. It will affect the scheduling efficiency of K8s.

Faced with the above complexity, we abstracted this resource application strategy and defined a ResourceAllocationStrategy. The Slot Manager will tell the strategy the current resource request and the existing available resources in the cluster. The strategy is responsible for making decisions and telling the Slot Manager how the existing resources are. Allocation, how many new TMs need to be applied for and their respective specifications, and whether there are jobs that cannot be satisfied.

At present, fine-grained resource management is still in beta version, and the community has built a simple default resource management strategy. Under this strategy, the specification of TM is fixed and determined according to the coarse-grained configuration. If the request for a certain slot is greater than the resource configuration, it may fail to be allocated. This is its limitation. In terms of resource allocation, it will sequentially scan the currently idle TM, and cut it directly as long as it satisfies the slot request. This strategy ensures that resource scheduling will not become a bottleneck even in large-scale tasks, but the cost is that resources cannot be avoided. Generation of fragments.

6. Summary and Future Prospects

Fine-grained resource management is currently in beta in Flink. As can be seen from the figure above, for the runtime, through FLIP-56 and FLIP-156, the work of fine-grained resource management has been basically completed. From the perspective of the user interface, FLIP-169 has opened the fine-grained configuration on the Datastream API. For details on how to configure it, you can refer to the user documentation in the community.

In the future, our development direction is mainly in the following aspects:

First, customize more resource management strategies to meet different scenarios, such as session and OLAP;

Second, currently we regard the extended resource as a TM-level resource, and every slot on the TM can see its information, and then we will further restrict its scope;

Third, fine-grained resource management can support coarse-grained and mixed-grained configurations, but there are some resource efficiency issues. For example, coarse-grained slot requests can be satisfied by slots of any size. In the future, we will further optimize the matching logic to better support hybrid configuration;

Fourth, we will consider adapting to the new Reactive Mode proposed by the community;

Finally, optimize the WebUI to display slot segmentation information, etc.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us