-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathauth_commands.py
More file actions
252 lines (214 loc) · 9 KB
/
auth_commands.py
File metadata and controls
252 lines (214 loc) · 9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
"""CLI Auth 子命令 — OAuth 登录、状态、重认证与登出."""
from __future__ import annotations
import asyncio
import inspect
import logging
from pathlib import Path
import typer
from rich.console import Console
from ..config.loader import load_config
app = typer.Typer(name="auth", help="管理 OAuth 登录凭证")
console = Console()
logger = logging.getLogger(__name__)
def _build_token_store(cfg_path: Path | None = None):
"""按配置解析 Token Store 路径并完成加载."""
from ..auth.store import TokenStoreManager
cfg = load_config(cfg_path)
store = TokenStoreManager(
store_path=Path(cfg.auth.token_store_path)
if cfg.auth.token_store_path
else None,
)
store.load()
logger.debug(
"OAuth token store loaded from config path: %s", cfg.auth.token_store_path
)
return cfg, store
# ── Auth 子命令 ─────────────────────────────────────────────
@app.command("login")
def auth_login(
provider: str | None = typer.Option(
None, "--provider", "-p", help="指定 provider (github/google)"
),
) -> None:
"""执行 OAuth 浏览器登录."""
asyncio.run(_run_auth_login(provider))
async def _run_auth_login(provider: str | None) -> None:
from ..auth.providers.github import GitHubDeviceFlowProvider
from ..auth.providers.google import GoogleOAuthProvider
cfg, store = _build_token_store()
providers = []
if provider == "github":
providers = [("github", GitHubDeviceFlowProvider())]
elif provider == "google":
providers = [
(
"google",
GoogleOAuthProvider(
client_id=cfg.auth.google_client_id,
client_secret=cfg.auth.google_client_secret,
),
)
]
elif provider is None:
providers = [
("github", GitHubDeviceFlowProvider()),
(
"google",
GoogleOAuthProvider(
client_id=cfg.auth.google_client_id,
client_secret=cfg.auth.google_client_secret,
),
),
]
else:
console.print(f"[red]未知 provider: {provider}[/red]")
raise typer.Exit(1)
for name, prov in providers:
try:
console.print(f"\n[bold cyan]登录 {name}...[/bold cyan]")
tokens = await prov.login()
store.set(name, tokens)
console.print(f"[green]{name} 登录成功[/green]")
except Exception as exc:
console.print(f"[red]{name} 登录失败: {exc}[/red]")
finally:
await prov.close()
@app.command("status")
def auth_status() -> None:
"""查看已登录的 OAuth 凭证状态."""
_, store = _build_token_store()
providers = store.list_providers()
if not providers:
console.print("[yellow]尚未登录任何 provider[/yellow]")
return
for name in providers:
tokens = store.get(name)
expired = tokens.is_expired
status_text = "[red]已过期[/red]" if expired else "[green]有效[/green]"
has_refresh = "有 refresh_token" if tokens.refresh_token else "无 refresh_token"
console.print(f" {name}: {status_text} {has_refresh}")
@app.command("reauth")
def auth_reauth(
provider: str = typer.Argument(..., help="provider 名称 (github/google)"),
port: int = typer.Option(3392, "--port", "-p", help="代理服务端口"),
) -> None:
"""触发运行中代理的 OAuth 重认证."""
import httpx as _httpx
try:
resp = _httpx.post(f"http://127.0.0.1:{port}/api/reauth/{provider}", timeout=5)
if resp.status_code == 202:
console.print(
f"[green]{provider} 重认证已触发,请在浏览器中完成登录[/green]"
)
elif resp.status_code == 404:
console.print("[red]重认证不可用(代理未启用对应后端)[/red]")
else:
console.print(f"[red]触发失败: {resp.status_code} {resp.text}[/red]")
except _httpx.ConnectError:
console.print("[red]代理服务未运行[/red]")
@app.command("logout")
def auth_logout(
provider: str | None = typer.Option(
None, "--provider", "-p", help="指定 provider(不指定则全部登出)"
),
) -> None:
"""清除已存储的 OAuth 凭证."""
_, store = _build_token_store()
if provider:
store.remove(provider)
console.print(f"[green]已登出 {provider}[/green]")
else:
for name in store.list_providers():
store.remove(name)
console.print("[green]已登出所有 provider[/green]")
# ── 自动登录辅助 ─────────────────────────────────────────────
async def auto_login_if_needed(cfg_path: Path | None) -> None:
"""检查各 Provider 是否缺少凭证,自动触发浏览器登录.
仅对已启用、且未在 config 中显式提供凭证的 Tier 做检查。
对 Google/Antigravity,若本地存在 refresh_token 且 access_token 过期,
优先执行静默刷新,避免每次启动都重新走浏览器 OAuth。
三阶段检查:
1. needs_login() — 快速本地判断(无凭证或已过期且无 refresh_token)
2. refresh() — Google access_token 过期且存在 refresh_token 时静默刷新
3. validate() — 网络验证已有凭证是否仍有效(仅在有凭证且未刷新时触发)
"""
from ..auth.providers.github import GitHubDeviceFlowProvider
from ..auth.providers.google import GoogleOAuthProvider
cfg, store = _build_token_store(cfg_path)
async def _resolve_needs_login(provider, tokens) -> bool:
result = provider.needs_login(tokens)
if inspect.isawaitable(result):
return bool(await result)
return bool(result)
# --- GitHub / Copilot ---
if cfg.copilot.enabled and not cfg.copilot.github_token:
tokens = store.get("github")
prov = GitHubDeviceFlowProvider()
needs = await _resolve_needs_login(prov, tokens)
if not needs and tokens.has_credentials:
# 有凭证但可能过期/吊销 → 网络验证
try:
if not await prov.validate(tokens):
needs = True
except Exception:
pass # 网络失败不阻塞启动
if needs:
console.print(
"[bold cyan]Copilot 层缺少有效凭证,启动 GitHub OAuth 登录...[/bold cyan]"
)
try:
tokens = await prov.login()
store.set("github", tokens)
console.print("[green]GitHub 登录成功[/green]")
except Exception as exc:
console.print(f"[red]GitHub 登录失败: {exc}[/red]")
finally:
await prov.close()
else:
await prov.close()
# --- Google / Antigravity ---
if cfg.antigravity.enabled and not cfg.antigravity.refresh_token:
tokens = store.get("google")
prov = GoogleOAuthProvider(
client_id=cfg.auth.google_client_id,
client_secret=cfg.auth.google_client_secret,
)
needs = await _resolve_needs_login(prov, tokens)
try:
if not needs and tokens.is_expired and tokens.refresh_token:
logger.info(
"Google access_token 已过期,尝试使用 refresh_token 静默刷新"
)
try:
tokens = await prov.refresh(tokens)
store.set("google", tokens)
logger.info("Google refresh_token 静默刷新成功")
except Exception as exc:
logger.warning(
"Google refresh_token 静默刷新失败,回退交互登录: %s", exc
)
console.print(
"[bold cyan]Antigravity 凭证刷新失败,启动 Google OAuth 登录...[/bold cyan]"
)
tokens = await prov.login()
store.set("google", tokens)
console.print("[green]Google 登录成功[/green]")
elif not needs and tokens.has_credentials:
try:
if not await prov.validate(tokens):
needs = True
except Exception:
pass
if needs:
console.print(
"[bold cyan]Antigravity 层缺少有效凭证,启动 Google OAuth 登录...[/bold cyan]"
)
tokens = await prov.login()
store.set("google", tokens)
console.print("[green]Google 登录成功[/green]")
except Exception as exc:
console.print(f"[red]Google 登录失败: {exc}[/red]")
finally:
await prov.close()
__all__ = ["app", "auto_login_if_needed"]