• UID63
  • Fans25
  • Follows1
  • Posts186

What makes AliCloud’s 377-second world record in sort contests?

More Posted time:Nov 25, 2015 21:13 PM
What makes AliCloud’s 377-second world record?
Text by AliCloud Apsara Team

Sort Benchmark, a non-profit organization specializing in the evaluation of sorting benchmarks, announced on Oct 28 that AliCloud sorted 100 terabytes of data within 377 seconds, breaking the previous world record of 1,406 seconds set by Apache Spark. In the two most technically demanding contests GraySort and MinuteSort, AliCloud set four new world records in both the Daytona category for general-purpose sorting systems and the Indy category for specifics-customized systems for the task of sorting.

The news stunned the technical sphere, especially the Internet and computer fields that pay close attention to cloud computing. AliCloud’s record-breaking performance in the contest rekindled people’s interest in distributed computing. Discussions are going on in the big data and cloud computing circles about how difficult the task is, how AliCloud made it, what its significance is for the general public etc.

This article aims to answer these questions.
How difficult is the task?
SortBenchmark aims to evaluate the calculating performance of different calculation models and platforms. Sorting, a most basic calculation task, is the first topic in any textbook on computer data structures and algorithms, thus the most easy yet directly effective benchmark in the contest. SortBenchmark contests date back to 1987 when entrants were single machines. At that time, how to manufacture a powerful machine and stretch the performance of a single machine was the focus.

The concept changed in 1998 and distributed computing started to predominate. The focus of work turned to how to effectively dispatch the physical resources of hundreds or even tens of thousands of machines such as CPUs, memory, networks and disk IOs to complete data sorting within the shortest time. The sorting contest is somewhat like the gradation in an army. The one who can manage several soldiers is qualified as a squadron leader, the one who can manage dozens is qualified as a platoon leader and the one who can manage tens of thousands is the general.

You may find it amusing when a squadron leader claims he can be a general as well as soon as you assign him several tens of thousands of men. It is the same case with the linear expansion of large-scale clusters, which is far more difficult than most think. As the scale grows, the system bottlenecks keep emerging, making the system too busy to attend to the messages or scheduling decisions of which it was formerly capable. Even with the help of sub-proxies, the performance may be unsatisfactory.
In particular, when the computing involves multiple types of resources, you may find the imbalances, i.e., the memory is effectively scheduled when the network access may be compromised and the network is optimized when the disk usage is comprised. Various dimensions may even conflict with each other.

When the scheduling of resources is well allocated, you will find Machine B may perform the calculation much faster than Machine A, or Machine A is faster, but it unfortunately collapses. This is normal, especially when a large number of machines are involved, just like one or more deserters among thousands of soldiers. As the scale expands, the issue becomes predominant and increasingly complicated. It is fair to say that every increase of the order of magnitude in the scale will lead to starkly different problems to be handled on the distributed computing platforms. How to achieve high performance using massive low-end machines is the core challenge of cloud computing technology.

The AliCloud Apsara platform managed to integrate 5,000 servers in a single cluster in 2013 and bigger scales are in production. Details about how to support the super-large scale can be found in Fuxi’s paper at the 40th International Conference on Very Large Data Bases in 2014, but it is not the emphasis of this article. Next we will focus on the additional efforts we’ve made to speed up data sorting on large-scale computing clusters.
How did AliCloud make it?
Apsara is a distributed computing platform of AliCloud. It handles all the offline data processing tasks within Alibaba Group and supports the basic platforms of public cloud services of AliCloud. Apsara comprises the following key modules (a) Pangu — the distributed file system in charge of storing and managing data files in the computing centers; (b) Fuxi — the distributed scheduling system in charge of managing clustered resources in computing centers and scheduling online and offline applications running in distributed systems. Fuxi provides FuxiJob, a framework for bulk processing of big data, to process arbitrary DAG-based user computing jobs.

The Fuxi distributed scheduling system has been deployed on hundreds of thousands of servers in multiple computing centers of Alibaba, with its single cluster housing more than 5,000 servers. FuxiJob handles any DAG-based offline data processing jobs including, but not limited to, MapReduce jobs and machine learning jobs which are more complicated. All the input and output files of the job and temporary files during the execution are stored in Pangu. The file copy and locality features in Pangu secure optimal performance and data reliability.

Next we will focus on FuxiSort developed on Apsara. FuxiSort was used in GraySort and MinuteSort contests. The optimization of the program will be detailed in the following chapter.

·        Overview
First, the program samples the data to be sorted to identify the range of various data partitions. As shown in Figure 1, after data sampling, the whole data sorting can be divided into two stages: the map stage and the sort stage, both of which involve multiple concurrent tasks.

Figure 1 FuxiSort procedure
In the map stage, map tasks read data partitions from local disks through the ChunkServer process of Pangu, and range input partitions and dispatch the input data to different sort handlers. The data is then transmitted to sort tasks over the network.

During the sort stage, all the sort tasks read the data from map tasks into the memory in a periodic manner. When the memory cache is full, memory sorting based on a quick-sort algorithm is conducted and the sorting results are written to Pangu temporary files (locally saved with no multiple copies). After sort tasks receive
all the map data, all the sorted data in the memory and the previous data written to Pangu temporary files will complete merge-sort together and the
merge-sort results are output to Pangu. After all the sort tasks of FuxiSort are completed, multiple globally-ordered Pangu files will be generated.

·        Implementation and Optimization
a) Input data sampling. To lower the impact of data skews on performance, we sampled the input data. The RangePartition scope is determined based on the sampling result to make the data size for each sort task equivalent as much as possible.

For example, we suppose the input data is divided into X files. First, we select Y positions at random from every file, and read Z continuous data samples from the selected position in each file, and we will get X*Y*Z samples. Next, we sort the sample data. The sorted sample data is divided into S partitions. S here indicates the number of sort tasks. Now we get the range scope of data to be processed for each sort task. Because the samples are evenly distributed, every sort task is assigned with a nearly equal size of data.

For GraySort, we have 20,000 input files (X=20,000), select 300 positions in each input file (Y=300), and read 1 sample at each position (Z=1). So we get 6,000,000 samples for sorting and divide them into 20,000 partitions (sort tasks). The map tasks will perform RangePartition for the samples above trying to evenly distribute the data among sort tasks. The whole sampling process takes around 35 seconds. For MinuteSort, we have 3,350 input files and select 900 data records from each file as samples, which takes the total samples to 3,015,000 in number. The sorted data is divided into 10,050 partitions and the entire sampling process takes 4 seconds. For IndySort, the sampling process is not required.

b) Dual IO buffers. During the map stage, FuxiSort processes data in one I/O buffer and Pangu reads data in another buffer. The two buffers may switch
between each other periodically to enable concurrent data processing and I/O operations, thereby reducing task latency.

Figure 2 Initiation sequence of various stages of FuxiSort
c) Operation pipelining. We break apart each stage of the sorting process into several steps to further lower the latency and try to overlap these steps for
execution. The steps are shown as follows.
Data sampling.
Job startup; MapTask reads input data;
MapTasks sends data to SortTask; SortTask receives data;
SortTask sorts data in the memory. When the memory is full, sorted data is dumped to the temporary file. SortTask merge sorts the ordered data in the memory and the temporary file.

SortTask writes the final output file. FuxiSort executes data sampling and job startup concurrently. In the job startup step, the major work includes job dispatching and other data management such as collecting IP addresses of SortTasks and notifying the IP addresses to all MapTasks. When the data sampling is done, the sampling program will put every partition range on Pangu and create another notification file on Pangu to signal the completion of sampling. Once the tasks are dispatched, every MapTask starts to check the presence of the notification file in a periodic manner. The existence of the notification file indicates the availability of the partition range from the sampling program. As a result, MapTask will read the partition ranges immediately and dispatch data accordingly.

Step (3), Step (4) and Step (5) are concurrently executed during the map stage, while Step (7) parallels Step (8) during the sort stage. During Step (6), only when the memory space allocated to the task is full will sorting and dumping be conducted. During sorting, because the memory is fully occupied with no space available for new data, Step (5) will be jammed. To solve this problem, we perform Step (5) and Step (6) concurrently by starting sorting as soon as the memory usage exceeds a certain value. In this way, Step (6) is executed in advance and Step (5) won’t be jammed. When the memory is
full, sorted data in the memory will be merge-sorted and dumped to the temporary file. Obviously, the lower the threshold value for sorting, the earlier Step (6) is started. In our experiment, we set to start Step (6) when 1/10 of the memory for the task is occupied by the received data. In this way,
we execute the I/O and computing in parallel to eliminate apparent latency. Although this approach requires merging more temporary files, we see no obvious overheads in our scenarios.

Figure 2 illustrates the time used in each step and the overlapping sections among the steps.

d) Network communication optimization. Network communication traffic is incurred prior to the map and sort tasks. CPU interrupt occurs at the arrival of each network package. If the handling of CPU interrupt is bound to a specified CPU core, the handling may be delayed when the specified core is busy with sorting, which will lead to request timeouts or even packet losses. We configure “sm_affinity” to balance the load from network interruption among all CPU cores, greatly reducing the occurrence of request timeouts and packet losses.

Figure 3 Real-time computing framework

e) Further optimization on MinuteSort. Because MinuteSort limits the execution time within 60 seconds, the scheduling overhead of general offline jobs becomes noticeable. To minimize the overhead, we execute MinuteSort on the Fuxi quasi real-time job model which is developed to maximize the computing performance of memory by reducing job scheduling overheads. Figure 3 illustrates the model framework. In a typical production environment, the quasi real-time system exists as a long-running service. It will be started during the cluster deployment and a no-exit worker process will be started on each server. Users are allowed to submit various jobs to the quasi real-time system scheduler and get the status of running jobs. According to Sort Benchmark contest rules, the startup and exit processes directly related to sorting should also be reflected in the result. So we start the quasi real-time system worker first before submitting the MinuteSort job, and then terminate the worker process after the job execution is done. In the submitted result, the worker startup and exit time are also covered.

The quasi real-time system is applicable to medium-sized data sets (not greater than 10 TB) whose data processing is latency-sensitive. In such a data scale, all the input and output records may be cached in the memory. In our experiment, we only run MinuteSort in the quasi real-time system.
What is its significance for the general public?
From the day AliCloud was born in 2009, our vision has been to create a self-developed, universally applicable and large-scale underlying distributed
computing system to facilitate the public access to cloud computing. The Apsara platform is the technical core that carries such a concept.

FuxiSort’s record-breaking performance at the Sort Benchmark contest is a direct reflection of AliCloud’s technical strength during the past six years and a pride of all technical practitioners.

But this is just a start. Technology should end up with application. Every technical progress made by AliCloud will be integrated into AliCloud products
to serve the clients both at home and abroad. FuxiSort, the winner of this contest, is commercially available in Open Data Processing Service (ODPS).
Independently developed by AliCloud, ODPS enables distributed processing of TB/PB-level of data less demanding in real-time performance and is applicable to data analysis, mining, BI fields etc.
All of

Alibaba’s offline data businesses run on ODPS. (which is to be launched to countries out of China) AliCloud is committed to constantly enhancing its
computing capability and economies of scale through technical innovations and welcomes more partners, small and medium sized enterprises and developers to enjoy the convenience and values brought about by cloud computing and make cloud computing a true public service to benefit all human beings.