Using Hive and Pig on Baseball Statistics

The Setup

Still using MapReduce? Well, we haven't forgotten about you. In this blog post, we'll tackle one of the challenges of learning Hadoop, and that's finding data sets that are realistic yet large enough to show the advantages of distributed processing, but small enough for a single developer to tackle. The data set we're using in this tutorial is play-by-play baseball statistics, available free of charge from Retrosheet. The data is available by year, and includes detailed descriptions of games, plays, and players. This data is especially well-suited for our purposes, because a great deal of it is hand-encoded, so there are errors and malformed records that we need to handle.

Each year contains several types of files: Team files, Roster files, and Event files. Team files contain a listing of teams playing each year. Each team listing contains a 3-letter designator that is used to reference that team in all other files. Roster files contain a listing of players for each team, and are named with the 3-letter designator for each team and the year, followed by a .ROS extension. Event files are designated by a .EVA, .EVN, or .EVE extension, depending on whether they are for American League teams (.EVA), National League teams (.EVN), or for post-season games (.EVE). Each event file contains the home games for a single team for a single year. The filename consists of the year included and the 3-letter designator for the home team.

TEAM1994 - contains all teams for 1994.
CHN1994.ROS - roster for the Chicago Cubs for 1994.
1994CHN.EVN - event file for the Chicago Cubs for 1994.

The majority of the data is in the .EV* files, and that’s what we’ll be looking at here. For a detailed description, you can look over the documentation at Retrosheet. A single game contains several types of records, including an id, game information, player start and substitution records, and play records. Some sample data is included below:

start,bergp001,"Peter Bergeron",0,1,8
start,vidrj001,"Jose Vidro",0,2,4
start,tatif001,"Fernando Tatis",0,3,5
Data Characterization

Our first step in exploring a new data set is to find out exactly what is contained in the data set itself. Using NFS, we can do some of that on the Linux command line:

[username@node1 ~]$ ls /mapr/*.EV{N,A} | wc -l

[username@node1 ~]$ ls /mapr/*CHN.EVN | wc -l

We can see here that we have 1582 regular season data files, and 68 years of data for the Chicago Cubs. We can also inspect the files directly if we wish:

[username@node1 ~]$ head /mapr/

Since we have comma-delimited, newline-terminated records, we can use Pig’s built-in PigStorage class to get some more in-depth information about our data set. Let’s start with a few basic questions:

  • How many games are represented?
  • How many records do we have total?
  • What is the relationship between player IDs and player names?

From the documentation for the data set, we know that each game record starts with an id line like the one shown above. We can write a simple pig script to filter out those records and count them:

[username@node1 ~]$ pig
grunt> raw_data = LOAD ‘/projects/baseball/*.EV{A,N}’ using PigStorage(‘,’) as (type:chararray);
grunt> id_only = FILTER raw_data BY type MATCHES ‘id’;
grunt> grouped = GROUP id_only ALL;
grunt> result = FOREACH grouped GENERATE COUNT(id_only);
grunt> dump result;
… snip ...

Job Stats (time in seconds):
JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime AvgReduceTime    Alias   Feature Outputs
job_201302221456_0142   3       1       62      8       42      4       4       4       grouped,id_only,raw_data,result        GROUP_BY,COMBINER       maprfs:/tmp/temp1671886658/tmp-401928473,

Successfully read 18941696 records (299673 bytes) from: "/projects/baseball/*.EV{A,N}"

Successfully stored 1 records in: "maprfs:/tmp/temp1671886658/tmp-401928473"

Total records written : 1
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:

2013-06-11 13:57:58,423 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2013-06-11 13:57:58,432 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2013-06-11 13:57:58,432 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
There’s a lot of output to dig through there, but we actually answered two of our questions above:
Successfully read 18941696 records (299673 bytes) from: "/projects/baseball/*.EV{A,N}"

The input report says that we read almost 19 million records from the data set. Those included the id records that we were interested in examining as well as all the other record types that were discarded by the FILTER statement in the pig script. The last line in the output answers the question that we explicitly asked: How many games are represented in the data set? It appears that we have over 122,000 games that we can use. Let’s move on and look at the players. After looking at the data, we can see that each player is assigned a unique ID. Since this data was gathered from a lot of different sources, we want to make sure that the IDs are actually unique, and correspond to the same player each time. The last pig script we ran was executed in interactive mode, typing one line at a time. Since we’ll be building on this next script a bit, we’ll write it in a file.


raw = LOAD '/projects/baseball/*.EV{N,A}' USING PigStorage(',') AS (type:chararray, id:chararray, name:chararray);
players = FILTER raw BY type MATCHES 'start' OR type MATCHES 'sub';
mapping = FOREACH players GENERATE id, name;
result = DISTINCT mapping;
sorted = ORDER result BY id;
STORE sorted INTO '/projects/name_to_id/' USING PigStorage(',');
We can run this example simply by specifying the filename of our script on the command line:
[username@node1 ~]$ pig player_ids.pig
… snip ...

Job Stats (time in seconds):
JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime AvgReduceTime    Alias   Feature Outputs
job_201302221456_0143   2       20      n/a     n/a     n/a     n/a     n/a     n/a     mapping,players,raw    DISTINCT        /projects/name_to_id,

Successfully read 18941696 records (2537666 bytes) from: "/projects/baseball/*.EV{N,A}"

Successfully stored 11852 records in: "/projects/name_to_id"

Total records written : 11852
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:

With this script, rather than just dump the output of the job to the console, we stored it into a file with the STORE command. Let’s take a look at the contents of the output file:

[username@node1 ~]$ head /mapr/
aardd001,"David Aardsma"
aaroh101,"Hank Aaron"
aaroh101,"Hank Aarpn"
aarot101,"Tommie Aaron"
aased001,"Don Aase"
abada001,"Andy Abad"
abadf001,"Fernando Abad"
abbog001,"Glenn Abbott"
abboj001,"Jim Abbott"
abboj002,"Jeff Abbott"

We already have some interesting results: Hank Aaron has two different spellings listed for his name, so there isn’t a strict 1-to-1 relationship between names and IDs. We also have the data in a much more convenient format. While the original format had multiple record types classified by the value of the first field, we now have a consistent set of (id,name) pairs.

With the data organized, let’s look at how we can query it.

Data Presentation

Most end-users expect their data to be presented through some kind of BI tool. Microsoft Excel is one of the most frequently used tools of this sort, so we would like to be able to present our data to Excel and other tools in a simple manner. Since we generated a CSV file, we could load it via NFS and import the data directly, but that doesn’t scale to large data sets. Instead, we can present the data through a SQL-like interface using Apache Hive. Note that we could also use Apache Drill here if we wanted faster queries and full SQL-on-Hadoop capabilities, but in this example we’ll start with Hive.

Hive has a feature called External Tables which allows us to present data present in our cluster as a table without moving the data around. This is extremely convenient, because we can continue to use the data through other tools without having to manage it through Hive. Let’s create a table that presents the data we just generated.

[username@node1 ~]$ hive
Logging initialized using configuration in jar:file:/opt/mapr/hive/hive-0.10/lib/hive-common-0.10-mapr.jar!/
Hive history file=/tmp/username/hive_job_log_root_201306111543_1148272954.txt

That’s it. There is no data to be moved, all we need to do is tell Hive that there is data present in a specific location, and now we can execute queries against it. Hive uses a SQL-like syntax called HQL. For most simple queries, there is no difference. Let’s run a query to get all of the names Ken Griffey Jr. was listed under:

hive> select * from players where name LIKE '%Griffey%';
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201302221456_0151, Tracking URL = http://ip-10-151-63-202.ec2.internal:50030/jobdetails.jsp?jobid=job_201302221456_0151
Kill Command = /opt/mapr/hadoop/hadoop-0.20.2/bin/../bin/hadoop job  -kill job_201302221456_0151
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2013-06-11 15:55:04,093 Stage-1 map = 0%,  reduce = 0%
2013-06-11 15:55:11,123 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 1.77 sec
MapReduce Total cumulative CPU time: 1 seconds 770 msec
Ended Job = job_201302221456_0151
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 1.77 sec   MAPRFS Read: 286126 MAPRFS Write: 24133 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 770 msec
grifk002        "Ken Jr. Griffey"
grifk001        "Ken Griffey Sr."
grifk001        "Ken Griffey"
grifk002        "Ken Griffey"
grifk002        "Ken Griffey Jr."
Time taken: 8.649 seconds
To expose our tables to an external application, we need to run a server process that the BI app can connect to. This is provided as part of Hive, via the hiveserver service.
[username@node1 ~]$ hive --service hiveserver
Starting Hive Thrift Server

To connect via Excel, we need to install the Hive ODBC connector. That process is shown in the MapR documentation, so I won’t repeat it here.

Once you have an ODBC DSN configured, you can access any tables available in Hive via Excel:

Any editing, including JOINs and WHERE clauses, can be added in Microsoft Query Editor before returning the data to Excel.

Once the data is returned to Excel, it is a live data connection, and will update whenever you reload the workbook or click the “Refresh All” button.

You can easily take this analysis to the next level by pulling this data into Spark. For more on that topic, check out the Getting Started with Spark ebook or to continue learning Pig, take the free DA 450 - Apache Pig Essentials on-demand training course.


Ebook: Getting Started with Apache Spark
Apache Spark is a powerful, multi-purpose execution engine for big data enabling rapid application development and high performance.

Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams




Download for free