# Hands-on: Locality Sensitive Hashing

In [1]:
import os
import sys

spark_path = os.environ['SPARK_HOME']
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.9-src.zip")

import findspark
findspark.init()

import pyspark

number_cores = 4
memory_gb = 8
conf = (pyspark.SparkConf().setMaster('local[{}]'.
                                      format(number_cores)).
        set('spark.driver.memory', '{}g'.format(memory_gb)))
sc = pyspark.SparkContext(conf=conf)

In [2]:
!ls ../data/

[34mcrypto[m[m                 [34mml-latest-small[m[m        small_graph.dat
hollins.dat            ml-latest.zip          [34myelp[m[m
inaugural_speeches.zip [34moutput[m[m
[34mml-latest[m[m              [34mshakespeare[m[m


In [3]:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
spark = pyspark.sql.SparkSession(sc)

In [5]:
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),),
         (3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]

In [6]:
dataA

[(0, SparseVector(6, {0: 1.0, 1: 1.0, 2: 1.0})),
 (1, SparseVector(6, {2: 1.0, 3: 1.0, 4: 1.0})),
 (2, SparseVector(6, {0: 1.0, 2: 1.0, 4: 1.0})),
 (3, SparseVector(6, {1: 1.0, 3: 1.0, 5: 1.0})),
 (4, SparseVector(6, {2: 1.0, 3: 1.0, 5: 1.0})),
 (5, SparseVector(6, {1: 1.0, 2: 1.0, 4: 1.0}))]

In [7]:
dfA = spark.createDataFrame(dataA, ["id", "features"])

In [10]:
dfA.take(6)

[Row(id=0, features=SparseVector(6, {0: 1.0, 1: 1.0, 2: 1.0})),
 Row(id=1, features=SparseVector(6, {2: 1.0, 3: 1.0, 4: 1.0})),
 Row(id=2, features=SparseVector(6, {0: 1.0, 2: 1.0, 4: 1.0})),
 Row(id=3, features=SparseVector(6, {1: 1.0, 3: 1.0, 5: 1.0})),
 Row(id=4, features=SparseVector(6, {2: 1.0, 3: 1.0, 5: 1.0})),
 Row(id=5, features=SparseVector(6, {1: 1.0, 2: 1.0, 4: 1.0}))]

In [11]:
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)

In [12]:
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
|  0|(6,[0,1,2],[1.0,1...|[[3.05647612E8], ...|
|  1|(6,[2,3,4],[1.0,1...|[[3.05647612E8], ...|
|  2|(6,[0,2,4],[1.0,1...|[[3.05647612E8], ...|
|  3|(6,[1,3,5],[1.0,1...|[[9.2332134E7], [...|
|  4|(6,[2,3,5],[1.0,1...|[[9.2332134E7], [...|
|  5|(6,[1,2,4],[1.0,1...|[[3.05647612E8], ...|
+---+--------------------+--------------------+



In [13]:
final = model.approxSimilarityJoin(dfA, dfA, 1.0, distCol="distance")

final.createOrReplaceTempView("final")
final.printSchema()

root
 |-- datasetA: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- features: vector (nullable = true)
 |    |-- hashes: array (nullable = true)
 |    |    |-- element: vector (containsNull = true)
 |-- datasetB: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- features: vector (nullable = true)
 |    |-- hashes: array (nullable = true)
 |    |    |-- element: vector (containsNull = true)
 |-- distance: double (nullable = false)



### Important

MinHashLSH provides the Jaccard distance  (see https://en.wikipedia.org/wiki/Jaccard_index#Probability_Jaccard_similarity_and_distance), which is 1 - Jaccard similarity

In [14]:
sql = """
select datasetA.id, datasetA.features, datasetB.id, datasetB.features, distance
from final where datasetA.id != datasetB.id order by datasetA.id, datasetB.id
"""
spark.sql(sql).show()

+---+--------------------+---+--------------------+--------+
| id|            features| id|            features|distance|
+---+--------------------+---+--------------------+--------+
|  0|(6,[0,1,2],[1.0,1...|  1|(6,[2,3,4],[1.0,1...|     0.8|
|  0|(6,[0,1,2],[1.0,1...|  2|(6,[0,2,4],[1.0,1...|     0.5|
|  0|(6,[0,1,2],[1.0,1...|  3|(6,[1,3,5],[1.0,1...|     0.8|
|  0|(6,[0,1,2],[1.0,1...|  5|(6,[1,2,4],[1.0,1...|     0.5|
|  1|(6,[2,3,4],[1.0,1...|  0|(6,[0,1,2],[1.0,1...|     0.8|
|  1|(6,[2,3,4],[1.0,1...|  2|(6,[0,2,4],[1.0,1...|     0.5|
|  1|(6,[2,3,4],[1.0,1...|  4|(6,[2,3,5],[1.0,1...|     0.5|
|  1|(6,[2,3,4],[1.0,1...|  5|(6,[1,2,4],[1.0,1...|     0.5|
|  2|(6,[0,2,4],[1.0,1...|  0|(6,[0,1,2],[1.0,1...|     0.5|
|  2|(6,[0,2,4],[1.0,1...|  1|(6,[2,3,4],[1.0,1...|     0.5|
|  2|(6,[0,2,4],[1.0,1...|  5|(6,[1,2,4],[1.0,1...|     0.5|
|  3|(6,[1,3,5],[1.0,1...|  0|(6,[0,1,2],[1.0,1...|     0.8|
|  3|(6,[1,3,5],[1.0,1...|  4|(6,[2,3,5],[1.0,1...|     0.5|
|  4|(6,[2,3,5],[1.0,1..

Now let's do the example in the lecture slides:

| C1  | C2  | C3  | C4 |
| --- | --- | --- | ---| 
| 1   | 0   |  1  |  0 | 
| 1   | 0   |  0  |  1 | 
| 0   | 1   |  0  |  1 | 
| 0   | 1   |  0  |  1 | 
| 0   | 1   |  0  |  1 | 
| 1   | 0   |  1  |  0 | 
| 1   | 0   |  1  |  0 | 

In [16]:
dataB = [("C1", Vectors.sparse(7, [0, 1, 5, 6], [1.0, 1.0, 1.0, 1.0]),),
         ("C2", Vectors.sparse(7, [2, 3, 4], [1.0, 1.0, 1.0]),),
         ("C3", Vectors.sparse(7, [0, 5, 6], [1.0, 1.0, 1.0]),),
         ("C4", Vectors.sparse(7, [1, 2, 3, 4], [1.0, 1.0, 1.0, 1.0]),)]

In [27]:
dfB = spark.createDataFrame(dataB, ["id", "features"])
mhB = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=100)
modelB = mhB.fit(dfB)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
modelB.transform(dfB).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
| C1|(7,[0,1,5,6],[1.0...|[[9.2332134E7], [...|
| C2|(7,[2,3,4],[1.0,1...|[[3.05647612E8], ...|
| C3|(7,[0,5,6],[1.0,1...|[[9.2332134E7], [...|
| C4|(7,[1,2,3,4],[1.0...|[[3.05647612E8], ...|
+---+--------------------+--------------------+



In [28]:
finalB = model.approxSimilarityJoin(dfB, dfB, 1.0, distCol="distance")
finalB.createOrReplaceTempView("finalB")

In [29]:
sql = """
select datasetA.id, datasetA.features, datasetB.id, datasetB.features, distance
from finalB where datasetA.id != datasetB.id order by datasetA.id, datasetB.id
"""
spark.sql(sql).show()

+---+--------------------+---+--------------------+--------+
| id|            features| id|            features|distance|
+---+--------------------+---+--------------------+--------+
| C1|(7,[0,1,5,6],[1.0...| C3|(7,[0,5,6],[1.0,1...|    0.25|
| C2|(7,[2,3,4],[1.0,1...| C4|(7,[1,2,3,4],[1.0...|    0.25|
| C3|(7,[0,5,6],[1.0,1...| C1|(7,[0,1,5,6],[1.0...|    0.25|
| C4|(7,[1,2,3,4],[1.0...| C2|(7,[2,3,4],[1.0,1...|    0.25|
+---+--------------------+---+--------------------+--------+



Does it need to be sorted? (Yes!)

In [20]:
dataB = [("C1", Vectors.sparse(7, [0, 5, 1, 6], [1.0, 1.0, 1.0, 1.0]),),
         ("C2", Vectors.sparse(7, [2, 3, 4], [1.0, 1.0, 1.0]),),
         ("C3", Vectors.sparse(7, [0, 5, 6], [1.0, 1.0, 1.0]),),
         ("C4", Vectors.sparse(7, [1, 3, 2, 4], [1.0, 1.0, 1.0, 1.0]),)]

dfB = spark.createDataFrame(dataB, ["id", "features"])
mhB = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
modelB = mhB.fit(dfB)

finalB = model.approxSimilarityJoin(dfB, dfB, 1.0, distCol="distance")
finalB.createOrReplaceTempView("finalB")

sql = """
select datasetA.id, datasetA.features, datasetB.id, datasetB.features, distance
from finalB where datasetA.id != datasetB.id order by datasetA.id, datasetB.id
"""
spark.sql(sql).show()

TypeError: Indices 5 and 1 are not strictly increasing

### Note:

Linux environment with true Hadoop support `wholeTextFiles`, but we don't have it in Windows, 
so we will try a different way:

```
raw_data = sc.wholeTextFiles("../data/inaugural_speeches").cache()
```

In [33]:
import glob

data_path = "../data/inaugural_speeches"
all_files = glob.glob("../data/inaugural_speeches/*.txt")


file_names = sc.parallelize(all_files)
file_names.collect()

['../data/inaugural_speeches\\10_adams_john_quincy_1825.txt',
 '../data/inaugural_speeches\\11_jackson_1829.txt',
 '../data/inaugural_speeches\\12_jackson_1833.txt',
 '../data/inaugural_speeches\\13_van_buren_1837.txt',
 '../data/inaugural_speeches\\14_harrison_1841.txt',
 '../data/inaugural_speeches\\15_polk_1845.txt',
 '../data/inaugural_speeches\\16_taylor_1849.txt',
 '../data/inaugural_speeches\\17_pierce_1853.txt',
 '../data/inaugural_speeches\\18_buchanan_1857.txt',
 '../data/inaugural_speeches\\19_lincoln_1861.txt',
 '../data/inaugural_speeches\\1_washington_1789.txt',
 '../data/inaugural_speeches\\20_lincoln_1865.txt',
 '../data/inaugural_speeches\\21_grant_1869.txt',
 '../data/inaugural_speeches\\22_grant_1873.txt',
 '../data/inaugural_speeches\\23_hayes_1877.txt',
 '../data/inaugural_speeches\\24_garfield_1881.txt',
 '../data/inaugural_speeches\\25_cleveland_1885.txt',
 '../data/inaugural_speeches\\26_harrison_1889.txt',
 '../data/inaugural_speeches\\27_cleveland_1893.txt',
 

In [34]:
raw_data = file_names.map(lambda f: (f.split("speeches")[1][2:], open(f,mode='r').read()))
raw_data.take(5)

[('0_adams_john_quincy_1825.txt',
  'John Quincy Adams\t1825-03-04\tIn compliance with an usage coeval with the existence of our Federal Constitution, and sanctioned by the example of my predecessors in the career upon which I am about to enter, I appear, my fellow-citizens, in your presence and in that of Heaven to bind myself by the solemnities of religious obligation to the faithful performance of the duties allotted to me in the station to which I have been called In unfolding to my countrymen the principles by which I shall be governed in the fulfillment of those duties my first resort will be to that Constitution which I shall swear to the best of my ability to preserve, protect, and defend. That revered instrument enumerates the powers and prescribes the duties of the Executive Magistrate, and in its first words declares the purposes to which these and the whole action of the Government instituted by it should be invariably and sacredly devoted--to form a more perfect union, est

In [35]:
%%time
raw_data.count()

Wall time: 4.32 s


57

In [36]:
raw_data.take(5)

[('0_adams_john_quincy_1825.txt',
  'John Quincy Adams\t1825-03-04\tIn compliance with an usage coeval with the existence of our Federal Constitution, and sanctioned by the example of my predecessors in the career upon which I am about to enter, I appear, my fellow-citizens, in your presence and in that of Heaven to bind myself by the solemnities of religious obligation to the faithful performance of the duties allotted to me in the station to which I have been called In unfolding to my countrymen the principles by which I shall be governed in the fulfillment of those duties my first resort will be to that Constitution which I shall swear to the best of my ability to preserve, protect, and defend. That revered instrument enumerates the powers and prescribes the duties of the Executive Magistrate, and in its first words declares the purposes to which these and the whole action of the Government instituted by it should be invariably and sacredly devoted--to form a more perfect union, est

In [50]:
import string
translator = str.maketrans('', '', string.punctuation)

tokenized_data = raw_data.map(lambda p: (p[0], sorted(list(set((p[1].replace('-', ' ').translate(translator).lower().split(" ")))))))
tokenized_data.take(5)

[('0_adams_john_quincy_1825.txt',
  ['',
   '03',
   '04\tin',
   'a',
   'ability',
   'aboriginal',
   'about',
   'abroad',
   'abuse',
   'accomplished',
   'accountability',
   'acknowledged',
   'acquainted',
   'acquired',
   'action',
   'adams\t1825',
   'addressing',
   'administered',
   'administration',
   'administrative',
   'admiration',
   'admit',
   'admitted',
   'admonition',
   'adoption',
   'advance',
   'advancing',
   'adverse',
   'affairs',
   'affording',
   'african',
   'after',
   'against',
   'age',
   'aged',
   'ages',
   'aggravated',
   'aggravation',
   'agreed',
   'alike',
   'all',
   'allayed',
   'alleviate',
   'allotted',
   'alluring',
   'alone',
   'am',
   'amity',
   'among',
   'an',
   'ancient',
   'and',
   'animation',
   'animosities',
   'annals',
   'annually',
   'another',
   'antipathies',
   'anxiety',
   'any',
   'appear',
   'appeared',
   'application',
   'approximated',
   'apt',
   'aqueducts',
   'ardent',
   'arduo

In [51]:
all_words = tokenized_data.flatMap(lambda p: p[1]) \
                        .map(lambda w: (w, 1)) \
                        .reduceByKey(lambda x, y: x + y) \
                        .map(lambda p: p[0]).collect()
all_words = sorted(all_words)

In [52]:
print(all_words[0:20])
print(len(all_words))

['', '03', '04', '04\tabout', '04\tcalled', '04\tcitizens', '04\tfellow', '04\tfriends', '04\ti', '04\tin', '04\tmy', '04\tproceeding', '04\tthe', '04\tunwilling', '04\twhen', '05\telected', '05\tfellow', '1', '100000000', '120000000']
9347


In [54]:
total_words = len(all_words)

def buildVector(words):
    indexList = []
    checkList = []
    
    for w in words:
        indexList.append(all_words.index(w))
        checkList.append(1.0)
    return Vectors.sparse(total_words, indexList, checkList)

In [55]:
dataC = tokenized_data.map(lambda p: (p[0], buildVector(p[1])))

In [61]:
dfC = spark.createDataFrame(dataC, ["id", "features"])
mhC = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
modelC = mhC.fit(dfC)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
modelC.transform(dfC).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+--------------------+--------------------+--------------------+
|                  id|            features|              hashes|
+--------------------+--------------------+--------------------+
|0_adams_john_quin...|(9347,[0,1,9,100,...|[[247760.0], [141...|
|  1_jackson_1829.txt|(9347,[0,1,6,100,...|[[247760.0], [1.2...|
|  2_jackson_1833.txt|(9347,[0,1,12,100...|[[247760.0], [141...|
|3_van_buren_1837.txt|(9347,[0,1,6,100,...|[[78254.0], [5679...|
| 4_harrison_1841.txt|(9347,[0,1,4,100,...|[[78254.0], [1302...|
|     5_polk_1845.txt|(9347,[0,1,6,35,1...|[[247760.0], [630...|
|   6_taylor_1849.txt|(9347,[0,1,15,100...|[[247760.0], [1.2...|
|   7_pierce_1853.txt|(9347,[0,1,10,41,...|[[247760.0], [130...|
| 8_buchanan_1857.txt|(9347,[0,1,6,100,...|[[78254.0], [7138...|
|  9_lincoln_1861.txt|(9347,[0,1,6,26,2...|[[247760.0], [163...|
|_washington_1789.txt|(9347,[0,2,22,77,...|[[247760.0], [579...|
|  0_lincoln_186

In [72]:
finalC = model.approxSimilarityJoin(dfC, dfC, 0.8, distCol="distance")
finalC.createOrReplaceTempView("finalC")

In [73]:
sql = """
select datasetA.id, datasetA.features, datasetB.id, datasetB.features, distance
from finalC where datasetA.id != datasetB.id order by datasetA.id, datasetB.id
"""
spark.sql(sql).show()

+--------------------+--------------------+--------------------+--------------------+------------------+
|                  id|            features|                  id|            features|          distance|
+--------------------+--------------------+--------------------+--------------------+------------------+
|0_adams_john_quin...|(9347,[0,1,9,100,...|    3_hayes_1877.txt|(9347,[0,1,16,44,...| 0.786593707250342|
|0_adams_john_quin...|(9347,[0,1,9,100,...|     5_polk_1845.txt|(9347,[0,1,6,35,1...|0.7608213096559379|
|0_adams_john_quin...|(9347,[0,1,9,100,...|   7_pierce_1853.txt|(9347,[0,1,10,41,...|0.7754982415005862|
|0_adams_john_quin...|(9347,[0,1,9,100,...| 8_mckinley_1897.txt|(9347,[0,1,6,24,4...|0.7909446618222471|
|0_adams_john_quin...|(9347,[0,1,9,100,...|  9_lincoln_1861.txt|(9347,[0,1,6,26,2...|0.7923264311814859|
|0_adams_john_quin...|(9347,[0,1,9,100,...|_adams_john_1797.txt|(9347,[1,14,23,10...|0.7783149171270718|
|0_adams_john_quin...|(9347,[0,1,9,100,...| _jefferson_