Hadoop Minutesort Record

February 26, 2013

Gone in 60 seconds! Breaking the MinuteSort Record

Yuliya Feldman, Amit Hadke, Gera Shegalov, M. C. Srivas

Introduction

The MinuteSort test measures how much data a system can sort in 1 minute. The test requires that a random sequence of 100-byte records, each consisting of a 10-byte key and 90 bytes of payload, be arranged in either ascending or descending order. The earlier record had sorted 14 billion records totaling 1400 gigabytes in 60 seconds. The web site sortbenchmark.org keeps track of all such records.

Since MapR (with Google’s help) broke through the first Hadoop barrier of sorting a terabyte in 60 seconds, we decided to try our hand at breaking the MinuteSort record using Hadoop.

We wish to point out that Hadoop is a general purpose framework, and is not optimized for this single problem. For instance, with MapR’s Hadoop distribution there are a lot of places where data is check-pointed to improve redundancy. In addition, there’s a lot of cluster management traffic that tries to detect failures and lots of health-checks running through the system. Finally, MapR’s software typically runs with all logging and reporting enabled at all times. We didn’t turn any of it off during this benchmark.

Trying to set a new record competing against special-purpose sorting frameworks is a much tougher proposition.

Announcing the New Record

Using the Google Compute Engine, running on 2103 instances of the type n1-standard-4-d, MapR was able to sort 15 billion 100-byte records totaling 1.5 terabytes of data in 59 seconds. It is also a testament to the high quality and uniformity of the compute environment that Google graciously made available to us. The record was set on the Google Compute Engine shared virtualized environment that was not altered or cordoned-off or isolated in any shape or form.

An n1-standard-4-d instance consists of 4 virtual cores with 15G of DRAM and 1 virtual Ethernet NIC, and 1 virtual local disk of size 1.7T. On the Google Compute Engine, 2 virtual cores are equivalent to 1 physical core, so this setup used about 4206 physical cores. Of the 2103 virtual nodes, 3 nodes ran the CLDB processes, 1 ran the JobTracker, and the remaining 2099 nodes ran TaskTrackers.

Divvying up the Map-Reduce stages

We ran 1 mapper and 1 reducer per node, for a total of 2099 mappers and 2099 reducers. Assuming a uniform distribution of data across the nodes, each map task reads about 714MB of data from disk, sorts it, and writes 2099 partitions of approx. 341KB each. In the MapR Hadoop implementation, each partition is a separate file, so the process creates about 4.4 million files in 10 seconds.

Next, each reducer fetches its partition file from each of the 2099 nodes, running a 2099-way merge to combine them to produce its result. The process is called the shuffle phase in map-reduce.That’s about 4.4 million files of 341KB being read and transferred over the network in about 20 seconds.

Note that the reducer cannot start its merge until all the map outputs have been fetched. Thus, the shuffle can be broken naturally into a “fetch” stage, followed by a merge-sort phase.

Given the budget of 60 seconds, we allocated timings for each phase as follows:
Map 18 seconds
Fetch 20 seconds
Merge-sort 17 seconds
Straggler overhead 5 seconds
The stragglers are tasks that linger longer and finish much later than the majority of the tasks. A single straggler task among the 4198 tasks may take up an extra 8 seconds to finish, and can hold up the entire works and impact the final result.

These stragglers end up governing the overall speed of the process since they consume the most amount of time. In thermodynamics parlance: the rate of the reaction is governed by the slowest step in the reaction, called the rate-determining step of the reaction. We budgeted an extra 5 seconds of time for the stragglers – our rate determining step – to finish.

Initial Attempts

The first few attempts at sorting 1.5TB started coming in about 69-70 seconds, with the map-stragglers taking 25 seconds, and reduce-stragglers taking 25 seconds. We noticed that between each of the different phases in the table above the entire cluster went idle for 2-4 seconds, for a total idle-time of about 7-9 seconds. Eliminating any idle-time became the main goal of our approach.

Overlapping Map and Fetch

The first (and obvious) thing to try was to overlap the map phase with the fetch phase. Each mapper produced 2099 files, but the reducers do not start fetching data until the map is completely done. This is because the Hadoop framework distributes completion notifications on a task-boundary, with no visibility inside partially-completed tasks.

So we hacked the Hadoop JT/TT framework to handle partial map completions. Remember that each mapper produces about 2099 intermediate files, one for each reducer. We changed the task-completion protocol to include partial results; for example, a mapper could send a notification every time it wrote another 100 files.

The problem was that the reducer was still not getting notified quickly enough, because of the heart-beat (HB) mechanism between the JobTracker (JT) and TaskTracker (TT) that is used for notifications. By default on a cluster of 2100 nodes the HBs are sent out every 3 seconds. This meant that a reducer may not notice for up to 6 seconds that a mapper’s outputs were ready for processing. Reducing the HB was tricky, as the HB processing in the JT holds a global-lock. When the HB’s start coming in too frequently, the lock-contention starts to dominate and normal work comes to a halt.

We reorganized the JT code to do most processing outside the global lock, entering the lock only at the last moment when everything was ready to be added to the JT’s global data structures. The change let us reduce the HB to 800 milliseconds on the 2100 node cluster, thereby speeding up the notifications by 4x.

Although we had a good overlap between the mapper and reducer, we didn’t see much improvement in the overall completion time. The merges at the reducer couldn’t be started until all the data had arrived.

This code is still probably quite useful, so we’ve kept it around. It’s not clear if this tweak is even possible with the Apache Hadoop, as it takes advantage of some unique features of MapR to reduce the amount of data the JobTracker has to track for partial mapper intermediate results.

Multi-threaded Reduce

What we noticed was that during the merge-sort in the reduce phase, one of the 4 virtual cores was pegged at 100% busy while the other cores were idle. We decided to find a way to harness all the virtual cores, calculating that the overall elapsed time would thereby improve.

Sorting is single-threaded. However if the data structures can be broken into separate independent pieces, then the sorting can be farmed out to multiple cores, and then their results can be combined using a final merge.

The obvious way to parallelize the reducers is to increase the number of reducers per node from 1 to 4. But it would cause an explosion in the number of files to be processed: the total files that had to be created/written/opened/read/deleted in 20 seconds would go from 4.4 million to 17.6 million. The other problem is that the file-server also needs some cycles to run, so we decided to create only 3 map outputs instead of 4 to save some CPU cycles for the file-server, but send the map outputs in a single stream to the reducer.

At the map side, the mapper was already multi-threaded due to our previous work, sorting the map inputs in parallel. To allow the reducers to merge in parallel, we combined the output of 3 mappers into 1 stream, and sent it over to the reducers. At each reducer, the stream was separated back into the 3 maps outputs, and merged in parallel in 3 threads. Note that the Hadoop framework was already set up to work in this fashion, it just needed a little tweak to allow for the single stream transfer for multiple reducer inputs.

Again, it’s unclear if this tweak is even possible with the Apache Hadoop, for the same reasons that it uses some unique features of MapR.

With this change, the run time came down from 70 seconds to 62 seconds. We were almost home!

Other Minor Changes

Among the observations, we noticed that the file-server hosting the partition list was getting hit very hard. We watched the GUI Heatmap using the CPU profile view … this file-server would light up bright red throughout any run. Every map task on the 2099 nodes was reading this partition list file at exactly the same moment. We increased the replication factor for the volume holding the partition list to 6, and the file-server bottleneck disappeared. The map stragglers disappeared as a result. It saved us a precious second or two.

We also noticed that the comparator in the quick-sort on the map-side wasn’t doing as good a job as we’d liked. The comparator by default “caches” the first 4 bytes of every key in a 32-bit integer and does a raw comparison on that before looking the entire key. Given that we were running on 64-bit processors, we increased this cache size to 8 bytes per key, and the accuracy of the comparator increased significantly for almost no cost. This change saved us another second! The variable we introduced to play around with this is mapr.map.keyprefix.ints

Final Result

After these changes, we were able to reproduce the result of sorting 1.5 terabytes of data in less than 1 minute consistently across multiple runs and multiple cluster configurations, thanks to the consistent and uniform performance of the Google Compute Environment. We recorded a few runs, and have included the JobTracker’s output showing the results from one such run. The job.xml used to run this is also included in this report.

Hadoop Job job_201301281929_0028 on History Viewer

User: yufeldman JobName: TeraSort JobConf: maprfs:/var/mapr/cluster/mapred/jobTracker/staging/yufeldman/.staging/job_201301281929_0028/job.xml Job-ACLs: mapreduce.job.acl-view-job: All users are allowed mapreduce.job.acl-modify-job: All users are allowed Submitted At: 30-Jan-2013 10:07:09 Launched At: 30-Jan-2013 10:07:09 (0sec) Finished At: 30-Jan-2013 10:08:09 (59sec) Status: SUCCESS Analyse This Job
Kind Total Tasks(successful+failed+killed) Successful tasks Failed tasks Killed tasks Start Time Finish Time
Setup 0 0 0 0
Map 2099 2099 0 0 30-Jan-2013 10:07:10 30-Jan-2013 10:07:40 (29sec)
Reduce 2099 2099 0 0 30-Jan-2013 10:07:10 30-Jan-2013 10:08:09 (58sec)
Cleanup 0 0 0 0

 

Counter Map Reduce Total
Job Counters Aggregate execution time of mappers(ms) 0 0 34,737,511
Launched reduce tasks 0 0 2,099
Total time spent by all reduces waiting after reserving slots (ms) 0 0 0
Total time spent by all maps waiting after reserving slots (ms) 0 0 0
Launched map tasks 0 0 2,099
Data-local map tasks 0 0 2,099
Aggregate execution time of reducers(ms) 0 0 97,345,363
FileSystemCounters MAPRFS_BYTES_READ 1,500,000,239,286 1,560,140,985,632 3,060,141,224,918
MAPRFS_BYTES_WRITTEN 1,560,158,608,836 1,500,000,000,000 3,060,158,608,836
FILE_BYTES_READ 145,368,344 0 145,368,344
FILE_BYTES_WRITTEN 30,505,140 30,324,626 60,829,766
Map-Reduce Framework Map input records 15,000,000,000 0 15,000,000,000
Reduce shuffle bytes 0 1,560,088,116,020 1,560,088,116,020
Spilled Records 15,000,000,000 0 15,000,000,000
Map output bytes 1,530,000,000,000 0 1,530,000,000,000
CPU_MILLISECONDS 64,850,620 100,186,300 165,036,920
Combine input records 0 0 0
SPLIT_RAW_BYTES 239,286 0 239,286
Reduce input records 0 15,000,000,000 15,000,000,000
Reduce input groups 0 15,000,000,000 15,000,000,000
Combine output records 0 0 0
PHYSICAL_MEMORY_BYTES 4,678,213,836,800 2,450,143,838,208 7,128,357,675,008
Reduce output records 0 15,000,000,000 15,000,000,000
VIRTUAL_MEMORY_BYTES 10,563,378,290,688 15,050,043,244,544 25,613,421,535,232
Map output records 15,000,000,000 0 15,000,000,000
GC time elapsed (ms) 274,468 2,730,379 3,004,847

Configuration details  (job.xml)

Job Configuration: JobId - job_201301281929_0028

name value
mapred.job.shuffle.merge.percent 1.0
mapred.reduce.slowstart.completed.maps 0.0
mapred.reduce.parallel.copies 50
mapr.map.output.batch -1
mapred.tasktracker.reduce.tasks.maximum 1
mapreduce.maprfs.use.compression true
mapred.maxthreads.partition.closer 0
mapr.map.keyprefix.ints 2
mapred.committer.job.setup.cleanup.needed false
mapreduce.job.submithost m03-demo1010.c.mapr-demo.maprtech.com.internal
mapreduce.terasort.final.sync true
hadoop.proxyuser.root.hosts *
mapred.output.dir /t.data/sort
mapreduce.heartbeat.10 500
mapred.create.symlink yes
mapreduce.partitioner.class org.apache.hadoop.examples.terasort.TeraSort$TotalOrderPartitioner
mapreduce.input.num.files 2099
mapred.cache.files /t.datasource/parts/_partition.lst#_partition.lst
mapreduce.job.dir maprfs:/var/mapr/cluster/mapred/jobTracker/staging/yufeldman/.staging/job_201301281929_0028
mapred.queue.default.acl-administer-jobs *
mapreduce.job.cache.files.visibilities true
mapred.cache.files.timestamps 1359569226469
mapreduce.heartbeat.1000 500
mapred.reducer.new-api true
mapred.job.reduce.input.buffer.percent 1.0
mapred.input.dir maprfs:/t.data/gen
group.name yufeldman
mapreduce.heartbeat.10000 500
partition.path /t.datasource/parts/_partition.lst
mapreduce.inputformat.class org.apache.hadoop.examples.terasort.TeraInputFormat
mapred.inmem.merge.threshold 0
mapred.jar /var/mapr/cluster/mapred/jobTracker/staging/yufeldman/.staging/job_201301281929_0028/job.jar
mapreduce.outputformat.class org.apache.hadoop.examples.terasort.TeraOutputFormat
mapred.mapgroupsize 3
mapred.reduce.child.java.opts -Xmx6000m
mapreduce.job.submithostaddress 10.240.16.198
mapred.reduce.tasks.speculative.execution false
mapred.map.tasks.speculative.execution false
fs.mapr.working.dir /user/yufeldman
mapred.job.shuffle.input.buffer.percent 0.8
user.name yufeldman
mapreduce.tasktracker.outofband.heartbeat true
mapred.cache.files.filesizes 69256
mapred.job.name TeraSort
mapred.reduce.tasks 2099
maprfs.openfid2.prefetch.bytes 0
mapred.output.value.class org.apache.hadoop.io.Text
mapred.working.dir /user/yufeldman
io.sort.mb 2000
dfs.replication 1
mapred.fairscheduler.locality.delay 100000
hadoop.proxyuser.root.groups root
mapred.output.key.class org.apache.hadoop.io.Text
mapred.tasktracker.map.tasks.maximum 1
mapred.map.child.java.opts -Xmx4000m
mapred.maxthreads.generate.mapoutput 10
mapreduce.heartbeat.100 500
mapred.mapper.new-api true
mapreduce.tasktracker.prefetch.maptasks 0.0
mapred.used.genericoptionsparser true
mapred.map.tasks 2099
mapreduce.tasktracker.group mapr