Skip to content

feat(kb-open): Deep Research 开放 API(start/SSE/status/cancel)#446

Open
ncw1992120 wants to merge 5 commits into
mateaix:devfrom
ncw1992120:feat/kb-open-research
Open

feat(kb-open): Deep Research 开放 API(start/SSE/status/cancel)#446
ncw1992120 wants to merge 5 commits into
mateaix:devfrom
ncw1992120:feat/kb-open-research

Conversation

@ncw1992120

Copy link
Copy Markdown
Contributor

Closes #443 · Part of #440 · Builds on #441 (P0-A)

改动

异步 Deep Research 开放 API。Research 是多步 LLM 管线(plan → retrieve+draft → compose),异步执行并通过 SSE 推送进度。

4 个端点

方法 路径 说明
POST /{kbId}/research 启动(返回 sessionId + streamUrl)
GET /{kbId}/research/{id}/stream SSE 进度流(?token= 给 EventSource)
GET /{kbId}/research/{id}/status 查询状态 / 最终报告
POST /{kbId}/research/{id}/cancel 取消运行中的会话

组件

  • KbOpenResearchController:4 个端点,@RequireKbScope("kb:search")
  • KbResearchSessionRegistry:内存会话追踪,记录 keyId 归属(调用方只能查询/取消自己的会话)

安全

  • R7(SSE 鉴权)?token= query param(KbOpenApiAuthFilter 已支持此 fallback,EventSource 无法设 Authorization 头)
  • 会话归属:status/cancel/stream 均校验 keyId 匹配
  • 取消校验:会话必须 RUNNING(否则 409)

复用

底层完全复用现有 WikiResearchService.research() + ChatStreamTracker(SSE 广播),不改动 research 管线。

测试

Tests run: 23, Failures: 0, Errors: 0 (P0-A 17 + research 6)
BUILD SUCCESS
  • KbResearchSessionRegistryTest(6):register/complete/fail/cancel 生命周期、cancel-on-completed no-op、unknown session 返回 empty

依赖

此 PR 包含 P0-A 的 cherry-pick。若 #444(P0-A)先合并,rebase 后只剩 research 的 3 个文件。

@mateaix

mateaix commented Jun 28, 2026

Copy link
Copy Markdown
Owner

感谢 Deep Research 开放 API 🙏 鉴权这块做得很好:path 走 permitAll + KbOpenApiAuthFilter 单点 fail-closed,四个端点都带 @RequireKbScope("kb:search"),并且 jobId 的 IDOR 已经堵住——requireSessionOwnership 校验 session.keyId().equals(ctx.keyId()),A 既看不到也取消不了 B 的 session。SSE 管线(Utf8SseEmitter + 10min 超时 + onCompletion/onTimeout/onError detach)也正确。

但异步作业层有几个阻塞项,对一个公开且产生真实成本的端点很关键:

1. 取消并不会真正停止作业。
WikiResearchService.research() 是一条直通的同步流水线(plan → 并行 draft → compose),全程不查 streamTracker.isStopRequested(...)、也没有中断标志。cancel 端点只把 registry 状态翻成 CANCELLED 并广播 SSE 关闭,后台虚拟线程仍把 LLM/web 调用跑到底——「取消」既不省成本也不停算力。需要引入协作式取消信号(在各 research 阶段间检查停止标志,或持有 Future 并 interrupt)。

2. CANCELLED 会被 COMPLETED/FAILED 覆盖。
作业跑完后 sessionRegistry.complete(...)computeIfPresent)无条件改写状态,用户取消后再查 /status 会看到 completed 和完整报告。请让 CANCELLED 成为「粘性」终态,complete/fail 在已 CANCELLED 时 no-op。

3. session registry 无界增长(内存泄漏)。
KbResearchSessionRegistry.sessions 从不清理——无 TTL、无定时清理、完成也不移除,每个 session 活到 JVM 退出。请加 TTL/定时清理或容量上限。

4. 每个 key 没有在跑作业的并发上限(成本/DoS)。
filter 只限了「每分钟请求数」(默认 60/min,start 也被计入),但没限「同时在跑的 research 作业数」。一个 key 每分钟能拉起约 60 个多步 LLM 流水线、各自再 fan-out 子问题,全跑在无界的 newVirtualThreadPerTaskExecutor 上——这是公开端点最主要的成本爆炸/DoS 路径。建议加 per-key 在跑并发上限,超了返回 429。设计文档 §9/§10 自己也写了 research 要「带限流+计费」,token 计费(TokenUsageService)这里也还缺。

5. 内联全限定名SecurityConfig.javaWebMvcConfig.javaKbOpenResearchController.javanew java.util.LinkedHashMap<>())、测试里的 java.util.List.of(...),按规范改成 import + 简单名。

非阻塞: V162 迁移头注释写成 V161(且与 #437 撞号,配合 P0-A 顺延);research 复用 kb:search scope 但设计文档没给它分配 scope,建议补一行说明;kb-open-api-design.md 同样建议移出仓库根目录。

栈底 P0-A 改好后这个 PR rebase,并把上面 1–4 的作业生命周期/成本控制补上,我们再合并 🙏

ncw1992120 added a commit to ncw1992120/mateclaw that referenced this pull request Jun 28, 2026
…urrency cap

Review mateaix#446 — address all 4 job-lifecycle/cost blockers + nits:

1. Cooperative cancellation (was: cancel only flipped status, pipeline ran
   to completion). Cancel endpoint now calls streamTracker.requestStop();
   WikiResearchService.ensureNotCancelled() checks isStopRequested at each
   stage boundary (plan→draft, draft→compose) and inside the parallel draft
   fan-out — so cancel actually halts the expensive LLM calls, not just the
   SSE stream. Throws ResearchCancelledException (caught locally, no error
   broadcast).

2. Sticky CANCELLED terminal. complete()/fail() now no-op on a CANCELLED
   session, so a user who cancelled never sees a COMPLETED report surface
   via /status.

3. Session registry TTL. Terminal sessions get an updatedAt timestamp and
   are evicted by a @scheduled sweep after
   mate.kbopen.research.session-ttl (default 30m). RUNNING sessions are
   never evicted. Prevents unbounded memory growth.

4. Per-key concurrency cap. startIfAllowed() rejects new research when a
   key already has mate.kbopen.research.max-concurrent-per-key (default 3)
   RUNNING sessions → 429. Stops one key from spawning ~60 parallel
   multi-step LLM pipelines per minute under the per-min rate limiter.

5. Inline FQN → import (controller LinkedHashMap, test List.of).

Nits (inherited from P0-A rebase):
- V162→V164, prefix VARCHAR(12), design doc moved to rfcs/.
- Design doc: kb:search scope row now documents it covers /research/**.

31 tests pass (12 registry incl. sticky-cancel/concurrency/TTL +
13 service + 4 rate limiter + 4 controller + ...).
@ncw1992120 ncw1992120 force-pushed the feat/kb-open-research branch from 8d7894f to 5e216c9 Compare June 28, 2026 19:24
@ncw1992120

ncw1992120 commented Jun 28, 2026

Copy link
Copy Markdown
Contributor Author

4 个阻塞项 + nits 已全部修复。已 rebase 到 P0-A 之上(继承 V164 + FQN import + 设计文档迁移),逐项改动见 commit 5e216c9

  1. 取消现在会真正停掉作业(协作式取消)。 cancel 会调用 streamTracker.requestStop(sessionId)。WikiResearchService 新增了 ensureNotCancelled(sessionId),在三个位置检查 isStopRequested:plan 之后、draft 之后(compose 之前)、以及并行 draftStage fan-out 中每次 draftOneSection LLM 调用之前。触发取消时抛出 ResearchCancelledException(在本地 catch,不广播 error 事件,因为是用户主动取消)。这样昂贵的 retrieve/draft/compose LLM 调用会被跳过,而不是跑完整个流程。

  2. CANCELLED 现在是粘性终态。 complete() / fail() 在 status == CANCELLED 时为 no-op。异步线程延迟到达的 completion 无法覆盖取消状态——/status 持续返回 cancelled。

  3. Session registry 的 TTL 清理。 每个 session 带有 updatedAt 时间戳;@scheduled 定时任务(每 5 分钟)清理超过 mate.kbopen.research.session-ttl(默认 30 分钟)的终态 session。RUNNING 状态的 session 永远不会被清理,map 不再无界增长。

  4. Per-key 并发上限。 startIfAllowed(keyId, ...) 会统计每个 key 的 RUNNING session 数量,达到 mate.kbopen.research.max-concurrent-per-key(默认 3)时抛出 TooManyConcurrentException → controller 映射为 429。它在每分钟限流器之上再加一层,确保单个 key 无法拉起约 60 条并行的多步流水线。(通过 TokenUsageService 的 per-key token 计费仍属 P1——已在设计文档中标注。)

  5. 内联 FQN → import。 Controller 的 java.util.LinkedHashMap → import java.util.LinkedHashMap;测试里的 java.util.List.of → import java.util.List(两项都通过 rebase 继承了 V164 / VARCHAR(12) 的修复)。

Nits: V162→V164 注释修正、设计文档移到 rfcs/,文档中 kb:search scope 行现在也说明它覆盖 /research/**(Deep Research 复用 search scope)。

31 个测试通过:12 个 registry(含 sticky-cancel、per-key cap、TTL eviction)+ 13 个 service + 4 个 rate limiter + 4 个 controller + fallback。请再 review 🙏

@ncw1992120

Copy link
Copy Markdown
Contributor Author

补充一下 #446 的 rebase 计划:

review 的 4 个阻塞项 + nits 已在上一个 commit 5e216c9 全部改好(协作式取消、CANCELLED 粘性终态、TTL 清理、per-key 并发上限、FQN import),31 个测试通过。

但目前分支还建在旧的 P0-A 栈上。按你的建议,等 #445(P0-B)合并进 dev 后,我会把 #446 也 rebase 到 dev(同样用 --onto 丢掉 P0-A/P0-B 的旧 commit),届时分支会变回 mergeable。合并 #445 后告诉我一声,我立刻 rebase 🙏

Implements the async Deep Research endpoint for the KB Open API (mateaix#443).
Research is a multi-step LLM pipeline (plan → retrieve+draft → compose)
that runs asynchronously and broadcasts progress via SSE.

Endpoints:
- POST /{kbId}/research                      start (returns sessionId + streamUrl)
- GET  /{kbId}/research/{id}/stream          SSE progress (?token= for EventSource)
- GET  /{kbId}/research/{id}/status          query status / final report
- POST /{kbId}/research/{id}/cancel          cancel running session

Components:
- KbOpenResearchController: 4 endpoints, @RequireKbScope("kb:search")
- KbResearchSessionRegistry: in-memory session tracking with keyId
  ownership (a caller can only query/cancel their own sessions)

Security:
- R7: SSE uses ?token= query param (KbOpenApiAuthFilter already supports
  this fallback for EventSource which can't set Authorization headers)
- Session ownership: status/cancel/stream all verify keyId match
- Cancel checks session is RUNNING (409 otherwise)

Reuses existing WikiResearchService.research() + ChatStreamTracker for
the actual research pipeline and SSE broadcasting.

Tests (6 new, all green):
- KbResearchSessionRegistryTest: register/complete/fail/cancel lifecycle,
  cancel-on-completed no-op, unknown session returns empty

Closes mateaix#443
…urrency cap

Review mateaix#446 — address all 4 job-lifecycle/cost blockers + nits:

1. Cooperative cancellation (was: cancel only flipped status, pipeline ran
   to completion). Cancel endpoint now calls streamTracker.requestStop();
   WikiResearchService.ensureNotCancelled() checks isStopRequested at each
   stage boundary (plan→draft, draft→compose) and inside the parallel draft
   fan-out — so cancel actually halts the expensive LLM calls, not just the
   SSE stream. Throws ResearchCancelledException (caught locally, no error
   broadcast).

2. Sticky CANCELLED terminal. complete()/fail() now no-op on a CANCELLED
   session, so a user who cancelled never sees a COMPLETED report surface
   via /status.

3. Session registry TTL. Terminal sessions get an updatedAt timestamp and
   are evicted by a @scheduled sweep after
   mate.kbopen.research.session-ttl (default 30m). RUNNING sessions are
   never evicted. Prevents unbounded memory growth.

4. Per-key concurrency cap. startIfAllowed() rejects new research when a
   key already has mate.kbopen.research.max-concurrent-per-key (default 3)
   RUNNING sessions → 429. Stops one key from spawning ~60 parallel
   multi-step LLM pipelines per minute under the per-min rate limiter.

5. Inline FQN → import (controller LinkedHashMap, test List.of).

Nits (inherited from P0-A rebase):
- V162→V164, prefix VARCHAR(12), design doc moved to rfcs/.
- Design doc: kb:search scope row now documents it covers /research/**.

31 tests pass (12 registry incl. sticky-cancel/concurrency/TTL +
13 service + 4 rate limiter + 4 controller + ...).
…hFilter

R7: the SSE progress stream (/research/{id}/stream) is consumed by browser
EventSource, which cannot set an Authorization header. The filter's
extractBearerToken() never read ?token= (still a TODO), so the SSE endpoint
was unreachable from the browser — the headline use case got 401.

Fix: accept ?token= ONLY on SSE stream paths (isSseStreamPath, suffix
/stream), reject it everywhere else so the API key does not leak into
access/proxy logs for normal calls (R5). Matches the JwtAuthFilter convention
(getRequestURI logs carry no query string).

Also bypass the per-minute rate limiter on the SSE path: EventSource
reconnects/heartbeats would otherwise burn the key's window and 429 its own
POST /research start. Rate limiting belongs on the cost-producing endpoints.

Tests (6 new, KbOpenApiAuthFilterTest):
- non-SSE: header passes, ?token= rejected (no authenticate call)
- SSE:     ?token= authenticates, missing token → 401
- SSE:     bypasses rate limiter; non-SSE still hits it
…then-act race)

startIfAllowed() did stream-and-count then put() — not atomic. Two
concurrent starts for the same key could both pass the count check (both
see < cap) and both put, admitting more sessions than the cap. On the
virtual-thread start endpoint this is a real DoS/cost-bypass path.

Fix: maintain a per-key AtomicInteger running counter (runningPerKey),
incremented atomically on start (incrementAndGet + rollback on overflow)
and decremented on each RUNNING→terminal transition (complete/fail/cancel).
The counter is kept in lock-step with status==RUNNING; since terminal
states are sticky, each session decrements exactly once.

cancel() also rewritten to capture the pre-transition state cleanly (the
old return check relied on Map.computeIfPresent returning the new value,
which worked but read as 'before.status==CANCELLED').

Tests (+2): cancelled/failed release slot (counter consistency), and a
concurrent-start test (12 virtual threads, cap=3) asserting exactly cap
admits — would be flaky/fail under the old impl.
@ncw1992120 ncw1992120 force-pushed the feat/kb-open-research branch from 5e216c9 to d7b27e9 Compare June 30, 2026 03:30
register() was left over from the initial impl — it bypassed the per-key
concurrency cap (no startIfAllowed check) and, after the atomic-counter fix,
incremented runningPerKey without any overflow rollback. With no production
caller (the start endpoint uses startIfAllowed), it only existed for tests to
set up a RUNNING session. Drop it and route the tests through startIfAllowed
so nothing can accidentally ship a path that ignores the cap.
@ncw1992120

Copy link
Copy Markdown
Contributor Author

刚做了一轮 force-push,把分支 rebase 到最新 dev 并补上了两个之前 review 没覆盖到的点。

为什么 force-push

P0-A 已经 squash merge 进 dev 了,但这个分支还叠着一份重复的 P0-A(24 文件、与 dev 冲突、DIRTY)。我把 2 个 research commit cherry-pick 到 dev 之上、丢掉 P0-A 之后,分支变成 7 文件、MERGEABLE/CLEAN,diff 现在精确反映 research 本身的改动(不再混 P0-A 的 auth/迁移/配置文件)。

这次补的两个修复

1. SSE 端点的 ?token= 鉴权其实没实现。
之前文档和 controller Javadoc 都说 SSE 用 ?token= 鉴权,但 KbOpenApiAuthFilter.extractBearerToken() 还是那条 TODO,没读 query param——结果 EventSource 在浏览器里 100% 拿 401,整个 stream 端点对主要场景不可用。
修法:新增 isSseStreamPath(),只在 SSE 流路径(.../stream)接受 ?token=,其它路径一律拒绝(R5:key 不进 access/proxy log)。顺带把 SSE 路径从每分钟限流里豁免,免得 EventSource 重连/心跳把 key 自己 POST /research start 的配额挤掉。

2. startIfAllowed 的并发上限竞态。
原来的 stream-count-then-put 不是原子的,同一 key 并发 start 能双双过 cap。改成 per-key AtomicInteger 计数器(incrementAndGet + 超限回滚;每次 RUNNING→终态 decrement),register() 这个绕过 cap 的 back-compat 方法也删了(没生产调用方,只剩测试)。新增了一个 12 虚线程并发抢 cap=3 槽位的测试,断言恰好 3 个 admitted(老实现下会 flaky)。

现在的 commit

  • feat: Deep Research open API
  • fix: cooperative cancel / TTL / sticky terminal(原 review 4 blocker)
  • fix: ?token= SSE auth fallback(本轮)
  • fix: atomic per-key concurrency cap(本轮)
  • refactor: drop unused register()(本轮)

测试

kbopen 全套 39 + wiki research fallback 4 = 43 全绿;registry 14(含新增竞态测试)、auth filter 6(新增)、service/rate limiter/controller 全过。合并状态 MERGEABLE/CLEAN。旧 PR head 5e216c97 在本地留了 backup ref,需要回滚随时能找回。请再 review 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(kb-open): P1.5 Deep Research 开放(独立设计)

2 participants