Page Rank Hands-on#

import os
import sys

spark_path = os.environ['SPARK_HOME']
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.9-src.zip")

import findspark
findspark.init()

import pyspark
number_cores = 4
memory_gb = 8
conf = (pyspark.SparkConf().
        setMaster('local[{}]'.format(number_cores)).
        set('spark.driver.memory', '{}g'.format(memory_gb)))
sc = pyspark.SparkContext(conf=conf)

Load data#

!ls -l ../data/small_graph.dat
-rw-r--r--  1 lngo  staff  19 Sep 26 10:37 ../data/small_graph.dat
!cat ../data/small_graph.dat
y y
y a
a y
a m
m a
graph_data = sc.textFile("../data/small_graph.dat")
graph_data.take(10)
['y y', 'y a', 'a y', 'a m', 'm a']

This is one of the possible ways to store matrix information. In this particular approach, a matrix is stored as combination of row and column, indicating a non-zero value (existence of linkage) from the node represented by the row to the node represented by the column.

Matrix formulation#

links = graph_data.map(lambda line: (line.split(" ")[0], line.split(" ")[1])) \
        .groupByKey() \
        .mapValues(list)
links.take(10)
[('y', ['y', 'a']), ('a', ['y', 'm']), ('m', ['a'])]
N = links.count()
ranks = links.map(lambda line: (line[0], 1 / N))
ranks.take(N)
[('y', 0.3333333333333333),
 ('a', 0.3333333333333333),
 ('m', 0.3333333333333333)]
votes = ranks.join(links)
votes.take(N)
[('y', (0.3333333333333333, ['y', 'a'])),
 ('a', (0.3333333333333333, ['y', 'm'])),
 ('m', (0.3333333333333333, ['a']))]
def calculateVotes(t):
    res = []
    for item in t[1][1]:
        count = len(t[1][1])
        res.append((item, t[1][0] / count))
    return res
calculateVotes(('y', (0.3333333333333333, ['y', 'a'])))
[('y', 0.16666666666666666), ('a', 0.16666666666666666)]
votes = ranks.join(links) \
        .flatMap(calculateVotes)
votes.collect()
[('y', 0.16666666666666666),
 ('a', 0.16666666666666666),
 ('y', 0.16666666666666666),
 ('m', 0.16666666666666666),
 ('a', 0.3333333333333333)]
ranks = votes.reduceByKey(lambda x, y: x + y)
ranks.take(N)
[('y', 0.3333333333333333), ('a', 0.5), ('m', 0.16666666666666666)]
ranks.values().sum()
1.0
%%time
for i in range(10):
    votes = ranks.join(links) \
        .flatMap(calculateVotes)
    ranks = votes.reduceByKey(lambda x, y: x + y)
    print(ranks.take(N))
[('y', 0.41666666666666663), ('a', 0.3333333333333333), ('m', 0.25)]
[('y', 0.375), ('a', 0.4583333333333333), ('m', 0.16666666666666666)]
[('y', 0.41666666666666663), ('m', 0.22916666666666666), ('a', 0.35416666666666663)]
[('y', 0.38541666666666663), ('a', 0.4375), ('m', 0.17708333333333331)]
[('m', 0.21875), ('y', 0.4114583333333333), ('a', 0.36979166666666663)]
[('y', 0.390625), ('a', 0.42447916666666663), ('m', 0.18489583333333331)]
[('m', 0.21223958333333331), ('y', 0.4075520833333333), ('a', 0.3802083333333333)]
[('m', 0.19010416666666666), ('y', 0.3938802083333333), ('a', 0.416015625)]
[('m', 0.2080078125), ('y', 0.40494791666666663), ('a', 0.3870442708333333)]
[('y', 0.39599609375), ('a', 0.4104817708333333), ('m', 0.19352213541666666)]
CPU times: user 250 ms, sys: 57.9 ms, total: 308 ms
Wall time: 7.53 s
sum = 1
# first try with 0.5, check the column. Then try with 0.1
while sum > 0.1:
    old_ranks = ranks
    votes = ranks.join(links) \
        .flatMap(calculateVotes)
    ranks = votes.reduceByKey(lambda x, y: x + y)
    errors = old_ranks.join(ranks).mapValues(lambda v: abs(v[0] - v[1]))
    sum = errors.values().sum()
    print(sum)
0.037923177083333315
ranks.take(N)
[('m', 0.20524088541666666),
 ('a', 0.39152018229166663),
 ('y', 0.40323893229166663)]

Hollins dataset#

!ls -lh ../data/hollins.dat
-rw-r--r--  1 lngo  staff   667K Sep 26 11:14 ../data/hollins.dat
raw_data = sc.textFile("../data/hollins.dat")
raw_data.take(50)
['6012 23875',
 '1 http://www1.hollins.edu/ ',
 '2 http://www.hollins.edu/ ',
 '3 http://www1.hollins.edu/Docs/CompTech/Network/webmail_faq.htm ',
 '4 http://www1.hollins.edu/Docs/Forms/GetForms.htm ',
 '5 http://www1.hollins.edu/Docs/misc/travel.htm ',
 '6 http://www1.hollins.edu/Docs/GVCalendar/gvmain.htm ',
 '7 http://www1.hollins.edu/docs/events/events.htm ',
 '8 http://www1.hollins.edu/docs/comptech/mainviruses.htm ',
 '9 http://www1.hollins.edu/Docs/Academics/acad.htm ',
 '10 http://www1.hollins.edu/Docs/CompTech/Blackboard/bb_faq.htm ',
 '11 http://www1.hollins.edu/Docs/comptech/comptech.htm ',
 '12 http://www1.hollins.edu/Docs/Academics/international_programs/index.htm ',
 '13 http://www1.hollins.edu/Docs/academics/online/cyber.htm ',
 '14 http://www1.hollins.edu/Registrar/registrar.htm ',
 '15 http://www1.hollins.edu/Docs/Academics/writingcenter/wcenter.htm ',
 '16 http://www.hollins.edu/about/map/map.htm ',
 '17 http://www1.hollins.edu/Docs/SchoolServices/FoodServ/default.htm ',
 '18 http://www1.hollins.edu/Docs/CampusLife/StudentAct/studentorgs/SGA/default.html ',
 '19 http://www1.hollins.edu/docs/admin/admin.htm ',
 '20 http://www1.hollins.edu/Docs/AlumDev/Default.htm ',
 '21 http://www1.hollins.edu/docs/athletics/athletics.htm ',
 '22 http://www1.hollins.edu/Docs/CampusLife/CampusLife.htm ',
 '23 http://www1.hollins.edu/Docs/Intercultural/default.htm ',
 '24 http://www1.hollins.edu/Docs/SchoolServices/SchoolServ.htm ',
 '25 http://www1.hollins.edu/security/Default.htm ',
 '26 http://www.hollins.edu/students/index.htm ',
 '27 http://www.hollins.edu/admissions/admissions.htm ',
 '28 http://www.hollins.edu/academics/academics.htm ',
 '29 http://www.hollins.edu/grad/coedgrad.htm ',
 '30 http://www.hollins.edu/academics/internships/real_world.htm ',
 '31 http://www.hollins.edu/campuslife/campus_life.htm ',
 '32 http://www.hollins.edu/abroad/abroad.htm ',
 '33 http://www.hollins.edu/careers/developctr/career_development.htm ',
 '34 http://www.hollins.edu/athletics/athletics.htm ',
 '35 http://www.hollins.edu/specprog/hollinsummer/holsum.htm ',
 '36 http://www.hollins.edu/news-events/news_events.htm ',
 '37 http://www.hollins.edu/admissions/visit/visit.htm ',
 '38 http://www.hollins.edu/about/about_tour.htm ',
 '39 http://www.hollins.edu/prospectives/index.htm ',
 '40 http://www.hollins.edu/alumnae/alumnae.htm ',
 '41 http://www.hollins.edu/parents/index.htm ',
 '42 http://www.hollins.edu/community/index.htm ',
 '43 http://www.hollins.edu/admissions/apply/apply.htm ',
 '44 http://www.hollins.edu/undergrad/undergraduate.htm ',
 '45 http://www.hollins.edu/undergrad/english/engcwrit.htm ',
 '46 http://www.hollins.edu/cgi-bin/htsearch ',
 '47 http://www.hollins.edu/sitemap/sitemap.htm ',
 '48 http://www.hollins.edu/contact/contact.htm ',
 '49 http://www.hollins.edu/tour/welcome.html ']

Data structure (structure of data).#

  • Does the data look consistent?

  • How many lines in the data file?

line_counts = raw_data.count()
print(line_counts)
29888
6012 + 23875
29887

We can infer what 6012 and 23875 means, but how to we really split the data? Remember that the data is distributed. What if this line does not exist?

If we can’t view the entirety of the dataset, what can we do?

raw_data.takeSample(False, 50)
['115 73',
 '1641 88',
 '583 21',
 '4335 http://www1.hollins.edu/Registrar/MAJORS-MINORS/MAJ-MIN%2001-02/COMPUTATIONAL%20SCIENCES%20MAJOR.pdf ',
 '2354 1181',
 '3608 4823',
 '1365 41',
 '2602 http://www1.hollins.edu/classes/comm232/sports.bouffard.htm ',
 '1207 67',
 '4594 http://www1.hollins.edu/registrar/MAJORS-MINORS/MAJ-MIN%2000-01/BIOLOGY%20MINOR.pdf ',
 '4025 4026',
 '341 61',
 '11 89',
 '311 2',
 '786 785',
 '5063 5065',
 '3579 http://www1.hollins.edu/registrar/Maj-Min%2003-04.htm ',
 '516 509',
 '656 28',
 '5898 http://www1.hollins.edu/faculty/clarkjm/Stat140/Outliers.htm ',
 '558 126',
 '5292 http://www1.hollins.edu/faculty/saloweyca/clas%20395/Sculpture/tsld039.htm ',
 '1819 2892',
 '3317 4403',
 '3178 http://www1.hollins.edu/classes/anth250/hbarrow/homepage.htm ',
 '837 855',
 '1225 1561',
 '1207 37',
 '373 1247',
 '308 48',
 '2668 http://www1.hollins.edu/docs/Career_Development/challenge.htm ',
 '2673 3487',
 '3171 http://www1.hollins.edu/homepages/sittonm/applied.htm ',
 '1189 57',
 '38 61',
 '449 36',
 '2381 http://www1.hollins.edu/faculty/richter/101/mt.rev.00.htm ',
 '5338 5337',
 '35 42',
 '2674 3482',
 '296 2',
 '3909 5145',
 '26 http://www.hollins.edu/students/index.htm ',
 '1094 2290',
 '143 2',
 '880 879',
 '3836 3835',
 '3238 3254',
 '3734 4965',
 '836 2006']

There seems to be two types of data: (number text) and (number number). How many of each type?

def isNumber(n):
    try:
        tmp = int(n)
        return True
    except:
        return False
isNumber(1)
True
isNumber("abc")
False
num_num = raw_data.filter(lambda line: isNumber(line.split(" ")[1]))
num_num.count()
23876
num_text = raw_data.filter(lambda line: not isNumber(line.split(" ")[1]))
num_text.count()
6012

Which data set, num_num or num_text, is important?

s = "a1 b2 c3"
s.split(" ")[0].upper()
'A1'

Hands-on#

  1. How do you apply page rank on num_num?

  2. How do you link the answers back to the actual page of hollies.dat?

links = num_num.map(lambda line: (line.split(" ")[0], line.split(" ")[1])) \
        .groupByKey() \
        .mapValues(list)
links.take(10)
[('1',
  ['2',
   '3',
   '4',
   '5',
   '6',
   '7',
   '8',
   '9',
   '10',
   '11',
   '12',
   '13',
   '14',
   '15',
   '16',
   '17',
   '18',
   '19',
   '20',
   '21',
   '22',
   '23',
   '24',
   '25']),
 ('8',
  ['2',
   '7',
   '19',
   '21',
   '85',
   '87',
   '88',
   '90',
   '91',
   '92',
   '221',
   '222',
   '223',
   '224',
   '225',
   '226',
   '227',
   '228',
   '229',
   '230',
   '340',
   '341',
   '342',
   '343']),
 ('16',
  ['2',
   '27',
   '28',
   '37',
   '38',
   '42',
   '48',
   '52',
   '61',
   '73',
   '74',
   '75',
   '76',
   '77',
   '78',
   '79']),
 ('20',
  ['2',
   '217',
   '248',
   '482',
   '483',
   '484',
   '485',
   '486',
   '487',
   '488',
   '489',
   '490',
   '491',
   '492',
   '493']),
 ('26',
  ['2', '27', '28', '37', '38', '39', '40', '41', '42', '43', '52', '61']),
 ('29',
  ['2',
   '27',
   '28',
   '37',
   '38',
   '43',
   '52',
   '61',
   '73',
   '118',
   '119',
   '120',
   '121',
   '122',
   '123',
   '124',
   '125',
   '126',
   '127',
   '128',
   '129',
   '130',
   '131']),
 ('33',
  ['2',
   '27',
   '28',
   '30',
   '37',
   '38',
   '40',
   '43',
   '52',
   '61',
   '73',
   '168',
   '169',
   '170',
   '171',
   '172',
   '173']),
 ('34',
  ['2',
   '37',
   '38',
   '43',
   '52',
   '73',
   '84',
   '148',
   '174',
   '175',
   '176',
   '177',
   '178',
   '179',
   '180',
   '181',
   '182',
   '183',
   '184',
   '185',
   '186',
   '187',
   '188',
   '189']),
 ('44',
  ['2',
   '28',
   '34',
   '37',
   '38',
   '43',
   '45',
   '52',
   '61',
   '132',
   '261',
   '262',
   '263',
   '264',
   '265',
   '266',
   '267',
   '268',
   '269',
   '270',
   '271',
   '272',
   '273',
   '274',
   '275',
   '276',
   '277',
   '278',
   '279',
   '280',
   '281',
   '282',
   '283',
   '284',
   '285',
   '286',
   '287',
   '288',
   '289',
   '290',
   '291',
   '292',
   '293']),
 ('45',
  ['2',
   '28',
   '37',
   '38',
   '43',
   '44',
   '52',
   '61',
   '123',
   '132',
   '198',
   '252',
   '294',
   '295',
   '296',
   '297',
   '298',
   '299',
   '300',
   '301',
   '302'])]
N = links.count()
ranks = links.map(lambda line: (line[0], 1 / N))
ranks.take(10)
[('1', 0.0003541076487252125),
 ('8', 0.0003541076487252125),
 ('16', 0.0003541076487252125),
 ('20', 0.0003541076487252125),
 ('26', 0.0003541076487252125),
 ('29', 0.0003541076487252125),
 ('33', 0.0003541076487252125),
 ('34', 0.0003541076487252125),
 ('44', 0.0003541076487252125),
 ('45', 0.0003541076487252125)]
def calculateVotes(t):
    res = []
    for item in t[1][1]:
        count = len(t[1][1])
        res.append((item, t[1][0] / count))
    return res
calculateVotes(('y', (0.3333333333333333, ['y', 'a'])))
[('y', 0.16666666666666666), ('a', 0.16666666666666666)]
%%time
sum = 1
N = links.count()
ranks = links.map(lambda line: (line[0], 1 / N))
ranks.take(10)

while sum > 0.1:
    old_ranks = ranks
    votes = ranks.join(links) \
        .flatMap(calculateVotes)
    ranks = votes.reduceByKey(lambda x, y: x + y)
    errors = old_ranks.join(ranks).mapValues(lambda v: abs(v[0] - v[1]))
    sum = errors.values().sum()
    print(sum)
1.0289126892749134
0.6524166912065938
0.354732060058874
0.20752688756812687
0.14669618513501703
0.11380517976401004
0.09642834963258069
CPU times: user 178 ms, sys: 36.8 ms, total: 214 ms
Wall time: 5.78 s
ranks.takeOrdered(10,key = lambda x: -x[1])
[('2', 0.01480246014081635),
 ('37', 0.010738970689076617),
 ('38', 0.009867983441807206),
 ('61', 0.009734731227889629),
 ('52', 0.009357107790050318),
 ('425', 0.00868217558595644),
 ('43', 0.008005883519059315),
 ('27', 0.00761747437182122),
 ('28', 0.006066068156415338),
 ('29', 0.0056368097448235835)]

How to link the above results with the text portion of Hollins dataset?