The examples here will help you get started using Apache Spark DataFrames with Scala. The new Spark DataFrames API is designed to make big data processing on tabular data easier. A Spark DataFrame is a distributed collection of data organized into named columns that provides operations to filter, group, or compute aggregates, and can be used with Spark SQL. DataFrames can be constructed from structured data files, existing RDDs, or external databases.
Contributed by: Carol McDonald, Instructor for MapR
The dataset to be used is from eBay online auctions. The eBay online auction dataset contains the following fields:
The table below shows the fields with some sample data:
Using Spark DataFrames, we will explore the eBay data with questions like:
First, we will import some packages and instantiate a sqlContext, which is the entry point for working with structured data (rows and columns) in Spark and allows the creation of DataFrame objects.
// SQLContext entry point for working with structured data val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Import Spark SQL data types and Row. import org.apache.spark.sql._
Start by loading the data from the ebay.csv file into a Resilient Distributed Dataset (RDD). RDDs have transformations and actions; the first() action returns the first element in the RDD:
// load the data into a new RDD val ebayText = sc.textFile("/home/jovyan/work/datasets/spark-ebook/ebay.csv") // Return the first element in this RDD ebayText.first()
Use a Scala case class to define the Auction schema corresponding to the ebay.csv file. Then a map() transformation is applied to each element of ebayText to create the ebay RDD of Auction objects.
//define the schema using a case class case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float, item: String, daystolive: Integer) // create an RDD of Auction objects val ebay = ebayText.map(_.split(",")).map(p => Auction(p(0), p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat, p(6).toFloat,p(7),p(8).toInt))
Calling first() action on the ebay RDD returns the first element in the RDD:
// Return the first element in this RDD ebay.first()
Calling count() action on the ebay RDD returns the number of elements in the RDD:
// Return the number of elements in the RDD ebay.count()
A DataFrame is a distributed collection of data organized into named columns. Spark SQL supports automatically converting an RDD containing case classes to a DataFrame with the method toDF():
// change ebay RDD of Auction objects to a DataFrame val auction = ebay.toDF()
DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, and Python; below are some examples with the auction DataFrame. The show() action displays the top 20 rows in a tabular form:
// Display the top 20 rows of DataFrame auction.show()
DataFrame printSchema() displays the schema in a tree format:
// Return the schema of this DataFrame auction.printSchema()
After a DataFrame is instantiated it can be queried. Here are some examples using the Scala DataFrame API:
// How many auctions were held? auction.select("auctionid").distinct.count
// How many bids per item? auction.groupBy("auctionid", "item").count.show
// Get the auctions with closing price > 100 val highprice = auction.filter("price > 100") // display dataframe in a tabular format highprice.show()
A DataFrame can also be registered as a temporary table using a given name, which can then have SQL statements run against it using the methods provided by sqlContext. Here are some example queries using sqlContext:
// register the DataFrame as a temp table auction.registerTempTable("auction") // How many bids per auction? val results = sqlContext.sql( """SELECT auctionid, item, count(bid) FROM auction GROUP BY auctionid, item""" ) // display dataframe in a tabular format results.show() val results = sqlContext.sql( """SELECT auctionid, MAX(price) FROM auction GROUP BY item,auctionid""" ) results.show()
You have now learned how to load data into Spark DataFrames, and explore tabular data with Spark SQL. These code examples can be reused as the foundation to solve any type of business problem.