In this blog post, we introduce the concept of using non-Java programs or streaming for MapReduce jobs. MapReduce’s streaming feature allows programmers to use languages other than Java such as Perl or Python to write their MapReduce programs.
You can use streaming for either rapid prototyping using sed/awk, or for full-blown MapReduce deployments. Note that the streaming feature does not include C++ programs – these are supported through a similar feature called pipes.
Be aware that streaming may introduce some performance penalty:
- Framework still creates JVMs for tasks
- Scripted programs may run more slowly
Streaming may improve performance for example:
- Code doing map and reduce functions may perform better than Java
Working with the MapReduce Streaming Feature
The image above shows how the MapReduce framework uses the streaming feature.
There is a mapper called PipeMap that runs inside a JVM which calls your map program. Similarly, there is a reducer called PipeReduce that runs inside a JVM which calls your reduce program. All key-value pairs are sent and received through the standard input and output channels of your map and reduce programs.
The reference to a “pipe” in the names of these 2 classes comes from the fact that we are connecting standard input and standard output streams from one process to another.
Above is a more detailed description of the data flow in a MapReduce streaming job. The PipeMap task processes input from your input files/directories and passes them to your script as standard input. Your map function processes key-value pairs one record at a time in an input split (just as in a normal MapReduce job). You write your output to standard output which is wired into the standard input of the PipeMap task. The PipeMap task then processes intermediate results from your map function, and the Hadoop framework sorts and shuffles the data to the reducers.
The same data flow mechanism occurs now on the reduce side. PipeReduce sends these intermediate results to its standard out, which is wired to the standard input of your reduce script. After your reduce script processes a record from standard input, it may write to its standard output (which is wired to the PipeReduce standard input.) The PipeReduce program will then collect all the output and write to the output directory.
Testing the MapReduce Streaming Framework
The streaming invocation above shows how you can quickly test the MapReduce streaming framework.
The script defines some environment variables just to make it easier to read within the screen shot (i.e., $THEJARFILE). In this example, we are simply using the /bin/cat command as a mapper to cat the local /etc/passwd file to the reducer (also the /bin/cat command). The output from this job is being sent to the "./streamOut0" directory.
Input, Output, and PipeMap
This is how the Hadoop framework passes input and output through the MapReduce streaming job launched in the previous slide.
The PipeMap reads the /etc/passwd file from the local file system and passes it to the /bin/cat command which doesn't do any transformation of the data. It simply outputs what it gets as input. The framework then performs the same shuffle and sort on the mapper output which is sent to the PipeReduce JVM.
The PipeReduce passes the intermediate results to the /bin/cat command which also doesn't perform any transformations or reductions and simply outputs the input to the specified output directory. The results in that output directory are identical to "standard" MapReduce jobs, so it contains:
- _SUCCESS file
- _logs directory
- part-xxxxx files
The slide above depicts the input file contents (local /etc/passwd file) as well as the output file from the streaming MapReduce job. Note that while the contents of both the input and output file are identical, the order of the output is changed; more specifically, it is sorted based on the first string in each record of the file.
Also note the name of the output file is part-00000. This indicates that the streaming API uses the mapred package.
Programming Contracts for Mappers and Reducers
In a MapReduce streaming job, the mapper will continue to get records (key-value pairs) on the standard input stream until the OS closes the stream (end-of-file). Your mapper code will generate key-value pairs and emit them in the format (key \t value \n).
When there are multiple mappers (e.g. multiple input splits), then the intermediate results are hashed based on the key and partitioned. Your reduce program will received keys in sorted order along with their associated values, one value at a time. This is a different model than in the Java reducer paradigm in which the reducer method is called with a single key and the associated list of values at once. Your reducer code will need to account for this by checking whether the key has changed
You can configure the key-value separator (default is \t) using the key.value.separator.in.input.line parameter. Configure the input and output field separator in the map using stream.map.input.field.separator and stream.map.output.field.separator parameters (default is any white space). Last, you can configure the key-value separator in the reducer output files by configuring the mapred.texoutputformat.separator (default is \t).
MapReduce streaming mapper function loops through and reads standard input, cleans up the line, extracts the key and value, performs any operations per your program logic, and then writes key and value (separated by \t) to standard output
This Perl code is a sort of "identity" mapper in the sense that it doesn’t perform any transformations or filtering of the input data. It loops through standard input until the operating system closes the stream (end of file). For each record, it removes the last character (\n), and defines key and value variables as output to the split method (split on \t). It then just prints "key:" followed by the key and "value:" followed by the value to standard out. Note the key and value strings are separated by the tab character, and the key-value pair is terminated by a newline character. This constitutes the intermediate results of the streaming job.
The commands here show how to prepare for and launch a MapReduce streaming job in python. Note that you can pass the “usual” generic hadoop job options when submitting a streaming mapreduce job to the framework. In this case, we are requesting that 2 reduce tasks get launched in this job.
Monitoring and Debugging MapReduce Streaming Jobs
Here are some general debugging tips when writing streaming MapReduce code. Note there isn't a lot for you to do – unlike with Java MapReduce code.
Ensure that your mapper and reducer script can run on it's own by feeding it input on standard in. As with any program, you should test with bad data (i.e. is not formatted according to what your map and reduce scripts expect). Test the map and reduce functions in the hadoop framework by using "identity" mapper and reducers accordingly. Last, you can update counters and status from within your mapper and reducer scripts.
To use and update counters and status from within your map and reduce scripts you can send from streaming code. The Hadoop framework listens for output on the standard error stream of your map and reduce scripts and looks for strings beginning with the string "reporter:counter" and "reporter:status". Each update to the reporter must appear on its own line.
This is effectively the extent of debugging when programming in streaming.
The string has the format:
reporter:counter: group, counter, amount
Now that we've introduced the concept of using non-Java programs or streaming for MapReduce jobs, be sure to check out our free Hadoop On-Demand Training for our full-length courses on a range of Hadoop technologies.
If you have any questions, please submit them in in the comments section below.