Sorting Daily Files with Larger Reference Files: It’s Easier than You Think

A person I know recently asked for advice about an interesting but very common scenario. He wanted to join a large reference file that didn’t get updated very often with a somewhat smaller file that was updated daily, but he wasn’t quite sure how to go about it in Apache Hadoop.

The scenario of interest here is when the reference file is too large to fit in memory. If it can fit into memory easily, then the join in question is better done using a memory-based hash join, possibly using NFS and mmap to minimize the time spent reading the reference file. When the reference file is very large, however, the design options become a bit trickier.

You basically have four options for sorting the daily file:

1) Sort the daily file and merge in the reducer. This option tends to have poor locality in the reduce side, but the sort should be pretty fast. You’ll need some landmarks in the reference file to be able to seek to the right places. Total input is daily + reference file, shuffle is daily size, and output is the merge result. Keep in mind that the non-local I/O can be roughly the size of the reference file. This option has the fewest map-reduces, but it suffers because of the large amount of non-local I/O. In some special cases, this option can work well. For instance, if the reference file has similar arrangement as the input file, you may not need to read from the entire reference file in every reducer. The I/O is still non-local, but it may be much smaller than if every reducer has to read the entire reference file.

2) Sort the daily file in one step and store to a tmp table along with some index information to assist in seeking. After this step, you’ll need to merge with a map-only step. Input is daily + reference, shuffle is daily size, and output is merge result + sorted copy of daily file. Non-local I/O can be limited to the size of the daily file by using the reference file to drive the second step.

This option is usually considerably faster than the first option and only requires standard map-reduce steps.

3) Sort the daily file and segment based on the segmentation of the reference. Copy the segments so that they are co-resident with the segments of the reference. Do a map-only merge. This is similar to option #2, but it depends on being able to control the placement of replicas. That’s easy to do in MapR, but not in other distros. By putting pieces of the daily file close to corresponding pieces of the reference file, it is possible to the actual merge by using only local I/O to read the reference and daily files. This can massively decrease the amount of network traffic required.

4) Segment the daily file on ingestion to match the segmentation of the reference, and store the segments collocated with the segments of the reference file. After this step, do (1) on each segment, and limit the execution to machines containing replicas of that segment of the reference file. This allows essentially the same processing speeds of (3), except that essentially all I/O is local. Again, this is pretty easy on MapR and hard to do on other Hadoop distros. This option is particularly attractive on legacy clusters limited to slower networking technologies.

This last option is considerably faster than the others. One of our customers uses this method and claims more than an order of magnitude speedup. Since this merge is a very large fraction of their daily workload, this technique has allowed them to process much more data using MapR with the same computers and networking systems than they were able to process using another Hadoop distribution.

It requires more capability than ordinary distributions can provide, however.

Which option you pick depends a lot on the details of your particular data, but it also depends critically on which Hadoop distribution you use. If you are limited to using HDFS for storing your data, your options are substantially limited and you won’t be able to get nearly as high a performance level from your cluster. If you are using MapR instead, you can use any of the methods above and will be able to use your cluster much more effectively.

Have any questions about these options? Post them here and I would be happy to answer them.

Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams




Download for free