Leverage Existing File-based Applications with Hadoop

Overview

Hadoop provides a compelling distributed platform for processing massive amounts of data in parallel using the Map/Reduce framework and the Hadoop distributed file system. A JAVA API allows developers to express the processing in terms of a map phase and a reduce phase, where both phases use key/value pairs or key/value list pairs as input/output. Each input file consists of one or more blocks, or chunks, that are distributed across the Hadoop cluster, and these chunks are processed by the map phase in parallel, using data locality when possible by assigning the map task to a node that contains the chunk locally.

But what if you have an existing application that processes large amounts of files, and you want to leverage the scale and economy that Hadoop offers, without re-writing the applications for Hadoop? Or what if these applications cannot process chunks of a file, but need to process the entire file as a whole? Hadoop does have a facility called Hadoop Streaming that can allow existing applications to run, however those applications must be able to read a chunk of the input data from standard input and cannot for example seek to random locations in the file. And finally, what if the application needs to modify the input files? This is not possible with other Hadoop distributions because of the append-only nature of the Hadoop distributed file system.

By using MapR’s enterprise grade distribution of Hadoop, applications that process large volumes of files can be run unmodified in Hadoop. These applications can be written in any language, and by using some basic Map/Reduce “wrapper” code described in this article, and by leveraging MapR’s Direct Access NFS™ feature, significant cost savings can be realized by eliminating the need to rewrite potentially huge investments in legacy software.

As an example, let’s assume an image processing application written in C++ needs to process thousands or even millions of files. The application program takes a file path name as an argument, and many of these programs can be invoked in parallel to take advantage of multi-core nodes. The output of the application can be new files, or even modifications to the input files. Using the techniques described here, Hadoop can be used to accomplish this without re-writing the application and still benefit from the parallelism, scale, and economy that Hadoop brings.

MapR Direct Access NFS™

A unique feature only found in MapR is the ability to mount the MapR distributed file system to any NFS client. That client can be Windows, Linux, UNIX, Mac, or any server that provides the NFS V3 client. Also, MapR’s file system is POSIX-compliant and supports full read/write random IO, unlike all other Hadoop distributions that provide a file system that has a Java-only interface and is write-once/append-only. One could attempt to do something similar on other Hadoop distributions with NFS FUSE, however it still is restricted to append-only, it performs poorly, and would have to be installed on all nodes in the cluster.

Using the NFS feature, all nodes of a MapR cluster can run a NFS server and each node can mount the cluster locally via this command:

# mount localhost:/mapr /mapr

This means that existing applications, written in any language, can directly process files stored in the MapR distributed file system. The rest of this article will show how this can be leveraged to process large amounts of data with existing applications in Hadoop.

High-level Architecture

In order to leverage existing applications using the Hadoop Map/Reduce framework and MapR’s distributed file system, we need a small Java “wrapper” that does the following:

  • Create a new class that extends the FileInputFormat class, which tells MapReduce to process the file as a whole, rather than splitting it into chunks
  • Optionally calculates which node contains most of the chunks of the file in order to improve data locality
  • Create a new class that extends the RecordReader class, which passes the input file name to the Map task as the value, with a null key
  • Invokes the existing application from the map task, passing in the NFS path of the input file as the command-line argument
  • Provide a simple Reduce task that writes the input file name and a “status” value indicating the file was processed (optional)

This can be implemented in about 200 lines of Java code, which can be easily adapted for any legacy application that takes an input file path as an argument for processing. Let’s now take a look at the implementation.

Implementation

We start out by creating a new FileInputFormat class that will work for any type of input file. The O’Reilly book titled “Hadoop The Definitive Guide” has an example of such as class that we can use here with some minor changes. It’s called WholeInputFileFormat, and is designed to read in the entire file into a buffer, and pass that buffer as the value for the map task. Since our legacy application will be handling the entire processing of the file, we want to modify WholeInputFileFormat to simply pass in the file path as the value to the map task. Note that the key is not used, so it’s defined to be of type NullWritable. Refer to the O’Reilly book for details on these classes.

WholeFileInputFormat.java:

import java.io.IOException;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.*;

 

public class WholeFileInputFormat

    extends FileInputFormat<NullWritable, Text> {

 

  @Override

  protected boolean isSplitable(JobContext context, Path file) {

    return false;

  }

 

  @Override

  public RecordReader<NullWritable, Text> createRecordReader(

      InputSplit split, TaskAttemptContext context) throws IOException,

      InterruptedException {

    WholeFileRecordReader reader = new WholeFileRecordReader();

    reader.initialize(split, context);

    return reader;

  }

}

The next class we need to implement is the WholeFileRecordReader class that will actually invoke the map task with the null key and file path value. Again we use the O’Reilly WholeFileRecordReader example, with some minor changes:

 

WholeFileRecordReader.java:

 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

class WholeFileRecordReader extends RecordReader<NullWritable, Text> {

 

  private FileSplit fileSplit;

  private Configuration conf;

  private Text value = new Text("");

  private boolean processed = false;

 

  @Override

  public void initialize(InputSplit split, TaskAttemptContext context)

      throws IOException, InterruptedException {

    this.fileSplit = (FileSplit) split;

    this.conf = context.getConfiguration();

  }

 

  @Override

  public boolean nextKeyValue() throws IOException, InterruptedException {

    if (!processed) {

      Path file = fileSplit.getPath();

      // Set the value to the file name

      value =  new Text(file.toString());

      processed = true;

      return true;

    }

    return false;

  }

 

  @Override

  public NullWritable getCurrentKey() throws IOException,

                 InterruptedException {

    return NullWritable.get();

  }

 

  @Override

  public Text getCurrentValue() throws IOException,

                 InterruptedException {

    return value;

  }

 

  @Override

  public float getProgress() throws IOException {

    return processed ? 1.0f : 0.0f;

  }

 

  @Override

  public void close() throws IOException {

    // do nothing

  }

}

The last class we need to implement is the main program that sets up the MapReduce environment and implements the map task which invokes our legacy application. It’s quite straightforward. There are 2 key steps in the Mapper map() method:

  • Convert the Hadoop input file path (which is the value passed to this method) into an NFS path required by our non-Hadoop application. For example, the input value for a file could be:
  • maprfs:/user/foo/input/fileXYZ
  • We want to convert that to a NFS path for our application:
  • /mapr/my.cluster.com/user/foo/input/fileXYZ
  • Invoke the legacy application via the Java Runtime exec() method. For example, let’s assume the application is invoked from a shell script (which is conveniently stored in the MapR-FS and accessed by its NFS path so all nodes can access it):
  • Runtime.getRuntime().exec(“/mapr/my.cluster.com/bin/myscript.sh ” + nfsfilepath );

Here is the code:

TestWholeFileInputFormat.java:

 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

 

public class TestWholeFileInputFormat {

    public static class MyMapper

    extends Mapper<Object, Text, Text, IntWritable>{

 

        /* (non-Javadoc)

         * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)

         */

 

        public void map(Object key, Text value, Context context)

                    throws IOException, InterruptedException {

 

        String filepath = value.toString().substring(7);

        String nfsfilepath = "/mapr/my.cluster.com" + filepath;

        System.out.println(" MY Mapper: filepath is " + filepath + ", nfs path is " + nfsfilepath);

 

        Runtime.getRuntime().exec("/mapr/my.cluster.com/user/mapr/myscript.sh " + nfsfilepath );

 

        IntWritable status = new IntWritable();

            Text word = new Text(nfsfilepath);

        status.set(1);

            context.write(word, status);

        }

    }

 

    public static class MyReducer

    extends Reducer<Text,IntWritable,Text,IntWritable> {

        public void reduce(Text key, Iterable<IntWritable> values,

                Context context

        ) throws IOException, InterruptedException {

            for (IntWritable val : values) {

                context.write(key, val);

            }

        }

    }

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length < 2) {

            System.err.println("Usage: TestWholeFileInputFormat  <in> <out>");

            System.exit(2);

        }

        Job job = new Job(conf, "Whole file input format");

        job.setInputFormatClass(WholeFileInputFormat.class);

 

        job.setJarByClass(TestWholeFileInputFormat.class);

        job.setMapperClass(MyMapper.class);

        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

Compiling and Running

We can compile and run the program on a directory of input files as follows:

$ mkdir wf_classes

$ javac -classpath `hadoop classpath` -d wf_classes TestWholeFileInputFormat.java

$ jar cvf wf.jar -C wf_classes .

$ hadoop jar wf.jar TestWholeFileInputFormat \

   -Dmapred.map.tasks.speculative.execution=false  input-dir output-dir

Note the –D option where we disable speculative execution. This is to prevent concurrent invocations of the application program on the same input file.

Output Files

The application uses an output directory for reduce phase output to indicate processing status for each file, and is free to write other output anywhere in the MapR distributed file system. The key here is that the path name chosen for the output files is in the NFS-mounted directory. The application is also free to modify the input files directly.

Another important factor is handling node failures. Map tasks that fail are automatically restarted. This means the application should be prepared to handle the case where a file may have been partially processed in a previous invocation (in other words, exhibit the property of idempotence).

Data Locality

One of the major design considerations of Hadoop is moving the processing to the node containing the storage. Since we’re processing the entire file as one record, how do we get the best locality? One approach would be to set the input chunk size to be large enough that the entire file fits in one chunk. If that’s not practical, we can modify our WholeFileInputFormat class so that the file splits use the node that contains the majority of the file chunks, simply by iterating over all splits and summing up the location of every chunk. This is accomplished by overriding the getSplits() method in the WholeFileInputFormat class:

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.HashMap;

 

import org.apache.hadoop.fs.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.*;

 

import org.apache.hadoop.fs.BlockLocation;

 

public class WholeFileInputFormat

    extends FileInputFormat {

    static final String NUM_INPUT_FILES = "mapreduce.input.num.files";

 

  @Override

  public List getSplits(JobContext job) throws IOException {

        List splitlist = super.getSplits(job);

        FileSystem hdfs = FileSystem.get(job.getConfiguration());

        List splits = new ArrayList();

        Listfiles = listStatus(job);

        for (FileStatus file: files) {

                Path path = file.getPath();

                FileStatus fs = hdfs.getFileStatus(path);

                long length = fs.getLen();

                BlockLocation bl[] = hdfs.getFileBlockLocations(fs,

                                0, length);

 

                String nodes[] = new String[1];

                nodes[0] = findNodeMostBlocks(bl);

                splits.add(new FileSplit(path,0,length,nodes));

        }

        // Save the number of input files in the job-conf

        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

 

        return splits;

  }

The last method we need is the findNodeMostBlocks() method, which simply takes a array of BlockLocations for a file, and returns the hostname of the node that contains the majority of them:

  public String findNodeMostBlocks(BlockLocation bl[]) {

        HashMap hm = new HashMap();

        int maxCount = 0;

        String maxNode = "";

        Integer n;

 

        for (int i = 0; i < bl.length; i++) {

                String nodes[] = {""};

                try {

                        nodes = bl[i].getHosts();

                } catch (IOException e) {}

                for (int j = 0; j < nodes.length; j++) {

                        if (hm.containsKey(nodes[j]))

                           n=Integer.valueOf(hm.get(nodes[j]).intValue()+1);

                        else

                           n = new Integer(1);

                        hm.put(nodes[j],n);

                        if (n.intValue() > maxCount) {

                                maxCount=n.intValue();

                                maxNode = new String(nodes[j]);

                        }

                }

        }

  return maxNode;

}

Summary

MapR is the only distribution of Hadoop that provides a distributed file system that has a POSIX-compliant interface and full read/write random file access. Using this feature, existing file-based applications can easily be leveraged in Hadoop, saving enormous amounts of cost in re-writing applications to take advantage of Hadoop.

no

Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams

 

 

 

Download for free