This repository was archived by the owner on Sep 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexecutor.py
More file actions
121 lines (87 loc) · 3.14 KB
/
executor.py
File metadata and controls
121 lines (87 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from collections import Counter, deque
from typing import Optional
import tldextract
from graph import Graph
from scraper import Scraper
class Executor(ABC):
def __init__(self, graph: Graph, scraper: Scraper, url: str) -> None:
super().__init__()
self.graph = graph
self.scraper = scraper
self.url = self.normalize_url(url)
self.domain = self.extract_domain(url)
self.visited_nodes = []
async def run(self):
await self._execute(self.url)
def is_node_visited(self, url: str) -> bool:
return url in self.visited_nodes
@abstractmethod
async def _execute(self, url: str):
pass
@staticmethod
def extract_domain(url: str) -> Optional[str]:
try:
result = tldextract.extract(url)
return result.domain
except RuntimeError:
return None
@staticmethod
def normalize_url(url: str) -> str:
return url.strip('/')
@staticmethod
def types():
return {
RecursiveExecutor.TYPE: lambda: RecursiveExecutor,
IterationExecutor.TYPE: lambda: IterationExecutor,
}
@staticmethod
def build(_type: str, graph: Graph, scraper: Scraper, url: str) -> Executor:
try:
return Executor.types().get(_type)()(graph, scraper, url)
except KeyError:
raise Exception("Given Executor type is invalid!")
class RecursiveExecutor(Executor):
TYPE = 'recursive-executor'
async def _execute(self, url: str):
url = self.normalize_url(url)
self.graph.add_node(url)
if self.is_node_visited(url):
return
self.visited_nodes.append(url)
links = await self.scraper.scrape_url(url)
edges = dict(Counter(map(lambda link: self.normalize_url(link), links)))
self.graph.set_edges(url, edges)
tasks = []
for (edge_url, _) in edges.items():
if self.extract_domain(edge_url) != self.domain:
continue
task = self._execute(edge_url)
tasks.append(task)
await asyncio.gather(*tasks)
class IterationExecutor(Executor):
TYPE = 'iteration-executor'
async def _execute(self, url: str):
url = self.normalize_url(url)
queue = deque([url])
while queue:
url = queue.pop()
self.graph.add_node(url)
if self.is_node_visited(url):
continue
self.visited_nodes.append(url)
links = await self.scraper.scrape_url(url)
edges = dict(Counter(map(lambda link: self.normalize_url(link), links)))
self.graph.set_edges(url, edges)
for (edge_url, _) in edges.items():
if self.extract_domain(edge_url) != self.domain or self.is_node_visited(edge_url):
continue
queue.append(edge_url)
# Should have producer and consumer like mechanism for parallelism
# class QueueExecutor(Executor):
# TYPE = 'queue-executor'
#
# async def _execute(self, url: str):
# pass