Home > COMPANY > BLOG

MapR Technologies has released its Big Data and Hadoop conference lineup for the month of April.  MapR will present sessions about Big Data and Hadoop at leading industry conferences in April, including Cloud Connect Conference, Big Data TechCon, Big Data Innovation Summit & Hadoop Innovation Summit and the Open Source Business Conference (OSBC).

Look for MapR at these events:

Cloud Connect Conference in Santa Clara, CA

April 5 at 9:00 am
Big Data Lessons from the Cloud
By Jack Norris, MapR’s VP of marketing

Big Data TechCon in Cambridge, MA

April 10 at 3:45 pm
Running Mission Critical Applications on Hadoop
By Dave Jespersen, MapR’s chief customer advocate

Big Data & Hadoop Innovation Summit in San Francisco, CA

April 12 at 9:00 am
One Platform for all Big Data Analytics
By Tomer Shiran, MapR’s director of product management

Open Source Business Conference (OSBC) in San Francisco, CA

April 29 at 3:00 pm
Building Large Scale Enterprise Databases with ANSI SQL capabilities over Hadoop
By Tomer Shiran, MapR’s director of product management

For more information on upcoming events go to our events page.

 

Leave a comment

MapR made two significant announcements today regarding our efforts to support the Hadoop ecosystem and provide an open enterprise-grade platform to Big Data users.

First, MapR is partnering with Canonical, the organization behind the Ubuntu operating system, to package and make available for download an integrated offering of MapR Distibution with Ubuntu.  The free MapR M3 Edition includes HBase, Pig, Hive, Mahout, Cascading, Sqoop, Flume and other Hadoop support tools.   MapR is the only distribution that enables Linux applications and commands to access data directly in the cluster via the NFS interface that is available with all MapR Editions.

Canonical and MapR are also working to develop a Juju Charm that can be used by OpenStack and other customers to easily deploy MapR into their environments.  Click here for more details about MapR and Ubuntu.

Next, MapR announced that the source code for the component packages of the MapR Distribution for Apache Hadoop is now publicly available on GitHub.  The MapR Distribution combines over a dozen different packages for Hadoop. The source code for these packages, including MapR’s fixes and enhancements, is now publicly available on Github at https://github.com/mapr/.

In addition, MapR is releasing binaries, source code and documentation in a public Maven repository making it easier for developers to develop, build and deploy their Hadoop-based applications.  Click here for more details.

Leave a comment

This is a tutorial of the MapR Enterprise Grade distribution of Hadoop Command Line Interface.

Before we start, just in case let’s refresh the MapR Architecture.

MapR Architecture

The MapR Architecture consists of the following services or daemons

  • CLDB (Containers Location Data Base)
  • MapR Fileserver
  • JobTracker
  • TaskTracker
  • Zookeeper
  • NFS
  • Webserver
  • Warden

Warden is a daemon that runs on all the cluster nodes to manage and monitor the other services running in a cluster node. It’s like a watchdog.

The warden will not start any services unless Zookeeper is reachable and more than half of the configured Zookeeper nodes are alive.

Cluster Management

  • Bringing up the cluster
    1. Start Zookeeper on all the nodes where it is installed, by running the command
      root@ip-10-245-12-77:~# /etc/init.d/mapr-zookeeper start
    2. On the nodes running the CLDB and Webserver service (mapr-webserver), start the warden by running the command
      root@ip-10-245-12-77:~# /etc/init.d/mapr-warden start
  • Stopping the cluster
    1. Determine which nodes are running the NFS service
      root@ip-10-245-12-77:~# maprcli node list -filter "[rp==/*]and[svc==nfs]" -columns id,h,hn,svc,rp
      id                   racktopo                                                         service                                              hostname                                      health  ip
      8707346954164511835  /data/default-rack/ip-10-244-129-15.us-west-2.compute.internal   fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-244-129-15.us-west-2.compute.internal   0       10.244.129.15
      453989218842577487   /data/default-rack/ip-10-244-131-141.us-west-2.compute.internal  fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-244-131-141.us-west-2.compute.internal  0       10.244.131.141
      2892638075826172151  /data/default-rack/ip-10-244-164-169.us-west-2.compute.internal  fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-244-164-169.us-west-2.compute.internal  0       10.244.164.169
      8833235307193396272  /data/default-rack/ip-10-244-165-232.us-west-2.compute.internal  webserver,cldb,fileserver,nfs,hoststats              ip-10-244-165-232.us-west-2.compute.internal  0       10.244.165.232
      6559272699074389504  /data/default-rack/ip-10-244-45-60.us-west-2.compute.internal    fileserver,tasktracker,hbmaster,nfs,hoststats        ip-10-244-45-60.us-west-2.compute.internal    0       10.244.45.60
      9137989191756045555  /data/default-rack/ip-10-245-12-77.us-west-2.compute.internal    webserver,cldb,fileserver,nfs,hoststats              ip-10-245-12-77.us-west-2.compute.internal    0       10.245.12.77
      7791110135751846418  /data/default-rack/ip-10-245-14-103.us-west-2.compute.internal   fileserver,tasktracker,hbmaster,nfs,hoststats        ip-10-245-14-103.us-west-2.compute.internal   0       10.245.14.103
      1152291012558508871  /data/default-rack/ip-10-245-7-200.us-west-2.compute.internal    fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-245-7-200.us-west-2.compute.internal    0       10.245.7.200
      7482334955545014043  /data/default-rack/ip-10-245-8-49.us-west-2.compute.internal     fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-245-8-49.us-west-2.compute.internal     0       10.245.8.49
      4127302514082488703  /data/default-rack/ip-10-245-8-52.us-west-2.compute.internal     webserver,cldb,fileserver,nfs,hoststats,jobtracker   ip-10-245-8-52.us-west-2.compute.internal     0       10.245.8.52
      root@ip-10-245-12-77:~#
    2. Determine which nodes are running the CLDB service
      root@ip-10-245-12-77:~# maprcli node list -filter "[rp==/*]and[svc==cldb]" -columns id,h,hn,svc,rp
      id                   racktopo                                                         service                                             hostname                                      health  ip
      8833235307193396272  /data/default-rack/ip-10-244-165-232.us-west-2.compute.internal  webserver,cldb,fileserver,nfs,hoststats             ip-10-244-165-232.us-west-2.compute.internal  0       10.244.165.232
      9137989191756045555  /data/default-rack/ip-10-245-12-77.us-west-2.compute.internal    webserver,cldb,fileserver,nfs,hoststats             ip-10-245-12-77.us-west-2.compute.internal    0       10.245.12.77
      4127302514082488703  /data/default-rack/ip-10-245-8-52.us-west-2.compute.internal     webserver,cldb,fileserver,nfs,hoststats,jobtracker  ip-10-245-8-52.us-west-2.compute.internal     0       10.245.8.52
      root@ip-10-245-12-77:~#
    3. List all non-CLDB nodes
      root@ip-10-245-12-77:~# maprcli node list -filter "[rp==/*]and[svc!=cldb]" -columns id,h,hn,svc,rp
      id                   racktopo                                                         service                                              hostname                                      health  ip
      8707346954164511835  /data/default-rack/ip-10-244-129-15.us-west-2.compute.internal   fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-244-129-15.us-west-2.compute.internal   0       10.244.129.15
      453989218842577487   /data/default-rack/ip-10-244-131-141.us-west-2.compute.internal  fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-244-131-141.us-west-2.compute.internal  0       10.244.131.141
      2892638075826172151  /data/default-rack/ip-10-244-164-169.us-west-2.compute.internal  fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-244-164-169.us-west-2.compute.internal  0       10.244.164.169
      6559272699074389504  /data/default-rack/ip-10-244-45-60.us-west-2.compute.internal    fileserver,tasktracker,hbmaster,nfs,hoststats        ip-10-244-45-60.us-west-2.compute.internal    0       10.244.45.60
      7791110135751846418  /data/default-rack/ip-10-245-14-103.us-west-2.compute.internal   fileserver,tasktracker,hbmaster,nfs,hoststats        ip-10-245-14-103.us-west-2.compute.internal   0       10.245.14.103
      1152291012558508871  /data/default-rack/ip-10-245-7-200.us-west-2.compute.internal    fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-245-7-200.us-west-2.compute.internal    0       10.245.7.200
      7482334955545014043  /data/default-rack/ip-10-245-8-49.us-west-2.compute.internal     fileserver,tasktracker,hbregionserver,nfs,hoststats  ip-10-245-8-49.us-west-2.compute.internal     0       10.245.8.49
      root@ip-10-245-12-77:~#
    4. Shutdown all NFS instances
      root@ip-10-245-12-77:~# maprcli node services -nfs stop -nodes ip-10-244-129-15.us-west-2.compute.internal ip-10-244-131-141.us-west-2.compute.internal ip-10-244-164-169.us-west-2.compute.internal ip-10-244-165-232.us-west-2.compute.internal ip-10-244-45-60.us-west-2.compute.internal ip-10-245-12-77.us-west-2.compute.internal ip-10-245-14-103.us-west-2.compute.internal ip-10-245-7-200.us-west-2.compute.internal ip-10-245-8-49.us-west-2.compute.internal ip-10-245-8-52.us-west-2.compute.internal
      root@ip-10-245-12-77:~#
    5. Stop the warden in each CLDB node
      root@ip-10-245-12-77:~# /etc/init.d/mapr-warden stop
    6. Stop the warden in the remaining cluster nodes
      root@ip-10-244-45-60:~# /etc/init.d/mapr-warden stop
    7. Stop the Zookeeper in each Zookeeper node
      root@ip-10-245-12-77:~# /etc/init.d/mapr-zookeeper stop

Restart Webserver (mapr-webserver)

root@ip-10-245-12-77:~# /opt/mapr/adminuiapp/webserver stop

stopping adminuiapp server
root@ip-10-245-12-77:~#

root@ip-10-245-12-77:~# /opt/mapr/adminuiapp/webserver start

Starting adminuiapp server, logging to /opt/mapr/logs/adminuiapp.log
root@ip-10-245-12-77:~#

Restarting NFS Service

root@ip-10-245-12-77:~# maprcli node services -nfs start -nodes ip-10-244-129-15.us-west-2.compute.internal ip-10-244-131-141.us-west-2.compute.internal ip-10-244-164-169.us-west-2.compute.internal ip-10-244-165-232.us-west-2.compute.internal ip-10-244-45-60.us-west-2.compute.internal ip-10-245-12-77.us-west-2.compute.internal ip-10-245-14-103.us-west-2.compute.internal ip-10-245-7-200.us-west-2.compute.internal ip-10-245-8-49.us-west-2.compute.internal ip-10-245-8-52.us-west-2.compute.internal
root@ip-10-245-12-77:~#

Restart Services (TaskTracker)

root@ip-10-245-12-77:~# maprcli node services -nodes ip-10-244-129-15.us-west-2.compute.internal -tasktracker stop
root@ip-10-245-12-77:~# maprcli node services -nodes ip-10-244-129-15.us-west-2.compute.internal -tasktracker start
root@ip-10-245-12-77:~#

Give full permission to MapR administrator user

root@ip-10-245-12-77:~# id mapr
uid=2147483632(mapr) gid=2147483632(mapr) groups=2147483632(mapr),42(shadow)
root@ip-10-245-12-77:~# maprcli acl edit -type cluster -user mapr:fc
root@ip-10-245-12-77:~#

Alarms Email Notifications Setup

root@ip-10-245-12-77:~# maprcli alarm config save -values "AE_ALARM_AEQUOTA_EXCEEDED,1,Carlos.Morillo@maprtech.com"
root@ip-10-245-12-77:~# maprcli alarm config save -values "NODE_ALARM_CORE_PRESENT,1,Carlos.Morillo@maprtech.com"
root@ip-10-245-12-77:~#

Listing Alarms

root@ip-10-245-12-77:~# maprcli alarm list -type cluster
root@ip-10-245-12-77:~# maprcli alarm list -type node
root@ip-10-245-12-77:~#

Listing Nodes

root@ip-10-245-12-77:~# maprcli node list -columns id,h,hn,br,da,dtotal,dused,davail,fs-heartbeat
id                   davail  dused  bytesReceived  hostname                                      dtotal  health  fs-heartbeat  ip
8707346954164511835  116     170    1741           ip-10-244-129-15.us-west-2.compute.internal   287     0       0             10.244.129.15
453989218842577487   38      249    2465           ip-10-244-131-141.us-west-2.compute.internal  287     0       0             10.244.131.141
2892638075826172151  106     180    764            ip-10-244-164-169.us-west-2.compute.internal  287     0       0             10.244.164.169
8833235307193396272  109     173    2168           ip-10-244-165-232.us-west-2.compute.internal  283     0       0             10.244.165.232
6559272699074389504  99      187    2605           ip-10-244-45-60.us-west-2.compute.internal    287     0       0             10.244.45.60
9137989191756045555  107     175    1606           ip-10-245-12-77.us-west-2.compute.internal    283     0       0             10.245.12.77
7791110135751846418  39      247    1627           ip-10-245-14-103.us-west-2.compute.internal   287     0       0             10.245.14.103
1152291012558508871  101     185    764            ip-10-245-7-200.us-west-2.compute.internal    287     0       0             10.245.7.200
7482334955545014043  39      248    904            ip-10-245-8-49.us-west-2.compute.internal     287     0       0             10.245.8.49
4127302514082488703  86      197    19685          ip-10-245-8-52.us-west-2.compute.internal     283     0       0             10.245.8.52
root@ip-10-245-12-77:~# maprcli node list -columns id,br,fs-heartbeat,jt-heartbeat
id                   bytesReceived  hostname                                      jt-heartbeat  fs-heartbeat  ip
8707346954164511835  832            ip-10-244-129-15.us-west-2.compute.internal   2             0             10.244.129.15
453989218842577487   1897           ip-10-244-131-141.us-west-2.compute.internal  2             0             10.244.131.141
2892638075826172151  1749           ip-10-244-164-169.us-west-2.compute.internal  2             0             10.244.164.169
8833235307193396272  1521           ip-10-244-165-232.us-west-2.compute.internal  2             0             10.244.165.232
6559272699074389504  1812           ip-10-244-45-60.us-west-2.compute.internal    2             0             10.244.45.60
9137989191756045555  1038           ip-10-245-12-77.us-west-2.compute.internal    2             0             10.245.12.77
7791110135751846418  1084           ip-10-245-14-103.us-west-2.compute.internal   2             0             10.245.14.103
1152291012558508871  836            ip-10-245-7-200.us-west-2.compute.internal    2             0             10.245.7.200
7482334955545014043  1869           ip-10-245-8-49.us-west-2.compute.internal     2             0             10.245.8.49
4127302514082488703  19243          ip-10-245-8-52.us-west-2.compute.internal     2             0             10.245.8.52
root@ip-10-245-12-77:~#

Determining which nodes are running the Zookeeper service

Note that there is only one Zookeeper leader and the remaining Zookeeper nodes are followers.

root@ip-10-245-12-77:~/installer# parallel-ssh -Ph ./hostnames /etc/init.d/mapr-zookeeper qstatus
[1] 16:11:04 [FAILURE] ip-10-244-45-60.us-west-2.compute.internal Exited with error code 127
[2] 16:11:04 [FAILURE] ip-10-245-7-200.us-west-2.compute.internal Exited with error code 127
[3] 16:11:04 [FAILURE] ip-10-245-14-103.us-west-2.compute.internal Exited with error code 127
[4] 16:11:04 [FAILURE] ip-10-244-131-141.us-west-2.compute.internal Exited with error code 127
[5] 16:11:04 [FAILURE] ip-10-244-164-169.us-west-2.compute.internal Exited with error code 127
[6] 16:11:04 [FAILURE] ip-10-244-129-15.us-west-2.compute.internal Exited with error code 127
[7] 16:11:04 [FAILURE] ip-10-245-8-49.us-west-2.compute.internal Exited with error code 127
ip-10-244-165-232.us-west-2.compute.internal: Mode: leader
[8] 16:11:04 [SUCCESS] ip-10-244-165-232.us-west-2.compute.internal
ip-10-245-8-52.us-west-2.compute.internal: Mode: follower
[9] 16:11:04 [SUCCESS] ip-10-245-8-52.us-west-2.compute.internal
ip-10-245-12-77.us-west-2.compute.internal: Mode: follower
[10] 16:11:04 [SUCCESS] ip-10-245-12-77.us-west-2.compute.internal
root@ip-10-245-12-77:~/installer#

Adding a node (in this cluster the nodes running the Zookeeper service are also running the CLDB service)

root@newnode:~# /opt/mapr/server/configure.sh -Z ip-10-244-165-232.us-west-2.compute.internal,ip-10-245-12-77.us-west-2.compute.internal,ip-10-245-8-52.us-west-2.compute.internal -C ip-10-244-165-232.us-west-2.compute.internal,ip-10-245-12-77.us-west-2.compute.internal,ip-10-245-8-52.us-west-2.compute.internal
root@newnode:~# /opt/mapr/server/disksetup -F /tmp/disks.txt
root@newnode:~# /etc/init.d/mapr-warden start

Volumes

Creating a Volume

root@ip-10-245-12-77:~# maprcli volume create -name carlosvolume -path /carlosvolume -quota 1G -advisoryquota 200M
root@ip-10-245-12-77:~#

Creating a Mirror Volume

root@ip-10-245-12-77:~# maprcli volume create -name carlosvolume_mirror -source carlosvolume@my.cluster.com -path /carlosvolume_mirror -type 1
root@ip-10-245-12-77:~#

Listing Volumes

root@ip-10-245-12-77:~# maprcli volume list -columns volumeid,volumetype,volumename,mountdir,mounted,aename,quota,used,totalused,actualreplication,rackpath
quota  mountdir                                                              actualreplication  volumeid   aename  rackpath  used    mounted  volumename                                                       volumetype  totalused  aetype
1024   /carlosvolume                                                         ...                48119723   root    /data     0       1        carlosvolume                                                     0           0          0
0      /carlosvolume_mirror                                                  ...                219729268  root    /data     0       1        carlosvolume_mirror                                              1           0          0
0      /user/mapr                                                            ...                161101264  mapr    /data     597899  1        mapr                                                             0           597899     0
0                                                                            ...                1          mapr    /data     0       0        mapr.cldb.internal                                               0           0          0
0      /                                                                     ...                104597444  mapr    /data     0       1        mapr.cluster.root                                                0           0          0
0      /var/mapr/configuration                                               ...                121877052  mapr    /data     0       1        mapr.configuration                                               0           0          0
0      /hbase                                                                ...                234741384  mapr    /data     0       1        mapr.hbase                                                       0           0          0
0      /var/mapr/local/ip-10-244-129-15.us-west-2.compute.internal/logs      ...                206969086  mapr    /data     5       1        mapr.ip-10-244-129-15.us-west-2.compute.internal.local.logs      0           5          0
0      /var/mapr/local/ip-10-244-129-15.us-west-2.compute.internal/mapred    ...                243798871  mapr    /data     1       1        mapr.ip-10-244-129-15.us-west-2.compute.internal.local.mapred    0           1          0
0      /var/mapr/local/ip-10-244-129-15.us-west-2.compute.internal/metrics   ...                69263110   mapr    /data     96      1        mapr.ip-10-244-129-15.us-west-2.compute.internal.local.metrics   0           96         0
0      /var/mapr/local/ip-10-244-131-141.us-west-2.compute.internal/logs     ...                23205174   mapr    /data     2       1        mapr.ip-10-244-131-141.us-west-2.compute.internal.local.logs     0           2          0
0      /var/mapr/local/ip-10-244-131-141.us-west-2.compute.internal/mapred   ...                105260920  mapr    /data     1       1        mapr.ip-10-244-131-141.us-west-2.compute.internal.local.mapred   0           1          0
0      /var/mapr/local/ip-10-244-131-141.us-west-2.compute.internal/metrics  ...                19860697   mapr    /data     97      1        mapr.ip-10-244-131-141.us-west-2.compute.internal.local.metrics  0           97         0
0      /var/mapr/local/ip-10-244-164-169.us-west-2.compute.internal/logs     ...                212161365  mapr    /data     5       1        mapr.ip-10-244-164-169.us-west-2.compute.internal.local.logs     0           5          0
0      /var/mapr/local/ip-10-244-164-169.us-west-2.compute.internal/mapred   ...                162207671  mapr    /data     1       1        mapr.ip-10-244-164-169.us-west-2.compute.internal.local.mapred   0           1          0
0      /var/mapr/local/ip-10-244-164-169.us-west-2.compute.internal/metrics  ...                251008000  mapr    /data     99      1        mapr.ip-10-244-164-169.us-west-2.compute.internal.local.metrics  0           99         0
0      /var/mapr/local/ip-10-244-165-232.us-west-2.compute.internal/logs     ...                254163265  mapr    /data     0       1        mapr.ip-10-244-165-232.us-west-2.compute.internal.local.logs     0           0          0
0      /var/mapr/local/ip-10-244-165-232.us-west-2.compute.internal/metrics  ...                252158411  mapr    /data     97      1        mapr.ip-10-244-165-232.us-west-2.compute.internal.local.metrics  0           97         0
0      /var/mapr/local/ip-10-244-45-60.us-west-2.compute.internal/logs       ...                185745772  mapr    /data     5       1        mapr.ip-10-244-45-60.us-west-2.compute.internal.local.logs       0           5          0
0      /var/mapr/local/ip-10-244-45-60.us-west-2.compute.internal/mapred     ...                213209407  mapr    /data     1       1        mapr.ip-10-244-45-60.us-west-2.compute.internal.local.mapred     0           1          0
0      /var/mapr/local/ip-10-244-45-60.us-west-2.compute.internal/metrics    ...                211996945  mapr    /data     97      1        mapr.ip-10-244-45-60.us-west-2.compute.internal.local.metrics    0           97         0
0      /var/mapr/local/ip-10-245-12-77.us-west-2.compute.internal/logs       ...                111775179  mapr    /data     0       1        mapr.ip-10-245-12-77.us-west-2.compute.internal.local.logs       0           0          0
0      /var/mapr/local/ip-10-245-12-77.us-west-2.compute.internal/metrics    ...                233931728  mapr    /data     97      1        mapr.ip-10-245-12-77.us-west-2.compute.internal.local.metrics    0           97         0
0      /var/mapr/local/ip-10-245-14-103.us-west-2.compute.internal/logs      ...                251542201  mapr    /data     2       1        mapr.ip-10-245-14-103.us-west-2.compute.internal.local.logs      0           2          0
0      /var/mapr/local/ip-10-245-14-103.us-west-2.compute.internal/mapred    ...                160008303  mapr    /data     1       1        mapr.ip-10-245-14-103.us-west-2.compute.internal.local.mapred    0           1          0
0      /var/mapr/local/ip-10-245-14-103.us-west-2.compute.internal/metrics   ...                73005604   mapr    /data     96      1        mapr.ip-10-245-14-103.us-west-2.compute.internal.local.metrics   0           96         0
0      /var/mapr/local/ip-10-245-7-200.us-west-2.compute.internal/logs       ...                66440508   mapr    /data     5       1        mapr.ip-10-245-7-200.us-west-2.compute.internal.local.logs       0           5          0
0      /var/mapr/local/ip-10-245-7-200.us-west-2.compute.internal/mapred     ...                87429862   mapr    /data     1       1        mapr.ip-10-245-7-200.us-west-2.compute.internal.local.mapred     0           1          0
0      /var/mapr/local/ip-10-245-7-200.us-west-2.compute.internal/metrics    ...                247661324  mapr    /data     97      1        mapr.ip-10-245-7-200.us-west-2.compute.internal.local.metrics    0           97         0
0      /var/mapr/local/ip-10-245-8-49.us-west-2.compute.internal/logs        ...                159900756  mapr    /data     5       1        mapr.ip-10-245-8-49.us-west-2.compute.internal.local.logs        0           5          0
0      /var/mapr/local/ip-10-245-8-49.us-west-2.compute.internal/mapred      ...                141734370  mapr    /data     1       1        mapr.ip-10-245-8-49.us-west-2.compute.internal.local.mapred      0           1          0
0      /var/mapr/local/ip-10-245-8-49.us-west-2.compute.internal/metrics     ...                59237315   mapr    /data     97      1        mapr.ip-10-245-8-49.us-west-2.compute.internal.local.metrics     0           97         0
0      /var/mapr/local/ip-10-245-8-52.us-west-2.compute.internal/logs        ...                69920939   mapr    /data     0       1        mapr.ip-10-245-8-52.us-west-2.compute.internal.local.logs        0           0          0
0      /var/mapr/local/ip-10-245-8-52.us-west-2.compute.internal/metrics     ...                26655377   mapr    /data     96      1        mapr.ip-10-245-8-52.us-west-2.compute.internal.local.metrics     0           96         0
0      /var/mapr/cluster/mapred/jobTracker                                   ...                162832045  mapr    /data     0       1        mapr.jobtracker.volume                                           0           0          0
0      /var/mapr/metrics                                                     ...                157755141  mapr    /data     0       1        mapr.metrics                                                     0           0          0
0      /var/mapr                                                             ...                129618546  mapr    /data     0       1        mapr.var                                                         0           0          0
0      /tera.in                                                              ...                225962280  root    /data     262621  1        tera.in                                                          0           262621     0
0      /tera.out                                                             ...                62181860   root    /data     1       1        tera.out                                                         0           1          0
0      /user                                                                 ...                250577554  mapr    /data     0       1        users                                                            0           0          0
root@ip-10-245-12-77:~#

Volume Properties

root@ip-10-245-12-77:~# maprcli volume info -name carlosvolume
numreplicas  schedulename  volumeid  rackpath  volumename    used  volumetype  aetype  creator  advisoryquota  snapshotcount  quota  mountdir       scheduleid  snapshotused  nameContainerSizeMB  replicationtype  maxinodesalarmthreshold  minreplicas  acl                                                                                  actualreplication  aename  needsGfsck  partlyOutOfTopology  mounted  logicalUsed  readonly  totalused
3                          48119723  /data     carlosvolume  0     0           0       root     200            0              1024   /carlosvolume  0           0             0                    high_throughput  0                        2            {"acl":{"Principal":"User root","Allowed actions":["dump","restore","m","d","fc"]}}  ...                root    false       0                    1        0            0         0
root@ip-10-245-12-77:~# maprcli volume info -output terse -name carlosvolume
qta   rp     ro  mrf  aqt  dsu  nfsck  id        ssu  arf  drf  tsu  on    miath  aen   sid  sn  acl                                                    mt  n             sc  poot  dcr              dlu  t  p              ncsmb  aet
1024  /data  0   2    200  0    false  48119723  0    ...  3    0    root  0      root  0        {"acl":{"User root":["dump","restore","m","d","fc"]}}  1   carlosvolume  0   0     high_throughput  0    0  /carlosvolume  0      0
root@ip-10-245-12-77:~#

Mount/Unmount Volume

root@ip-10-245-12-77:~# hadoop fs -ls maprfs:///
Found 7 items
drwxr-xr-x   - root root          0 2013-03-27 17:33 /carlosvolume
drwxrwxrwx   - root root          0 1970-01-01 00:00 /carlosvolume_mirror
drwxr-xr-x   - mapr mapr          6 2013-02-21 17:50 /hbase
drwxr-xr-x   - root root          1 2013-03-11 14:07 /tera.in
drwxr-xr-x   - root root          1 2013-03-11 14:54 /tera.out
drwxr-xr-x   - mapr mapr          1 2013-02-21 18:23 /user
drwxr-xr-x   - mapr mapr          1 2013-02-21 17:42 /var
root@ip-10-245-12-77:~# maprcli volume unmount -name carlosvolume
root@ip-10-245-12-77:~# hadoop fs -ls maprfs:///
Found 6 items
drwxrwxrwx   - root root          0 1970-01-01 00:00 /carlosvolume_mirror
drwxr-xr-x   - mapr mapr          6 2013-02-21 17:50 /hbase
drwxr-xr-x   - root root          1 2013-03-11 14:07 /tera.in
drwxr-xr-x   - root root          1 2013-03-11 14:54 /tera.out
drwxr-xr-x   - mapr mapr          1 2013-02-21 18:23 /user
drwxr-xr-x   - mapr mapr          1 2013-02-21 17:42 /var
root@ip-10-245-12-77:~# maprcli volume mount -name carlosvolume
root@ip-10-245-12-77:~# hadoop fs -ls maprfs:///
Found 7 items
drwxr-xr-x   - root root          0 2013-03-27 17:33 /carlosvolume
drwxrwxrwx   - root root          0 1970-01-01 00:00 /carlosvolume_mirror
drwxr-xr-x   - mapr mapr          6 2013-02-21 17:50 /hbase
drwxr-xr-x   - root root          1 2013-03-11 14:07 /tera.in
drwxr-xr-x   - root root          1 2013-03-11 14:54 /tera.out
drwxr-xr-x   - mapr mapr          1 2013-02-21 18:23 /user
drwxr-xr-x   - mapr mapr          1 2013-02-21 17:42 /var
root@ip-10-245-12-77:~#

Removing a Volume

root@ip-10-245-12-77:~# hadoop fs -ls maprfs:///
Found 8 items
drwxr-xr-x   - root root          0 2013-03-27 17:33 /carlosvolume
drwxrwxrwx   - root root          0 1970-01-01 00:00 /carlosvolume_mirror
drwxr-xr-x   - mapr mapr          6 2013-02-21 17:50 /hbase
drwxr-xr-x   - root root          1 2013-03-11 14:07 /tera.in
drwxr-xr-x   - root root          1 2013-03-11 14:54 /tera.out
drwxr-xr-x   - root root          0 2013-03-27 17:59 /testvolume
drwxr-xr-x   - mapr mapr          1 2013-02-21 18:23 /user
drwxr-xr-x   - mapr mapr          1 2013-02-21 17:42 /var
root@ip-10-245-12-77:~# maprcli volume remove -name testvolume
root@ip-10-245-12-77:~# hadoop fs -ls maprfs:///
Found 7 items
drwxr-xr-x   - root root          0 2013-03-27 17:33 /carlosvolume
drwxrwxrwx   - root root          0 1970-01-01 00:00 /carlosvolume_mirror
drwxr-xr-x   - mapr mapr          6 2013-02-21 17:50 /hbase
drwxr-xr-x   - root root          1 2013-03-11 14:07 /tera.in
drwxr-xr-x   - root root          1 2013-03-11 14:54 /tera.out
drwxr-xr-x   - mapr mapr          1 2013-02-21 18:23 /user
drwxr-xr-x   - mapr mapr          1 2013-02-21 17:42 /var
root@ip-10-245-12-77:~#

Later on I will cover the MapR CLI showing examples of how to use it with Mirrors, Schedules and Snapshots.

I hope you enjoy it and find this information useful.

1 Comment

Platfora just announced a business intelligence platform for Hadoop  that immediately and seamlessly transforms raw Hadoop data into interactive, in-memory business intelligence, reducing the dependence companies have on complex and costly data warehouse architectures.  The combination of MapR and Platfora will help bring Hadoop to everyday business users, who will now be able to interactively explore, visualize, and analyze data instantaneously.

With its easy-to-use software product, Platfora will let users easily manipulate data sets to show clear and valuable graphics. Our partnership will enable new use cases that clients can derive real and substantial business value.

For more information on Platfora, click here to see their full news release.

Leave a comment

MapR secured $30 million in series C financing to accelerate global expansion and continue our segment-leading product development.  Mayfield Fund, which led the funding round, joins our existing investors Lightspeed Venture Partners, NEA and Redpoint Ventures in this round, bringing total funds raised to $59 million.

The new funding will back our growing presence in EMEA and will extend our global reach to Asia Pacific in the near future.  MapR will continue to invest in the research and development of the MapR world-record setting, enterprise-grade distribution for Hadoop that offers IT organizations the only Hadoop platform with full data protection, no single points of failure, improved performance and dramatic ease of use advantages.

Arik Hesseldahl of All Things D talked with MapR co-founder and CEO John Schroeder about this news.  Read the article hereGigaOm, Venture Beat and Silicon Angle also provided their spin on our news.

Click here to read the news release.

Leave a comment

Overview

Running MapReduce jobs on ingested data is traditionally batch-oriented: the data must be first transferred to a local file system accessible to the Hadoop cluster, then copied into HDFS with Flume or the “hadoop fs” command. Only once the transfers are complete can MapReduce be run on the ingested files.

What if you wanted to run MapReduce on ingested data, directly from its source application, without modifying the source application or waiting until ingest is complete? What if rather than waiting for a lengthy ingest to complete, you could run a series of MapReduce jobs on the data in near-real-time, only processing the newest data as it arrives?

With the MapR distribution of Hadoop, the Direct Access NFS™ feature provides this capability, along with some straight-forward MapReduce code to skip previously processed data in the input files. This article describes how this can be accomplished, and uses the WordCount program to demonstrate this feature.

MapR Direct Access NFS™

One of the most useful features of MapR’s distribution of Hadoop is Direct Access NFS. This allows any machine with NFS client software to “mount” the Hadoop cluster into the local file system tree. NFS client machines can be running Linux, Unix, Macintosh, Windows, or any other operating system that supports NFS version 3. Mounting the cluster via NFS provides the following capabilities:

  • All existing applications can read and write data directly from/to the Hadoop distributed file system (completely transparent to the application – it’s just POSIX I/O!)
  • All applications have complete read/write random and concurrent access, meaning Hadoop is no longer write-once or append-only
  • Inside the Hadoop cluster, the data is immediately available for processing while it’s streaming in

In other words, now we can take data directly from its source application outside the cluster into Hadoop without intermediate steps and added complexity. The next step is to run MapReduce on the data as it’s streaming into the cluster.

Running MapReduce on Input Files While Being Ingested

To get some intermediate MapReduce results on the data as it’s being ingested, we want to run MapReduce repeatedly on the same input files, each time skipping over the previously-processed sections of the file, and accumulate the intermediate results for aggregation at the end of the ingest. In order to accomplish this, information has to be persisted that tracks which sections of the input files have already been processed—Hadoop breaks up the input files into splits, which are the obvious choice for sections to track. By the way, MapR refers to this as chunk size; a chunk defaults to 256MB, but can be easily tuned at the directory level.

The Atlantbh blog has a great entry on one method for performing this persistence:

http://www.atlantbh.com/hadoop-identifying-and-skipping-processed-data/

The idea here is to override some key MapReduce methods and use the SplitID as the unit of persistence. A SplitID contains the following data:

  • Pathname of input file
  • Starting offset
  • Length of split

We can override methods to accomplish the following:

  • Persist the SplitID to a sub-directory of the MapReduce job output directory once the Map task successfully completes
  • Pre-process the list of splits passed to the MapReduce jobs, looking up each split in our persisted sub-directory
  • Bypass the usual MapReduce check for pre-existing output directory, to allow multiple job runs using the same output directory
  • And finally, the main() method needs to be overridden in order to set up the above overridden methods as part of the job initialization

In other words, without changing the original source code, a new Java class can be written that extends the original MapReduce application class, and this new MapReduce application can be run repeatedly as the files are streaming in. The flow would look like this:

While (file is still being streamed in)
     hadoop jar newjob.jar newapp input output
     Save off results file from output/part*
End

The remainder of this blogs describes how this can be demonstrated with the WordCount program that is distributed with Hadoop, along with a test program that can be used to ingest data over NFS into the WordCount MapReduce job. All source code is available here.

Demonstrating the Solution Using WordCount

I borrowed code from the Atlantbh website and integrated it into the canonical WordCount Hadoop program. The key ingredient here is how the processed SplitIDs are persisted with the use of something called Hadoop task side-effect files. Task side-effect files are used when MapReduce jobs want to output additional data from MapReduce tasks, but only do so upon successful completion of the task.

The source code for WordCount used in this demonstration is available in the following directory on a MapR node:

/opt/mapr/pig/pig*/test/org/apache/pig/test/utils/WordCount.java

The WordCount class does not need to be modified – instead it was extended by a new class called WordCountNew. This new class contains the necessary code that persists the split information for each split processed into the output directory. Using this approach, other MapReduce programs can be similarly enhanced to use this ingest feature.

A set of scripts and some driver code was also written to wrap all of this up into a complete demonstration package. First, a C program was written that simulates an application that is writing data directly into the Hadoop cluster using an NFS mount. It simply writes a known pattern so that it’s easy to check whether WordCount is getting the correct answer. It could also copy a given file to the cluster for input. The C program also has options for inserting a delay to demonstrate on slow VMs running on laptops, for example.

We could also use a simple ‘cat’ or ‘cp’ or even drag-and-drop files into the input directory, but for demo purposes on a single laptop VM, we wanted to slow down the ingest so that we could keep up with the ingest rate with our MapReduce job.

As the data is streaming, WordCountNew is run on the input directory a number of times. After each MapReduce job, the output file that contains the results from the latest ingested portion of the file is saved off for final aggregation. This is simply a matter of rolling over the outputdir/part_r_00000 file into a unique file name such as output-pass.0, output-pass.1, etc. Once no more new data is detected in the input file, WordCountNew is run one final time, passing in a property that tells it to process any remaining data at the end of the file.

This results in several output files, each showing the results of each pass.

Finally, in order to aggregate the output from each pass, a simple MapReduce program called WordCountFinal takes the output directory as its input, and comes up with a final WordCount. This last job is very fast since we don’t have to process the potentially large input file, but instead process the results of each pass.

Here are the contents of the source code bundle, as well as instructions on how to build and run the demo:

Makefile

Make file to build the C program for streaming in the data via NFS.

text_stream.c

Program that can either stream in a known pattern, or an input file, inserting delays to slow the rate of ingest for demo purposes.

const4.txt

A 20MB input file that can be used for the demo. It’s basically many copies of the US Constitution in text form.

text_stream.sh

Script that streams the data in over NFS (I use my Mac for this). Here is a sample run:

# ./text_stream -I const4.txt -f \
/mapr/my.cluster.com/user/mapr/input/testfile -i 0 \
-l 20 -c 32k -d 250000

Simply put, it writes the local constitution file into the input directory of the mapr user on the cluster, 32KB chunks at a time, inserting 250ms delays. Note the NFS path that it is writing to. This is the beauty of MapR Direct Access NFS™. No Java code must be written or configured to get data directly into the cluster. Any POSIX program can access data in the cluster.

WordCountNew.java

This is the new class that extends the sample WordCount program that comes with Hadoop, modified to do streaming input.

The following methods were overridden to make this work:

public static void main(String[] args) throws Exception {
     This is the main() method from the original WordCount.java, with a
     few changes to pass the below overridden classes to Hadoop.
protected void setup(Mapper.Context context){
     Gets the split ID and persists it to the output directory
     under .meta/splits/input-file-path_offset_length
public void checkOutputSpecs(JobContext job)
     Overridden to remove the check for pre-existing output
     directory - necessary to allow repeated MR runs
public List<InputSplit> getSplits(JobContext job) throws IOException {
     TextInputFormat method to strip out already-processed splits

These are the methods used to persist the processed splits and to strip out processed splits:

public static void saveInputSplitId(Mapper.Context context, String splitId)

public static List<InputSplit> filterProcessedSplits(List<InputSplit>inputSplits, JobContext job) throws IOException {

WordCountFinal.java

This is the MR job to run at the end, to aggregate the intermediate word count outputs, and it uses the output directory of the intermediate runs as input.

wc-loop.sh

This script runs the demo. Once the data is streaming in, run this script inside the cluster. It assumes the cluster is also NFS-mounted on the node from which the script runs. Assuming the MapR NFS Gateway service is running on this node, mount it like this:

$ sudo mount -o nolock,actimeo=1 localhost:/mapr /mapr

On the cluster, you can run the wc-loop.sh script as follows:

$ ./wc-loop.sh

The script is run from the same location as the jar file that contains the compiled code. After each pass through the file being streamed in, it appends the top 10 words from that pass to /tmp/wc.passes.

While this is running, you can watch the results as the jobs complete:

$ tail -f /tmp/wc.passes

This is the hadoop job that is run on each pass through wc-loop.sh:

$ hadoop jar wc.jar WordCountNew input output

When wc-loop.sh sees no changes to the input file, it assumes it’s finished streaming and runs one last pass, processing any partial chunk at the end that may have been skipped.

This is the aggregation pass at the end. Note it used the above output directory as input, since we saved off each iteration’s results:

$ hadoop jar wc.jar WordCountFinal output output-final

It then shows the aggregated top 10 words.

Lastly, it runs a diff to be sure the results are identical to the original non-streaming wordcount job, which was previously run and saved off into a directory called output-orig-wordcount.

rollit.sh

This script is invoked by wc-loop.sh and rolls over the output files (part_m_00000) so we can aggregate at the end with WordCountFinal.java

Compiling Java

To compile the Java code and build the wc.jar file:

$ mkdir wc_classes
$ javac -classpath `hadoop classpath` -d wc_classes \ 
/opt/mapr/pig/pig*/test/org/apache/pig/test/utils/  \ Word Count.java
$ javac -classpath `hadoop classpath`:wc_classes -d \
wc_classes WordCountNew.java
$ javac -classpath `hadoop classpath`:wc_classes -d \
wc_classes WordCountFinal.java
$ jar cvf wc.jar -C wc_classes .

Output of WordCountNew

Here is what the output directory looks like after several passes:

$ pwd
/mapr/my.cluster.com/user/mapr/output
$ ls -la
total 119
drwxr-xr-x 4 mapr mapr   9 Jan 17 09:29 .
drwxrwxrwx 33 mapr mapr  41 Jan 17 09:32 ..
drwxr-xr-x 3 mapr mapr   1 Jan 17 09:26 _logs
drwxr-xr-x 3 mapr mapr   1 Jan 17 09:26 .meta
-rwxr-xr-x 1 mapr mapr 19286 Jan 17 09:26 output.pass-1
-rwxr-xr-x 1 mapr mapr 19595 Jan 17 09:26 output.pass-2
-rwxr-xr-x 1 mapr mapr 19624 Jan 17 09:27 output.pass-3
-rwxr-xr-x 1 mapr mapr 19595 Jan 17 09:28 output.pass-4
-rwxr-xr-x 1 mapr mapr 20709 Jan 17 09:28 output.pass-5
-rwxr-xr-x 1 mapr mapr 18990 Jan 17 09:29 output.pass-6
-rwxr-xr-x 1 mapr mapr   0 Jan 17 09:26 _SUCCESS

Each output.pass-N file is a copy of the MR job output file (part_r_00000). Note the .meta directory. This is where the process split information is persisted:

$ cd .meta
$ ls -l
total 1
drwxr-xr-x 2 mapr mapr 20 Jan 17 09:29 splits
$ cd splits/
$ ls -l
total 0
-rwxr-xr-x 1 0 Jan 17 09:26 maprfs:_user_mapr_input_testfile:0+1048576
-rwxr-xr-x 1 0 Jan 17 09:28 maprfs:_user_mapr_input_testfile:10485760+1048576
-rwxr-xr-x 1 0 Jan 17 09:26 maprfs:_user_mapr_input_testfile:1048576+1048576
-rwxr-xr-x 1 0 Jan 17 09:28 maprfs:_user_mapr_input_testfile:11534336+1048576
-rwxr-xr-x 1 0 Jan 17 09:28 maprfs:_user_mapr_input_testfile:12582912+1048576
...

Each empty file contains the name of the input file, the offset to the start of the split, and the length of the split. It’s essentially the SplitID. 1 MB split sizes were used to make the demo easier to run on a slow VM, so this was done to the input directory before streaming the testfile in:

$ hadoop mfs -setchunksize 1048576 input

Debug Messages

As the job runs, it shows messages about what splits have already been processed and skipped, and what splits will be processed on this run. For example:

************* filterProcessedSplits - splitsDir is output/.meta/splits
Input file is maprfs:/user/mapr/input/testfile, chunksize is 1048576
Look for previously processed splits...
split already processed :maprfs:_user_mapr_input_testfile:1048576+1048576
split already processed:maprfs:_user_mapr_input_testfile:0+1048576
Get unprocessed splits...
   Unprocessed split:2097152/1048576
   Unprocessed split:3145728/1048576
   Unprocessed split:4194304/1048576
   Unprocessed split:5242880/1048576
   Unprocessed split:6291456/1048576
   Unprocessed split:7340032/1048576
   Unprocessed split:8388608/1048576
PARTIAL SPLIT - start is 9437184, length is 655360, do not process

Summary

With MapR’s Direct Access NFS feature, MapReduce can be run in near-realtime on the data as it’s being ingested directly from its source, and do so reliably, without having to wait until the stream ends and the file is closed.

2 Comments

Gartner Analyst Merv Adrian weighed in on “Hadoop Open Source, ‘Purity,’ and Market Realities” in his blog over the weekend.  He writes:

“The fact is, there is an entire industry building products atop Apache open source code – and that is the point of having Apache license its projects and provide the other services it does for the open source community…

“Having some components of your solution stack provided by the open source community is a fact of life and a benefit for all. So are roads, but nobody accuses Fedex or your pizza delivery guy of being evil for using them without contributing some asphalt,”  says Adrian.

One aspect I’d like to highlight is the importance of ‘standard’ interfaces, defined through community consensus, and enforced by the Apaches and the likes.I think it makes perfect sense to offer a commercial implementation that is superior to the implementation you get ‘for free’ – as long as you’re 100% compatible with the community-defined standard.

Leave a comment

Hadoop users were excited to see the real-time Hadoop analytics demonstration at the Strata Conference in Santa Clara.  By streaming the #strataconf twitter hashtag directly into a cluster during the conference, MapR displayed two real-time tag clouds showing a word bubble with the most frequently used words in conference tweets and a user name cloud of top tweeters.  Watching the information change proved mesmerizing for some.

One Platform with Streaming Writes

How did we do this?   By bringing MapR and Storm together to capitalize on their strengths.

Real-time analytics are becoming common-place in businesses today.  Data sources include social media, stock tick data, network sensors, payments, and ad impressions.  Rarely does one tool fit all of these use cases, data feeds, and analysis needs of today’s enterprises.  Hadoop’s venerable MapReduce framework has proven its worth at scale, but it comes with a price:  higher latency.  For interactive querying needs, or even moreso, real-time stream computation requirements, traditional Hadoop distributions haven’t played much of a role, and need to be augmented with other solutions.  However, the characteristics of MapR’s data platform allow us to interact with these lower-latency systems.

Take Storm for example.  Storm was written by Nathan Marz at Backtype/Twitter and is used as a continuous, distributed stream computation engine for the massive amounts of tweets they need to process.  Much like Hadoop, Storm hides the complexity of these systems, and allows you to focus on your business problem, not the underlying system.  Usually, Storm gets its data from a queuing system like Kafka or Kestrel.  One of the most common things to do at the end of the real-time workflow is to write the raw data to Hadoop for batch analysis in a less time-sensitive setting.

Delivering Streaming Writes for Realtime Applications

Writing directly to HDFS would be too slow for the real-time computation workflow, but writing to MapRFS has some interesting possibilities.  Because of its true read/write nature, MapRFS allows us to get rid of the queuing system and do publish-subscribe models within the data platform.  Storm can then ‘tail’ a file to which it wishes to subscribe, and as soon as new data hits the file system, it is injected into the Storm topology.  This allows for strong Storm/Hadoop interoperability, and a unification & simplification of technologies onto one platform.

Leave a comment

This was an exciting week at the Strata Big Data conference. Our CEO, John Schroeder delivered a short keynote, Ted Dunning presented on moving Beyond Hadoop and included a glimpse of real-time streaming with MapR and Storm integration. I also presented an overview of Apache Drill in a standing room only session.

There have also been interesting news announcements including the world record MinuteSort record set with MapR on the Google Compute Engine and Hadoop distribution announcements from EMC and Intel. We’re happy to see the new Hadoop announcements because they further validate the importance of the Hadoop market and put the focus on differentiation across the various distributions. I think it’s interesting that Cloudera and Hortonworks quickly turned to their blogs to discredit these announcements. In their blog posts they both talked about the history of Hadoop and how many Apache committers they employ, but neither talked about differentiated functionality or providing value to customers.

One could claim that neither Intel nor EMC provided any significant value add for Hadoop. Intel announced a distribution inclusive of Apache Hadoop with plans to enhance security in the future, while Greenplum announced yet another SQL-on-Hadoop project (really just the ability to run its legacy Greenplum database on HDFS instead of XFS) on top of their existing Greenplum HD distribution. However, you could also look at these and claim that there is very little gap between these announced distributions and the distributions from Hortonworks and Cloudera.

Packaging and supporting an open source project and providing a management suite is not rocket science, so I would not be surprised to see even more companies announce their own Hadoop distributions. Developing the technology to address the platform’s limitations and provide a true enterprise-grade solution is much harder, and MapR is currently unique positioned as the only company that has been able so.

MapR’s relentless focus on addressing the core issues and limitations of Hadoop and providing the best support to customers is what truly distinguishes MapR from all other distributions. For example, disaster recovery requires more than an audit trail showing that you’ve lost data, or a GUI wrapper for the MapReduce-based distcp command. You need snapshots and mirroring to support recovery point and recovery time objectives. The announcements from Intel and EMC and the subsequent blog posts from Cloudera and Hortonworks make MapR’s differentiated features stand out much more prominently. Whether there are three distributions on the market or eight distributions, MapR is still the only distribution that provides random read/write support, NFS access, multi-tenancy, a no-NameNode architecture, JobTracker HA, snapshots, mirroring and best-in-class management and performance.

To summarize, there’s a reason MapR has more production deployments than any other Hadoop distribution.

Welcome to the new entrants.

Leave a comment

Gone in 60 seconds! Breaking the MinuteSort Record

Yuliya Feldman, Amit Hadke, Gera Shegalov, M. C. Srivas

Introduction

The MinuteSort test measures how much data a system can sort in 1 minute. The test requires that a random sequence of 100-byte records, each consisting of a 10-byte key and 90 bytes of payload, be arranged in either ascending or descending order. The earlier record had sorted 14 billion records totaling 1400 gigabytes in 60 seconds. The web site sortbenchmark.org keeps track of all such records.

Since MapR (with Google’s help) broke through the first Hadoop barrier of sorting a terabyte in 60 seconds, we decided to try our hand at breaking the MinuteSort record using Hadoop.

We wish to point out that Hadoop is a general purpose framework, and is not optimized for this single problem. For instance, with MapR’s Hadoop distribution there are a lot of places where data is check-pointed to improve redundancy. In addition, there’s a lot of cluster management traffic that tries to detect failures and lots of health-checks running through the system. Finally, MapR’s software typically runs with all logging and reporting enabled at all times. We didn’t turn any of it off during this benchmark.

Trying to set a new record competing against special-purpose sorting frameworks is a much tougher proposition.

Announcing the New Record

Using the Google Compute Engine, running on 2103 instances of the type n1-standard-4-d, MapR was able to sort 15 billion 100-byte records totaling 1.5 terabytes of data in 59 seconds. It is also a testament to the high quality and uniformity of the compute environment that Google graciously made available to us. The record was set on the Google Compute Engine shared virtualized environment that was not altered or cordoned-off or isolated in any shape or form.

An n1-standard-4-d instance consists of 4 virtual cores with 15G of DRAM and 1 virtual Ethernet NIC, and 1 virtual local disk of size 1.7T. On the Google Compute Engine, 2 virtual cores are equivalent to 1 physical core, so this setup used about 4206 physical cores. Of the 2103 virtual nodes, 3 nodes ran the CLDB processes, 1 ran the JobTracker, and the remaining 2099 nodes ran TaskTrackers.

Divvying up the Map-Reduce stages

We ran 1 mapper and 1 reducer per node, for a total of 2099 mappers and 2099 reducers. Assuming a uniform distribution of data across the nodes, each map task reads about 714MB of data from disk, sorts it, and writes 2099 partitions of approx. 341KB each. In the MapR Hadoop implementation, each partition is a separate file, so the process creates about 4.4 million files in 10 seconds.

Next, each reducer fetches its partition file from each of the 2099 nodes, running a 2099-way merge to combine them to produce its result. The process is called the shuffle phase in map-reduce.That’s about 4.4 million files of 341KB being read and transferred over the network in about 20 seconds.

Note that the reducer cannot start its merge until all the map outputs have been fetched. Thus, the shuffle can be broken naturally into a “fetch” stage, followed by a merge-sort phase.

Given the budget of 60 seconds, we allocated timings for each phase as follows:

Map 18 seconds
Fetch 20 seconds
Merge-sort 17 seconds
Straggler overhead 5 seconds

The stragglers are tasks that linger longer and finish much later than the majority of the tasks. A single straggler task among the 4198 tasks may take up an extra 8 seconds to finish, and can hold up the entire works and impact the final result.

These stragglers end up governing the overall speed of the process since they consume the most amount of time. In thermodynamics parlance: the rate of the reaction is governed by the slowest step in the reaction, called the rate-determining step of the reaction. We budgeted an extra 5 seconds of time for the stragglers – our rate determining step – to finish.

Initial Attempts

The first few attempts at sorting 1.5TB started coming in about 69-70 seconds, with the map-stragglers taking 25 seconds, and reduce-stragglers taking 25 seconds. We noticed that between each of the different phases in the table above the entire cluster went idle for 2-4 seconds, for a total idle-time of about 7-9 seconds. Eliminating any idle-time became the main goal of our approach.

Overlapping Map and Fetch

The first (and obvious) thing to try was to overlap the map phase with the fetch phase. Each mapper produced 2099 files, but the reducers do not start fetching data until the map is completely done. This is because the Hadoop framework distributes completion notifications on a task-boundary, with no visibility inside partially-completed tasks.

So we hacked the Hadoop JT/TT framework to handle partial map completions. Remember that each mapper produces about 2099 intermediate files, one for each reducer. We changed the task-completion protocol to include partial results; for example, a mapper could send a notification every time it wrote another 100 files.

The problem was that the reducer was still not getting notified quickly enough, because of the heart-beat (HB) mechanism between the JobTracker (JT) and TaskTracker (TT) that is used for notifications. By default on a cluster of 2100 nodes the HBs are sent out every 3 seconds. This meant that a reducer may not notice for up to 6 seconds that a mapper’s outputs were ready for processing. Reducing the HB was tricky, as the HB processing in the JT holds a global-lock. When the HB’s start coming in too frequently, the lock-contention starts to dominate and normal work comes to a halt.

We reorganized the JT code to do most processing outside the global lock, entering the lock only at the last moment when everything was ready to be added to the JT’s global data structures. The change let us reduce the HB to 800 milliseconds on the 2100 node cluster, thereby speeding up the notifications by 4x.

Although we had a good overlap between the mapper and reducer, we didn’t see much improvement in the overall completion time. The merges at the reducer couldn’t be started until all the data had arrived.

This code is still probably quite useful, so we’ve kept it around. It’s not clear if this tweak is even possible with the Apache Hadoop, as it takes advantage of some unique features of MapR to reduce the amount of data the JobTracker has to track for partial mapper intermediate results.

Multi-threaded Reduce

What we noticed was that during the merge-sort in the reduce phase, one of the 4 virtual cores was pegged at 100% busy while the other cores were idle. We decided to find a way to harness all the virtual cores, calculating that the overall elapsed time would thereby improve.

Sorting is single-threaded. However if the data structures can be broken into separate independent pieces, then the sorting can be farmed out to multiple cores, and then their results can be combined using a final merge.

The obvious way to parallelize the reducers is to increase the number of reducers per node from 1 to 4. But it would cause an explosion in the number of files to be processed: the total files that had to be created/written/opened/read/deleted in 20 seconds would go from 4.4 million to 17.6 million. The other problem is that the file-server also needs some cycles to run, so we decided to create only 3 map outputs instead of 4 to save some CPU cycles for the file-server, but send the map outputs in a single stream to the reducer.

At the map side, the mapper was already multi-threaded due to our previous work, sorting the map inputs in parallel. To allow the reducers to merge in parallel, we combined the output of 3 mappers into 1 stream, and sent it over to the reducers. At each reducer, the stream was separated back into the 3 maps outputs, and merged in parallel in 3 threads. Note that the Hadoop framework was already set up to work in this fashion, it just needed a little tweak to allow for the single stream transfer for multiple reducer inputs.

Again, it’s unclear if this tweak is even possible with the Apache Hadoop, for the same reasons that it uses some unique features of MapR.

With this change, the run time came down from 70 seconds to 62 seconds. We were almost home!

Other Minor Changes

Among the observations, we noticed that the file-server hosting the partition list was getting hit very hard. We watched the GUI Heatmap using the CPU profile view … this file-server would light up bright red throughout any run. Every map task on the 2099 nodes was reading this partition list file at exactly the same moment. We increased the replication factor for the volume holding the partition list to 6, and the file-server bottleneck disappeared. The map stragglers disappeared as a result. It saved us a precious second or two.

We also noticed that the comparator in the quick-sort on the map-side wasn’t doing as good a job as we’d liked. The comparator by default “caches” the first 4 bytes of every key in a 32-bit integer and does a raw comparison on that before looking the entire key. Given that we were running on 64-bit processors, we increased this cache size to 8 bytes per key, and the accuracy of the comparator increased significantly for almost no cost. This change saved us another second! The variable we introduced to play around with this is mapr.map.keyprefix.ints

Final Result

After these changes, we were able to reproduce the result of sorting 1.5 terabytes of data in less than 1 minute consistently across multiple runs and multiple cluster configurations, thanks to the consistent and uniform performance of the Google Compute Environment. We recorded a few runs, and have included the JobTracker’s output showing the results from one such run. The job.xml used to run this is also included in this report.

Hadoop Job job_201301281929_0028 on History Viewer

User: yufeldman
JobName: TeraSort
JobConf: maprfs:/var/mapr/cluster/mapred/jobTracker/staging/yufeldman/.staging/job_201301281929_0028/job.xml
Job-ACLs:
mapreduce.job.acl-view-job: All users are allowed
mapreduce.job.acl-modify-job: All users are allowed
Submitted At: 30-Jan-2013 10:07:09
Launched At: 30-Jan-2013 10:07:09 (0sec)
Finished At: 30-Jan-2013 10:08:09 (59sec)
Status: SUCCESS
Analyse This Job


Kind Total Tasks(successful+failed+killed) Successful tasks Failed tasks Killed tasks Start Time Finish Time
Setup 0 0 0 0
Map 2099 2099 0 0 30-Jan-2013 10:07:10 30-Jan-2013 10:07:40 (29sec)
Reduce 2099 2099 0 0 30-Jan-2013 10:07:10 30-Jan-2013 10:08:09 (58sec)
Cleanup 0 0 0 0

 

Counter Map Reduce Total
Job Counters Aggregate execution time of mappers(ms) 0 0 34,737,511
Launched reduce tasks 0 0 2,099
Total time spent by all reduces waiting after reserving slots (ms) 0 0 0
Total time spent by all maps waiting after reserving slots (ms) 0 0 0
Launched map tasks 0 0 2,099
Data-local map tasks 0 0 2,099
Aggregate execution time of reducers(ms) 0 0 97,345,363
FileSystemCounters MAPRFS_BYTES_READ 1,500,000,239,286 1,560,140,985,632 3,060,141,224,918
MAPRFS_BYTES_WRITTEN 1,560,158,608,836 1,500,000,000,000 3,060,158,608,836
FILE_BYTES_READ 145,368,344 0 145,368,344
FILE_BYTES_WRITTEN 30,505,140 30,324,626 60,829,766
Map-Reduce Framework Map input records 15,000,000,000 0 15,000,000,000
Reduce shuffle bytes 0 1,560,088,116,020 1,560,088,116,020
Spilled Records 15,000,000,000 0 15,000,000,000
Map output bytes 1,530,000,000,000 0 1,530,000,000,000
CPU_MILLISECONDS 64,850,620 100,186,300 165,036,920
Combine input records 0 0 0
SPLIT_RAW_BYTES 239,286 0 239,286
Reduce input records 0 15,000,000,000 15,000,000,000
Reduce input groups 0 15,000,000,000 15,000,000,000
Combine output records 0 0 0
PHYSICAL_MEMORY_BYTES 4,678,213,836,800 2,450,143,838,208 7,128,357,675,008
Reduce output records 0 15,000,000,000 15,000,000,000
VIRTUAL_MEMORY_BYTES 10,563,378,290,688 15,050,043,244,544 25,613,421,535,232
Map output records 15,000,000,000 0 15,000,000,000
GC time elapsed (ms) 274,468 2,730,379 3,004,847

Configuration details  (job.xml)

Job Configuration: JobId – job_201301281929_0028

name value
mapred.job.shuffle.merge.percent 1.0
mapred.reduce.slowstart.completed.maps 0.0
mapred.reduce.parallel.copies 50
mapr.map.output.batch -1
mapred.tasktracker.reduce.tasks.maximum 1
mapreduce.maprfs.use.compression true
mapred.maxthreads.partition.closer 0
mapr.map.keyprefix.ints 2
mapred.committer.job.setup.cleanup.needed false
mapreduce.job.submithost m03-demo1010.c.mapr-demo.maprtech.com.internal
mapreduce.terasort.final.sync true
hadoop.proxyuser.root.hosts *
mapred.output.dir /t.data/sort
mapreduce.heartbeat.10 500
mapred.create.symlink yes
mapreduce.partitioner.class org.apache.hadoop.examples.terasort.TeraSort$TotalOrderPartitioner
mapreduce.input.num.files 2099
mapred.cache.files /t.datasource/parts/_partition.lst#_partition.lst
mapreduce.job.dir maprfs:/var/mapr/cluster/mapred/jobTracker/staging/yufeldman/.staging/job_201301281929_0028
mapred.queue.default.acl-administer-jobs *
mapreduce.job.cache.files.visibilities true
mapred.cache.files.timestamps 1359569226469
mapreduce.heartbeat.1000 500
mapred.reducer.new-api true
mapred.job.reduce.input.buffer.percent 1.0
mapred.input.dir maprfs:/t.data/gen
group.name yufeldman
mapreduce.heartbeat.10000 500
partition.path /t.datasource/parts/_partition.lst
mapreduce.inputformat.class org.apache.hadoop.examples.terasort.TeraInputFormat
mapred.inmem.merge.threshold 0
mapred.jar /var/mapr/cluster/mapred/jobTracker/staging/yufeldman/.staging/job_201301281929_0028/job.jar
mapreduce.outputformat.class org.apache.hadoop.examples.terasort.TeraOutputFormat
mapred.mapgroupsize 3
mapred.reduce.child.java.opts -Xmx6000m
mapreduce.job.submithostaddress 10.240.16.198
mapred.reduce.tasks.speculative.execution false
mapred.map.tasks.speculative.execution false
fs.mapr.working.dir /user/yufeldman
mapred.job.shuffle.input.buffer.percent 0.8
user.name yufeldman
mapreduce.tasktracker.outofband.heartbeat true
mapred.cache.files.filesizes 69256
mapred.job.name TeraSort
mapred.reduce.tasks 2099
maprfs.openfid2.prefetch.bytes 0
mapred.output.value.class org.apache.hadoop.io.Text
mapred.working.dir /user/yufeldman
io.sort.mb 2000
dfs.replication 1
mapred.fairscheduler.locality.delay 100000
hadoop.proxyuser.root.groups root
mapred.output.key.class org.apache.hadoop.io.Text
mapred.tasktracker.map.tasks.maximum 1
mapred.map.child.java.opts -Xmx4000m
mapred.maxthreads.generate.mapoutput 10
mapreduce.heartbeat.100 500
mapred.mapper.new-api true
mapreduce.tasktracker.prefetch.maptasks 0.0
mapred.used.genericoptionsparser true
mapred.map.tasks 2099
mapreduce.tasktracker.group mapr

1 Comment