Gone in 60 seconds! Breaking the MinuteSort Record
Yuliya Feldman, Amit Hadke, Gera Shegalov, M. C. Srivas
The MinuteSort test measures how much data a system can sort in 1 minute. The test requires that a random sequence of 100-byte records, each consisting of a 10-byte key and 90 bytes of payload, be arranged in either ascending or descending order. The earlier record had sorted 14 billion records totaling 1400 gigabytes in 60 seconds. The web site sortbenchmark.org keeps track of all such records.
Since MapR (with Google’s help) broke through the first Hadoop barrier of sorting a terabyte in 60 seconds, we decided to try our hand at breaking the MinuteSort record using Hadoop.
We wish to point out that Hadoop is a general purpose framework, and is not optimized for this single problem. For instance, with MapR’s Hadoop distribution there are a lot of places where data is check-pointed to improve redundancy. In addition, there’s a lot of cluster management traffic that tries to detect failures and lots of health-checks running through the system. Finally, MapR’s software typically runs with all logging and reporting enabled at all times. We didn’t turn any of it off during this benchmark.
Trying to set a new record competing against special-purpose sorting frameworks is a much tougher proposition.
Announcing the New Record
Using the Google Compute Engine, running on 2103 instances of the type n1-standard-4-d, MapR was able to sort 15 billion 100-byte records totaling 1.5 terabytes of data in 59 seconds. It is also a testament to the high quality and uniformity of the compute environment that Google graciously made available to us. The record was set on the Google Compute Engine shared virtualized environment that was not altered or cordoned-off or isolated in any shape or form.
An n1-standard-4-d instance consists of 4 virtual cores with 15G of DRAM and 1 virtual Ethernet NIC, and 1 virtual local disk of size 1.7T. On the Google Compute Engine, 2 virtual cores are equivalent to 1 physical core, so this setup used about 4206 physical cores. Of the 2103 virtual nodes, 3 nodes ran the CLDB processes, 1 ran the JobTracker, and the remaining 2099 nodes ran TaskTrackers.
Divvying up the Map-Reduce stages
We ran 1 mapper and 1 reducer per node, for a total of 2099 mappers and 2099 reducers. Assuming a uniform distribution of data across the nodes, each map task reads about 714MB of data from disk, sorts it, and writes 2099 partitions of approx. 341KB each. In the MapR Hadoop implementation, each partition is a separate file, so the process creates about 4.4 million files in 10 seconds.
Next, each reducer fetches its partition file from each of the 2099 nodes, running a 2099-way merge to combine them to produce its result. The process is called the shuffle phase in map-reduce.That’s about 4.4 million files of 341KB being read and transferred over the network in about 20 seconds.
Note that the reducer cannot start its merge until all the map outputs have been fetched. Thus, the shuffle can be broken naturally into a “fetch” stage, followed by a merge-sort phase.
Given the budget of 60 seconds, we allocated timings for each phase as follows:
|Straggler overhead||5 seconds|
The stragglers are tasks that linger longer and finish much later than the majority of the tasks. A single straggler task among the 4198 tasks may take up an extra 8 seconds to finish, and can hold up the entire works and impact the final result.
These stragglers end up governing the overall speed of the process since they consume the most amount of time. In thermodynamics parlance: the rate of the reaction is governed by the slowest step in the reaction, called the rate-determining step of the reaction. We budgeted an extra 5 seconds of time for the stragglers – our rate determining step – to finish.
The first few attempts at sorting 1.5TB started coming in about 69-70 seconds, with the map-stragglers taking 25 seconds, and reduce-stragglers taking 25 seconds. We noticed that between each of the different phases in the table above the entire cluster went idle for 2-4 seconds, for a total idle-time of about 7-9 seconds. Eliminating any idle-time became the main goal of our approach.
Overlapping Map and Fetch
The first (and obvious) thing to try was to overlap the map phase with the fetch phase. Each mapper produced 2099 files, but the reducers do not start fetching data until the map is completely done. This is because the Hadoop framework distributes completion notifications on a task-boundary, with no visibility inside partially-completed tasks.
So we hacked the Hadoop JT/TT framework to handle partial map completions. Remember that each mapper produces about 2099 intermediate files, one for each reducer. We changed the task-completion protocol to include partial results; for example, a mapper could send a notification every time it wrote another 100 files.
The problem was that the reducer was still not getting notified quickly enough, because of the heart-beat (HB) mechanism between the JobTracker (JT) and TaskTracker (TT) that is used for notifications. By default on a cluster of 2100 nodes the HBs are sent out every 3 seconds. This meant that a reducer may not notice for up to 6 seconds that a mapper’s outputs were ready for processing. Reducing the HB was tricky, as the HB processing in the JT holds a global-lock. When the HB’s start coming in too frequently, the lock-contention starts to dominate and normal work comes to a halt.
We reorganized the JT code to do most processing outside the global lock, entering the lock only at the last moment when everything was ready to be added to the JT’s global data structures. The change let us reduce the HB to 800 milliseconds on the 2100 node cluster, thereby speeding up the notifications by 4x.
Although we had a good overlap between the mapper and reducer, we didn’t see much improvement in the overall completion time. The merges at the reducer couldn’t be started until all the data had arrived.
This code is still probably quite useful, so we’ve kept it around. It’s not clear if this tweak is even possible with the Apache Hadoop, as it takes advantage of some unique features of MapR to reduce the amount of data the JobTracker has to track for partial mapper intermediate results.
What we noticed was that during the merge-sort in the reduce phase, one of the 4 virtual cores was pegged at 100% busy while the other cores were idle. We decided to find a way to harness all the virtual cores, calculating that the overall elapsed time would thereby improve.
Sorting is single-threaded. However if the data structures can be broken into separate independent pieces, then the sorting can be farmed out to multiple cores, and then their results can be combined using a final merge.
The obvious way to parallelize the reducers is to increase the number of reducers per node from 1 to 4. But it would cause an explosion in the number of files to be processed: the total files that had to be created/written/opened/read/deleted in 20 seconds would go from 4.4 million to 17.6 million. The other problem is that the file-server also needs some cycles to run, so we decided to create only 3 map outputs instead of 4 to save some CPU cycles for the file-server, but send the map outputs in a single stream to the reducer.
At the map side, the mapper was already multi-threaded due to our previous work, sorting the map inputs in parallel. To allow the reducers to merge in parallel, we combined the output of 3 mappers into 1 stream, and sent it over to the reducers. At each reducer, the stream was separated back into the 3 maps outputs, and merged in parallel in 3 threads. Note that the Hadoop framework was already set up to work in this fashion, it just needed a little tweak to allow for the single stream transfer for multiple reducer inputs.
Again, it’s unclear if this tweak is even possible with the Apache Hadoop, for the same reasons that it uses some unique features of MapR.
With this change, the run time came down from 70 seconds to 62 seconds. We were almost home!
Other Minor Changes
Among the observations, we noticed that the file-server hosting the partition list was getting hit very hard. We watched the GUI Heatmap using the CPU profile view … this file-server would light up bright red throughout any run. Every map task on the 2099 nodes was reading this partition list file at exactly the same moment. We increased the replication factor for the volume holding the partition list to 6, and the file-server bottleneck disappeared. The map stragglers disappeared as a result. It saved us a precious second or two.
We also noticed that the comparator in the quick-sort on the map-side wasn’t doing as good a job as we’d liked. The comparator by default “caches” the first 4 bytes of every key in a 32-bit integer and does a raw comparison on that before looking the entire key. Given that we were running on 64-bit processors, we increased this cache size to 8 bytes per key, and the accuracy of the comparator increased significantly for almost no cost. This change saved us another second! The variable we introduced to play around with this is mapr.map.keyprefix.ints
After these changes, we were able to reproduce the result of sorting 1.5 terabytes of data in less than 1 minute consistently across multiple runs and multiple cluster configurations, thanks to the consistent and uniform performance of the Google Compute Environment. We recorded a few runs, and have included the JobTracker’s output showing the results from one such run. The job.xml used to run this is also included in this report.
Hadoop Job job_201301281929_0028 on History Viewer
mapreduce.job.acl-view-job: All users are allowed
mapreduce.job.acl-modify-job: All users are allowed
Submitted At: 30-Jan-2013 10:07:09
Launched At: 30-Jan-2013 10:07:09 (0sec)
Finished At: 30-Jan-2013 10:08:09 (59sec)
Analyse This Job
|Kind||Total Tasks(successful+failed+killed)||Successful tasks||Failed tasks||Killed tasks||Start Time||Finish Time|
|Map||2099||2099||0||0||30-Jan-2013 10:07:10||30-Jan-2013 10:07:40 (29sec)|
|Reduce||2099||2099||0||0||30-Jan-2013 10:07:10||30-Jan-2013 10:08:09 (58sec)|
|Job Counters||Aggregate execution time of mappers(ms)||0||0||34,737,511|
|Launched reduce tasks||0||0||2,099|
|Total time spent by all reduces waiting after reserving slots (ms)||0||0||0|
|Total time spent by all maps waiting after reserving slots (ms)||0||0||0|
|Launched map tasks||0||0||2,099|
|Data-local map tasks||0||0||2,099|
|Aggregate execution time of reducers(ms)||0||0||97,345,363|
|Map-Reduce Framework||Map input records||15,000,000,000||0||15,000,000,000|
|Reduce shuffle bytes||0||1,560,088,116,020||1,560,088,116,020|
|Map output bytes||1,530,000,000,000||0||1,530,000,000,000|
|Combine input records||0||0||0|
|Reduce input records||0||15,000,000,000||15,000,000,000|
|Reduce input groups||0||15,000,000,000||15,000,000,000|
|Combine output records||0||0||0|
|Reduce output records||0||15,000,000,000||15,000,000,000|
|Map output records||15,000,000,000||0||15,000,000,000|
|GC time elapsed (ms)||274,468||2,730,379||3,004,847|
Configuration details (job.xml)
Job Configuration: JobId – job_201301281929_0028