This use case will bring together the core concepts of Spark and use a large dataset to build a simple real-time dashboard that provides insight into customer behaviors.
Spark is an enabling technology in a wide variety of use cases across many industries. Spark is a great candidate anytime results are needed fast and much of the computations can be done in memory. The language used here will be Python, because it does a nice job of reducing the amount of boilerplate code required to illustrate these examples.
Contributed by: Nick Amato, Director, Technical Marketing for MapR
Music streaming is a rather pervasive technology which generates massive quantities of data. This type of service is much like people would use every day on a desktop or mobile device, whether as a subscriber or a free listener (perhaps even similar to a Pandora). This will be the foundation of the use case to be explored. Data from such a streaming service will be analyzed.
The basic layout consists of customers whom are logging into this service and listening to music tracks, and they have a variety of parameters:
Python, PySpark and MLlib will be used to compute some basic statistics for a dashboard, enabling a high-level view of customer behaviors as well as a constantly updated view of the latest information.
This service has users whom are continuously connecting to the service and listening to tracks. Customers listening to music from this streaming service generate events, and over time they represent the highest level of detail about customers' behaviors.
The data will be loaded directly from a CSV file. There are a couple of steps to perform before it can be analyzed. The data will need to be transformed and loaded into a PairRDD. This is because the data consists of arrays of (key, value) tuples.
The customer events-individual tracks dataset (tracks.csv) consists of a collection of events, one per line, where each event is a client listening to a track. This size is approximately 1M lines and contains simulated listener events over several months. Because this represents things that are happening at a very low level, this data has the potential to grow very large.
|Field Name||Event ID||Customer ID||Track ID||Datetime||Mobile||Listening Zip|
The event, customer and track IDs show that a customer listened to a specific track. The other fields show associated information, like whether the customer was listening on a mobile device, and a geolocation. This will serve as the input into the first Spark job.
The customer information dataset (cust.csv) consists of all statically known details about a user.
|Field Name||Customer ID||Name||Gender||Address||Zip||Sign Date||Status||Level||Campaign||Linked with apps?|
|Example||10||Joshua Threadgill||0||10084 Easy Gate Bend||66216||01/13/2013||0||1||1||1|
The fields are defined as follows:
Other datasets that would be available, but will not be used for this use case, would include:
All the right information is in place and a lot of micro-level detail is available that describes what customers listen to and when. The quickest way to get this data to a dashboard is by leveraging Spark to create summary information for each customer as well as basic statistics about the entire user base. After the results are generated, they can be persisted to a file which can be easily used for visualization with BI tools such as Tableau, or other dashboarding frameworks like C3.js or D3.js.
Step one in getting started is to initialize a Spark context. Additional parameters could be passed to the SparkConf method to further configure the job, such as setting the master and the directory where the job executes.
from pyspark import SparkContext, SparkConf from pyspark.mllib.stat import Statistics import csv conf = SparkConf().setAppName('ListenerSummarizer') sc = SparkContext(conf=conf)
The next step will be to read the CSV records with the individual track events, and make a PairRDD out of all of the rows. To convert each line of data into an array, the map() function will be used, and then reduceByKey() is called to consolidate all of the arrays.
trackfile = sc.textFile('/home/jovyan/work/datasets/spark-ebook/tracks.csv') def make_tracks_kv(str): l = str.split(",") return [l, [[int(l), l, int(l), l]]] # make a k,v RDD out of the input data tbycust = trackfile.map(lambda line: make_tracks_kv(line)) .reduceByKey(lambda a, b: a + b)
The individual track events are now stored in a PairRDD, with the customer ID as the key. A summary profile can now be computed for each user, which will include:
By passing a function to mapValues, a high-level profile can be computed from the components. The summary data is now readily available to compute basic statistics that can be used for display, using the colStats function from pyspark.mllib.stat.
def compute_stats_byuser(tracks): mcount = morn = aft = eve = night = 0 tracklist =  for t in tracks: trackid, dtime, mobile, zip = t if trackid not in tracklist: tracklist.append(trackid) d, t = dtime.split(" ") hourofday = int(t.split(":")) mcount += mobile if (hourofday < 5): night += 1 elif (hourofday < 12): morn += 1 elif (hourofday < 17): aft += 1 elif (hourofday < 22): eve += 1 else: night += 1 return [len(tracklist), morn, aft, eve, night, mcount] # compute profile for each user custdata = tbycust.mapValues(lambda a: compute_stats_byuser(a)) # compute aggregate stats for entire track history aggdata = Statistics.colStats(custdata.map(lambda x: x))
The last line provides meaningful statistics like the mean and variance for each of the fields in the per-user RDDs that were created in custdata.
Calling collect() on this RDD will persist the results back to a file. The results could be stored in a database such as MapR-DB, HBase or an RDBMS (using a Python package like happybase or dbset). For the sake of simplicity for this example, using CSV is the optimal choice. There are two files to output:
for k, v in custdata.collect(): unique, morn, aft, eve, night, mobile = v tot = morn + aft + eve + night # persist the data, in this case write to a file with open('/home/jovyan/work/datasets/spark-ebook/live_table.csv', 'wb') as csvfile: fwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) fwriter.writerow(unique, morn, aft, eve, night, mobile) # do the same with the summary data with open('/home/jovyan/work/datasets/spark-ebook/agg_table.csv', 'wb') as csvfile: fwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) fwriter.writerow(aggdata.mean(), aggdata.mean(), aggdata.mean(), aggdata.mean(), aggdata.mean(), aggdata.mean())
After the job completes, a summary is displayed of what was written to the CSV table and the averages for all users.
With just a few lines of code in Spark, a high-level customer behavior view was created, all computed using a dataset with millions of rows that stays current with the latest information. Nearly any toolset that can utilize a CSV file can now leverage this dataset for visualization.
This use case showcases how easy it is to work with Spark. Spark is a framework for ensuring that new capabilities can be delivered well into the future, as data volumes grow and become more complex.