feat(kb-open): Deep Research 开放 API(start/SSE/status/cancel)#446
feat(kb-open): Deep Research 开放 API(start/SSE/status/cancel)#446ncw1992120 wants to merge 5 commits into
Conversation
|
感谢 Deep Research 开放 API 🙏 鉴权这块做得很好:path 走 但异步作业层有几个阻塞项,对一个公开且产生真实成本的端点很关键: 1. 取消并不会真正停止作业。 2. CANCELLED 会被 COMPLETED/FAILED 覆盖。 3. session registry 无界增长(内存泄漏)。 4. 每个 key 没有在跑作业的并发上限(成本/DoS)。 5. 内联全限定名: 非阻塞: V162 迁移头注释写成 V161(且与 #437 撞号,配合 P0-A 顺延);research 复用 栈底 P0-A 改好后这个 PR rebase,并把上面 1–4 的作业生命周期/成本控制补上,我们再合并 🙏 |
…urrency cap Review mateaix#446 — address all 4 job-lifecycle/cost blockers + nits: 1. Cooperative cancellation (was: cancel only flipped status, pipeline ran to completion). Cancel endpoint now calls streamTracker.requestStop(); WikiResearchService.ensureNotCancelled() checks isStopRequested at each stage boundary (plan→draft, draft→compose) and inside the parallel draft fan-out — so cancel actually halts the expensive LLM calls, not just the SSE stream. Throws ResearchCancelledException (caught locally, no error broadcast). 2. Sticky CANCELLED terminal. complete()/fail() now no-op on a CANCELLED session, so a user who cancelled never sees a COMPLETED report surface via /status. 3. Session registry TTL. Terminal sessions get an updatedAt timestamp and are evicted by a @scheduled sweep after mate.kbopen.research.session-ttl (default 30m). RUNNING sessions are never evicted. Prevents unbounded memory growth. 4. Per-key concurrency cap. startIfAllowed() rejects new research when a key already has mate.kbopen.research.max-concurrent-per-key (default 3) RUNNING sessions → 429. Stops one key from spawning ~60 parallel multi-step LLM pipelines per minute under the per-min rate limiter. 5. Inline FQN → import (controller LinkedHashMap, test List.of). Nits (inherited from P0-A rebase): - V162→V164, prefix VARCHAR(12), design doc moved to rfcs/. - Design doc: kb:search scope row now documents it covers /research/**. 31 tests pass (12 registry incl. sticky-cancel/concurrency/TTL + 13 service + 4 rate limiter + 4 controller + ...).
8d7894f to
5e216c9
Compare
|
4 个阻塞项 + nits 已全部修复。已 rebase 到 P0-A 之上(继承 V164 + FQN import + 设计文档迁移),逐项改动见 commit 5e216c9。
Nits: V162→V164 注释修正、设计文档移到 rfcs/,文档中 kb:search scope 行现在也说明它覆盖 /research/**(Deep Research 复用 search scope)。 31 个测试通过:12 个 registry(含 sticky-cancel、per-key cap、TTL eviction)+ 13 个 service + 4 个 rate limiter + 4 个 controller + fallback。请再 review 🙏 |
Implements the async Deep Research endpoint for the KB Open API (mateaix#443). Research is a multi-step LLM pipeline (plan → retrieve+draft → compose) that runs asynchronously and broadcasts progress via SSE. Endpoints: - POST /{kbId}/research start (returns sessionId + streamUrl) - GET /{kbId}/research/{id}/stream SSE progress (?token= for EventSource) - GET /{kbId}/research/{id}/status query status / final report - POST /{kbId}/research/{id}/cancel cancel running session Components: - KbOpenResearchController: 4 endpoints, @RequireKbScope("kb:search") - KbResearchSessionRegistry: in-memory session tracking with keyId ownership (a caller can only query/cancel their own sessions) Security: - R7: SSE uses ?token= query param (KbOpenApiAuthFilter already supports this fallback for EventSource which can't set Authorization headers) - Session ownership: status/cancel/stream all verify keyId match - Cancel checks session is RUNNING (409 otherwise) Reuses existing WikiResearchService.research() + ChatStreamTracker for the actual research pipeline and SSE broadcasting. Tests (6 new, all green): - KbResearchSessionRegistryTest: register/complete/fail/cancel lifecycle, cancel-on-completed no-op, unknown session returns empty Closes mateaix#443
…urrency cap Review mateaix#446 — address all 4 job-lifecycle/cost blockers + nits: 1. Cooperative cancellation (was: cancel only flipped status, pipeline ran to completion). Cancel endpoint now calls streamTracker.requestStop(); WikiResearchService.ensureNotCancelled() checks isStopRequested at each stage boundary (plan→draft, draft→compose) and inside the parallel draft fan-out — so cancel actually halts the expensive LLM calls, not just the SSE stream. Throws ResearchCancelledException (caught locally, no error broadcast). 2. Sticky CANCELLED terminal. complete()/fail() now no-op on a CANCELLED session, so a user who cancelled never sees a COMPLETED report surface via /status. 3. Session registry TTL. Terminal sessions get an updatedAt timestamp and are evicted by a @scheduled sweep after mate.kbopen.research.session-ttl (default 30m). RUNNING sessions are never evicted. Prevents unbounded memory growth. 4. Per-key concurrency cap. startIfAllowed() rejects new research when a key already has mate.kbopen.research.max-concurrent-per-key (default 3) RUNNING sessions → 429. Stops one key from spawning ~60 parallel multi-step LLM pipelines per minute under the per-min rate limiter. 5. Inline FQN → import (controller LinkedHashMap, test List.of). Nits (inherited from P0-A rebase): - V162→V164, prefix VARCHAR(12), design doc moved to rfcs/. - Design doc: kb:search scope row now documents it covers /research/**. 31 tests pass (12 registry incl. sticky-cancel/concurrency/TTL + 13 service + 4 rate limiter + 4 controller + ...).
…hFilter
R7: the SSE progress stream (/research/{id}/stream) is consumed by browser
EventSource, which cannot set an Authorization header. The filter's
extractBearerToken() never read ?token= (still a TODO), so the SSE endpoint
was unreachable from the browser — the headline use case got 401.
Fix: accept ?token= ONLY on SSE stream paths (isSseStreamPath, suffix
/stream), reject it everywhere else so the API key does not leak into
access/proxy logs for normal calls (R5). Matches the JwtAuthFilter convention
(getRequestURI logs carry no query string).
Also bypass the per-minute rate limiter on the SSE path: EventSource
reconnects/heartbeats would otherwise burn the key's window and 429 its own
POST /research start. Rate limiting belongs on the cost-producing endpoints.
Tests (6 new, KbOpenApiAuthFilterTest):
- non-SSE: header passes, ?token= rejected (no authenticate call)
- SSE: ?token= authenticates, missing token → 401
- SSE: bypasses rate limiter; non-SSE still hits it
…then-act race) startIfAllowed() did stream-and-count then put() — not atomic. Two concurrent starts for the same key could both pass the count check (both see < cap) and both put, admitting more sessions than the cap. On the virtual-thread start endpoint this is a real DoS/cost-bypass path. Fix: maintain a per-key AtomicInteger running counter (runningPerKey), incremented atomically on start (incrementAndGet + rollback on overflow) and decremented on each RUNNING→terminal transition (complete/fail/cancel). The counter is kept in lock-step with status==RUNNING; since terminal states are sticky, each session decrements exactly once. cancel() also rewritten to capture the pre-transition state cleanly (the old return check relied on Map.computeIfPresent returning the new value, which worked but read as 'before.status==CANCELLED'). Tests (+2): cancelled/failed release slot (counter consistency), and a concurrent-start test (12 virtual threads, cap=3) asserting exactly cap admits — would be flaky/fail under the old impl.
5e216c9 to
d7b27e9
Compare
register() was left over from the initial impl — it bypassed the per-key concurrency cap (no startIfAllowed check) and, after the atomic-counter fix, incremented runningPerKey without any overflow rollback. With no production caller (the start endpoint uses startIfAllowed), it only existed for tests to set up a RUNNING session. Drop it and route the tests through startIfAllowed so nothing can accidentally ship a path that ignores the cap.
|
刚做了一轮 force-push,把分支 rebase 到最新 dev 并补上了两个之前 review 没覆盖到的点。 为什么 force-pushP0-A 已经 squash merge 进 dev 了,但这个分支还叠着一份重复的 P0-A(24 文件、与 dev 冲突、 这次补的两个修复1. SSE 端点的 2. 现在的 commit
测试kbopen 全套 39 + wiki research fallback 4 = 43 全绿;registry 14(含新增竞态测试)、auth filter 6(新增)、service/rate limiter/controller 全过。合并状态 |
Closes #443 · Part of #440 · Builds on #441 (P0-A)
改动
异步 Deep Research 开放 API。Research 是多步 LLM 管线(plan → retrieve+draft → compose),异步执行并通过 SSE 推送进度。
4 个端点
/{kbId}/research/{kbId}/research/{id}/stream/{kbId}/research/{id}/status/{kbId}/research/{id}/cancel组件
KbOpenResearchController:4 个端点,@RequireKbScope("kb:search")KbResearchSessionRegistry:内存会话追踪,记录 keyId 归属(调用方只能查询/取消自己的会话)安全
?token=query param(KbOpenApiAuthFilter已支持此 fallback,EventSource 无法设 Authorization 头)keyId匹配复用
底层完全复用现有
WikiResearchService.research()+ChatStreamTracker(SSE 广播),不改动 research 管线。测试
依赖
此 PR 包含 P0-A 的 cherry-pick。若 #444(P0-A)先合并,rebase 后只剩 research 的 3 个文件。