forked from Jerry1014/FundCrawler
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_manager.py
More file actions
203 lines (165 loc) · 6.17 KB
/
process_manager.py
File metadata and controls
203 lines (165 loc) · 6.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
爬取核心
对爬取过程的管理
"""
import logging
from abc import abstractmethod, ABC
from collections.abc import Generator
from datetime import datetime
from enum import unique, StrEnum
from threading import Thread
from time import sleep
from typing import NoReturn, Optional
class NeedCrawledFundModule(ABC):
"""
基金爬取任务模块
通过生成器逐个给出 需要爬取的基金
"""
class NeedCrawledOnceFund:
"""
需要爬取的 单个基金信息
"""
def __init__(self, code: str, name: str):
self.code = code
self.name = name
def __init__(self):
self.total = None
self.task_generator: Optional[Generator[NeedCrawledFundModule.NeedCrawledOnceFund]] = None
self.init_generator()
@abstractmethod
def init_generator(self) -> NoReturn:
"""
初始化 生成器
"""
return NotImplemented
class FundCrawlingResult:
"""
基金的最终爬取结果定义
"""
@unique
class Header(StrEnum):
"""
结果key
"""
FUND_CODE = '基金代码',
FUND_SIMPLE_NAME = '基金简称',
FUND_TYPE = '基金类型',
FUND_SIZE = '资产规模(亿)',
FUND_COMPANY = '基金管理人',
FUND_VALUE = '基金净值',
# 兼容带新场景,A+B -> B -> B+C,此时基金经理为时长最长的B,对应的任职时间为 这三段 B连续任职的任职时间
FUND_MANAGER = '基金经理(最近连续最长任职)',
DATE_OF_APPOINTMENT = '基金经理的上任时间',
STANDARD_DEVIATION_THREE_YEARS = '近三年标准差',
SHARPE_THREE_YEARS = '近三年夏普',
THREE_YEARS_INCREASE = '近三年涨幅',
FIVE_YEARS_INCREASE = '近五年涨幅'
def __init__(self, fund_code: str, fund_name: str):
self.fund_info_dict = {FundCrawlingResult.Header.FUND_CODE: fund_code,
FundCrawlingResult.Header.FUND_SIMPLE_NAME: fund_name}
class CrawlingDataModule(ABC):
"""
数据爬取模块
包括数据的下载和清洗
"""
@abstractmethod
def do_crawling(self, task: NeedCrawledFundModule.NeedCrawledOnceFund) -> NoReturn:
"""
提交任务
需要有任务堆积时的阻塞, 以便可以将时间片让出来 处理结果
"""
return NotImplemented
@abstractmethod
def has_next_result(self) -> bool:
"""
请求已经全部处理完, 且结果都被取出了
在shutdown后调用
"""
return NotImplemented
@abstractmethod
def get_an_result(self) -> Optional[FundCrawlingResult]:
"""
(阻塞, 有超时)获取一个处理好的结果
数据爬取尽量保证成功, 实在失败时 爬取数据为None, 所以不期望的异常
可以认为只存在于 数据解析 部分, 需要防止一个任务失败导致全部失败
"""
return NotImplemented
@abstractmethod
def shutdown(self):
"""
请求已经全部传递完了
"""
return NotImplemented
class SaveResultModule(ABC):
"""
基金数据的保存模块
"""
@abstractmethod
def save_result(self, result: FundCrawlingResult) -> NoReturn:
"""
爬取结果的保存
"""
return NotImplemented
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class TaskManager:
"""
爬取核心
"""
def __init__(self, need_crawled_fund_module: NeedCrawledFundModule, crawling_data_module: CrawlingDataModule,
save_result_module: SaveResultModule, log_level=logging.DEBUG):
"""
:param need_crawled_fund_module: 负责给出 基金爬取任务
:param crawling_data_module: 负责 数据爬取和清洗
:param save_result_module: 负责 数据保存
"""
self._need_crawled_fund_module = need_crawled_fund_module
self._crawling_data_module = crawling_data_module
self._save_result_module = save_result_module
logging.basicConfig(filename='./log/process.text', encoding='utf-8', level=log_level, filemode='w',
format='%(asctime)s %(message)s')
logging.info(f"需要爬取的基金总数:{self._need_crawled_fund_module.total}")
self._cur_finished_task_count = 0
self._all_task_finished = False
def get_task_and_crawling(self):
generator = self._need_crawled_fund_module.task_generator
while True:
try:
task: NeedCrawledFundModule.NeedCrawledOnceFund = next(generator)
except StopIteration:
break
self._crawling_data_module.do_crawling(task)
self._crawling_data_module.shutdown()
def get_result_and_save(self):
with self._save_result_module:
while self._crawling_data_module.has_next_result():
result: FundCrawlingResult = self._crawling_data_module.get_an_result()
if result:
self._save_result_module.save_result(result)
self._cur_finished_task_count += 1
self._all_task_finished = True
def show_process(self):
while not self._all_task_finished:
logging.info(f"已爬取完成基金数:{self._cur_finished_task_count}")
sleep(5)
def run(self) -> NoReturn:
"""
爬取主流程
从 基金爬取任务模块 将任务传递给 数据爬取和清洗模块
从 数据爬取和清洗模块 将结果传递给 数据保存模块
两部分的任务都是阻塞的(主要会阻塞在 数据爬取和清洗)
"""
start_time = datetime.now()
thread1 = Thread(target=self.get_task_and_crawling)
thread2 = Thread(target=self.get_result_and_save)
thread3 = Thread(target=self.show_process)
thread1.start()
thread2.start()
thread3.start()
thread1.join()
thread2.join()
thread3.join()
cur_time = datetime.now()
logging.info(f"基金爬取完成 耗时{(cur_time - start_time).seconds}s")