In my last post, I described an example scenario where we used Spark to compute summary statistics on customer data for building a real-time dashboard in Tableau. Our data source derives from a fictional music streaming service where listeners are connecting, listening to tracks, and generating millions of rows of individual events.
We computed various statistics about what users were doing (i.e. their listening habits) and how they consumed the service, persisting the results back to MapR-DB for further analysis. With that data calculated and stored, we can take the next step and generate features as input into machine learning, so we can make valuable predictions that enable us to maximize revenue and ensure the best customer experience.
In this post I’ll give an example of how we can make such a prediction using the output of the Spark code from our last adventure. One of the many favorable aspects of Spark is its ability to compute and store results in near real-time -- opening a new world of possibilities for making customers happy and learning new things about what they are doing.
The Scenario, Revisited
Looking again at our streaming service, it has three levels of customer membership:
- Free -- the base service which is free to subscribe, but has limits (for example, limited number of tracks, more ads, etc.)
- Silver -- an upgraded level of service which has associated revenue
- Gold -- the highest level of service
Let’s say it’s near the end of the fiscal quarter, and we need to give revenue a little extra push to meet expectations. We could just try to cook the books, but the CFO probably wouldn’t let us do that. Instead, our marketing team decided that we’re going to play an advertisement to the customer (maybe in the form of an audio or display ad) that offers a reduced-cost upgrade to Gold service for one day only. There is a cost associated with playing an ad to customers (and its annoying to constantly play them), so we want to play it to the subset of customers who we think will be most likely to take action. Because of the nature of the service, we have a few goals to meet:
- The decision needs to be fast. When users connect, they might not connect for very long, and we can’t play an ad to them if they’re not listening, so the question of “should we play the upgrade offer?” needs to be answered soon after the user connects and starts listening to tracks.
- It needs to take into account the latest information. We can’t use data that’s weeks or months old to make a decision, because customer behaviors, preferences and trends are constantly changing. Using stale or inaccurate information would lessen the effectiveness of the model, or may actually make things worse by giving wildly inaccurate predictions.
How do we determine which customers should be presented with the offer? We’ll need to get information about behaviors and classify customers accordingly. The good news is that we can use many of the same features we computed from the dashboard and generate them at the same time.
Even though our example is somewhat stylized (we would likely have to do some data-wrangling to get to this point, and various other considerations) our scenario makes for a concise example of how to start getting data into Spark, making a classification, and get started in the process of fine-tuning.
We’ll use the following sets of data.
- Previous ad clicks (clicks.csv) -- a CSV file indicating which ad was played to the user and whether or not they clicked on it
- Customer behaviors (live_table) -- summary data about listening habits, for example what time(s) of day were they listening, how much listening on a mobile device, and how many unique tracks they played. We computed and stored this table in the previous post.
The click data is stored in a CSV file, structured as follows:
EventID,CustID,AdClicked,Localtime 0,109,"ADV_FREE_REFERRAL","2014-12-18 08:15:16"
The fields that interest us are the foreign key identifying the customer (CustID), a string indicating which ad they clicked (AdClicked), and the time when it happened (Localtime). Note that we could use a lot more features here, such as basic information about the customer (gender, etc.), but to keep things simple for the example we’ll leave that as a future exercise.
Loading the Data and Training the Classifier
Recalling from last time, our PySpark code pulled the data from MapR-FS into a Spark RDD as follows:
from __future__ import division from pyspark import SparkContext, SparkConf from pyspark.mllib.stat import Statistics import happybase import csv conf = SparkConf().setAppName('ListenerSummarizer') sc = SparkContext(conf=conf) conn = happybase.Connection('localhost') ctable = conn.table('/user/mapr/cust_table') trackfile = sc.textFile('tracks.csv') clicksfile = sc.textFile('clicks.csv') trainfile = open('features.txt', 'wb') def make_tracks_kv(str): l = str.split(",") return [l, [[int(l), l, int(l), l]]] # make a pair RDD out of the input data tbycust = trackfile.map(lambda line: make_tracks_kv(line)).reduceByKey(lambda a, b: a + b)
Let’s first extract the fields that we want into a pair RDD, again making the data easier to use and opening up more capabilities in MLLib. We pass the data through a function (user_clicked) that counts whether the user previously clicked on the ad that interests us (ADV_REDUCED_1DAY).
# distill the clicks down to a smaller data set that is faster def user_clicked(line, which): eid, custid, adclicked, ltime = line.split(",") if (which in adclicked): return (custid, 1) else: return (custid, 0) clickdata = clicksfile.map(lambda line: user_clicked(line, "ADV_REDUCED_1DAY")).reduceByKey(add) sortedclicks = clickdata.sortByKey()
Now we’ll loop through the customer data we just processed for the dashboard (done in the previous post, referencing the ‘custdata’ RDD here) and write a training file that we will use to train a couple of models. LibSVM is a good format to use for this as its a preferred input format into many machine learning platforms, including MLLib.
for k, v in custdata.collect(): unique, morn, aft, eve, night, mobile = v tot = float(morn + aft + eve + night) # see if this user clicked on a 1-day special reduced Gold rate clicked = 1 if sortedclicks.lookup(k) > 0 else 0 # write a row of training data, starting with the target value training_row = [ morn / tot, aft / tot, eve / tot, night / tot, mobile / tot ] trainfile.write("%d" % clicked) # write the individual features # the libSVM format wants features to start with 1 for i in range(1, len(training_row) + 1): trainfile.write(" %d:%.2f" % (i, training_row[i - 1])) trainfile.write("\n")
We could do all of this at the same time as when we generated data for the dashboard. One thing to note is that we’ll use the “share of total tracks listened” for morning, afternoon, evening, and mobile, which makes for a more scaled input than using the absolute numbers.
After this code runs, we have a training file called ‘features.txt’ that contains all of the points for training one or more classifiers, which will serve as input into prediction about which customers are most likely to click on the ad based on their past behaviors.
Each line of the new file looks like this:
0 1:0.31 2:0.25 3:0.16 4:0.28 5:0.53 6:0.93 1 1:0.35 2:0.15 3:0.24 4:0.26 5:0.85 6:0.92 0 1:0.27 2:0.21 3:0.25 4:0.27 5:0.52 6:0.98
The 0 or 1 at the beginning is our label and indicates if this customer clicked on the particular ad, and the remainder of the line lists the values for each feature.
Now we can write some PySpark code to load the features into an RDD and use it to start classifying. We can run this when new customers connect to the service, enabling a quick answer to the question (should we play the ad?) with models trained from the latest information.
Logistic regression is a widely-used method for classifying points into discrete outcomes. It’s often easy to use the same data to train a few different classifiers, then compare results before choosing which one (or group) to use in production, so we’ll do that here with two: LogisticRegressionWithSGD and LogisticRegressionWithLBFGS, both provided by the pyspark.mllib package.
from pyspark import SparkContext, SparkConf from pyspark.mllib.util import MLUtils from pyspark.mllib.classification import LogisticRegressionWithLBFGS from pyspark.mllib.classification import LogisticRegressionWithSGD conf = SparkConf().setAppName('AdPredictor') sc = SparkContext(conf=conf) data = MLUtils.loadLibSVMFile(sc, 'features.txt') # split the data into training, and test (70/30%) train, test = data.randomSplit([0.7, 0.3], seed = 0)
At this point we can now run our two different models. To summarize, we have the following features of our data:
- Listening behaviors: the share of overall tracks listened during the morning, afternoon, evening and night hours
- Mobile tracks listened (share of overall)
- Unique tracks listened (share of overall)
Not all of these features may be valuable in making the classifications, and there are ways we can get that info (for example with a few lines of R), which would allow us to decide which features to keep and which to discard, which I will leave as future exercise. Below is the Python code to train the two models on the training set, classify customers in the test set, then measure how many were correct.
# train LBFGS model on the training data model_1 = LogisticRegressionWithLBFGS.train(train) # evaluate the model on test data results_1 = test.map(lambda p: (p.label, model_1.predict(p.features))) # calculate the error err_1 = results_1.filter(lambda (v, p): v != p).count() / float(te_count) # train SGD model on the training data model_2 = LogisticRegressionWithSGD.train(train) # evaluate the model on test data results_2 = test.map(lambda p: (p.label, model_2.predict(p.features))) # calculate the error err_2 = results_2.filter(lambda (v, p): v != p).count() / float(te_count)
After running the script with as follows, we can check the results:
[mapr@ip-172-31-42-51 d2]$ /opt/mapr/spark/spark-1.2.1/bin/spark-submit ./predict.py Spark assembly has been built with Hive, including Datanucleus jars on classpath Results ------- training size: 3500, test size 1500 LBFGS error: 0.0413333333333 SGD error: 0.157333333333 [mapr@ip-172-31-42-51 d2]$
Since we had 5000 total customers in the database, a 70/30 split ended up being 3500 and 1500 for training and testing, respectively. You can see that LBFGS did somewhat better than SGD at a high level with a lower overall error rate. Now that we have a way to test, we could go in several directions from here. At the time of this writing some of the more detailed binary classification metrics are not available in PySpark (such as precision, recall, F-measure, etc.), but we could do some of that by hand if we had concerns about false positives or other specific types of errors.
With the data we computed about customer behaviors with Spark in the previous post, we added the use of MLLib to classify our customers to fine-tune the experience on our platform. We enabled making a decision quickly and an opportunity for a new revenue stream. From this simple example you can get an idea of what’s possible with Spark and how you can start to use classifications from Hadoop data sources to make better decisions.
I know you’re just itching to try this on your own data. All of the code in this post and the previous can be found in this github repo. Here are a few pointers and resources for getting started with Spark on MapR: