Register here.

Each phone call has a call detail record. A call detail record contains information about a call, not the call itself. In other words, these records contain information about the originating number, the terminating number, the length of the call, etc. At first glance, it would seem to be a huge endeavor to analyze all the calls in the U.S., and it would even seem to require a huge datacenter (or multiple datacenters) to store all the data.
But in reality, the amount of data is relatively small on the spectrum of Big Data projects. There are 300 million people in the U.S., and approximately 250 million of them are adults and teens. If we assume that everyone generates 10 phone calls per day, on average, we have over 2.5 billion phone calls. The size of a typical call detail record is 200 bytes. In some cases, there can be multiple records generated for a single call. Think of these call records as metadata. If we assume 10 call records per call, this would expand to 2KB of data for every phone call. Given these assumptions, the size of the data would be 5 terabytes per day.
At MapR, we have customers who are analyzing many times this amount of data on a daily basis. How big would the cluster need to be? Well, we have customers with 32TB of data on a single node. If an organization wanted to analyze 30 days of U.S. call detail records, it would be approximately 150TB of data, which is just 5 nodes of a MapR Hadoop cluster. The total call record volume for the U.S. wouldn’t come close to creating a busy signal on a MapR cluster.
Use This Graphic for FREE on Your Site!
You may use the infographic above on your website, however, the license we grant to you requires that you properly and correctly attribute the work to us with a link back to our website by using the following embed code.
Embed Code
<div style="width: 280px"> <a href="http://www.mapr.com/images/blog/big-data-infographic-lg.jpg" /> <img src="http://www.mapr.com/images/blog/thumb-big-data-infographic.jpg" alt="Big Data & Apache Hadoop capabilities" /></a><br/> To view the original post, see the original <a href="http://www.mapr.com/blog/how-big-is-big-data"> Big Data infographic</a>.</div>
…and of course a nice square thumbnail for Facebook:
![]()
When people sit down to build a real-time big data reporting system, it is very common that compromises creep into the design. These compromises result in a “quick and dirty” analysis – the thought being that in order to get rapid results, you must give up accuracy or consistency or even any notion of what failure modes might exist. But Ted Dunning says that to get “quick” you don’t have to settle for “dirty.”
Would you like to be able to analyze data seamlessly between up-to-the-minute real-time reporting and long-term aggregation, without the need for reprocessing of temporary real-time estimates? And would you like to do that accurately and with a simple architecture? Would you like it if your CEO doesn’t find any more nasty discrepancies in your metrics?
Last week at Berlin Buzzwords 2013, MapR’s Ted Dunning showed how to do this with both metrics and with many forms of machine learning in his fourth #bbuzz talk titled “Real-time Learning for Fun and Profit,” presented to a packed room.
“It’s not a problem. It’s an opportunity.”
Interest in machine learning is widespread and growing. This talk addressed that interest by looking at the real-time and long-time transition in the context of learning models.
Several key MapR features make it possible for the approach Ted described to be incredibly simple. These features include NFS access to the MapR distributed storage, as well as reliable, small footprint MapR snapshots. Under the covers, this approach employs a combination of replay logs, aggregation checkpoints, and snapshots to implement a real-time system with an analysis horizon from now to years in the future.

What does this approach mean to the business user?
If you need to collect and react to data as it arrives but also need to store data over a long time frame, this approach may help you. Traditionally, it has been difficult to do this accurately and yet keep up-to-the-instant in reporting. Ted’s approach applied via a MapR system is exact, correct and consistent as analysis moves smoothly from real-time to long-time.
The example that Ted focused on at Buzzwords was the problem of maintaining simple counts, such as how many page views there are for a particular site, but this same approach can be applied to any problem involving associative aggregations. This includes unique counts (e.g. how many unique visitors to a web site), finding heavy hitters or trending topics (things getting the highest number of hits) and even the co-occurrence counting required by recommendation engines.
If you would like to know about how this approach could improve your business metrics and machine learning efforts, contact Ted at MapR tdunning+1@maprtech.com or view his slides.
It was really exciting to be in Berlin for the open source Berlin Buzzwords 2013 conference this week. The audiences were energized, and one of the really hot topics was a lot of interest and enthusiasm for search.
This enthusiasm was especially apparent at the June 4 presentation “Multi-modal Recommendation Algorithms” by Ted Dunning, MapR’s Chief Applications Architect. Surprisingly, a major part of this recommendation/machine-learning talk involved search, in particular, the use of Apache Solr/Lucene with Apache Mahout on the MapR distribution for Apache Hadoop.
The main thrust of the talk had to do with the advantage gained by using multiple behaviors as the source of input data for building a recommendation engine. Normally in a recommendation system, you observe behaviors similar to the one you want to drive through the use of your recommender, and then you use those behaviors as your input data to build and train your model. In contrast, Ted’s multi-modal approach has two new twists:
- Use multiple types of behavior as input to a Mahout-based recommendation model.
- Use the behavioral indicators output from the Mahout step as input for Solr-based search. The search engine here is abused to provide recommendations instead of search results.
The combination of search with recommendation surprised even some very experienced Buzzwords audience members, including Anne Verling, who engaged in this tweet interplay just after the talk:

Ted’s multi-modal recommendation talk fit well with the buzz (pun intended) around the recent announcement of the inclusion of LucidWorks as part of the MapR distribution. The combination of LucidWorks Solr + Apache Mahout (which is also included with MapR distribution) makes it easy to put the multimodal recommendation technique into practice. And these techniques are similar to some of the log processing that is done in Lucidworks Big Data product, also included with MapR.
The combination of Solr + MapR meets challenges many businesses face, and I predict that the excitement seen at Buzzwords foretells a lot of new technologies that will appear using the combination of MapR/Apache Mahout/LucidWorks Solr and Big Data.
Click here view Ted’s slides.
MapR is working closely with Cisco on a number of fronts including the Cisco Tidal Enterprise Scheduler (TES). Jack Norris, MapR’s chief marketing officer, posted today the guest blog “Big Returns on Big Data through Operational Intelligence” on the Cisco blog. Jack talks about how Big Data use cases are changing the competitive dynamics for organizations with a range of operational use cases and how enterprise-grade Hadoop applications combined with powerful cost-effective hardware create powerful solutions.
Click here to read Jack’s blog.
Overview
Hadoop provides a compelling distributed platform for processing massive amounts of data in parallel using the Map/Reduce framework and the Hadoop distributed file system. A JAVA API allows developers to express the processing in terms of a map phase and a reduce phase, where both phases use key/value pairs or key/value list pairs as input/output. Each input file consists of one or more blocks, or chunks, that are distributed across the Hadoop cluster, and these chunks are processed by the map phase in parallel, using data locality when possible by assigning the map task to a node that contains the chunk locally.
But what if you have an existing application that processes large amounts of files, and you want to leverage the scale and economy that Hadoop offers, without re-writing the applications for Hadoop? Or what if these applications cannot process chunks of a file, but need to process the entire file as a whole? Hadoop does have a facility called Hadoop Streaming that can allow existing applications to run, however those applications must be able to read a chunk of the input data from standard input and cannot for example seek to random locations in the file. And finally, what if the application needs to modify the input files? This is not possible with other Hadoop distributions because of the append-only nature of the Hadoop distributed file system.
By using MapR’s enterprise grade distribution of Hadoop, applications that process large volumes of files can be run unmodified in Hadoop. These applications can be written in any language, and by using some basic Map/Reduce “wrapper” code described in this article, and by leveraging MapR’s Direct Access NFS™ feature, significant cost savings can be realized by eliminating the need to rewrite potentially huge investments in legacy software.
As an example, let’s assume an image processing application written in C++ needs to process thousands or even millions of files. The application program takes a file path name as an argument, and many of these programs can be invoked in parallel to take advantage of multi-core nodes. The output of the application can be new files, or even modifications to the input files. Using the techniques described here, Hadoop can be used to accomplish this without re-writing the application and still benefit from the parallelism, scale, and economy that Hadoop brings.
MapR Direct Access NFS™
A unique feature only found in MapR is the ability to mount the MapR distributed file system to any NFS client. That client can be Windows, Linux, UNIX, Mac, or any server that provides the NFS V3 client. Also, MapR’s file system is POSIX-compliant and supports full read/write random IO, unlike all other Hadoop distributions that provide a file system that has a Java-only interface and is write-once/append-only. One could attempt to do something similar on other Hadoop distributions with NFS FUSE, however it still is restricted to append-only, it performs poorly, and would have to be installed on all nodes in the cluster.
Using the NFS feature, all nodes of a MapR cluster can run a NFS server and each node can mount the cluster locally via this command:
# mount localhost:/mapr /mapr
This means that existing applications, written in any language, can directly process files stored in the MapR distributed file system. The rest of this article will show how this can be leveraged to process large amounts of data with existing applications in Hadoop.
High-level Architecture
In order to leverage existing applications using the Hadoop Map/Reduce framework and MapR’s distributed file system, we need a small Java “wrapper” that does the following:
- Create a new class that extends the FileInputFormat class, which tells MapReduce to process the file as a whole, rather than splitting it into chunks
- Optionally calculates which node contains most of the chunks of the file in order to improve data locality
- Create a new class that extends the RecordReader class, which passes the input file name to the Map task as the value, with a null key
- Invokes the existing application from the map task, passing in the NFS path of the input file as the command-line argument
- Provide a simple Reduce task that writes the input file name and a “status” value indicating the file was processed (optional)
This can be implemented in about 200 lines of Java code, which can be easily adapted for any legacy application that takes an input file path as an argument for processing. Let’s now take a look at the implementation.
Implementation
We start out by creating a new FileInputFormat class that will work for any type of input file. The O’Reilly book titled “Hadoop The Definitive Guide” has an example of such as class that we can use here with some minor changes. It’s called WholeInputFileFormat, and is designed to read in the entire file into a buffer, and pass that buffer as the value for the map task. Since our legacy application will be handling the entire processing of the file, we want to modify WholeInputFileFormat to simply pass in the file path as the value to the map task. Note that the key is not used, so it’s defined to be of type NullWritable. Refer to the O’Reilly book for details on these classes.
WholeFileInputFormat.java:
import java.io.IOException;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.*;
public class WholeFileInputFormat
extends FileInputFormat {
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
The next class we need to implement is the WholeFileRecordReader class that will actually invoke the map task with the null key and file path value. Again we use the O’Reilly WholeFileRecordReader example, with some minor changes:
WholeFileRecordReader.java:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
class WholeFileRecordReader extends RecordReader {
private FileSplit fileSplit;
private Configuration conf;
private Text value = new Text("");
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
Path file = fileSplit.getPath();
// Set the value to the file name
value = new Text(file.toString());
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return NullWritable.get();
}
@Override
public Text getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
// do nothing
}
}
The last class we need to implement is the main program that sets up the MapReduce environment and implements the map task which invokes our legacy application. It’s quite straightforward. There are 2 key steps in the Mapper map() method:
- Convert the Hadoop input file path (which is the value passed to this method) into an NFS path required by our non-Hadoop application. For example, the input value for a file could be:
- maprfs:/user/foo/input/fileXYZ
- We want to convert that to a NFS path for our application:
- /mapr/my.cluster.com/user/foo/input/fileXYZ
- Invoke the legacy application via the Java Runtime exec() method. For example, let’s assume the application is invoked from a shell script (which is conveniently stored in the MapR-FS and accessed by its NFS path so all nodes can access it):
- Runtime.getRuntime().exec(“/mapr/my.cluster.com/bin/myscript.sh ” + nfsfilepath );
Here is the code:
TestWholeFileInputFormat.java:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
public class TestWholeFileInputFormat {
public static class MyMapper
extends Mapper
Compiling and Running
We can compile and run the program on a directory of input files as follows:
$ mkdir wf_classes
$ javac -classpath `hadoop classpath` -d wf_classes TestWholeFileInputFormat.java
$ jar cvf wf.jar -C wf_classes .
$ hadoop jar wf.jar TestWholeFileInputFormat \
-Dmapred.map.tasks.speculative.execution=false input-dir output-dir
Note the –D option where we disable speculative execution. This is to prevent concurrent invocations of the application program on the same input file.
Output Files
The application uses an output directory for reduce phase output to indicate processing status for each file, and is free to write other output anywhere in the MapR distributed file system. The key here is that the path name chosen for the output files is in the NFS-mounted directory. The application is also free to modify the input files directly.
Another important factor is handling node failures. Map tasks that fail are automatically restarted. This means the application should be prepared to handle the case where a file may have been partially processed in a previous invocation (in other words, exhibit the property of idempotence).
Data Locality
One of the major design considerations of Hadoop is moving the processing to the node containing the storage. Since we’re processing the entire file as one record, how do we get the best locality? One approach would be to set the input chunk size to be large enough that the entire file fits in one chunk. If that’s not practical, we can modify our WholeFileInputFormat class so that the file splits use the node that contains the majority of the file chunks, simply by iterating over all splits and summing up the location of every chunk. This is accomplished by overriding the getSplits() method in the WholeFileInputFormat class:
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.fs.BlockLocation;
public class WholeFileInputFormat
extends FileInputFormat {
static final String NUM_INPUT_FILES = "mapreduce.input.num.files";
@Override
public List getSplits(JobContext job) throws IOException {
List splitlist = super.getSplits(job);
FileSystem hdfs = FileSystem.get(job.getConfiguration());
List splits = new ArrayList();
Listfiles = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
FileStatus fs = hdfs.getFileStatus(path);
long length = fs.getLen();
BlockLocation bl[] = hdfs.getFileBlockLocations(fs,
0, length);
String nodes[] = new String[1];
nodes[0] = findNodeMostBlocks(bl);
splits.add(new FileSplit(path,0,length,nodes));
}
// Save the number of input files in the job-conf
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
return splits;
}
The last method we need is the findNodeMostBlocks() method, which simply takes a array of BlockLocations for a file, and returns the hostname of the node that contains the majority of them:
public String findNodeMostBlocks(BlockLocation bl[]) {
HashMap hm = new HashMap();
int maxCount = 0;
String maxNode = "";
Integer n;
for (int i = 0; i < bl.length; i++) {
String nodes[] = {""};
try {
nodes = bl[i].getHosts();
} catch (IOException e) {}
for (int j = 0; j < nodes.length; j++) {
if (hm.containsKey(nodes[j]))
n=Integer.valueOf(hm.get(nodes[j]).intValue()+1);
else
n = new Integer(1);
hm.put(nodes[j],n);
if (n.intValue() > maxCount) {
maxCount=n.intValue();
maxNode = new String(nodes[j]);
}
}
}
return maxNode;
}
Summary
MapR is the only distribution of Hadoop that provides a distributed file system that has a POSIX-compliant interface and full read/write random file access. Using this feature, existing file-based applications can easily be leveraged in Hadoop, saving enormous amounts of cost in re-writing applications to take advantage of Hadoop.
NoSQL databases are becoming increasingly popular for analyzing big data. There are very few NoSQL solutions, however, that provide the combination of scalability, reliability and data consistency required in a mission-critical application.
As the open source implementation of Google’s BigTable architecture, HBase is a NoSQL database that integrates directly with Hadoop and meets these requirements for a mission-critical database. Estimates show that nearly half of all Hadoop users today are using HBase to store and analyze big data.
Read more about the challenges and enhancement to HBase in the Database Trends and Applications article Building Scaleable, Mission-Critical NoSQL Databases.
An article I read on ReadWrite Enterprise, “One Hadoop to Rule Them All” offers a take on the exploding Hadoop market. It mentions the entrance of relative newcomers such as EMC, IBM, and Intel which just reinforces the burgeoning enterprise demand for Hadoop and Big Data analytics. These established enterprise vendors know that Hadoop is on everyone’s IT radar.
Demand for Hadoop in the enterprise is pushing the need for enterprise Hadoop features to the forefront. Hadoop’s scalability tops off at 100 million files because of limitations from having a single NameNode. To make it enterprise grade and allow for speedy recovery, here at MapR, we de-centralized and distributed the NameNode. This increased the upper limit to a trillion files, making it possible for all the nodes to participate in recovery efforts if needed. MapR’s NFS support allows a virtually unlimited number of files to be supported and to be read and written by many other tools and technologies.
As the article points out, “[u]ltimately, companies that make it easier to get value from Hadoop will win big.” At MapR, we’ve worked with a lot of companies that are currently getting value today. The technology is used to analyze hundreds of billions of objects a day, 90% of the Internet population every month, and over a trillion dollars in retail transactions every year.
The Hadoop market has truly entered a new phase: the enterprise phase. We are excited to be leading this phase.
IDC analysts Dan Vesset and Michael Versace outlined the way that Progressive Insurance is using big data as a way to get insights into its customers’ driving behaviors in a recent article by Chris Kanaracus in ComputerWorld about the challenges faced in reaching a mature state of big data analytics.
The piece points out some challenges faced by companies trying to implement big data initiatives, including deciding what data should be kept or discarded and the IT skills gap that exists in implementing big data efforts.
While the article offered some interesting insights about big data, it included some misconceptions about Hadoop. First, it referred to the Big Data platform as a batch only technology. While plenty of enterprises use it for batch processing, real-time capabilities are now available for Hadoop. For example, the MapR Big Data platform delivers integration with NFS, enabling MapR to support real-time computation workflows.
Second, the article did not distinguish between the types of features that are needed to accelerate adoption and expansion of Hadoop through enterprise capabilities. At MapR, we think that the addition of enterprise-grade capabilities like replacing the Hadoop file system with NFS access and increasing reliability of Hadoop and its performance are crucial to making it a part of an enterprise infrastructure, and not just a science project. As our demonstration with Twitter analysis shows, Hadoop can play many different roles in real time processing, especially when NFS support is brought into play.
Real-time analysis is becoming increasingly important in the enterprise. Our Twitter demonstration during Strata, (which showed in a dynamic display who was tweeting and what topics were being covered), is just one example of the way that the MapR Big Data platform can handle enterprise challenges such as real-time data streams. It’s true enough that standard Hadoop distributions can’t process real-time data, but MapR certainly can.



