In this post, I’ll show you what happens behind the scenes, from the time a user fires any job from a Client/Edge node or Cluster node, until the time the job actually gets submitted to the JobTracker for execution.
Users submitting a job communicate with the cluster via JobClient, which is the interface for the user-job to interact with the cluster. JobClient provides a lot of facilities, such as job submission, progress tracking, accessing of component-tasks' reports/logs, Map-Reduce cluster status information, etc.
The above figure gives a good high-level overview for the flow in MR1 in terms of how a job gets submitted to JobTracker. Below are the steps which are followed when any MR job is submitted by the user until it gets submitted to JobTracker:
- User copies input file to distributed file system
- User submits job
- Job client get input files info
- Creates splits
- Uploads job info i.e., Job.jar and Job.xml
- Validation of Job Output directory is done via HDFS API call; then client submits job to JobTracker using RPC call
Once the job is submitted to JobTracker, it assumes it is JobTracker’s responsibility to distribute the job to the TT’s, schedule tasks and monitor them, and provide status and diagnostic information back to the job-client. Details of a job submission on the JobTracker side is out of scope for this post, but I plan to write a dedicated post in the future which details job flow for both JobTracker and TaskTracker.
Now that we have understood the flow completely, let’s associate the above steps with the log lines when a job does get submitted. I spun up a cluster to demonstrate this:
3 node 3.1.1 MapR cluster (126.96.36.199113.GA)
1 client node (188.8.131.52113.GA)
On the client node, where I plan to fire a WordCount job for demonstration purposes, I changed the log level of log4j.logger.org.apache.hadoop.mapred.JobClient class to DEBUG by editing “/opt/mapr/hadoop/hadoop-0.20.2/conf/log4j.properties” file:
# Logging levels
log4j.logger.org.apache.hadoop.security.JniBasedUnixGroupsMapping=WARN log4j.logger.org.apache.hadoop.util.NativeCodeLoader=WARN log4j.logger.org.apache.hadoop.mapred.JobTracker=INFO log4j.logger.org.apache.hadoop.mapred.TaskTracker=INFO log4j.logger.org.apache.hadoop.mapred.JobClient=DEBUG log4j.logger.org.apache.zookeeper=INFO log4j.logger.org.apache.hadoop.mapred.MapTask=WARN log4j.logger.org.apache.hadoop.mapred.ReduceTask=WARN #log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
With the above DEBUG enabled, it appears that we didn’t get enough log messages which would actually list every step that we discussed earlier, so we had to modify code in the below jar to print custom debug log lines in order to understand and validate the flow.
- As a first step, I copied the input file to the Distributed File System. The file “/myvolume/in” is roughly 1.5 MB in size, on which I will run the WordCount job.
[root@ip-10-255-68-164 conf]# hadoop fs -du /myvolume/in Found 1 items 1573079 maprfs:/myvolume/in [root@ip-10-255-68-164 conf]#
- Now we submit the WordCount job to the JobClient as shown below. Here I just added a custom split size while executing the job to make sure our job will run two map tasks in parallel, since we will get two splits for our input file, i.e., input file/custom split size.
hadoop jar /opt/mapr/hadoop/hadoop-0.20.2/hadoop-0.20.2-dev-examples.jar wordcount -Dmapred.max.split.size=786432 /myvolume/in /myvolume/out >> /tmp/JobClient-WC-split1 2>&1
Note: When the JobClient initiates, you will see the messages below, which are due to the fact that MapR supports JobTracker high availability. It connects to ZooKeeper to find which is currently the active JobTracker for communication, and gets the JobId for the current job by making an RPC call to JobTracker.
15/06/27 07:38:33 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=ec2-54-177-98-44.us-west-1.compute.amazonaws.com:5181,ec2-184-169-213-71.us-west-1.compute.amazonaws.com:5181,ec2-184-169-212-210.us-west-1.compute.amazonaws.com:5181 sessionTimeout=30000 watcher=com.mapr.fs.JobTrackerWatcher@7b05d0ae 15/06/27 07:38:33 INFO zookeeper.Login: successfully logged in. 15/06/27 07:38:33 INFO client.ZooKeeperSaslClient: Client will use SIMPLE-SECURITY as SASL mechanism. 15/06/27 07:38:33 INFO zookeeper.ClientCnxn: Opening socket connection to server ip-10-255-0-66.us-west-1.compute.internal/10.255.0.66:5181. Will attempt to SASL-authenticate using Login Context section 'Client_simple' 15/06/27 07:38:33 INFO zookeeper.ClientCnxn: Socket connection established to ip-10-255-0-66.us-west-1.compute.internal/10.255.0.66:5181, initiating session 15/06/27 07:38:33 INFO zookeeper.ClientCnxn: Session establishment complete on server ip-10-255-0-66.us-west-1.compute.internal/10.255.0.66:5181, sessionid = 0x24dd4a5bb6a0896, negotiated timeout = 30000 15/06/27 07:38:33 INFO fs.JobTrackerWatcher: Current running JobTracker is: ip-10-128-202-14.us-west-1.compute.internal/10.128.202.14:9001 ### Custom Debug Log Lines### got the jobId 7 for the job submitted jobsubmit dir is maprfs:/var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007
- Now the JobClient checks if there are any custom library jars, then inputs files specified during job execution and creates a Job directory, libjars directory, archives directory and files directory under JobTracker volume to place temporary files during job execution (the distributed cache which can be used during job processing).
### Custom Debug Log Lines### files null libjars null archives null 15/06/27 07:38:33 DEBUG mapred.JobClient: default FileSystem: maprfs:/// ### Custom Debug Log Lines### submitJobDir /var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007 ### Custom Debug Log Lines### filesDir /var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007/files archivesDir /var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007/archives libjarsDir /var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007/libjars
- Now the JobClient starts creating splits for the input file. It generated two splits, since we choose custom split size and have one input file, which is roughly two times the split size. Finally, this split meta information is written to a file under the temp job directory as well (under the JobTracker volume).
15/06/27 07:38:33 DEBUG mapred.JobClient: Creating splits at maprfs:/var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007 ### Custom Debug Log Lines### creating splits at maprfs:/var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007 ### Custom Debug Log Lines### writing splits and calculating number of map 15/06/27 07:38:33 INFO input.FileInputFormat: Total input paths to process : 1 15/06/27 07:38:33 WARN snappy.LoadSnappy: Snappy native library is available 15/06/27 07:38:33 INFO snappy.LoadSnappy: Snappy native library loaded ### Custom Debug Log Lines### splits generated number of splits is 2 ### Custom Debug Log Lines### splits are sorted into order based on size so that the biggest go first ### Custom Debug Log Lines### split file maprfs:/var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007/job.split no of replication set is 10 ### Custom Debug Log Lines### writing new splits ### Custom Debug Log Lines### wiriting split meta info ### Custom Debug Log Lines### splits meta-info file for job tracker maprfs:/var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007/job.splitmetainfo
- Job jar and job.xml are also copied to the shared job directory (under JobTracker volume) for it to be available when Jobtracker starts job execution.
### Custom Debug Log Lines### copying jar file /var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007/job.jar ### Custom Debug Log Lines### submitjobfile maprfs:/var/mapr/cluster/mapred/jobTracker/staging/root/.staging/job_201506202152_0007/job.xml
- Finally, the JobClient checks if the output directory exists. If it does, job initialization fails to prevent output directory from being overwritten. If it doesn’t exist, it will be created and the job is submitted to JobTracker.
Note: Checking if the output directory exists is not done on client side; it is done via an HDFS interface call from the client.
15/06/27 07:38:33 INFO mapred.JobClient: Creating job's output directory at /myvolume/out 15/06/27 07:38:33 INFO mapred.JobClient: Creating job's user history location directory at /myvolume/out/_logs 15/06/27 07:38:34 INFO mapred.JobClient: root, realuser: null 15/06/27 07:38:34 DEBUG mapred.JobClient: Printing tokens for job: job_201506202152_0007 ### Custom Debug Log Lines### submitting the job to JobTracker
In this blog post, you’ve learned all about job submission in Hadoop. If you have any further questions, please ask them in the comments section below.