Understanding MapReduce Input Split Sizes and MapR-FS Chunk Sizes

Improving performance by letting MapR-FS do the right thing
 
The performance of your MapReduce jobs depends on a lot of factors. In this post, we'll talk about the relationship of MapReduce input split sizes and MapR-FS chunk sizes, and how they can work together to help (or hurt) job execution time.
Let's say we have a 10GB file that we need to process with Map/Reduce. We could choose to use the standard WholeFileInputFormat class in which case the entire 10GB file would be processed by a single map task.
Obviously if we want to lower the run time of the map/reduce job, we want more than one task running concurrently. So we could alternately choose the FileInputFormat and we can set a parameter indicating that the input files are splittable, meaning we can run multiple map tasks against different parts of a single input file. This is referred to as a split size. If we set the split size to be 1GB then we would end up with 10 map tasks against the single 10GB file with each map task processing a unique 1GB range of the file data.
In some cases, the map tasks will need to sort the input data in order to prepare it for the reduce tasks to consume. When a sort is required, it's important for the map task to be able to sort the input data in the memory available to the map task attempt JVM. If a map task doesn't have enough memory, then it will need to leverage the hard drives of the node where it runs to execute the sort. For example, a map task needs to sort 1GB of input data but has only 100MB of memory available in which to execute the sort. The map task will begin to read in the records in the input data, sorting them in memory, until it has read in 100MB worth of records. At this point, the map task will write out the 100MB of sorted content to disk and will proceed with reading and sorting the next 100MB, writing that to disk, and repeating until all 1GB has been read back and sorted. Once the map task has read all the input data and created sorted segments of 100MB, it will begin to read back the records from each of those segments that is stored on disk and again it will write the records back to disk in the final sorted order. This means that to process the 1GB of input data, it will do about 2GB worth of disk reads and 2GB worth of disk writes. This is inefficient compared to reading the entire input data into memory once and writing the entire sorted content out to disk once. You can determine whether this type of inefficient operation is occurring by reviewing the counters for map tasks. If the counters show that the map task read X input records and spilled X records to disk then the execution of the map task was efficient. However, if the number of spilled records is significantly more than X it means that the execution of the map task was inefficient.
So what can be done to improve efficiency if you find that you are spilling an excessive number of records? You can reduce the size of the input split such that the entirety of the input split can be sorted in the memory available to the map task attempt JVM. Alternately, you can increase the amount of memory available to the map task JVM to accommodate the size of the input split.
Additional extremely important considerations when setting the input split size are data locality and chunk size of the input files. By default, in MapR-FS, files are broken up into 256MB chunks. So a 10GB file would consist of 40 chunks of 256MB each. Each of those chunks can be stored on a different set of servers, and indeed it is highly likely that an individual 10GB file will have its 40 chunks spread out among many different servers. For the first 1GB of the file, you may have chunk 1 on nodes 1-3, chunk 2 on nodes 4-6, chunk 3 on nodes 7-9 and chunk 4 on nodes 10-12. Thus, if you run a map/reduce task against the first 1GB of a file, it is likely that there is no single node in the cluster that has the entirety of that 1GB on its local disk drives. This impacts performance because the map task will need to hop across the network and read at least some of the input data from other nodes.
Further, in many clusters, the network has less throughput capacity than the aggregate throughput of the hard drives on the nodes. This means you will hit a performance bottleneck based on the network throughput capacity as opposed to the disk throughput capacity. And if you bottleneck on network bandwidth during the map phase then you are also likely taking away bandwidth from reduce tasks that, by design, must read data over the network. Thus, the map phase goes slower and the reduce phase of other running jobs will also go slower.
If you use a 256MB chunk size, the default in MapRFS, then it is better to set your input split size to 256MB instead of 1GB. As map tasks are preferentially scheduled on nodes that have the input data for the task, with a 256MB input split size, the input splits will line up with single chunks of file data in MapRFS and the map task that processes an 256MB input split is going to be preferentially scheduled on a node that has that chunk of data stored locally on its hard drives, thus no network bandwidth is consumed during execution.
So unless you have specifically loaded your files with a 1GB chunk size by overriding the MapRFS default, then using a 1GB split size for your map/reduce job will generally result in lower performance vs. a 256MB split size.
Key takeaways:
  • Running multiple map tasks against a large input file can reduce the duration of the map phase
  • The amount of input data processed by a map task should be able to be sorted in the amount of memory given to the map task attempt JVM
  • When processing large input files, the input split size for a map/reduce job should line up with the chunk size of the input file data as stored in MapRFS
 
no

Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams

 

 

 

Download for free