Flink Runtime Architecture

1. Runtime overview

As we all know, Flink is a distributed data processing framework, and the user's business logic will be submitted to the Flink cluster in the form of Job. As the Flink engine, Flink Runtime is responsible for enabling these jobs to run and complete normally. These jobs can be stream computing jobs or batch jobs. They can run on bare metal or on a Flink cluster. Flink Runtime must support all types of jobs and jobs that run under different conditions.

1. Expression of assignment

To execute a job, it is first necessary to understand how a job is expressed in Flink.

The user writes a job through the API, such as the example of StreamWordInput on the left side of the above figure, which can continuously output words one by one; the following Map operation is responsible for mapping words into a two-tuple; then connect a keyBy to make the same word The 2-tuples are assigned together, then sum counts them, and finally prints them.

The job on the left corresponds to the logical topology (StreamGraph) on the right. There are 4 nodes in this topology, namely source, map, sum and print. These are data processing logics, also known as operators; the lines between nodes correspond to the distribution method of data, and affect how the data is distributed downstream. For example, the keyBy between map and sum means that the data produced by map and the data of the same key must be distributed to the same downstream.

With StreamGraph, Flink Runtime will further translate it into JobGraph. The difference between JobGraph and StreamGraph is that JobGraph will chain some nodes to form an Operator chain. The chain condition requires the concurrency of the two operators to be the same, and their data exchange method is one-to-one. The formed Operator chain is also called JobVertex.

The significance of Operator chain is to reduce some unnecessary data exchange, so that the operators of the chain are all executed in the same place. During the actual execution of the job, the logic graph will be further translated into an execution graph — ExecutionGraph. The execution diagram is a view at the concurrency level of the logic diagram. As shown in the above diagram, the execution diagram below is the expression that the concurrency of all operators in the logic diagram above is 2.

Why can't the map and sum in the above figure be embedded? Because their data involves multiple downstream operators, it is not a one-to-one data exchange method. A node in the logic diagram JobVertex corresponds to several concurrent execution nodes ExecutionVertex, and each node corresponds to each task. These tasks are finally deployed as entities to the Worker node to execute the actual data processing business logic.

2. Distributed architecture

As a distributed data processing framework, Flink has a distributed architecture, which is mainly divided into three parts: Client, Master and Worker nodes.

The Master is the main control center of the Flink cluster. It can have one or more JobMasters. Each JobMaster corresponds to a job. These JobMasters are managed by a control called Dispatcher. There is also a ResourceManager in the Master node for resource management. ResourceManager manages all Worker nodes, and it serves all jobs at the same time. In addition, there is a Rest Server in the Master node, which will be used to respond to Rest requests from various clients. Clients include web clients and command-line clients. The requests it can initiate include submitting jobs, querying job status and stop jobs etc. Jobs are divided into individual tasks through the execution graph, and these tasks are finally executed in the Worker node. Workers are TaskExecutors, which are containers for task execution.

There are three core components of job execution, namely JobMaster, TaskExecutor and ResourceManager: JobMaster is used to manage jobs; TaskExecutor is used to execute various tasks; ResourceManager is used to manage resources and serve the resource requests of JobMaster.

2. JobMaster: Job Control Center

The main responsibilities of JobMaster include job life cycle management, task scheduling, error recovery, status query and distributed status snapshot.

Distributed state snapshots include Checkpoint and Savepoint, where Checkpoint is mainly used for error recovery services, and Savepoint is mainly used for job maintenance, including upgrades and migrations. Distributed snapshots are triggered and managed by the CheckpointCoordinator component.

The core component in JobMaster is the Scheduler. Whether it is job lifecycle management, job status maintenance, or task scheduling and error recovery, Scheduler is responsible.

1. Job lifecycle management

The state of the life cycle of the job, and all possible state transitions of the job are shown in the figure below.

Under the normal process, the job will have three states, namely Created, Running and Finished. A job is in the Created state at the beginning. When the job is started to be scheduled, it will enter the Running state and start scheduling tasks. When all tasks are successfully completed, the job will go to the Finished state, report the final result, and then exit .

However, a job may encounter some problems during its execution, so the job will also have an exception handling status. If a job-level error occurs during job execution, the entire job will enter the Failing state, and then cancel all tasks. Wait until all tasks have entered the final state, including Failed, Canceled, and Finished, and then go to check the error exception. If the exception is unrecoverable, the entire job goes to the Failed state and exits. If the exception is recoverable, it will go to the Restarting state to try to restart. If the number of restarts does not exceed the upper limit, the job will be rescheduled from the Created state; if the upper limit is reached, the job will go to the Failed state and exit. (Note: In versions after Flink 1.10, when an error occurs, if it can be recovered, the job will not enter the Failing state but will directly enter the Restarting state. When all tasks return to normal, the job will return to the Running state. If the job If it cannot be recovered, the job will pass through the Failing state and finally enter the Failed state and end.)

The two states of Canceling and Canceled will only be reached when the user manually cancels the job. When the user manually explores the job in the Web UI or through the Flink command, Flink will first transfer the state to Cancel, and then cancel all tasks. After all tasks enter the final state, the entire job will enter the Canceled state and exit.

The Suspended state will only be reached when high availability is configured and the JobMaster loses leadership. This status only means that the JobMaster has a problem and terminated. Generally speaking, after the JobMaster gets the leadership again, or another standby Master gets the leadership, it will restart on the node that got the leadership.

2. Task scheduling

Task scheduling is one of the core responsibilities of JobMaster. To schedule tasks, one of the first problems is to decide when to schedule tasks. The timing of task scheduling is controlled by the scheduling strategy (SchedulingStrategy). This strategy is an event-driven component. The events it listens to include: job scheduling starts, task status changes, task output data becomes consumable, and failed tasks need to be restarted. By listening to these events, it can be more flexible. to determine when the task is started.

Currently we have a variety of different scheduling strategies, namely Eager and Lazy from sources. EagerSchedulingStrategy is mainly for streaming jobs. Its strategy is to start all tasks directly when the job starts to be scheduled. The advantage of this is that it can reduce the scheduling time. Lazy from sources mainly serve batch jobs. Its strategy is that the job only schedules the Source node at the beginning, and it will not be called until the input data of any node can be consumed. As shown in the figure below, the agg node can only be activated after the data of the source node starts to be output, and the sink node can only be activated after the agg node ends.

Why do Batch jobs and Streaming jobs have different scheduling policies? It is because there is a blocking shuffle data exchange mode in the Batch job. In this mode, it is necessary to wait for the upstream to fully produce all the data before the downstream can consume this part of the data set. If the downstream is adjusted in advance, it will just waste resources there. Compared with the Eager strategy, it can save a certain amount of resources for batch jobs.

Currently, there is another scheduling strategy called Pipelined region based that is under development. This strategy is similar to the Lazy from source strategy. The difference is that the former schedules tasks at the granularity of the Pipelined region.

A Pipelined region is a collection of tasks connected by pipelined. Pipelined side means that the upstream and downstream nodes will exchange data in a streaming manner, that is, the upstream writes while the downstream consumes while reading. The advantage of Pipelined region scheduling is that it can inherit the benefits of Eager scheduling to a certain extent, which can save the time spent on scheduling and allow upstream and downstream tasks to run in parallel. At the same time, it also retains Lazy from sources to avoid unnecessary waste of resources. By scheduling some tasks as a whole, you can know the amount of resources required by these jobs that need to run at the same time, and you can use this to perform some deeper optimizations.

(Note: Starting from Flink 1.11, the Pipelined region strategy has become the default scheduling strategy, serving both streaming and batch jobs.)

3. The process of task scheduling

Tasks have many different states, initially the task is in the Created state. When the scheduling policy thinks that the task can start to be called, it will go to the Scheduled state and start to apply for resources, that is, Slot. After applying for the Slot, it will go to the Deploying state to generate a description of the Task, and deploy it to the worker node, and then the Task will start on the worker node. After successful startup, it will turn to the running state on the worker node and notify the JobMaster, and then turn the task state to running on the JobMaster side.

For a job with an infinite flow, the final state is when it goes to the running state; for a job with a limited flow, once all the data is processed, the task will also go to the finished state, marking the completion of the task. When an exception occurs, the task will also go to the Failed state, and other affected tasks may be Canceled and go to the Canceled state.

4. Error recovery

When an error occurs in a task, the strategy or basic idea of JobMaster is to resume the data processing of the job by restarting the failed task and the task that may be affected. This involves three steps:

The first step is to stop related tasks, including error-failed tasks and tasks that may be affected by them. The failed tasks may have been FAILED, and then other affected tasks will be canceled and finally enter the Canceled state;

The second step is to reset the task back to the Created state;

The third step is to notify the scheduling policy to reschedule these tasks.

The tasks that may be affected are mentioned above, so what kind of tasks may be affected? This is determined by the FailoverStrategy.

Currently Flink's default FailoverStrategy is RestartPipelinedRegionFailoverStrategy. After adopting this strategy, if a Task fails, the region where it is located will be restarted. This is actually related to the Pipelined data exchange mentioned above. Among the nodes of Pipelined data exchange, if any node fails, other associated nodes will also fail. This is to prevent data inconsistencies. Therefore, in order to avoid multiple failovers caused by a single task, the general operation is to cancel the other tasks together when the first task fails, and then restart them together.

The RestartPipelinedRegion strategy restarts not only the Region where the failed task is located, but also its downstream Region. The reason is that the output of tasks is often non-deterministic. For example, a record is distributed to the first concurrent downstream and reruns once; when distributed to the second concurrent downstream, once the two downstreams are in different regions , it may cause the record to be lost, or even produce different data. In order to avoid this situation, using PipelinedRegionFailoverStrategy will restart the Region where the failed task is located and all its downstream Regions.

In addition, there is a RestartAllFailoverStrategy strategy, which will restart all tasks in the job when any Task fails. In general, this strategy is not often used, but in some special cases, such as when a task fails, the user does not want to run locally but wants all tasks to end and recover as a whole, this strategy can be used.

3. TaskExecutor: task runner

TaskExecutor is the runner of the task, and it has various resources in order to run the task. As shown in the figure below, the memory resources are mainly introduced here.

All memory resources are individually configurable. TaskManager also manages their configuration hierarchically. The outermost layer is Process Memory, which corresponds to the total resources of the entire TaskExecutor JVM. This memory includes the memory occupied by the JVM itself and the memory occupied by Flink. The memory occupied by Flink includes the memory occupied by the framework and the memory of the task.

The memory occupied by the task includes Task Heap Memory, which is the memory occupied by the Java object of the task; Task Off-Heap Memory is generally used for native third-party libraries; Network Memory is used to create Network Buffer to serve the input and output of the task; Managed Memory is managed Off-Heap Memory, which will be used by some components, such as operators and StateBackend. These Task resources will be divided into Slots one by one, and Slots are logical containers for task execution. Currently, the slot size is obtained by directly dividing the resources of the entire TasExecutor according to the number of slots.

One or more tasks can be run in a Slot, but there are certain constraints, that is, only tasks of different types in the same sharing group can run in a Slot at the same time. Generally speaking, tasks in the same PipelinedRegion are in a shared group, and all tasks of streaming jobs are also in a shared group. Different types mean they need to belong to different JobVertex.

As shown in the example on the right side of the figure above, this is a source, map, and sink job. It can be seen that after deployment, there are three tasks in each of the three slots, one for source, one for map, and one for sum. And there are only two tasks in one slot, because the source has only three concurrency, and no more concurrency can be deployed.

The first benefit of SlotSharing is that it can reduce the overhead of data exchange. There is one-to-one data exchange between map and sink. In fact, these nodes with physical data exchange are shared together, so that their data exchange can be carried out in memory, which is more expensive than in the network. Low.

The second advantage is that it is convenient for users to configure resources. Through SlotSharing, users only need to configure n slots to ensure that a sum job can always run. n is the concurrency of the largest operator.

The third benefit is to improve load balancing when there is little difference in the concurrency of each operator. This is because there are different types of operators in each slot, which prevents some operators with heavy loads from being crowded in the same TaskExecutor.

1. Task execution model

As mentioned above, each task corresponds to an OperatorChain. Generally speaking, each OperatorChain has its own input and output, the input is InputGate, and the output is ResultPartition. These tasks will generally be executed in an exclusive thread. The task reads data from InputGate, feeds it to OperatorChain, OperatorChain performs business logic processing, and finally outputs the output data to ResultPartition.

An exception here is the Source task, which does not read data from InputGate, but directly produces data through SourceFunction. The upstream ResultPartition and the downstream InputGate will exchange data through Flink's ShuffleService. ShuffleService is a plug-in. Currently, Flink defaults to NettyShuffleService. The downstream InputGate will use Netty to obtain data from the upstream ResultPartition.

ResultPartition is composed of SubPartitions one by one, and each SubPartition corresponds to a concurrent downstream consumer. InputGate is also composed of InputChannels one by one, and each different InputChannel corresponds to an upstream concurrency.

4. ResouceManager: resource management center

ResourceManager is the resource management center of Flink. We mentioned earlier that TaskExecutor contains various resources. The ResourceManager manages these TaskExecutors. The newly started TaskExecutor needs to be registered with the ResourceManager, and then the resources in it can serve job requests.

There is a key component in ResourceManager called SlotManager, which manages the status of Slot. These Slot states are updated through the heartbeat between TaskExecutor and ResourceManager, and the heartbeat information contains the state of all Slots in TaskExecutor. With all the current Slot states, the ResourceManager can serve the resource application of the job. When the JobMaster schedules a task, it will initiate a Slot request to the ResourceManager. The ResourceManager that receives the request will forward it to the SlotManager, and the SlotManager will check whether the available Slots in it meet the request conditions. If there is, it will initiate a Slot application to the corresponding TaskExecutor. If the request is successful, TaskExecutor will actively offer this Slot to the JobMaster.

The reason for going around like this is to avoid the inconsistency caused by distribution. As we mentioned just now, the Slot status in SlotManager is updated through heartbeat, so there is a certain delay. In addition, during the entire Slot application process, the Slot status may also change. So in the end we need to use Slot offer and its ACK as the final result of all applications.

There are many different implementations of ResourceManager. The ResourceManager used in Standalone mode is StandaloneResourceManager, which requires the user to manually pull up the Worker node. This requires the user to first understand how many total resources the job will require.

In addition, there are some ResourceManagers that will automatically apply for resources, including YarnResourceManager, MesosResourceManager and KubernetesResourceManager. After adopting these ResourceManagers, in the case of unsatisfactory conditions, the ResourceManager will automatically pull up the Worker node during the Slot request process.

Take YarnResourceManager as an example, JobMaster requests a Slot for a certain task. YarnResourceManager passes this request to SlotManager. If SlotManager finds that there is no Slot that meets the application, it will notify YarnResourceManager. YarnResourceManager will request a container from the real external YarnResourceManager. After getting the container, it will start a TaskExecutor. When the TaskExecutor is up, it will It will be registered in the ResourceManager and inform it of the available Slot information. After the SlotManager gets this information, it will try to satisfy the currently pending SlotRequests. If it can be satisfied, the JobMaster will initiate a Slot request to the TaskExecutor. If the request is successful, the TaskExecutor will offer the Slot to the JobMaster. In this way, users do not need to calculate the resource requirements of their jobs at the beginning, but only need to ensure the size of a single Slot, which can meet the task execution.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us