Immediate MapReduce on Continuously Ingested Data


Running MapReduce jobs on ingested data is traditionally batch-oriented: the data must be first transferred to a local file system accessible to the Hadoop cluster, then copied into HDFS with Flume or the "hadoop fs" command. Only once the transfers are complete can MapReduce be run on the ingested files.

What if you wanted to run MapReduce on ingested data, coming directly from its source application, without modifying the source application or waiting until ingest is complete? What if rather than waiting for a lengthy ingest to complete, you could run a series of MapReduce jobs on the data in near-real-time, only processing the newest data as it arrives?

This article describes how this can be accomplished using NFS, and uses the WordCount program to demonstrate this feature.

Random Read-Write NFS Access

Random Read-Write NFS capability on Hadoop (provided by MapR) allows any machine with NFS client software to “mount” the Hadoop cluster into the local file system tree. NFS client machines can be running Linux, Unix, Macintosh, Windows, or any other operating system that supports NFS version 3. Mounting the cluster via NFS provides the following capabilities:

  • All existing applications can read and write data directly from/to the Hadoop distributed file system (completely transparent to the application – it’s just POSIX I/O!)
  • All applications have complete read/write random and concurrent access, meaning Hadoop is no longer write-once or append-only
  • Inside the Hadoop cluster, the data is immediately available for processing while it’s streaming in
In other words, now we can take data directly from its source application outside the cluster into Hadoop without intermediate steps and added complexity. The next step is to run MapReduce on the data as it’s streaming into the cluster.

Running MapReduce on Input Files While Being Ingested

To get some intermediate MapReduce results on the data as it’s being ingested, we want to run MapReduce repeatedly on the same input files, each time skipping over the previously-processed sections of the file, and accumulate the intermediate results for aggregation at the end of the ingest. In order to accomplish this, information has to be persisted that tracks which sections of the input files have already been processed—Hadoop breaks up the input files into splits, which are the obvious choice for sections to track. By the way, MapR refers to this as chunk size; a chunk defaults to 256MB, but can be easily tuned at the directory level.

The Atlantbh blog has a great entry on one method for performing this persistence:

The idea here is to override some key MapReduce methods and use the SplitID as the unit of persistence. A SplitID contains the following data:

  • Pathname of input file
  • Starting offset
  • Length of split

We can override methods to accomplish the following:

  • Persist the SplitID to a sub-directory of the MapReduce job output directory once the Map task successfully completes
  • Pre-process the list of splits passed to the MapReduce jobs, looking up each split in our persisted sub-directory
  • Bypass the usual MapReduce check for pre-existing output directory, to allow multiple job runs using the same output directory
  • And finally, the main() method needs to be overridden in order to set up the above overridden methods as part of the job initialization

In other words, without changing the original source code, a new Java class can be written that extends the original MapReduce application class, and this new MapReduce application can be run repeatedly as the files are streaming in. The flow would look like this:

While (file is still being streamed in)
     hadoop jar newjob.jar newapp input output
     Save off results file from output/part*
The remainder of this blog describes how this can be demonstrated with the WordCount program that is distributed with Hadoop, along with a test program that can be used to ingest data over NFS into the WordCount MapReduce job.

Demonstrating the Solution Using WordCount

Code was borrowed from the Atlantbh website and integrated into the canonical WordCount Hadoop program. The key ingredient here is how the processed SplitIDs are persisted with the use of something called Hadoop task side-effect files. Task side-effect files are used when MapReduce jobs want to output additional data from MapReduce tasks, but only do so upon successful completion of the task.

The source code for WordCount used in this demonstration is available in the following directory on a MapR node:


The WordCount class does not need to be modified – instead it was extended by a new class called WordCountNew. This new class contains the necessary code that persists the split information for each split processed into the output directory. Using this approach, other MapReduce programs can be similarly enhanced to use this ingest feature.

A set of scripts and some driver code was also written to wrap all of this up into a complete demonstration package. First, a C program was written that simulates an application that is writing data directly into the Hadoop cluster using an NFS mount. It simply writes a known pattern so that it's easy to check whether WordCount is getting the correct answer. It could also copy a given file to the cluster for input. The C program also has options for inserting a delay to demonstrate on slow VMs running on laptops, for example.

We could also use a simple ‘cat’ or ‘cp’ or even drag-and-drop files into the input directory, but for demo purposes on a single laptop VM, we wanted to slow down the ingest so that we could keep up with the ingest rate with our MapReduce job.

As the data is streaming, WordCountNew is run on the input directory a number of times. After each MapReduce job, the output file that contains the results from the latest ingested portion of the file is saved off for final aggregation. This is simply a matter of rolling over the outputdir/part_r_00000 file into a unique file name such as output-pass.0, output-pass.1, etc. Once no more new data is detected in the input file, WordCountNew is run one final time, passing in a property that tells it to process any remaining data at the end of the file.

This results in several output files, each showing the results of each pass.

Finally, in order to aggregate the output from each pass, a simple MapReduce program called WordCountFinal takes the output directory as its input, and comes up with a final WordCount. This last job is very fast since we don’t have to process the potentially large input file, but instead process the results of each pass.

Here are the contents of the source code bundle, as well as instructions on how to build and run the demo:

Make file to build the C program for streaming in the data via NFS.

Program that can either stream in a known pattern, or an input file, inserting delays to slow the rate of ingest for demo purposes.

A 20MB input file that can be used for the demo. It's basically many copies of the US Constitution in text form.
Script that streams the data in over NFS (we used a Mac for this). Here is a sample run:
# ./text_stream -I const4.txt -f \
/mapr/ -i 0 \
-l 20 -c 32k -d 250000
Simply put, it writes the local constitution file into the input directory of the mapr user on the cluster, 32KB chunks at a time, inserting 250ms delays. Note the NFS path that it is writing to. This is the beauty of MapR Direct Access NFS™. No Java code must be written or configured to get data directly into the cluster. Any POSIX program can access data in the cluster.
This is the new class that extends the sample WordCount program that comes with Hadoop, modified to do streaming input. The following methods were overridden to make this work:
public static void main(String[] args) throws Exception {
     This is the main() method from the original, with a 
     few changes to pass the below overridden classes to Hadoop.
protected void setup(Mapper.Context context){
     Gets the split ID and persists it to the output directory
     under .meta/splits/input-file-path_offset_length
public void checkOutputSpecs(JobContext job)
     Overridden to remove the check for pre-existing output
     directory - necessary to allow repeated MR runs
public List<InputSplit> getSplits(JobContext job) throws IOException {
     TextInputFormat method to strip out already-processed splits
These are the methods used to persist the processed splits and to strip out processed splits:
public static void saveInputSplitId(Mapper.Context context, String splitId)
public static List<InputSplit> filterProcessedSplits(List<InputSplit>inputSplits, JobContext job) throws IOException {
This is the MR job to run at the end, to aggregate the intermediate word count outputs, and it uses the output directory of the intermediate runs as input.
This script runs the demo. Once the data is streaming in, run this script inside the cluster. It assumes the cluster is also NFS-mounted on the node from which the script runs. Assuming the MapR NFS Gateway service is running on this node, mount it like this:
$ sudo mount -o nolock,actimeo=1 localhost:/mapr /mapr

On the cluster, you can run the script as follows:
$ ./

The script is run from the same location as the jar file that contains the compiled code. After each pass through the file being streamed in, it appends the top 10 words from that pass to /tmp/wc.passes. While this is running, you can watch the results as the jobs complete:
$ tail -f /tmp/wc.passes

This is the hadoop job that is run on each pass through
$ hadoop jar wc.jar WordCountNew input output
When sees no changes to the input file, it assumes it's finished streaming and runs one last pass, processing any partial chunk at the end that may have been skipped. This is the aggregation pass at the end. Note it used the above output directory as input, since we saved off each iteration's results:
$ hadoop jar wc.jar WordCountFinal output output-final

It then shows the aggregated top 10 words.
Lastly, it runs a diff to be sure the results are identical to the original non-streaming wordcount job, which was previously run and saved off into a directory called output-orig-wordcount.
This script is invoked by and rolls over the output files (part_m_00000) so we can aggregate at the end with

Compiling Java
To compile the Java code and build the wc.jar file:
$ mkdir wc_classes
$ javac -classpath `hadoop classpath` -d wc_classes \  
/opt/mapr/pig/pig*/test/org/apache/pig/test/utils/  \ Word
$ javac -classpath `hadoop classpath`:wc_classes -d \
$ javac -classpath `hadoop classpath`:wc_classes -d \
$ jar cvf wc.jar -C wc_classes .

Output of WordCountNew
Here is what the output directory looks like after several passes:
$ pwd
$ ls -la
total 119
drwxr-xr-x 4 mapr mapr   9 Jan 17 09:29 .
drwxrwxrwx 33 mapr mapr  41 Jan 17 09:32 ..
drwxr-xr-x 3 mapr mapr   1 Jan 17 09:26 _logs
drwxr-xr-x 3 mapr mapr   1 Jan 17 09:26 .meta
-rwxr-xr-x 1 mapr mapr 19286 Jan 17 09:26 output.pass-1
-rwxr-xr-x 1 mapr mapr 19595 Jan 17 09:26 output.pass-2
-rwxr-xr-x 1 mapr mapr 19624 Jan 17 09:27 output.pass-3
-rwxr-xr-x 1 mapr mapr 19595 Jan 17 09:28 output.pass-4
-rwxr-xr-x 1 mapr mapr 20709 Jan 17 09:28 output.pass-5
-rwxr-xr-x 1 mapr mapr 18990 Jan 17 09:29 output.pass-6
-rwxr-xr-x 1 mapr mapr   0 Jan 17 09:26 _SUCCESS
Each output.pass-N file is a copy of the MR job output file (part_r_00000). Note the .meta directory. This is where the process split information is persisted:
$ cd .meta
$ ls -l
total 1
drwxr-xr-x 2 mapr mapr 20 Jan 17 09:29 splits
$ cd splits/
$ ls -l
total 0
-rwxr-xr-x 1 0 Jan 17 09:26 maprfs:_user_mapr_input_testfile:0+1048576
-rwxr-xr-x 1 0 Jan 17 09:28 maprfs:_user_mapr_input_testfile:10485760+1048576
-rwxr-xr-x 1 0 Jan 17 09:26 maprfs:_user_mapr_input_testfile:1048576+1048576
-rwxr-xr-x 1 0 Jan 17 09:28 maprfs:_user_mapr_input_testfile:11534336+1048576
-rwxr-xr-x 1 0 Jan 17 09:28 maprfs:_user_mapr_input_testfile:12582912+1048576
Each empty file contains the name of the input file, the offset to the start of the split, and the length of the split. It’s essentially the SplitID. 1 MB split sizes were used to make the demo easier to run on a slow VM, so this was done to the input directory before streaming the testfile in:
$ hadoop mfs -setchunksize 1048576 input

Debug Messages
As the job runs, it shows messages about what splits have already been processed and skipped, and what splits will be processed on this run. For example:
************* filterProcessedSplits - splitsDir is output/.meta/splits
Input file is maprfs:/user/mapr/input/testfile, chunksize is 1048576
Look for previously processed splits...
split already processed :maprfs:_user_mapr_input_testfile:1048576+1048576
split already processed:maprfs:_user_mapr_input_testfile:0+1048576
Get unprocessed splits...
   Unprocessed split:2097152/1048576
   Unprocessed split:3145728/1048576
   Unprocessed split:4194304/1048576
   Unprocessed split:5242880/1048576
   Unprocessed split:6291456/1048576
   Unprocessed split:7340032/1048576
   Unprocessed split:8388608/1048576
PARTIAL SPLIT - start is 9437184, length is 655360, do not process

With MapR's Direct Access NFS feature, MapReduce can be run in near-realtime on the data as it's being ingested directly from its source, and do so reliably, without having to wait until the stream ends and the file is closed.
Best Practices