Evolving Parquet as self-describing data format – New paradigms for consumerization of Hadoop data

With Hadoop becoming more prominent in customer environments, one of the frequent questions we hear from users is what should be the storage format to persist data in Hadoop. The data format selection is a critical decision especially as Hadoop evolves from being about cheap storage to a pervasive query and analytics platform. In this blog, I want to briefly describe self-describing data formats, how they are gaining a lot of interest as a new management paradigm to consumerize Hadoop data in organizations and the work we have been doing as part of the Parquet community to evolve Parquet as fully self-describing format.

About Parquet

Apache Parquet is a columnar storage format for the Hadoop ecosystem. Since its inception about 2 years ago, Parquet has gotten very good adoption due to the highly efficient compression and encoding schemes used that demonstrate significant performance benefits. Its ground-up design allows it to be used regardless of any data processing framework, data model, and programming language used in Hadoop ecosystem. A variety of tools and frameworks including MapReduce, Hive, Impala, and Pig provided the ability to work with Parquet data and a number of data models such as AVRO, Protobuf, and Thrift have been expanded to be used with Parquet as storage format. Parquet is widely adopted by a number of major companies including tech giants such as Twitter and Netflix.

Self-describing data formats and their growing role in analytics on Hadoop/NoSQL

Self-describing data is where schema or structure is embedded in the data itself. The schema is comprised of metadata such as element names, data types, compression/encoding scheme used (if any), statistics, and a lot more. There are a variety of data formats including Parquet, XML, JSON, and NoSQL databases such as HBase that belong to the spectrum of self-describing data and typically vary in the level of metadata they expose about themselves.

While the self-describing data has been in rise with NoSQL databases (e.g., the Mongo BSON model) for a while now empowering developers to be agile and iterative in application development cycle, the prominence of these has been growing in analytics as well when it comes to Hadoop. So what is driving this? The answer is simple - it's the same reason - i.e., the requirement to be agile and iterative in BI/analytics.

More and more organizations are now using Hadoop as a data hub to store all their data assets. These data assets often contain existing datasets offloaded from the traditional DBMS/DWH systems, but also new types of data from new data sources (such as IOT sensors, logs, clickstream) including external data (such as social data, 3rd party domain specific datasets). The Hadoop clusters in these organizations are often multi-tenant and shared by multiple groups in the organizations. The traditional data management paradigms of creating centralized models/metadata definitions upfront before the data can be used for analytics are quickly becoming bottlenecks in Hadoop environments. The new complex and schema-less data models are hard to map to relational models and modeling data upfront for unknown ad hoc business questions and data discovery needs is challenging and keeping up with the schema changes as the data models evolve is practically impossible.

By pushing metadata to data and then using tools that can understand this metadata available in self-describing formats to expose it directly for end user consumption, the analysis life cycles can become drastically more agile and iterative. For example, using Apache Drill, the world's first schema-free SQL query engine, you can query self-describing data (in files or NoSQL databases such as HBase/MongoDB) immediately without having to define and manage schema overlay definitions in centralize metastores. Another benefit of this is business self-service where the users don't need to rely on IT departments/DBAs constantly for adding/changing attributes to centralized models, but rather focus on getting answers to the business questions by performing queries directly on raw data.

Think of it this way, Hadoop scaled processing by pushing processing to the nodes that have data. Analytics on Hadoop/NoSQL systems can be scaled to the entire organization by pushing more and more metadata to the data and using tools that leverage that metadata automatically to expose it for analytics. The more self-describing the data formats are (i.e., the more metadata they contain about data), the smarter the tools that leverage the metadata can be.

Evolving Parquet as fully self-describing data format

As part of the work on the Apache Drill project, we are contributing to Parquet to make it more self-describing and there by enhancing a highly efficient storage format as a flexible modern format suitable for big data analytics. Specifically, there are 2 areas of focus at this point.

1. Generate self describing Parquet data: Drill is the first query engine that can very easily create parquet files including complex data types such as Maps and Arrays with no upfront setup required . Drill also has ability to generate parquet files with evolving schemas or changing schemas and query it on the fly (see example below)

2. Extend Parquet datatype support : In the recent months, there are several new scalar and complex data types have been added to Parquet format and more work is in progress. Parquet files with these enhanced data types can currently be created and queried by Apache Drill. We expect all the ecosystem tools to very soon start leveraging these new data types as well.

Below is a quick example of how you can create a self-describing Parquet file from Apache Drill and query it without any centralized metadata definitions.

The source data for Parquet is a simple JSON (called sample.json) with the following contents:

{"trans_id":0,"date":"2013-07-26","time":"04:56:59","amount":80.5,"user_info":{"cust_id":28,"device":"IOS5","state":"mt"},"marketing_info":{"camp_id":4,"keywords":["go","to","thing","watch","made","laughing","might","pay","in","your","hold"]},"trans_info":{"prod_id":[16],"purch_flag":"false"}}

{"trans_id":1,"date":"2013-05-16","time":"07:31:54","amount":100.40,
"user_info":{"cust_id":86623,"device":"AOS4.2","state":"mi"},"marketing_info":{"camp_id":6,"keywords":["pronounce","tree","instead","games","sigh"]},"trans_info":{"prod_id":[],"purch_flag":"false"}}

{"trans_id":2,"date":"2013-06-09","time":"15:31:45","amount":20.25,
"user_info":{"cust_id":11,"device":"IOS5","state":"la"},"marketing_info":{"camp_id":17,"keywords":[]},"trans_info":{"prod_id":[293,90],"purch_flag":"true"}}

{"trans_id":3,"date":"2013-07-19","time":"11:24:22","amount":500.75,
"user_info":{"cust_id":666,"device":"IOS5","state":"nj"},"marketing_info":{"camp_id":17,"keywords":["it's"]},"trans_info":{"prod_id":[173,18,121,84,115,226,464,525,35,11,94,45],"purch_flag":"false"}}

{"trans_id":4,"date":"2013-07-21","time":"08:01:13","amount":34.20,"user_info":{"cust_id":999,"device":"IOS7","state":"ct"},"marketing_info":{"camp_id":8,"keywords":["fallout"]},"trans_info":{"prod_id":[311,29,5,41],"purch_flag":"false"}}

Step 1: Create the Parquet file from the JSON data using Apache Drill

First lets take a look at the JSON data and then use 'CREATE TABLE AS SELECT' syntax to extract relevant elements and create a Parquet file.

0: jdbc:drill:zk=local> select * from dfs.`/Users/nrentachintala/Downloads/sample.json` ;
+------------+------------+------------+------------+------------+----------------+------------+
|  trans_id  |    date    |    time    |   amount   | user_info  | marketing_info | trans_info |
+------------+------------+------------+------------+------------+----------------+------------+
| 0          | 2013-07-26 | 04:56:59   | 80.5       | {"cust_id":28,"device":"IOS5","state":"mt"} | {"camp_id":4,"keywords":["go","to","thing","watch","made","laughing","might","pay","in","your","hold"]} | {"prod_id":[16],"purch_flag":"false"} |
| 1          | 2013-05-16 | 07:31:54   | 100.4      | {"cust_id":86623,"device":"AOS4.2","state":"mi"} | {"camp_id":6,"keywords":["pronounce","tree","instead","games","sigh"]} | {"prod_id":[],"purch_flag":"false"} |
| 2          | 2013-06-09 | 15:31:45   | 20.25      | {"cust_id":11,"device":"IOS5","state":"la"} | {"camp_id":17,"keywords":[]} | {"prod_id":[293,90],"purch_flag":"true"} |
| 3          | 2013-07-19 | 11:24:22   | 500.75     | {"cust_id":666,"device":"IOS5","state":"nj"} | {"camp_id":17,"keywords":["it's"]} | {"prod_id":[173,18,121,84,115,226,464,525,35,11,94,45],"purch_flag":"false"} |
| 4          | 2013-07-21 | 08:01:13   | 34.2       | {"cust_id":999,"device":"IOS7","state":"ct"} | {"camp_id":8,"keywords":["fallout"]} | {"prod_id":[311,29,5,41],"purch_flag":"false"} |
+------------+------------+------------+------------+------------+----------------+------------+

0: jdbc:drill:zk=local> create  table dfs.tmp.sampleparquet as (select trans_id, cast(`date` as date) transdate,cast(`time` as time) transtime, cast(amount as double) amount,`user_info`,`marketing_info`, `trans_info` from dfs.`/Users/nrentachintala/Downloads/sample.json` );
+------------+---------------------------+
|  Fragment  | Number of records written |
+------------+---------------------------+
| 0_0        | 5                         |
+------------+---------------------------+

Note that Drill has the ability to read and write to the file system (both local and distributed file systems such as Hadoop, MapR-FS, S3). In this example the 'CREATE TABLE AS SELECT' syntax in Drill is writing the resulting Parquet file to a file system workspace dfs.tmp which is configured by default in Drill to point to the /tmp directory on the local file system.

If we go to the /tmp directory, you can see the generated Parquet file as below. Note that Drill allows you to configure different workspaces pointing to various file system locations depending on your requirements.

Administrators-MacBook-Pro-56:Downloads nrentachintala$ ls -l /tmp/sampleparquet
total 8
-rw-r--r--  1 nrentachintala  wheel  1967 Dec  6 22:05 0_0_0.parquet

Step 2: Observe the schema of the generated Parquet file using Parquet tools

Administrators-MacBook-Pro-56:parquet-tools-1.5.1-SNAPSHOT nrentachintala$ ./parquet-tools schema /tmp/parquetsample/0_0_0.parquet

message root {
  optional int64 trans_id;
  optional int32 transdate (DATE);
  optional int32 transtime (TIME);
  optional double amount;
  optional group user_info {
    optional int64 cust_id;
    optional binary device (UTF8);
    optional binary state (UTF8);
  }
  optional group marketing_info {
    optional int64 camp_id;
    repeated binary keywords (UTF8);
  }
  optional group trans_info {
    repeated int64 prod_id;
    optional binary purch_flag (UTF8);
  }
}

Note Drill has the ability to write Parquet files with a variety of data types including some of the extended data types recently introduced such as Date and Time.

Step 3: Directly query the self-describing Parquet data and perform analytics

0: jdbc:drill:zk=local> select count(*) from dfs.`/tmp/sampleparquet/0_0_0.parquet`;
+------------+
|   EXPR$0   |
+------------+
| 5          |
+------------+

Note we can use the direct file path `/tmp/sampleparquet/0_0_0.parquet` or through workspace dfs.tmp.parquetsample. They essentially are the same.

Now let us run a few simple analytic queries on the Parquet data.

0: jdbc:drill:zk=local> select * from dfs.tmp.parquetsample;
+------------+------------+------------+------------+------------+----------------+------------+
|  trans_id  | transdate  | transtime  |   amount   | user_info  | marketing_info | trans_info |
+------------+------------+------------+------------+------------+----------------+------------+
| 0          | 2013-07-26 | 04:56:59   | 80.5       | {"cust_id":28,"device":"IOS5","state":"mt"} | {"camp_id":4,"keywords":["go","to","thing","watch","made","laughing","might","pay","in","your","hold"]} | {"prod_id":[16],"purch_flag":"false"} |
| 1          | 2013-05-16 | 07:31:54   | 100.4      | {"cust_id":86623,"device":"AOS4.2","state":"mi"} | {"camp_id":6,"keywords":["pronounce","tree","instead","games","sigh"]} | {"prod_id":[],"purch_flag":"false"} |
| 2          | 2013-06-09 | 15:31:45   | 20.25      | {"cust_id":11,"device":"IOS5","state":"la"} | {"camp_id":17,"keywords":[]} | {"prod_id":[293,90],"purch_flag":"true"} |
| 3          | 2013-07-19 | 11:24:22   | 500.75     | {"cust_id":666,"device":"IOS5","state":"nj"} | {"camp_id":17,"keywords":["it's"]} | {"prod_id":[173,18,121,84,115,226,464,525,35,11,94,45],"purch_flag":"false"} |
| 4          | 2013-07-21 | 08:01:13   | 34.2       | {"cust_id":999,"device":"IOS7","state":"ct"} | {"camp_id":8,"keywords":["fallout"]} | {"prod_id":[311,29,5,41],"purch_flag":"false"} |
+------------+------------+------------+------------+------------+----------------+------------+

0: jdbc:drill:zk=local> select max(transdate) from dfs.`/tmp/sampleparquet/0_0_0.parquet` ;
+------------+
|   EXPR$0   |
+------------+
| 2013-07-26 |
+------------+

0: jdbc:drill:zk=local> select * from dfs.tmp.sampleparquet t where t.`user_info`.device='IOS5';
+------------+------------+------------+------------+------------+----------------+------------+
|  trans_id  | transdate  | transtime  |   amount   | user_info  | marketing_info | trans_info |
+------------+------------+------------+------------+------------+----------------+------------+
| 0          | 2013-07-26 | 04:56:59   | 80.5       | {"cust_id":28,"device":"IOS5","state":"mt"} | {"camp_id":4,"keywords":["go","to","thing","watch","made","laughing","might","pay","in","your","hold"]} | {"prod_id":[16],"purch_flag":"false"} |
| 2          | 2013-06-09 | 15:31:45   | 20.25      | {"cust_id":11,"device":"IOS5","state":"la"} | {"camp_id":17,"keywords":[]} | {"prod_id":[293,90],"purch_flag":"true"} |
| 3          | 2013-07-19 | 11:24:22   | 500.75     | {"cust_id":666,"device":"IOS5","state":"nj"} | {"camp_id":17,"keywords":["it's"]} | {"prod_id":[173,18,121,84,115,226,464,525,35,11,94,45],"purch_flag":"false"} |
+------------+------------+------------+------------+------------+----------------+------------+

0: jdbc:drill:zk=local> select t.`marketing_info`.`camp_id`,t.`marketing_info`.keywords from dfs.tmp.sampleparquet t;
+------------+------------+
|   EXPR$0   |   EXPR$1   |
+------------+------------+
| 4          | ["go","to","thing","watch","made","laughing","might","pay","in","your","hold"] |
| 6          | ["pronounce","tree","instead","games","sigh"] |
| 17         | []         |
| 17         | ["it's"]   |
| 8          | ["fallout"] |
+------------+------------+

0: jdbc:drill:zk=local> select t.`marketing_info`.`camp_id`,flatten(t.`marketing_info`.keywords) from dfs.tmp.sampleparquet t;
+------------+------------+
|   EXPR$0   |   EXPR$1   |
+------------+------------+
| 4          | go         |
| 4          | to         |
| 4          | thing      |
| 4          | watch      |
| 4          | made       |
| 4          | laughing   |
| 4          | might      |
| 4          | pay        |
| 4          | in         |
| 4          | your       |
| 4          | hold       |
| 6          | pronounce  |
| 6          | tree       |
| 6          | instead    |
| 6          | games      |
| 6          | sigh       |
| 17         | it's       |
| 8          | fallout    |
+------------+------------+

Next steps

In addition to the data type extensions, several other enhancements are in discussion on how to combine the Parquet storage and query efficiencies with the flexibility of self-describing data to make Parquet the default format standard for big data analytics. We will be sharing these ideas in an upcoming blog post.

If you are interested in learning more about Drill and its ability to query self-describing data without centralized schema definitions, please refer to the following resources:

Download Drill
10 reasons we think Drill is cool
A simple 10-minute tutorial
A Yelp JSON analysis using Apache Drill tutorial
A more comprehensive Drill on Hadoop tutorial

no

Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams

 

 

 

Download for free