Using Spark GraphFrames to Analyze Facebook Connections

Sooner or later, if you eyeball enough data sets, you will encounter some that look like a graph, or are best represented a graph. Whether it's social media, computer networks, or interactions between machines, graph representations are often a straightforward choice for representing relationships among one or more entities. The practice of using graphs to model real-life phenomenon in data structures has been around as long as computing itself.

Recent versions of Spark (starting with 1.4) introduced a new capability called GraphFrames that promises to be an expressive way to process and analyze graphs for many different use cases. If you've already used DataFrames (part of Spark SQL) for row-and-column style processing, they'll look very familiar. What's most exciting is the ability to do queries and run graph algorithms on the same data set, without doing any moving or copies between representations. After all the normal Spark wrangling, transformations, and pipelines are done, it's efficient and code-saving to be able to get results from a single GraphFrame when multiple types of operations (such as finding patterns and running PageRank) are required. There is also a new, very cool query language that allows locating patterns (called "motifs") among the edges and vertices.

The best part? It comes with a Python API. In this post, I will show a simple example using GraphFrames that you can run using the MapR Sandbox, which is a free, downloadable virtual machine pre-loaded with MapR and Spark.

Prerequisite Steps

Follow these prerequisite steps to get your Spark 1.5 environment running. Then we'll run some code examples to get you started with a graph of Facebook connections.

  • Download the latest MapR Sandbox from mapr.com/sandbox. You can also install the MapR Converged Community Edition on a cluster for free, or start with a pre-existing MapR cluster already running Spark.
  • After logging in as the 'mapr' user, install the GraphFrames Python package on the sandbox. It's also convenient to make a link to the Python module to the same directory where you will run spark-submit:
    wget https://github.com/graphframes/graphframes/archive/master.tar.gz
    tar xvfz master.tar.gz
    ln -s graphframes-master/python/graphframes ./graphframes
    
  • Grab the Stanford Facebook data set files and move them into MapR-FS:
    wget https://snap.stanford.edu/data/facebook.tar.gz
    wget https://snap.stanford.edu/data/facebook_combined.txt.gz
    wget https://snap.stanford.edu/data/readme-Ego.txt
    tar xvfz facebook.tar.gz -C /mapr/demo.mapr.com/user/mapr --strip 1
    gunzip -c facebook_combined.txt.gz > /mapr/demo.mapr.com/user/mapr
    mv facebook_combined.txt /mapr/demo.mapr.com/user/mapr
    
  • Pick up the example code from this blog post on github:
    wget https://raw.githubusercontent.com/mapr-demos/spark-graphframes/master/gframes.py
    
At this point, you should have a bunch of files that start with a node ID (like '0.feat') in the MapR-FS directory that's locally mounted over NFS, in the directory /mapr/demo.mapr.com/user/mapr/.

A Look at the Facebook Data

Social media is becoming one of the "classic" applications of graph analysis. Our example data consists of anonymized Facebook "friends lists" from survey participants. According to the specs, we should see a total of 4039 nodes after all the processing, along with 88234 edges among them. The structure of how this particular data set represents things is a little awkward, but nothing we can't handle with a few pre-processing steps.

In the facebook.tar.gz file you downloaded above, there are several collections of files, each consisting of a friends list of a user. A user is identified by a node ID. Each user has these associated files (where 'n' is the ID of the particular user):

  • n.edges - a simple space-delimited file with two node IDs on each line, indicating a unidirected edge (friendship) exists between the two nodes
  • n.featnames - for this node n's friends (nodes that appear in the edges file), each feature value (such as their birthday, what school they attended, etc.) is flattened into an anonymized ID, which is listed in this file.
  • n.feat - the features for all of the nodes in the edges file. If the feature appeared in the user's profile, the value is 1, else it is 0. For each node, each value corresponds to the associated 'featnames' file above.
  • n.egofeat - the features for the user (same 1/0 value)
  • n.circles - this describes the 'circles' for the user (we won't use this one)
The feature representation as 'anonymized'—this means that we can see if two nodes have the same of something (such as a birthday) for comparisons and queries, but we don't know the actual value.

The facebook_combined.txt file contains all of the edges from the nodes, combined into one file, with the same format as the 'n' edges files.

There are a few small issues we'll have to handle here. One of them is duplicates. A few checks show that the same node ID appears in multiple edges files, each with a different set of features describing it:

[mapr@maprdemo ~]$ cat /mapr/demo.mapr.com/user/mapr/*.feat | cut -d ' ' -f 1 | sort | uniq -d | wc -l
116
Since we'll use facebook_combined.txt to build our edges, we'll just discard duplicates in this case. A quick count of all the nodes in that file shows we do indeed have 4039 unique nodes:
[mapr@maprdemo ~]$ cat facebook_combined.txt | tr ' ' '\n' | sort | uniq | wc -l
4039
Let's look at some example code to see how this works.

Loading the Facebook Graph into Spark

There are a few different ways to get data from files into Spark, especially when Spark SQL is involved. I'll use two different ways here just for tutorial purposes. Let's start with the facebook_combined.txt file and bring it into a DataFrame, using only a subset of the features that interest us:
from graphframes import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row
import re

sc = SparkContext()
sqlContext = SQLContext(sc)
peopleids = [0, 107, 1684, 1912, 3437, 348, 3980, 414, 686, 698]
featids = ["id", "birthday", "hometown_id", "work_employer_id",
    "education_school_id", "education_year_id" ]
formatter = 'com.databricks.spark.csv'
vtx = Row(*featids)

# load the entire edge and node set into a Spark DataFrame
edf = sqlContext.read.format(formatter).options(delimiter=' ', \
    header='false', inferSchema=True) \
    .load('facebook_combined.txt').withColumnRenamed( \
    'C0', 'src').withColumnRenamed('C1', 'dst')
Some helper functions are in order—these will be used later in the examples:
  • featurematch helps us format strings for matching in the graph search API, with g.find()
  • fn_process uses a regex to read the feature name mappings.
  • feat_process handles reading of the features, and get_feats does the actual mapping when we are building the RDD.
The 'featnames' file contains lines that look like this:
0 birthday;anonymized feature 0
This indicates that in the corresponding 'feats' and 'egofeats' files for that node, feature index 0 corresponds to this specific value of birthday. The feature has two possible values: 1 or 0. If the feature is 1, the value of that node's birthday is equal to 'anonymized feature 0'; otherwise the feature is not in the data.
def featurematch(a, b):
    return "%s != 'None' and %s != 'None' and %s = %s" % \
            (a, b, a, b)

def fn_process(line):
    psd = re.search(
        r'\d+ (.*);anonymized feature (\d+)', line, re.IGNORECASE)
    if not psd:
        print "parse error, line: %s" % line
        sys.exit(1)
    n = psd.group(1).replace(';', '_')
    #n = re.sub('_$', '', n)
    f = psd.group(2)
    return (n, f)

def feat_process(line, selfid):
    allents = line.split(' ')
    if (selfid != -1):
        return (selfid, allents)
    else:
        return (allents[0], allents[1:])

def get_feats(vtxid, mapping, feats):
    thisfeats = {}
    vtxfeats = []
    for idx, f in enumerate(feats):
        name, value = mapping[idx]
        if (f == '1'):
            thisfeats[name] = value
        else:
            thisfeats[name] = 'None'
    for ff in featids[1:]:
        vtxfeats.append(thisfeats[ff])
    return vtx(vtxid, *vtxfeats)

Building a GraphFrame

Recapping the structure above, we have to pull together a few things to build our GraphFrame of nodes and edges. We first put the map file ('n.featnames') into memory because it's very small, then do a map of all the connected nodes (from 'n.feat') including our own (from 'n.egofeat'). This is where the bulk of the work gets done.
# load all of the feature maps, feature files, and self features into an RDD
alledges = sc.emptyRDD()
for personid in peopleids:
        featmap_fname = "/mapr/demo.mapr.com/user/mapr/%d.featnames" % personid
        feats_fname = "%d.feat" % personid
        this_feats_fname = "%d.egofeat" % personid

        # load the feature map
        fmap = []
        with open(featmap_fname) as flines:
            for line in flines:
                fmap.append(fn_process(line))

        # load the features for all the edges, and our own
        f_rdd = sc.textFile(feats_fname).map(lambda x: feat_process(x, -1)). \
             union(sc.textFile(this_feats_fname).map(lambda x: feat_process(x, personid)))

        # add the new data to the group
        alledges = f_rdd.map(lambda x: get_feats(x[0], fmap, x[1])).union(alledges)

# remove duplicates
print "rdd raw count: %d" % alledges.count()

# create a GraphFrame from the result
vdf = sqlContext.createDataFrame(alledges, featids).dropDuplicates(['id'])
print "vertex count: %d" % vdf.count()
print "edge count: %d" % edf.count()
g = GraphFrame(vdf, edf)

Here's where we handle the duplicate nodes that appear in the edge files. We could do more to process these (like try to intelligently merge them), but for now in this example, we discard entries for duplicate nodes, and finally convert the merged RDD from all of the data files into a GraphFrame. After this step, we can see that the resulting number of nodes is the number we expected (4039).

Searching the Facebook Graph

Let's put the GraphFrame to work. The first thing we can do is run some simple queries that yield useful information, like finding all instances where B is a friend of A and both people have the same birthday. We do this with a powerful graph searching language (passed to 'g.find') that allows us to match both attributes of nodes as well as a relationship structure (or 'motif') that interests us.
# find all connected vertices with the same birthday identifier
print "same birthdays"
res = g.find("(a)-[]->(b)") \
         .filter(featurematch("a.birthday", "b.birthday"))
print "count: %d" % res.count()
res.select("a.id", "a.birthday", "b.id", "b.birthday").show(5)
The output is as follows:
rdd raw count: 4177                                                             
vertex count: 4039                                                              
edge count: 88234                                                               
same birthdays
count: 100                                                                      
+---+--------+---+--------+                                                     
| id|birthday| id|birthday|
+---+--------+---+--------+
|  3|       7| 85|       7|
| 75|       7| 85|       7|
| 13|       7|109|       7|
| 56|       7|109|       7|
|200|       7|274|       7|
+---+--------+---+--------+
only showing top 5 rows
The 5 nodes displayed here are all of a birthday of 'anonymized feature id 7.' We know that they have the same birthday, just not the actual date of the birthday. This works similarly in many anonymized data sets.

Now let's run a more complex query. We could use the below query to optimize recommendations, offer a friend suggestion, or research ideas for new offers or campaigns.

# find "friends of friends" who are not connected to us, but graduated the same
# year from the same school
print "same class"
res = g.find("(a)-[]->(b); (b)-[]->(c); !(a)-[]->(c)") \
         .filter("%s and %s" % \
                 (featurematch("a.education_school_id", "c.education_school_id"), \
                 featurematch("a.education_year_id", "c.education_year_id")))
res = res.filter("a.id != c.id").select("a.id", "a.education_school_id", "a.education_year_id",
        "c.id", "c.education_school_id", "c.education_year_id") 
print "count: %d" % res.count()
res.show(5)
The output produced:
count: 1285                                                                     
+----+-------------------+-----------------+----+-------------------+-----------------+
|  id|education_school_id|education_year_id|  id|education_school_id|education_year_id|
+----+-------------------+-----------------+----+-------------------+-----------------+
|1008|                538|               72|1480|                538|               72|
|1271|                538|               72|1551|                538|               72|
| 953|                538|               72|1864|                538|               72|
|1003|                538|               72|1571|                538|               72|
|1480|                538|               72|1864|                538|               72|
+----+-------------------+-----------------+----+-------------------+-----------------+
only showing top 5 rows
Here we've displayed nodes that met all the conditions, and it looks like there are 1285 of them in the data.

Finally, we can run graph algorithms on the same GraphFrame. The package comes with some common ones, like shortest paths, and (of course) PageRank.

# finally do a page rank on the graph
print "page rank"
g.pageRank(resetProbability=0.15, tol=0.01).vertices.sort(
    	    'pagerank', ascending=False).show(5)

print "done"
And the output of this is:
+----+--------+-----------+----------------+-------------------+-----------------+------------------+
|  id|birthday|hometown_id|work_employer_id|education_school_id|education_year_id|          pagerank|
+----+--------+-----------+----------------+-------------------+-----------------+------------------+
|1911|    None|       None|            None|                538|             None|17.706611805364968|
|3434|    None|       None|            None|                538|             None| 17.68697075025074|
|2655|    None|       None|            None|               None|             None| 17.11712984943487|
|1902|    None|       None|            None|                538|               72|16.868179332524562|
|1888|    None|       None|            None|                538|             None| 12.93200439748058|
+----+--------+-----------+----------------+-------------------+-----------------+------------------+
only showing top 5 rows
Interestingly, 4 out of the 5 top-scoring nodes according to PageRank went to the same school. It's either a very big school, or it's just common in the selected data and/or survey participants. Probably, it's some of both.

Conclusions

GraphFrames, while still early in its development (version 0.1, at the time of this writing), promises to be a convenient and scalable way to query and process large graph data sets in Spark, especially when both queries and graph algorithms are required.

All of the above code is hosted in mapr-demos on github, for easy loading onto the sandbox and experimenting with your own graph structures. We'd love to hear about your experience in the comments!

no

CTA_Inside

Ebook: Getting Started with Apache Spark
Interested in Apache Spark? Experience our interactive ebook with real code, running in real time, to learn more about Spark.

Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams

 

 

 

Download for free