Guarantee Mapreduce/Spark task data consistency through Job Committer

Concurrently writing data to the target storage system is a natural feature of distributed tasks. By writing data concurrently at the node/process/thread level, the disk and network bandwidth of the cluster can be fully utilized to achieve high-capacity throughput. One of the main problems that need to be solved when writing data concurrently is how to ensure data consistency. Specifically, the problems listed below need to be solved:

1. In the process of writing data in distributed tasks, how to ensure that the intermediate data is not visible to the outside world.

2. After the distributed tasks are completed normally, all result data are guaranteed to be visible to the outside world at the same time.

3. When a distributed task fails, all result data is invisible to the outside world and can be cleaned up correctly.

4. When predictive execution is enabled, ensure that multiple tasks that execute the same task have only one copy of the result data in the final result.

In addition, some job exceptions need to be handled, such as task failure retry, job restart, and so on. Job Committer is used by MapReduce to ensure the consistency of distributed writes. Through various implementations of Job Committer, the consistency of data written by MapReduce tasks in various abnormal scenarios is guaranteed. Spark supports MapReduce's JobCommitter, which also implements the consistency of data written by Spark jobs through JobCommitter.

JobCommitter interface
MapReduce has two sets of API interfaces, V1 and V2, which are distinguished by mapred and mapreduce in the package name. The JobCommitter abstract interface of v1 and v2 versions is basically the same. The following uses org.apache.hadoop.mapreduce.OutputCommitter as an example to introduce the main interface definition:

According to the calling timing and sequence of the interface, we can roughly sort out how the MapReduce task passes through the working mechanism of the JobCommitter.

1. When the job is initialized, call setupJob to perform some job-level initialization work, such as setting the working directory of the job and so on.

2. If the same job is already being executed, call isCommitJobRepeatable to determine whether to continue.

3. When the task is initialized, call setupTask to perform some job-level initialization work, such as setting the task working directory, task output directory, etc.

4. If the task output already exists, use isRecorverySupport to judge whether recovery is supported. If so, call recoverTask to avoid task calculation.

5. If the task execution fails, call abortTask to clean up the task output.

6. If the task is executed successfully, call commitTask.

7. If all tasks are completed, call commitJob.

8. If the job fails, call abortJob.

It can be seen that the basic mechanism of JobCommitter is based on a method similar to the two-phase commit protocol in a distributed database. The task commits first, and the main work is completed in the task. After the appmaster receives information about the successful submission of all tasks, Carry out job commit to complete the final submission work. Achieving data consistency through a two-phase commit protocol has two main requirements that need to be met:

1. Before the commit job, the data is not visible to the outside world and can be rolled back.

2. The commit job process should be as short as possible, preferably an atomic operation. A longer commit job process has a greater risk of failure in the middle. Once it fails, the data will be in an intermediate state, which cannot meet the data consistency requirements.

In MapReduce, FileOutputCommitter is the most commonly used Job Commiter implementation. When writing data to HDFS, it fully meets the two requirements of the two-phase commit protocol.


The following briefly introduces some specific implementation details of the main interface of FileOutputCommitter. FileOutputCommitter mainly involves four directories:

Final directory: $dest/
Job temporary directory: $dest/_temporary/$appAttemptId/
Task temporary directory: $dest/_temporary/$appAttemptId/_temporary/$taskAttemptID/
Task output directory: $dest/_temporary/$appAttemptId/$taskAttemptID/
The entire JobCommitter execution process is shown in the figure:

1.setupJob: Set up the Job temporary directory.

2.setupTask: Determine the Task temporary directory and output directory.

3.commitTask: Rename the Task temporary directory to the output directory.

4.abortTask: Clean up the Task temporary directory.

5.commitJob: Merge the data in the temporary directory of the Job (including files in the output directory of all Tasks) into the final directory of the Job.

6. abortJob: Clean up the Job temporary directory.

According to the above implementation of FileOutputCommitter, you can see that before the commitJob, all the data written by the mapreduce task is in the temporary directory. Reading the final directory of the Job will not read the temporary data. If any process executed by the Job fails, the temporary directory should be cleaned up. file. During the execution of the Job by FileOutputCommitter, each generated file needs to be renamed twice. The first is the commitTask, which is executed in the Task. Tasks executed in multiple nodes can be renamed concurrently. The second time is commitJob, executed by the Job Driver of MapReduce or Spark, which is a single-point operation. When commitJob, due to the need to move the files in the temporary directory of the Job to the final directory, there will be a time window. If it fails during the process, some data will be visible to the outside world. This time window will increase with the number of files. Increase. For distributed file systems such as HDFS, rename is a very efficient operation that only involves the modification of relevant metadata on the NameNode, so this time window is very small and can meet the needs of most scenarios.

For object storage systems on public clouds such as S3 and OSS, the Rename operation is not directly supported, and the Rename operation at the file system level is generally converted into a Copy+Delete operation, which will greatly increase the cost compared with HDFS. commitJob is executed on the Job Driver side of MapReduce or Spark, and is a single-point operation. Although thread-level concurrency optimization is implemented, in the scenario of writing to S3/OSS, the commitJob time window will be very long and the number of files will be large It may reach the minute or even hour level, which will have a serious impact on the performance of the Job. In order to solve the performance problem of writing to object storage systems such as S3/OSS, the Hadoop community introduced the FileOutputCommitter V2 version.

FileOutputCommitter V2
The entire job commit process of the FileOutputCommitter V2 version is as follows:


1.setupJob: Set up the Job temporary directory.

2.setupTask: Determine the Task temporary directory.

3.commitTask: rename the Task temporary directory file to the final Job directory.

4.abortTask: Clean up the Task temporary directory.

5.commitJob: Rename operation is not required.

6. abortJob: Clean up the Job temporary directory.

It can be seen that in the V2 version, the biggest difference is that the Task output directory is removed, and the file is directly renamed to the final Job directory when commitTask. The entire Job Commit process requires only one Rename operation for all files, and the Rename operation It is executed concurrently on all tasks of the cluster nodes, which eliminates the bottleneck of Job Driver's single-point execution of rename.

FileOutputCommitter V2 greatly improves the performance in scenarios such as writing data to S3/OSS, but because the task output directory is byPassed, data consistency cannot be guaranteed. During the execution of the job, some files are moved to the final directory of the job. When some tasks succeed and some tasks fail, intermediate files will also remain in the final directory.

For scenarios such as writing to S3/OSS, the Hadoop community and various industries have also proposed a lot of solutions. The basic goal is to completely avoid the Rename operation while ensuring data consistency. The following mainly introduces S3ACommitter and JindoOssCommitter, which are the Job Committers implemented by the hadoop community and Alibaba Cloud EMR team for S3 and OSS respectively. They are mainly based on the Multipart Upload feature of S3/OSS. The basic ideas are the same, and they are introduced here together. In addition, there are also Databricks' DBIO-based solution, Netflix's Staging committer solution, etc. The space is limited, so I won't introduce too much here.

Multipart Upload of Object Storage System
In addition to uploading files to S3/OSS through the PUT Object interface, S3/OSS also provides another upload mode - Multipart Upload. It is mainly used in scenarios where large files need to be uploaded at breakpoints or the network is not good. Taking OSS as an example, the upload process of Multipart Upload is as follows:

1. InitiateMultipartUpload: Before using the Multipart Upload mode to transfer data, this interface must be called to notify OSS to initiate a Multipart Upload event. Specify the target file address as a parameter to get an uploadId for subsequent upload.

2. UploadPart: After initializing a MultipartUpload, data can be uploaded in parts according to the specified Object name and Upload ID. The uploadPart interface can be called repeatedly to upload different parts of data, and can be called concurrently.

3. CompleteMultipartUpload: After uploading all data parts, the CompleteMultipartUpload interface must be called to complete the MultipartUpload of the entire file. After completing completeMultipartUpload, the file is visible to the outside world on oss, and it is not visible to the outside world until completeMultipartUpload returns.

4. AbortMultipartUpload: The AbortMultipartUpload interface is used to terminate the MultipartUpload event, and the MultipartUpload can be terminated at any time before CompleteMultipartUpload.

5. ListMultipartUploads: ListMultipartUploads is used to list all executing Multipart Upload events, that is, Multipart Upload events that have been initialized but not Complete or Abort.

Implementation of No-Rename Committer based on Multipart Upload
Through the support provided by the Multipart Upload function, combined with the customization support at the S3/Oss file system level, it is possible to implement a Job Committer that does not require a Rename operation under the premise of ensuring data consistency. The specific Job Commit process is as follows:

1.setupJob: Set up the Job temporary directory.

2.setupTask: Set up the temporary directory of the task. During the execution of the task, write files directly to the final directory of the job using the MultiUpload interface. When closing the file, the CompleteMultipartUpload interface is not called, and all upload block information is recorded in the file in the temporary directory of the task.

3.commitTask: Merge the upload block information of multiple files in the Task temporary directory file into one file, and write it to the Job temporary directory.

4. abortTask: Clean up the temporary directory of the task, use the AbortMultipartUpload interface, and abort all the files written by the task.

5.commitJob: Access all Upload block information in the Job temporary directory, call the CompleteMultipartUpload interface, and complete the MultipartUpload of all files.

6. abortJob: call ListMultipartUploads, abort all the files written by the task, and clean up the temporary directory of the job.

During the execution of the Task, because the upload and the upload of the block data are initialized through the Multipart Upload related interface, but when the commitJob is known, the CompleteMultipartUpload will be called. According to the Multipart Upload feature, the file is invisible before calling CompleteMultipartUpload, thus ensuring data consistency. Similar to FileOutputCommitter, since there are multiple files that require CompleteMultipartUpload, there will also be a time window that may cause data inconsistency when commitJob. The file upload process has been completed in a distributed manner in the task. CompleteMultipartUpload is a very lightweight request when commitJob in the Job Driver, so this time window will be very short, the possibility of failure is low, and it can satisfy most of the requests. The needs of business scenarios. Compared with FileOutputCommitter V1, the cost of CompleteMultipartUpload is much lower than that of Rename during jobCommit, and the time window that may cause data inconsistency is also much smaller. Compared with FileOutputCommitter V2, V2 does not guarantee data consistency, JindoOssCommitter can be applied to more scenarios that require data consistency.

In terms of performance, this method writes data concurrently in tasks to OSS in a distributed manner, and does not require a Rename operation. Compared with the two and one Rename operations required by FileOutputCommitter V1/V2, it also has a significant performance improvement.


Compared with the FileOutputCommitter V1/V2 version, the No-Rename Committer realized through the Multipart Upload function generally provided by the object storage system has greatly improved in terms of data consistency and performance. When using MapRedcue and Spark to write data to S3/Oss It is more recommended to use in the scene. S3ACommitter is already available in Hadoop community version 3.1.2, and JindoOssCommitter is also enabled by default in Alibaba Cloud's EMR environment version 2.5.0 and above

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us