Interpretation of PolarDB-X source code: the life of DDL

In this article, the author will introduce the architecture and implementation of the DDL engine, as well as the interaction logic between the DDL engine and the DDL Job, from the perspective of the DDL engine.
Before reading this article, readers are advised to read
• "The Life of DDL (Part 1)"
• "PolarDB-X DDL also pursues ACID? "
DDL engine related concepts
DDL Job
DDL Job is a concept in the DDL engine, which is used to describe a logical DDL. In the DDL engine, a DDL job corresponds to a logical DDL, and the DDL job contains a series of actions required to execute a logical DDL. Therefore, under the DDL engine framework, developers support a new logical DDL, which essentially defines a new DDL job. .
DDL developers define static DDL jobs. However, DDL jobs also have state attributes when they are running. This property is primarily managed by the DDL engine. Of course, users can also execute limited DDL operation and maintenance instructions to manage the status of DDL Jobs and realize the management of the DDL execution process. The following figure is a state transition diagram of a DDL job. The bold black line in the figure represents the initial state and final state of the DDL job execution. The executable operation and maintenance instructions are marked on the connection lines between the states of each DDL job.
DDL Task
DDL Task is the encapsulation of a series of behaviors inside DDL Job, such as reading and writing metaDb, computing in memory, process communication, sending physical DDL to DN to be executed, etc. These behaviors will be encapsulated as DDL Task respectively. Therefore, a DDL job is composed of several DDL tasks. These tasks need to be scheduled and executed by the DDL engine in a certain order. DDL developers can use the DAG graph framework provided by the DDL engine of Polardb-X to describe the dependencies and execution order between tasks. . Under the framework of the DDL engine, developers define a new DDL Job, essentially defining several DDL Tasks, and then combining them with a DAG diagram.
DDL Task is an important tool for DDL engine to realize DDL approximate atomicity, and DDL atomicity is the goal pursued by DDL engine. Executing a logical DDL involves a series of operations, and atomicity requires that either all of these operations take effect, or none of them take effect. Specifically, the DDL engine requires each DDL Task to be idempotent, and each Task must have a corresponding reverse idempotent method (this method is called by the DDL engine when the Task is rolled back). Before the DDL engine executes the DDL, it will generate a DAG graph composed of DDL Tasks for the DDL and persist it to MetaDb, which is equivalent to the undo Log that guarantees the atomicity of the DDL. The DDL engine executes Tasks sequentially according to the DAG graph until the entire DDL Job is successfully executed or completely rolled back.
Workers and Leaders
From the perspective of the DDL engine, CN nodes are divided into Worker nodes and Leader nodes (the only ones in the cluster). The Worker node is responsible for receiving the DDL request sent by the user. It will perform simple local verification on the received request, then convert the DDL into a DDL Job and push it to MetaDb, and finally notify the Leader node to pull the DDL task from MetaDb.
The Leader node is responsible for the execution of DDL. After it pulls the DDL Job from MetaDb, it restores it to the form of a DAG graph, and topologically sorts the Tasks in the Job, and then schedules and executes the Tasks according to a certain degree of parallelism.
DDL engine source directory
In order to describe the following, this article first explains to the reader the directory of the source code of the DDL engine. The source code of the DDL engine of PolarDB-X is located at com.alibaba.polardbx.executor.ddl.newengine, and the descriptions of each module are as follows:
subdirectory or key class
Function
job
Definition of job and task objects
dag
Implementation of general DAG and topological sorting, including definition of nodes and graphs, topological sorting, maintenance and update of DAG
meta
Interface for reading and writing persistent objects in GMS. Persistent objects include job and task status, system resources (persistent read-write locks)
sync
Provide sync interface to realize information synchronization between Leader node and Follower node
utils
Encapsulation of threads, inter-thread communication and thread pool
serializable
Serialization interface for job and task objects
DdlEngineDagExecutor
The executor of the job, including the main logic of Task scheduling, Task status monitoring, and exception handling
DdlEngineScheduler
The scheduler of the job, puts the job into the execution queue and calls the executor of the job
DdlEngineRequester
The ddl engine processes the entry of the ddl request, persists the ddl job and notifies the leader node to process the ddl request.
example
Next, from the perspective of the DDL engine, this article shows readers how a logical DDL is scheduled and executed by the DDL engine.
DDL task scheduling
After a DDL statement is sent by the Mysql Client on the client side, the Worker node receives the DDL statement, parses it through a simple optimizer to obtain a LogicalPlan, and then assigns the LogicalPlan to the corresponding DDL Handler, which is responsible for generating the DDL Job. Then the interface com.alibaba.polardbx.executor.handler.ddl.LogicalCommonDdlHandler#handleDdlRequest of the public base class of DDL Handler handles the DDL request. This function calls the com.alibaba.polardbx.executor.ddl.newengine.DdlEngineRequester#execute method to The generated DDL Job and the context required to execute DDL are written into MetaDB, and the Leader node is notified for processing. So far, the Worker node has completed its own work. If the DDL is blocking, the Worker node will wait for the Leader to finish executing the DDL, and then return a Response to the client; if the DDL is non-blocking, the Worker node will return directly.
Two threads com.alibaba.polardbx.executor.ddl.newengine.DdlEngineScheduler#ddlDispatcherThread and com.alibaba.polardbx.executor.ddl.newengine.DdlEngineScheduler#ddlSchedulerThread are running on the Leader node, which correspond to instance-level DdlJobDispatcher and Schema respectively level of DdlJobScheduler. Among them, DdlJobDispatcher takes out Ddl Request from the globally unique Ddl Request queue, and then assigns it to the Schema-level Ddl Job queue. DdlJobScheduler is at the Schema level. It is responsible for continuously consuming Ddl Jobs from the Schema-level Ddl Job queue. During this process, DdlJobScheduler uses Schema-level semaphores to control the parallelism of parallel consumption of Ddl Jobs (the maximum number of threads on the same Schema for 10). DdlJobScheduler consumes Ddl Job, essentially taking out Ddl Job from Schema-level Ddl Job queue, and then assigning it to DdlJobExecutor (Job level), DdlJobExecutor is responsible for transferring DDL Job to DdlEngineDagExecutor. At this point, the DDL Job officially enters the executor DdlEngineDagExecutor in the DDL engine, and the latter takes over the execution of the DDL Job.
What needs to be added is that, from the above, it can be seen that the DDL engine supports concurrent execution of multiple DDLs. In order to ensure mutual exclusion between DDLs that require the same resources, the DDL engine provides a persistent read-write lock mechanism. As a DDL developer, you only need to declare the Schema and Table resources required by the DDL in advance when defining the DDL Job. When executing DDL, the DDL engine will acquire the read-write lock according to the resources required by the DDL job before com.alibaba.polardbx.executor.ddl.newengine.DdlEngineRequester#execute generates the DDL job and saves it to MetaDB.
DDL task execution
DdlEngineDagExecutor is responsible for the execution of DDL tasks. It will call the restoreAndRun method to pull and restore DDL Jobs from MetaDb to DAG form. Then call the run method to execute the corresponding callback method according to the current state of the DDL Job.
public class DdlEngineDagExecutor {

public static void restoreAndRun(String schemaName, Long jobId, ExecutionContext executionContext){
boolean restoreSuccess = DdlEngineDagExecutorMap.restore(schemaName, jobId, executionContext);
DdlEngineDagExecutor dag = DdlEngineDagExecutorMap.get(schemaName, jobId);
dag. run();
}

private void run() {
// Start the job state machine.
if (ddlContext. getState() == DdlState. QUEUED) {
onQueued();
}
if (ddlContext. getState() == DdlState. RUNNING) {
onRunning();
}
if (ddlContext. getState() == DdlState. ROLLBACK_RUNNING) {
onRollingBack();
}
// Handle the terminated states.
switch (ddlContext. getState()) {
case ROLLBACK_PAUSED:
case PAUSED:
onTerminated();
break;
case ROLLBACK_COMPLETED:
case COMPLETED:
onFinished();
break;
default:
break;
}
}
}
com.alibaba.polardbx.executor.ddl.newengine.DdlEngineDagExecutor#run will execute the corresponding callback method according to the current state of the DDL Job, which is essentially a process of walking on the state transition diagram of the DDL Job.
The initial state of a DDL Job is generally QUEUED, which indicates that it is currently scheduled to a Schema-level queue by the DDL engine. At this time, the run method will call the onQueued() method according to this state. The function of the onQueued() method is to modify the status of the DDL Job to RUNNING.
When the current state of the DDL Job is RUNNING, the run method will call the onRunning callback method to execute the tasks inside the DDL Job according to the dependencies of the DAG graph.
private void onRunning() {
while (true) {
if (hasFailureOnState(DdlState. RUNNING)) {
if (waitForAllTasksToStop(50L, TimeUnit. MILLISECONDS)) {
LOGGER.info(String.format("JobId:[%s], all tasks stopped", ddlContext.getJobId()));
return;
} else {
continue;
}
}
if (executingTaskScheduler. isAllTaskDone()) {
updateDdlState(DdlState. RUNNING, DdlState. COMPLETED);
return;
}
if (executingTaskScheduler. hasMoreExecutable()) {
// fetch & execute next batch
submitDdlTask(executingTaskScheduler. pollBatch(), true, executingTaskScheduler);
continue;
}
//get some rest
sleep(50L);
}
The process of onRunning is as follows:
• First check whether the status of the current DDL Job is RUNNING, if not, return directly.
• Check whether there are Task nodes to be executed on the current DAG graph, if not, update the Job status to COMPLETED, and then return.
• If there are executable tasks on the current DAG graph, use topological sorting to extract all executable tasks from the DAG graph, and call the submitDdlTask method to execute concurrently according to the limit of parallelism. Note that the Task may not be executed successfully. If there is a Task execution failure, the submitDdlTask method will modify the status of the current DDL Job according to the failure strategy predefined by the developer of the Task. Most typically, when a Task fails, modify the current DDL Job status to PAUSED or ROLLBACK_RUNNING. The detailed error handling and recovery mechanism will be introduced in the next section.
If the state of a DDL Job is ROLLBACK_RUNNING, the run method will call the onRollingBack() callback method to implement DDL rollback. The relevant code is as follows
private void onRollingBack() {
if (!allowRollback()) {
updateDdlState(DdlState. ROLLBACK_RUNNING, DdlState. ROLLBACK_PAUSED);
return;
}

reverseTaskDagForRollback();

// Rollback the tasks.
while (true) {
if (hasFailureOnState(DdlState. ROLLBACK_RUNNING)) {
if (waitForAllTasksToStop(50L, TimeUnit. MILLISECONDS)) {
LOGGER.info(String.format("JobId:[%s], all tasks stopped", ddlContext.getJobId()));
return;
} else {
continue;
}
}
if (reveredTaskScheduler. isAllTaskDone()) {
updateDdlState(DdlState. ROLLBACK_RUNNING, DdlState. ROLLBACK_COMPLETED);
return;
}
if (reveredTaskScheduler. hasMoreExecutable()) {
// fetch & execute next batch
submitDdlTask(reveredTaskScheduler.pollBatch(), false, reveredTaskScheduler);
continue;
}
//get some rest
sleep(50L);
}
}
The process of onRollingBack is as follows:
• First check whether rollback is allowed under the execution progress of the current DAG graph (once the fail point task is passed, rollback is not allowed). If it cannot be rolled back, mark the status of the current DDL Job as PAUSED and exit.
• When the status of DDL Job is ROLLBACK_RUNNING, there may be other executing Tasks. At this time, the DDL engine will no longer allow new tasks to start executing, and will wait for the success or failure of the executing tasks. At this time, the DDL job has reached a consistent state.
• After reaching a consistent state, the rollback process can be started. First, all directed edges of the DAG graph are reversed, so that the execution process of the entire DDL Job is reversed. Then perform topological sorting according to the reversed DAG graph, take out tasks that have been executed before or have been executed but have not been completed, and execute their reverse idempotent methods.
• When there is no executable Task node in the DAG graph, the DDL Job status is marked as ROLLBACK_COMPLETED, and the rollback is successful.
The callback function logic of other states is relatively simple, so I won’t go into details here. Interested readers are invited to read the code by themselves.
Error Handling and Recovery
One of the goals pursued by the DDL engine is the atomicity of DDL. If some tasks fail during the execution of DDL, the DDL engine needs to take appropriate measures to make the DDL Job become completely unexecuted or successfully executed (that is, in the state transition diagram. final state). The method adopted by the DDL engine is to add the DdlExceptionAction attribute to the Task, which is used to instruct the DDL engine how to deal with the exception when executing the Task. DDL developers can set this property when defining DDL Task.
DdlExceptionAction has a total of 4 values
• TRY_RECOVERY_THEN_PAUSE: If an exception occurs during the execution of the Task, retry 3 times. If it still fails, set the status of the DDL Job corresponding to the Task to PAUSED.
• ROLLBACK: When an exception occurs during the execution of the Task, the DDL Job status of the Task is set to ROLLBACK_RUNNING, and then the DDL engine will roll back the DDL according to the status.
• TRY_RECOVERY_THEN_ROLLBACK: If an exception occurs during the execution of the Task, retry 3 times. If it still fails, set the status of the DDL Job where the Task is located to ROLLBACK_RUNNING, and then the DDL engine will roll back the DDL.
• PAUSE: When an exception occurs during the execution of the Task, the status of the DDL Job corresponding to the Task is set to PAUSED.
Generally speaking, the PAUSED state means that the DDL Job has not reached the final state and requires the intervention of the developer. This is often used for tasks that cannot be recovered after an exception occurs, or tasks that have affected the outside world and cannot be rolled back. The former is an example, such as the drop table command. Once the task of deleting metadata or deleting the physical table is executed, it cannot be restored to the state before the deletion. At this time, if a task fails and fails after three retries, it will cause The DDL Job enters the PAUSED state; for the latter, for example, most DDL Jobs in Polardb-X contain a CDC-marked Task, which is used to generate bin logs externally. The completion of the Task execution means that the outside world can already obtain the corresponding DDL bin log, so it cannot be rolled back.
Summarize
From the perspective of the DDL engine, this article introduces the architecture and implementation of the DDL engine, as well as the interaction logic between the DDL engine and the DDL Job. To learn more about the analysis of the source code of Polardb-X, please continue to pay attention to our subsequent articles.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us