Mahout on Spark: What’s New in Recommenders

This blog was originally posted August 11, 2014 on Occam's Machete: Machine Learning. We are sharing it with permission from the author.

There are big changes happening in Apache Mahout. For years it’s been the go to machine learning library for Hadoop. It contained most of the best-in-class algorithms for scalable machine learning, which means clustering, classification, and recommendation. But it was written for Hadoop and mapreduce. Today a number of new parallel execution engines show great promise in speeding calculations by as much as 10-100x (Spark, H2O, Flink). That means instead of buying 10 computers for a cluster, a single one may do. That should get your manager’s attention.

After releasing Mahout 0.9 the team decided to begin an aggressive retool using Spark but building in the flexibility to support other engines and both H2O and Flink have shown active interest.

My story is about moving the heart of Mahout’s item-based collaborative filtering recommender to Spark.

Where we are

Mahout is currently on the 1.0 snapshot version, meaning we are working on what will be released as 1.0. For more than a year some of the Mahout team was working on a Spark port and Scala based DSL (Domain Specific Language), which looks is Scala with R-like algebraic expressions added. Since Scala supports not only operator overloading but functional programming it is a natural for building distributed code with rich linear algebra expressions. Scala has an interactive shell and Mahout has customized it to run Spark as well as the Mahout DSL. Think of it as an R-like version of Scala supporting truly huge data in a completely distributed way using Spark.

Many algorithms—the ones that can be expressed as simple linear algebra equations—are implemented with relative ease (SSVD, PCA). Scala also has lazy evaluation, which allows Mahout to slide a modern optimizer underneath the DSL. When an end product of a calculation is needed the optimizer figures out the best path to follow and spins off the most efficient Spark jobs to accomplish the whole.


Starting from all this, one of the first things we wanted to implement was the popular item-based recommenders. But here too we’ll introduce many innovations. It still starts from some linear algebra. Let’s take the case of recommending purchases on an ecom site. The problem can be defined:

rp = recommendations for a given user
hp = history of purchases for a given user
A = the matrix of all purchases by all users
rp = [AtA]hp

AtA is the matrix A transposed then multiplied by A. This is the core cooccurrence indicator matrix and defines the base training data in this style of recommender. Using the Mahout Scala DSL we could write the recommender as:

val recs = (A.t %*% A) * userHist

This would produce reasonable recommendations, but is subject to skewed results due to the dominance of popular items. To avoid that, we can apply a weighting called the log likelihood ratio (LLR), which is a probabilistic measure of the importance of a cooccurrence.

In general when you see something like AtA it can be replaced with a similarity comparison for each row with every other row. This will produce a matrix whose rows are items and whose columns are the same items. The magnitude of the value in the matrix determines the strength of similarity of row item to the column item. We can use the LLR weights as a similarity measure that is nicely immune to unimportant similarities.

The new Mahout Scala DSL includes a cooccurrence calculator that performs this comparison for us with LLR so the above line of linear algebra code is replaced by:

val recs = CooccurrenceAnalysis.cooccurence(A) * userHist

But this would take too long to calculate for each user using Mahout’s big-data jobs so we’ll handle that outside of Mahout. First lets talk about data preparation.


Creating the indicator matrix [AtA] is the core of this type of recommender. We have a quick flexible way to create this using text log files and creating output that’s in an easy form to digest. The job of data prep is greatly streamlined in the Mahout 1.0 snapshot. In the past a user would have to do all the data prep themselves. Translating their own user and item ids into Mahout ids, putting the data into text files, one element per line, and feeding them to the recommender. Out the other end you’d get a Hadoop binary file called a sequence file and you’d have to translate the Mahout ids into something your application could understand. No more.

To make this process simpler we created the spark-itemsimilarity command line tool (also usable as a library). After installing Mahout, Hadoop, and Spark and assuming you have logged user purchases in some directories in HDFS we can read them in directly, calculate the indicator matrix, and write it out with no other prep required. The spark-itemsimilarityjob takes in text-delimited files, extracts the user id and item id, runs the cooccurrence analysis and outputs a text file with your application’s ids restored.

In the sample input file below we have something a web app could produce as a log file, a simple comma-delimited format where certain fields hold the user id, the item id, and a filter word—in this case the word “purchase”:

Thu Jul 10 10:52:10.996,u1,purchase,iphone
Fri Jul 11 13:52:51.018,u1,purchase,ipad
Fri Jul 11 21:42:26.232,u2,purchase,nexus
Sat Jul 12 09:43:12.434,u2,purchase,galaxy
Sat Jul 12 19:58:09.975,u3,purchase,surface
Sat Jul 12 23:08:03.457,u4,purchase,iphone
Sun Jul 13 14:43:38.363,u4,purchase,galaxy

spark-itemsimilarity creates a Spark resilient distributed dataset (rdd) to back the Mahout DRM (distributed row matrix) that holds this data:


The output of the job is the LLR computed indicator matrix and will contain this data:


Notice that the self-similarities have been removed so the diagonal is all zeros. The iPhone is similar to the iPad and the galaxy is similar to the nexus. The output of the spark-itemsimilarity job can be formatted in various ways but by default looks like this:


On the ecom site for the page displaying the Nexus we can now show that the Galaxy was purchased by the same people. The preservation of the application specific ids makes the text file very easy to use and the tab-delimited format is easy to parse. The numeric values are the strength of similarity and for the case where there are many similar products the jobs sorts the similar items by strength.

We are still only part way there; we’ve done the [AtA] part, now we need to do the * hp part. Using the current user’s purchase history will personalize the recommendations.

The next post will talk about how to use a search engine to take the last step.


Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams




Download for free