Real-Time User Profiles with Spark, Drill and MapR-DB

At MapR we’re big fans of Spark, and every week we see more customers deploying or considering it for analytic workloads using MapR as the platform. Spark is becoming a key enabling technology in a wide variety of use cases across many industries – anything where the results are needed fast and much of the computations can be done in memory – but its fully developed support for at least three programming languages (Scala, Java, Python) and active community are driving its adoption into areas where MapReduce or other approaches may have been the first choice – and I think, in general, we’re long past the phase of "when you have a hammer, everything looks like a nail."

Spark might be the star of your own particular application show, but to really get things done it needs to have a good supporting cast. For example, Spark needs to get its data from somewhere, whether that’s HDFS, a database or a set of labeled points in a file, and to fully reap its advantages in speed and scale in production these parts of the platform also have to provide enterprise-level capabilities. One of the advantages of starting with MapR for your Spark implementation is that you already have a NoSQL database capable of real-time, operational analytics as part of the platform, eliminating the need to add yet another software package to the cluster and tune its performance. MapR-DB is available in the MapR Enterprise Database Edition or you can try it out for free, with the example here or with your own, in the Community Edition. (See our three editions here.)

In this post, I’ll show you an example of bringing all this together to build a simple real-time dashboard using Spark on MapR.

The Scenario

The scenario we’ll use is a hypothetical music streaming site, much like the one you might use every day on your desktop or mobile device, either as a subscriber or a free listener.

The basic architecture is shown below. Customers are logging into our service and listening to music tracks, and they have a variety of parameters associated with them -- things like basic demographic information (gender, location, etc.), whether or not they are a paying member of the site, and their listening history. This example provides us with a few different rich data sources.

We’ll use Python, PySpark and MLlib to compute some basic statistics for our dashboard. Even though the specific example is somewhat “canned”, it involves some typical things you might do with Spark in getting started with your own use case:

  • Reading data from MapR-FS (or HDFS) into a Spark RDD
  • Applying transformations to “massage” the data into a pair RDD
  • Compute summary statistics for each user and persist them to a database
  • Use ANSI SQL to create views on the results and display real-time profile(s) on a dashboard

All of the source code and data sets are available in this github repo, and using the MapR Sandbox you can easily setup and modify this example.

Introducing the Example Data

Users are continuously connecting to the service and listening to tracks that they like -- this generates our main data set. The behaviors captured in these events, over time, represent the highest level of detail about actual behaviors of customers as they consume the service by listening to music. In addition to the events of listening to individual tracks, we have a few other data sets representing all the information we might normally have in such a service. In this post we will make use of the following two data sets, and in my next post we will bring in an additional data set relating to click events.

Individual customers listening to individual tracks: (tracks.csv) - a collection of events, one per line, where each event is a client listening to a track.

This data is approximately 1M lines and contains simulated listener events over several months.

The event, customer and track IDs tell us what occurred (a customer listened to a certain track), while the other fields tell us some associated information, like whether the customer was listening on a mobile device and a guess about their location while they were listening. With many customers listening to many tracks, this data can get very large and will be the input into our Spark job.

Customer information: (cust.csv/MapR-DB) - information about individual customers.

The fields are defined as follows:

  • Customer ID: a unique identifier for that customer
  • Name, gender, address, zip: the customer’s associated information
  • Sign date: the date of addition to the service
  • Status: indicates whether or not the account is active (0 = closed, 1 = active)
  • Level: indicates what level of service -- 0, 1, 2 for Free, Silver and Gold, respectively
  • Campaign: indicates the campaign under which the user joined, defined as the following (fictional) campaigns driven by our (also fictional) marketing team:
    • NONE - no campaign
    • 30DAYFREE - a ‘30 days free’ trial offer
    • SUPERBOWL - a Superbowl-related program
    • RETAILSTORE - an offer originating in brick-and-mortar retail stores
    • WEBOFFER - an offer for web-originated customers

    There are some additional data sets, such as history about clicking on advertisements, and of course information about music tracks (title, etc.), but we’ll handle those for a future post. These are also part of the github repo.

    What are customers doing?

    We’ve got all the right information in place and a lot of micro-level detail about what customers are selecting for music and when, but most of us would fall into a deep sleep looking at it. What’s the quickest way from here to a dashboard? Here’s where we put Spark to work computing summary information for each customer as well as some basic statistics about the entire user base. In our example we will use Tableau to visualize the results, but the outputs here could be used to build dashboards in other ways, such as with C3.js.

    Let’s jump into some Python code. First, we’ll initialize PySpark and establish a connection to the customer table in MapR-DB. To do that we’ll use the HappyBase library for Python. Before using this library a quick install of the HBase Thrift server is required, following the instructions here.

    from pyspark import SparkContext, SparkConf  
    from pyspark.mllib.stat import Statistics  
    import happybase  
    import csv  
    conf = SparkConf().setAppName('ListenerSummarizer')  
    sc = SparkContext(conf=conf)  
    conn = happybase.Connection('localhost')  
    ctable = conn.table('/user/mapr/cust_table')  

    Next we’ll read the CSV rows with individual track events, and make a pair RDD out of all of the rows. Pair RDDs have several advantages, one of them being gaining the use of a set of library functions for manipulating the data that we wouldn’t have otherwise.

    We’ll use map() to convert each line of the data into an array, then reduceByKey to consolidate all of the arrays.

    trackfile = sc.textFile('tracks.csv')  
    def make_tracks_kv(str):
        l = str.split(",")
        return [l[1], [[int(l[2]), l[3], int(l[4]), l[5]]]]
    # make a k,v RDD out of the input data  
    tbycust = line: make_tracks_kv(line)).reduceByKey(lambda a, b: a + b) 

    The individual track events are now stored in a pair RDD, with the customer ID as the key.

    Now we’re ready to compute a summary profile for each user. By passing a function we’ll write to mapValues, we compute some high-level data:

    • Average number of tracks listened during each period of the day: morning, afternoon, evening, and night. We arbitrarily define the time ranges in the code.
    • This is good for our dashboard, but is also a new feature of the data set which we can use for making predictions.
    • Total unique tracks listened by that user, i.e. the set of unique track IDs.
    • Total mobile tracks listened by that user, i.e. the count of tracks that were listened that had their mobile flag set.

    def compute_stats_byuser(tracks):
        mcount = morn = aft = eve = night = 0
        tracklist = []
        for t in tracks:
            trackid, dtime, mobile, zip = t
            if trackid not in tracklist:
            d, t = dtime.split(" ")
            hourofday = int(t.split(":")[0])
            mcount += mobile
            if (hourofday < 5):
                night += 1
            elif (hourofday < 12):
                morn += 1
            elif (hourofday < 17):
                aft += 1
            elif (hourofday < 22):
                eve += 1
                night += 1
        return [len(tracklist), morn, aft, eve, night, mcount]
    # compute profile for each user  
    custdata = tbycust.mapValues(lambda a: compute_stats_byuser(a))  

    Since we have the summary data readily available we compute some basic statistics on it that we can use for display, using the colStats function from pyspark.mllib.stat.

    # compute aggregate stats for entire track history  
    aggdata = Statistics.colStats( x: x[1]))  

    This gives us access to the mean, variance, and other statistics for each of the fields in the per-user RDD we created (custdata).

    Calling collect() on new RDD we just computed (custdata), we can write the data back to MapR-DB in a batch transaction, into a different table called ‘live_table’. We’ll do the same for the aggregate data.

    Note here that we must have previously created the tables and column families, requiring a few initial steps before actually writing data to the tables. Check the documentation here for examples of how to quickly do this in MapR-DB, either from the command line or MCS (MapR Control System).

    b = ltable.batch(transaction=True)
    ffor k, v in custdata.collect():
        unique, morn, aft, eve, night, mobile = v
        tot = morn + aft + eve + night
        # write the data to MapR-DB
                    {'ldata:unique_tracks': str(unique),
                     'ldata:morn_tracks': str(morn),
                     'ldata:aft_tracks': str(aft),
                     'ldata:eve_tracks': str(eve),
                     'ldata:night_tracks': str(night),
                     'ldata:mobile_tracks': str(mobile),
    # send it to the db
    # write the summary data  
        {'adata:unique_tracks': str(aggdata.mean()[0]),  
         'adata:morn_tracks': str(aggdata.mean()[1]),  
         'adata:aft_tracks': str(aggdata.mean()[2]),  
         'adata:eve_tracks': str(aggdata.mean()[3]),  
         'adata:night_tracks': str(aggdata.mean()[4]),  
         'adata:mobile_tracks': str(aggdata.mean()[5]) })  

    To run the code on the MapR cluster we run spark-submit:

    [mapr@ip-172-31-42-51 d2]$ /opt/mapr/spark/spark-1.2.1/bin/spark-submit ./ 
    Spark assembly has been built with Hive, including Datanucleus jars on classpath
    wrote 5000 lines to profile db                                                                                        
    averages:  unique: 112 morning: 32 afternoon: 25 evening: 27 night: 31 mobile: 64
    [mapr@ip-172-31-42-51 d2]$

    After the job completes, a summary is printed of what was written to the database and the averages for all users.

    Pulling it All Together with Drill and Tableau

    We’ll use Drill to connect Tableau to the data we just computed for visualizing it. Drill makes it easy to make SQL views we can use in the dashboard, especially if we are going to bring in less structured data (like JSON) later -- we don’t have to build any schemas in advance and can run queries directly on the data.

    After installing Drill on the machine where we’re running Tableau, in Drill Explorer can navigate directly to the customer and “live” tables we just created. Our MapR-DB table lives in /user/mapr and can be found under the ‘dfs.default’ schema.

    To get the data we need for the dashboard, we can simplify things from Tableau if we make a couple of views, which are entered under the ‘SQL’ tab for the cust_table and live_table, respectively:

    CAST(`row_key` AS INTEGER) AS `custid`
    CAST(`t`.`cdata`['name'] AS VARCHAR(20)) AS `name`
    CAST(`t`.`cdata`['campaign'] AS INTEGER) AS `campaign`
    CAST(`t`.`cdata`['gender'] AS INTEGER) AS `gender`
    CAST(`t`.`cdata`['level'] AS INTEGER) AS `level`
    CAST(`t`.`cdata`['address'] AS VARCHAR(40)) AS `address`
    CAST(`t`.`cdata`['zip'] AS VARCHAR(5)) AS `zip`
    FROM `dfs`.`default`.`./user/mapr/cust_table` AS `t`"
    CAST(`row_key` AS INTEGER) AS `custid`
    CAST(`t`.`ldata`['morn_tracks'] AS FLOAT) AS `avg_morn`,
    CAST(`t`.`ldata`['aft_tracks'] AS FLOAT) AS `avg_aft`
    CAST(`t`.`ldata`['eve_tracks'] AS FLOAT) AS `avg_eve`,
    CAST(`t`.`ldata`['night_tracks'] AS FLOAT) AS `avg_night`, CAST(`t`.`ldata`['unique_tracks'] AS FLOAT) AS `avg_unique`,
    CAST(`t`.`ldata`['mobile_tracks'] AS FLOAT) AS `avg_mobile`
    FROM `dfs`.`default`.`./user/mapr/live_table` AS `t`"

    Now we can connect to the data in Tableau and build the dashboard. After starting Tableau and selecting ‘Connect to Data’, ‘Other Databases’, then the ‘MapR ODBC Driver for Drill DSN’, we can drag the cust_view and live_view entries from the ‘dfs.tmp’ schema into the data area. Tableau will join them automatically on the customer ID field.

    Now we’re ready to make the components of the dashboard. Tableau is a powerful BI platform and the process is a little too involved for one blog post, but check out the video below for an end-to-end demo of the process.

    Our finished product looks like this:

    With just a few lines of code in Spark running on the MapR Hadoop distribution, we’re able to get a high-level view of our customer base and their listening habits by membership level. All of this is computed based on the entire data set very fast, and is done with current information.

    Aggregate information is useful for making decisions about campaigns, offers, and acquiring new customers, but what if we want to see individual reports? We can take things a step further by looking at individual customer data compared to averages, by doing a little bit of filtering in Tableau.

    In this example, we can see that this particular user has a fairly typical profile compared to the average, with slightly higher use overall and significantly higher unique tracks listened. Having this information available fast means we could put it to good use. For example if a customer were to call or chat with our customer support team, the profile of that individual customer could be displayed on the screen, allowing us to better tailor service (such as offering a mobile-only subscription to highly mobile users, etc.).

    In my next post, I’ll show how to start using this information to classify customers using this new data as features and take action as part of an upgrade campaign.

    Ready to let this loose on your own data? An easy way to get started with Spark is to download the MapR Sandbox for Hadoop, with has Spark pre-enabled on a single-node platform, as well as our top-ranked in-Hadoop NoSQL database (MapR-DB) that we used in this example.

    Here's the full demo video:


Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams




Download for free