Home > COMPANY > BLOG

Overview

Hadoop provides a compelling distributed platform for processing massive amounts of data in parallel using the Map/Reduce framework and the Hadoop distributed file system. A JAVA API allows developers to express the processing in terms of a map phase and a reduce phase, where both phases use key/value pairs or key/value list pairs as input/output. Each input file consists of one or more blocks, or chunks, that are distributed across the Hadoop cluster, and these chunks are processed by the map phase in parallel, using data locality when possible by assigning the map task to a node that contains the chunk locally.

But what if you have an existing application that processes large amounts of files, and you want to leverage the scale and economy that Hadoop offers, without re-writing the applications for Hadoop? Or what if these applications cannot process chunks of a file, but need to process the entire file as a whole? Hadoop does have a facility called Hadoop Streaming that can allow existing applications to run, however those applications must be able to read a chunk of the input data from standard input and cannot for example seek to random locations in the file. And finally, what if the application needs to modify the input files? This is not possible with other Hadoop distributions because of the append-only nature of the Hadoop distributed file system.

By using MapR’s enterprise grade distribution of Hadoop, applications that process large volumes of files can be run unmodified in Hadoop. These applications can be written in any language, and by using some basic Map/Reduce “wrapper” code described in this article, and by leveraging MapR’s Direct Access NFS™ feature, significant cost savings can be realized by eliminating the need to rewrite potentially huge investments in legacy software.

As an example, let’s assume an image processing application written in C++ needs to process thousands or even millions of files. The application program takes a file path name as an argument, and many of these programs can be invoked in parallel to take advantage of multi-core nodes. The output of the application can be new files, or even modifications to the input files. Using the techniques described here, Hadoop can be used to accomplish this without re-writing the application and still benefit from the parallelism, scale, and economy that Hadoop brings.

MapR Direct Access NFS™

A unique feature only found in MapR is the ability to mount the MapR distributed file system to any NFS client. That client can be Windows, Linux, UNIX, Mac, or any server that provides the NFS V3 client. Also, MapR’s file system is POSIX-compliant and supports full read/write random IO, unlike all other Hadoop distributions that provide a file system that has a Java-only interface and is write-once/append-only. One could attempt to do something similar on other Hadoop distributions with NFS FUSE, however it still is restricted to append-only, it performs poorly, and would have to be installed on all nodes in the cluster.

Using the NFS feature, all nodes of a MapR cluster can run a NFS server and each node can mount the cluster locally via this command:

	# mount localhost:/mapr /mapr

This means that existing applications, written in any language, can directly process files stored in the MapR distributed file system. The rest of this article will show how this can be leveraged to process large amounts of data with existing applications in Hadoop.

High-level Architecture

In order to leverage existing applications using the Hadoop Map/Reduce framework and MapR’s distributed file system, we need a small Java “wrapper” that does the following:

  • Create a new class that extends the FileInputFormat class, which tells MapReduce to process the file as a whole, rather than splitting it into chunks
  • Optionally calculates which node contains most of the chunks of the file in order to improve data locality
  • Create a new class that extends the RecordReader class, which passes the input file name to the Map task as the value, with a null key
  • Invokes the existing application from the map task, passing in the NFS path of the input file as the command-line argument
  • Provide a simple Reduce task that writes the input file name and a “status” value indicating the file was processed (optional)

This can be implemented in about 200 lines of Java code, which can be easily adapted for any legacy application that takes an input file path as an argument for processing. Let’s now take a look at the implementation.

Implementation

We start out by creating a new FileInputFormat class that will work for any type of input file. The O’Reilly book titled “Hadoop The Definitive Guide” has an example of such as class that we can use here with some minor changes. It’s called WholeInputFileFormat, and is designed to read in the entire file into a buffer, and pass that buffer as the value for the map task. Since our legacy application will be handling the entire processing of the file, we want to modify WholeInputFileFormat to simply pass in the file path as the value to the map task. Note that the key is not used, so it’s defined to be of type NullWritable. Refer to the O’Reilly book for details on these classes.

WholeFileInputFormat.java:

import java.io.IOException;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.*;

public class WholeFileInputFormat
    extends FileInputFormat {

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    return false;
  }

  @Override
  public RecordReader createRecordReader(
      InputSplit split, TaskAttemptContext context) throws IOException,
      InterruptedException {
    WholeFileRecordReader reader = new WholeFileRecordReader();
    reader.initialize(split, context);
    return reader;
  }
}

The next class we need to implement is the WholeFileRecordReader class that will actually invoke the map task with the null key and file path value. Again we use the O’Reilly WholeFileRecordReader example, with some minor changes:

WholeFileRecordReader.java:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

class WholeFileRecordReader extends RecordReader {

  private FileSplit fileSplit;
  private Configuration conf;
  private Text value = new Text("");
  private boolean processed = false;

  @Override
  public void initialize(InputSplit split, TaskAttemptContext context)
      throws IOException, InterruptedException {
    this.fileSplit = (FileSplit) split;
    this.conf = context.getConfiguration();
  }

  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!processed) {
      Path file = fileSplit.getPath();
      // Set the value to the file name
      value =  new Text(file.toString());
      processed = true;
      return true;
    }
    return false;
  }

  @Override
  public NullWritable getCurrentKey() throws IOException,
                 InterruptedException {
    return NullWritable.get();
  }

  @Override
  public Text getCurrentValue() throws IOException,
                 InterruptedException {
    return value;
  }

  @Override
  public float getProgress() throws IOException {
    return processed ? 1.0f : 0.0f;
  }

  @Override
  public void close() throws IOException {
    // do nothing
  }
}

The last class we need to implement is the main program that sets up the MapReduce environment and implements the map task which invokes our legacy application. It’s quite straightforward. There are 2 key steps in the Mapper map() method:

  • Convert the Hadoop input file path (which is the value passed to this method) into an NFS path required by our non-Hadoop application. For example, the input value for a file could be:
    • maprfs:/user/foo/input/fileXYZ
  • We want to convert that to a NFS path for our application:
    • /mapr/my.cluster.com/user/foo/input/fileXYZ
  • Invoke the legacy application via the Java Runtime exec() method. For example, let’s assume the application is invoked from a shell script (which is conveniently stored in the MapR-FS and accessed by its NFS path so all nodes can access it):
    • Runtime.getRuntime().exec(“/mapr/my.cluster.com/bin/myscript.sh ” + nfsfilepath );

Here is the code:

TestWholeFileInputFormat.java:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;

public class TestWholeFileInputFormat {
    public static class MyMapper
    extends Mapper

Compiling and Running

We can compile and run the program on a directory of input files as follows:

$ mkdir wf_classes
$ javac -classpath `hadoop classpath` -d wf_classes TestWholeFileInputFormat.java
$ jar cvf wf.jar -C wf_classes .
$ hadoop jar wf.jar TestWholeFileInputFormat \
   -Dmapred.map.tasks.speculative.execution=false  input-dir output-dir

Note the –D option where we disable speculative execution. This is to prevent concurrent invocations of the application program on the same input file.

Output Files

The application uses an output directory for reduce phase output to indicate processing status for each file, and is free to write other output anywhere in the MapR distributed file system. The key here is that the path name chosen for the output files is in the NFS-mounted directory. The application is also free to modify the input files directly.

Another important factor is handling node failures. Map tasks that fail are automatically restarted. This means the application should be prepared to handle the case where a file may have been partially processed in a previous invocation (in other words, exhibit the property of idempotence).

Data Locality

One of the major design considerations of Hadoop is moving the processing to the node containing the storage. Since we’re processing the entire file as one record, how do we get the best locality? One approach would be to set the input chunk size to be large enough that the entire file fits in one chunk. If that’s not practical, we can modify our WholeFileInputFormat class so that the file splits use the node that contains the majority of the file chunks, simply by iterating over all splits and summing up the location of every chunk. This is accomplished by overriding the getSplits() method in the WholeFileInputFormat class:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.fs.BlockLocation;

public class WholeFileInputFormat
    extends FileInputFormat {
    static final String NUM_INPUT_FILES = "mapreduce.input.num.files";

  @Override
  public List getSplits(JobContext job) throws IOException {
        List splitlist = super.getSplits(job);
        FileSystem hdfs = FileSystem.get(job.getConfiguration());
        List splits = new ArrayList();
        Listfiles = listStatus(job);
        for (FileStatus file: files) {
                Path path = file.getPath();
                FileStatus fs = hdfs.getFileStatus(path);
                long length = fs.getLen();
                BlockLocation bl[] = hdfs.getFileBlockLocations(fs,
                                0, length);

                String nodes[] = new String[1];
                nodes[0] = findNodeMostBlocks(bl);
                splits.add(new FileSplit(path,0,length,nodes));
        }
        // Save the number of input files in the job-conf
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

        return splits;
  }

The last method we need is the findNodeMostBlocks() method, which simply takes a array of BlockLocations for a file, and returns the hostname of the node that contains the majority of them:

  public String findNodeMostBlocks(BlockLocation bl[]) {
        HashMap hm = new HashMap();
        int maxCount = 0;
        String maxNode = "";
        Integer n;

        for (int i = 0; i < bl.length; i++) {
                String nodes[] = {""};
                try {
                        nodes = bl[i].getHosts();
                } catch (IOException e) {}
                for (int j = 0; j < nodes.length; j++) {
                        if (hm.containsKey(nodes[j]))
                           n=Integer.valueOf(hm.get(nodes[j]).intValue()+1);
                        else
                           n = new Integer(1);
                        hm.put(nodes[j],n);
                        if (n.intValue() > maxCount) {
                                maxCount=n.intValue();
                                maxNode = new String(nodes[j]);
                        }
                }
        }
  return maxNode;
}

Summary

MapR is the only distribution of Hadoop that provides a distributed file system that has a POSIX-compliant interface and full read/write random file access. Using this feature, existing file-based applications can easily be leveraged in Hadoop, saving enormous amounts of cost in re-writing applications to take advantage of Hadoop.

Tagged , , , | Leave a comment

NoSQL databases are becoming increasingly popular for analyzing big data.  There are very few NoSQL solutions, however, that provide the combination of scalability, reliability and data consistency required in a mission-critical application.

As the open source implementation of Google’s BigTable architecture, HBase is a NoSQL database that integrates directly with Hadoop and meets these requirements for a mission-critical database.  Estimates show that nearly half of all Hadoop users today are using HBase to store and analyze big data.

Read more about the challenges and enhancement to HBase in the Database Trends and Applications article Building Scaleable, Mission-Critical NoSQL Databases.

Tagged , , , , | Leave a comment

An article I read on ReadWrite Enterprise, “One Hadoop to Rule Them All” offers a take on the exploding Hadoop market. It mentions the entrance of relative newcomers such as EMC, IBM, and Intel which just reinforces the burgeoning enterprise demand for Hadoop and Big Data analytics. These established enterprise vendors know that Hadoop is on everyone’s IT radar.

Demand for Hadoop in the enterprise is pushing the need for enterprise Hadoop features to the forefront. Hadoop’s scalability tops off at 100 million files because of limitations from having a single NameNode. To make it enterprise grade and allow for speedy recovery, here at MapR, we de-centralized and distributed the NameNode. This increased the upper limit to a trillion files, making it possible for all the nodes to participate in recovery efforts if needed. MapR’s NFS support allows a virtually unlimited number of files to be supported and to be read and written by many other tools and technologies.

As the article points out, “[u]ltimately, companies that make it easier to get value from Hadoop will win big.” At MapR, we’ve worked with a lot of companies that are currently getting value today. The technology is used to analyze hundreds of billions of objects a day, 90% of the Internet population every month, and over a trillion dollars in retail transactions every year.

The Hadoop market has truly entered a new phase: the enterprise phase. We are excited to be leading this phase.

Tagged , , , | Leave a comment

IDC analysts Dan Vesset and Michael Versace outlined the way that Progressive Insurance is using big data as a way to get insights into its customers’ driving behaviors in a recent article by Chris Kanaracus in ComputerWorld about the challenges faced in reaching a mature state of big data analytics.

The piece points out some challenges faced by companies trying to implement big data initiatives, including deciding what data should be kept or discarded and the IT skills gap that exists in implementing big data efforts.

While the article offered some interesting insights about big data, it included some misconceptions about Hadoop. First, it referred to the Big Data platform as a batch only technology. While plenty of enterprises use it for batch processing, real-time capabilities are now available for Hadoop. For example, the MapR Big Data platform delivers integration with NFS, enabling MapR to support real-time computation workflows.

Second, the article did not distinguish between the types of features that are needed to accelerate adoption and expansion of Hadoop through enterprise capabilities. At MapR, we think that the addition of enterprise-grade capabilities like replacing the Hadoop file system with NFS access and increasing reliability of Hadoop and its performance are crucial to making it a part of an enterprise infrastructure, and not just a science project. As our demonstration with Twitter analysis shows, Hadoop can play many different roles in real time processing, especially when NFS support is brought into play.

Real-time analysis is becoming increasingly important in the enterprise. Our Twitter demonstration during Strata, (which showed in a dynamic display who was tweeting and what topics were being covered), is just one example of the way that the MapR Big Data platform can handle enterprise challenges such as real-time data streams. It’s true enough that standard Hadoop distributions can’t process real-time data, but MapR certainly can.

Tagged , , , , | Leave a comment

Expanding Big Data solutions to yet another level, MapR made two announcements today: MapR M7, which provides an enterprise-grade NoSQL and Hadoop solution to our customers, is now available; and LucidWorks Search will be distributed with the MapR Platform for Apache Hadoop, including with the new MapR M7 Edition.

M7 Now Available

The MapR M7 Edition is architected from the ground up to deliver reliability and performance without requiring compactions or background consistency checks to work smoothly.  M7 delivers over one million operations/sec with a ten-node cluster and provides dramatic scalability advantages with support for up to one trillion tables across thousands of nodes.

M7 also provides instant recovery from failures, ensuring 99.999% availability for HBase and Hadoop applications. With M7, there are no region servers, additional processes, or any redundant layer between the application and the data residing in the cluster. M7’s zero-administration approach includes automatic region splits and self-tuning with no downtime required for any operation, including schema changes.  Read more.

Integrated Search Provides Real-time Analytics

LucidWorks and MapR are integrating LucidWorks Search™ with the MapR Platform for Apache Hadoop, including M7, to deliver a single platform for predictive analytics, full search and discovery, and advanced database operations.

MapR provides customers with specific advantages over other Hadoop distributions when it comes to search including:  NFS for streaming writes for search; data modeling with snapshots; storing and indexing data directly on MapR’s platform for Hadoop; and integrating enterprise search into the distribution.  Read more.

InformationWeek, PCWorld and ZDNet provided their insights on our news today.

Tagged , , , , , , | Leave a comment

In 2011 Martin Fowler coined the term Polyglot Persistence, suggesting in a nutshell:

… any decent sized enterprise will have a variety of different data storage technologies for different kinds of data. There will still be large amounts of it managed in relational stores, but increasingly we’ll be first asking how we want to manipulate the data and only then figuring out what technology is the best bet for it.

More recently, Mark Madsen and Robin Bloor discussed the topic in a webcast, along with the Bloor Group’s Database Revolution white paper. Another good source for lessons learned and examples is an IEEE Software hosted podcast, the Episode 189 of the Software Engineering Radio: Eric Lubow on Polyglot Persistence.

Make no mistake, Polyglot Persistence as a meme has a direct impact on how you design and implement solutions for large-scale data processing. Moreover, it will influence the way you think about the tools you deploy. Rather than the one-size-fits-it-all mantra we’ve been injected by the Oracles and the likes over the past ten or more years, we now should consider dealing with a tool-belt. And, as an architect it is your responsibility to select the right combination of tools for the tasks at hand.  The Hadoop ecosystem offers many options.

What you choose may depend on the type of data you’re dealing with (such as a customer’s shopping basket vs. a financial transaction) as well as on the sort of workload. Is it a quick key-driven look-up? Do you need to scan and aggregate data over many records? Do you have ad-hoc queries? Or rather timed, repeated one-offs that run in batch mode? Is low-latency you primary concern? And of course, as always, all the tooling should not only be available at scale, in the Petabytes and beyond, but must be reliable and high performance.

Look at the following figure and it may get clearer that the data volumes, varieties and velocities we had to deal with so far are really a special (and simple) case of Big Data, rather than the other way round:

Here at MapR we appreciate Polyglot Persistence and have in fact already aligned our Big Data platform in this sense. One day, I suppose, the majority of the platforms out there will be compatible with Polyglot Persistence. We already today enable you to benefit from the combination of Open Source-based agreement on the interfaces and the enterprise-grade implementation, delivering a reliable and fast solution:

If you are interested in reading more of the collected wisdom around Polyglot Persistence, including examples and support in decision making, it is now available in the book titled NoSQL Distilled, by Pramod J. Sadalage and Martin Fowler.

Tagged , , , , | 1 Comment

Increasing our local presence in Europe, MapR announced last week the appointment of Xavier Guerin to the position of vice president southern Europe and Benelux.  Guerin, a seasoned executive with over 20 years of industry experience including senior roles at EMC, Isilon and NetApp, will lead the new MapR team within France, Benelux and the rest of southern Europe.

MapR has also appointed Aurélien Goujet as senior pre-sales director for southern Europe and Benelux to support customers across the region. Goujet joins MapR following senior technical and sales roles at EMC, Isilon and NetApp.

Our new Paris office supports a growing community of Big Data analytics partners and customers across the region and builds on the success MapR is already having in EMEA.

Tagged , , , , , , | Leave a comment

Demand for both Apache Hadoop and NoSQL is extremely strong and this can mean only one thing: companies are hiring, and there is stiff competition for employees with experience in these technologies. This was brought home last month in an article in Data Informed and the numbers have only gone up since that time.

Indeed.com shows growth of 150,000 percent in Hadoop job postings:

NoSQL jobs are growing at a little more than half that rate, hitting an 85,000 percent growth in September 2012, which has since cooled down to about 70,000 %:

When we last checked Indeed, the total number of Hadoop jobs listed was around 6,000; NoSQL lags slightly at over 4,000. Job listings for data scientists, arguably the hottest job title overall, are approaching 9,000 on Indeed.

And these numbers are not isolated. Dice reports similar job trends and technology research organization Wikibon states that Hadoop and NoSQL software and services are the fastest growing subset of the market for big data analytics, software and services.

And this is a great job trend for us here at MapR. Demand for enterprise-grade Hadoop solutions are on the rise – and we’re one step ahead of this demand by perfecting these solutions for you already.

With such an increased demand, we realize that it’s going to be difficult to hire quality Hadoop and NoSQL developers. The good thing about using MapR’s technology is that we’ve invested many man-years making Hadoop easier to use and administer. Through the support of industry standard APIs and protocols such as NFS, you don’t need to train all your personnel on the inner workings of Hadoop. Anyone that can use a file browser can access data in MapR. This is a huge difference compared with other distributions. The same is true from the administrative aspect; MapR offers self-healing, automation and ease of integration with other applications. The MapR management tools make it much easier for organizations to be successful with Hadoop deployments without sending people to a week-long boot camp.

Tagged , , , , | Leave a comment

VMware and MapR have been collaborating to provide customers with the “Easy” button when it comes to deploying MapR into a vSphere environment.  Serengeti’s M4 release (derived from Model 4 and not related to MapR’s M3 or M5 Editions), provides organizations with the ability to quickly enable developers to write applications that extract value from Big Data.  MapR has been working with VMware in our own develop and test environment. Provisioning MapR into a vSphere environment leverages virtualization to quickly deploy test clusters and provide proof points back to the business prior to production deployments. MapR’s M3 and Serengeti are available to download for free.  What more do you need to know to get started!  No approvals required.
Continue reading

Leave a comment

It’s not just how you store big data but what you can do with it – and that was apparent as Java developers took part in Devoxx conferences in London and Paris last week. Participants had a lot to say about the international presenters, and among those was MapR Chief Application Architect and Apache Mahout committer Ted Dunning.

Ted gave two presentations during Devoxx France 2013, the first to the Paris DataGeeks group on new approaches to machine learning with Apache Mahout and the second at the Devoxx main conference on real-time learning with Storm and the MapR Distribution for Hadoop at scale.

The Mahout talk focused on a new way to do collaborative filtering with cross-recommendations and advances in big data discovery through clustering.  The highlight of the latter topic was a new, lightning fast clustering algorithm that will be part of the 0.8 release of Mahout coming very soon.  Visit here for more information and to view the slides.

The second talk, Real-time Learning, approached this big data topic both at the detailed mathematical level and at the practical architectural level including ways to use Storm + the MapR Distribution for Hadoop. Ted dove into some serious math, some serious magic and a lot of useful content on the best approaches to how to use these models in many settings from financial portfolio optimization to direct mail to web-site design. As with the Paris DataGeeks event, the great enthusiasm and participation of the Devoxx audience showed how strongly these developers are interested in new ways to analyze big data.  Visit here for more information and to view the slides.

If you are in Paris, be sure to see Ted at Big Data Paris.  He will present two sessions: Getting a grasp of Hadoop’s world – Open Source and branded solutions and Expect More From Hadoop on Wednesday April 3rd.

Tagged , , , , , | Leave a comment