-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patha2.py
More file actions
60 lines (49 loc) · 1.89 KB
/
a2.py
File metadata and controls
60 lines (49 loc) · 1.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
"""
First Name: Aditya Subramanian
Last Name: Muralidaran
UBIT Name: adityasu
"""
from pyspark import SparkConf, SparkContext
import sys
def getLGraph(line):
v = [ int(x) for x in line.split(" ") ]
return [(v[0], v[1]), (v[1], v[0])]
def getSGraph(LGraph):
v = [int(x) for x in LGraph]
if(v[1] <= v[0]):
return (v[0],v[1])
else:
return (v[1],v[0])
def smallStarOp(keyval):
if (int(keyval[1][0]) != int(keyval[1][1])):
return (int(keyval[1][0]), int(keyval[1][1]))
else:
return (int(keyval[0]), int(keyval[1][0]))
if __name__ == "__main__":
conf = SparkConf().setAppName("ConnectedComponents")
sc = SparkContext(conf = conf)
lines = sc.textFile(sys.argv[1])
Neig = lines.flatMap(getLGraph)
Neig.persist()
# Large Star Operations
while (True):
m = Neig.reduceByKey(lambda a,b: min(a,b)).map(lambda v: (int(v[0]),min(int(v[0]),int(v[1]))))
LGraphEdges = Neig.map(lambda e: e if int(e[0]) < int(e[1]) else None).filter(lambda x: x is not None)
newEdges = LGraphEdges.join(m).map(lambda e: (e[1][0], e[1][1])).distinct()
# Small star operation
SstarGraph = newEdges.map(getSGraph)
SstarGraph.persist()
m1 = SstarGraph.reduceByKey(lambda a,b: min(a,b)).map(lambda v: (int(v[0]),min(int(v[0]),int(v[1]))))
SnewEdges = SstarGraph.join(m1).map(smallStarOp).distinct()
SnewEdgeslist = SstarGraph.flatMap(lambda e: [(int(e[0]),int(e[1])), (int(e[1]),int(e[0]))])
if(SnewEdgeslist.subtract(Neig).isEmpty() and Neig.subtract(SnewEdgeslist).isEmpty()):
break
else:
Neig = SnewEdges.flatMap(lambda e: [(int(e[0]),int(e[1])), (int(e[1]),int(e[0]))])
Neig.persist()
SstarGraph = SnewEdges.map(getSGraph)
SstarGraph.persist()
# Each node of a connected component is labeled using the root of its connected component.
vertexLabel = SstarGraph.flatMap(lambda v: [str(v[0]) + " " + str(v[1]), str(v[1]) + " " + str(v[1])]).distinct()
vertexLabel.saveAsTextFile(sys.argv[2])
sc.stop()