本文档按当前模块结构说明代码职责。monitor.py 只是兼容入口,实际实现位于 wgmm_monitor/。
monitor.py
-> wgmm_monitor.cli
-> wgmm_monitor.app
-> services
-> clients
-> stores
-> wgmm
-> utils
-> models
核心边界:
clients:外部系统 I/O。stores:本地文件读写。services:业务流程编排。wgmm:纯算法层。models:跨模块数据结构。
保留历史运行方式:
python monitor.py
python monitor.py --dev
python monitor.py --wgmm-core-only文件只导入并调用 wgmm_monitor.cli.main()。
职责:
- 解析
--dev和--wgmm-core-only。 - 调用
load_env_file("data/.env")。 - 创建
Application。 - 根据模式调用
run_forever()、run_dev_once()或run_wgmm_core_only()。
Application 是运行期依赖装配器。
装配对象:
RuntimePathsAppConfigRuntimeLoggerBarkClientGistClientYtDlpClientBilibiliApiClientConfigStoreUrlStoreHistoryStoreBilibiliServiceHistoryServiceFrequencyServiceMonitorServiceNotificationService
启动校验:
- 必需环境变量缺失时退出。
data/cookies.txt缺失、为空或不可读时记录严重错误并退出。- 注册
SIGTERM和SIGINT信号处理器。
load_env_file():读取简单KEY=VALUE格式的data/.env。load_app_config():从环境变量生成AppConfig。
主要数据模型:
RuntimePaths:所有运行时路径。AppConfig:Gist、Bark、Bilibili 配置。WgmmConfig:WGMM 持久化状态,未知字段保存在extra并写回。YtDlpResult:yt-dlp执行结果。FrequencyDecision:一次调频决策结果。
YtDlpClient 只负责执行 yt-dlp:
- 首次调用时用
shutil.which("yt-dlp")查找可执行文件并缓存路径。 - 使用
subprocess.run()执行命令。 - 记录
last_duration。 - 对成功调用更新
normal_duration指数滑动均值。 - 返回
YtDlpResult,不抛出业务异常。
GistClient 封装 GitHub Gist API:
fetch_urls()读取 Gist 中的urls.txt。write_new_urls()写入 Gist 中的new.txt。- HTTP 或 JSON 错误以
(success, data, error)形式返回给服务层处理。
BarkClient 构造 Bark URL 并发送 GET 请求。通知语义不在 client 层处理,由 NotificationService 决定。
BilibiliApiClient 封装 B站 view API(/x/web-interface/view),获取视频真实投稿时间 ctime:
extract_bvid():从 URL 或 yt-dlp id 中提取 BV 号(模块级纯函数)。fetch_view():按 bvid 缓存(含失败的None),串行请求 API(两次实际请求间留REQUEST_INTERVAL间隔,避免封控),同一 BV 的多 P 只请求一次。get_ctime()/get_part_ctimes():单 P 用data.ctime,多 P 按page匹配data.pages[].ctime。
管理 data/wgmm_config.json:
- 缺失时使用
WgmmConfig()默认值。 - JSON 损坏时记录 warning 并使用默认配置。
- dev mode 下
save()不写磁盘。 ensure_manual_flag()保证生产模式首次运行时存在is_manual_run。
管理历史事件:
load_positive_events()读取data/mtime.txt。load_miss_history()读取data/miss_history.txt。save_miss_history()保存一次负向事件;手动运行和 dev mode 不写真实文件。append_upload_timestamps()追加真实上传时间。prune_old_data()根据指数衰减权重剪枝低权重历史。
管理 data/local_known.txt:
load()返回本地已知 URL 集合。save()写入排序后的 URL。- dev mode 下写入内存沙盒。
MonitorService 编排主监控流程。
关键状态:
memory_urls:从 Gist 读取的云端 URL。known_urls:本地完整已知 URL 集合。
关键方法:
sync_urls_from_gist():读取 Gist 并合并本地状态。run_monitor():执行一次完整检测和调频。wait_for_next_check():根据FrequencyService.get_next_check_time()等待。adjust_check_frequency():兼容入口,传入yt-dlp耗时给调频服务。cleanup():dev mode 下清理临时目录。
run_monitor() 的重要分支:
- Gist 失败且没有基准 URL:跳过本轮。
- 两层预检查都无变化:直接按未发现新内容调频。
- 完整视频列表首次失败:等待 30 秒重试一次。
- 分片扩展返回空:跳过本轮检测,避免基础 URL 误判。
- 新 URL 存在:保存真实上传时间、更新本地状态、写 Gist、发送通知、按发现新内容调频。
BilibiliService 封装 B站相关业务:
check_potential_new_parts():第一层,多分片预检查。quick_precheck():第二层,获取最新视频 ID,并同时检查memory_urls与known_urls。fetch_video_list():第三层完整扫描入口。get_video_parts():获取单个视频的所有分片 URL。get_all_videos_parallel():最多 5 个线程并行展开分片。get_video_upload_time():通过 B站 view API 获取真实投稿时间ctime(单 P 用data.ctime,多 P 按page匹配data.pages[].ctime)。注意 yt-dlp 的timestamp/upload_date对应可被 UP 主伪造的pubdate,不作为训练数据。get_bvid_part_ctimes():透传 view API,返回某 BV 全部分 P 的真实 ctime(供批量重建使用)。
HistoryService 维护 mtime.txt:
save_real_upload_timestamps():为真正新 URL 获取真实上传时间。获取失败时跳过,不使用当前时间伪造。generate_mtime_file():保证mtime.txt可用,最多尝试 3 次。create_mtime_from_info_json():用yt-dlp --write-info-json枚举该 UP 主全部视频,再按 bvid 串行调用 view API 取真实 ctime(不再使用 info.json 中可伪造的 pubdate)。
FrequencyService 是服务层与纯算法层的边界:
- 读取正向/负向事件。
- 过滤异常值。
- 必要时剪枝旧数据。
- 调用
decide_next_frequency()。 - 保存 miss history 和 WGMM 配置。
- dev mode 下不写真实配置。
负责通知内容:
notify_new_videos()notify_error()notify_critical_error()
默认值:
DEFAULT_DIMENSION_WEIGHTSDEFAULT_SIGMASLAMBDA_BASEMAPPING_CURVEMIN_HISTORY_COUNTPRUNE_THRESHOLDLOOKAHEAD_DAYSFALLBACK_INTERVAL
vectorized_time_features_numpy():生成 sin/cos 周期特征。get_raw_time_components():生成离散维度值,用于权重和 sigma 学习。
固定维度:
dayweekmonth_weekyear_month
附加维度:
custom_0、custom_1、custom_2,来自discovered_periods。
aggregate_publish_events():链式合并 600 秒内的连续时间戳,把视频粒度折成 UP 主行为粒度。仅用于正向事件。filter_outliers():IQR 过滤异常间隔。仅用于负向事件(正向数据有批量发布,IQR 短端失效)。calculate_adaptive_lambda():根据间隔方差和变异系数学习遗忘速度。discover_periods():自相关发现非日历周期。sync_discovered_periods():稳定custom_N映射,避免索引漂移。initialize_wgmm_dimensions():同步 custom 权重和 sigma。learn_dimension_weights():按维度分布集中度学习权重。learn_adaptive_sigmas():按离散度学习 sigma。
calculate_point_score():计算单个时间点得分。batch_calculate_scores():批量计算扫描窗口得分。
得分由正向事件贡献减去负向事件惩罚,并裁剪到 [0, 1]。
scan_future_peak():扫描未来LOOKAHEAD_DAYS天,取首个得分高于扫描均值的局部峰作为"下一次发布"预估(首峰解码,ADR 006);无显著峰时退回最高 raw peak,再退回全局最高分。estimate_hazard_cap():风险率间隔上限(间隔 ∝ h(tau)^-0.5,幸存间隔 m 近邻估计 + Pareto 尾退化),防止低分时段把间隔拉到峰值距离(ADR 008)。decide_next_frequency():完整调频决策。
决策步骤:
- 初始化固定维度和 custom 维度。
- 数据不足时进入学习期,使用历史间隔中位数或 1 小时回退。
- 学习 lambda、周期、维度权重和 sigma。
- 计算当前得分。
- 扫描未来峰值。
- 将相对得分映射为检查间隔。
- 如果峰值足够强,根据
yt-dlp正常耗时提前检查。 - 用
estimate_hazard_cap()给间隔套上风险率上限(ADR 008)。 - 当最近
yt-dlp耗时异常增大时加入阻抗因子。 - 更新
WgmmConfig并返回FrequencyDecision。
limit_file_lines():限制文本文件最大行数,可保留文件头部若干行(供日志和历史文件复用)。
get_jst_datetime_str():日志时间字符串(JST 时区)。get_local_timezone_offset():本地时区偏移秒数,day/week特征依赖它。format_frequency_interval():把秒数格式化为"X 天 X 小时 X 分钟 X 秒"。
log_message():控制台输出;非 dev mode 写urls.log。log_error():可选普通错误通知。log_critical_error():写critical_errors.log,非 dev mode 可发送严重通知。limit_file_lines():限制日志和历史文件行数。
严重错误不会自动代表程序退出,是否退出由 Application 或服务层决定。
修改 wgmm_monitor/wgmm/constants.py:
MAPPING_CURVE = 2.0值越大,高分时越容易靠近最小检查间隔;值越小,检查间隔更保守。
修改 DEFAULT_SIGMAS:
DEFAULT_SIGMAS = {
"day": 0.8,
"week": 1.0,
"month_week": 1.5,
"year_month": 2.0,
}运行后 sigma 会随历史数据自适应更新。
修改 LAMBDA_BASE:
LAMBDA_BASE = 0.0001实际 lambda 会由 calculate_adaptive_lambda() 根据间隔方差动态计算。
- 预检查和完整扫描:
wgmm_monitor/services/bilibili.py - 主分支和降级行为:
wgmm_monitor/services/monitor.py - 上传时间保存策略:
wgmm_monitor/services/history.py
source .venv/bin/activate
ruff check monitor.py wgmm_monitor tests
ruff format --check monitor.py wgmm_monitor tests
python -m unittest discover -s tests
python -m coverage run -m unittest discover -s tests && python -m coverage report
python monitor.py --wgmm-core-only
python monitor.py --dev