[mirror] fix(backend/copilot): sync safety net for Redis-induced zombie sessions#2
[mirror] fix(backend/copilot): sync safety net for Redis-induced zombie sessions#2yashwant86 wants to merge 18 commits intomm-base-12886from
Conversation
On SIGTERM the executor's cleanup() waited passively for active_tasks to drain but never signalled cancel_event to the running turns. Anything longer than the grace window got SIGKILL'd; the processor's finally never ran, so mark_session_completed never fired and the Redis session stayed status=running until the 65-minute stale watchdog eventually reaped it. While the session was a zombie, pending-message pushes succeeded (HTTP side only needs status=running) but no consumer ever drained them, so the frontend showed "Queued" bubbles forever. Changes: - cleanup() now sets each active task's cancel_event first, letting the turn break out of its stream loop and hit its own finally. - If a turn still hasn't drained after the grace window, cleanup() synchronously marks the session failed and clears its pending buffer so the UI transitions to a terminal state immediately. - Added sync_fail_close_session, invoked from CoPilotProcessor.execute's finally as a last line of defence — covers the case where the async event loop dies before its own finally can run. mark_session_completed is a CAS on status==running so the happy path is a cheap no-op. - Raised COPILOT_CONSUMER_TIMEOUT_SECONDS and GRACEFUL_SHUTDOWN_TIMEOUT_ SECONDS to 6h so a rolling deploy can let the longest legitimate turn finish. Matches the helm chart terminationGracePeriodSeconds change. - Renamed _clear_pending_messages_unsafe to clear_pending_messages_unsafe (still carries the _unsafe suffix) so it's importable without the leading-underscore private marker; docs now call out the legitimate cleanup fail-close use case.
- Refactor cleanup() into four self-documenting phases with a single _ActiveTask dataclass instead of three parallel dicts. - Reject RabbitMQ delivery during fail-close so orphan messages are not redelivered and double-executed on another pod (addresses critical bug in previous patch where active_tasks held (future, event) tuple but the ack callback was only reachable through on_run_done). - Gate sync_fail_close_session to only fire when the future didn't reach done — avoids racing _execute_async's own finally and overwriting its accurate terminal-state message with the generic shutdown one. - Give the async finally a short grace window (5s) after future.cancel() so it has time to publish the terminal state before the sync safety net fires. - clear_pending_messages_unsafe now drops BOTH the primary buffer and the persist queue — otherwise stale follow-ups in copilot:pending-persist:* could ride a later unrelated tool result on the same session. - Extract SHUTDOWN_ERROR_MESSAGE constant shared between processor and manager (no more duplicated string). - utils.py: document the RabbitMQ policy-based migration needed when deploying the new x-consumer-timeout value (existing durable queue would otherwise fail redeclaration with PRECONDITION_FAILED). - pr-polish skill: guard against empty PR inference, filter bots + author from baseline issue comments, treat all non-success terminal CI conclusions as blockers (not just "failure"), refresh the baseline after each /pr-address so already-addressed items don't stay "new" forever.
Phase 1 used to both flag the event AND call channel.stop_consuming() via add_callback_threadsafe. Once start_consuming() returned, pika's ioloop exited — and phase 3's basic_nack threadsafe callbacks silently never fired. That meant orphan-session deliveries went unacked, got redelivered on channel close, and could re-execute an already-failed turn on another pod. Split the phase: now phase 1 only sets the Python event that gates _handle_run_message (new deliveries are nacked there), and the actual channel.stop_consuming() is deferred to phase 4's _stop_message_consumers. Because both phase 3's nacks and phase 4's stop are queued via add_callback_threadsafe, pika processes them FIFO — nacks fire first, then stop_consuming exits the ioloop.
…TERM The previous patch signalled cancel_event to every in-flight turn as soon as SIGTERM arrived, which interrupted turns that were running perfectly fine. That violates the "let in-flight work finish naturally" principle. The zombie-session bug this PR targets was never caused by SIGTERM killing the turn — it was caused by a transient Redis failure mid-turn whose reporting path (_execute_async.finally → mark_session_completed) also hit the failure and silently swallowed it. That case is already handled by sync_fail_close_session in execute()'s finally (fresh asyncio loop + fresh Redis client on every turn exit), so cancelling healthy turns on shutdown adds no value and hurts UX. Reverts phase 2 to a passive wait. Phase 3's fail-close still handles the much rarer case where a turn genuinely cannot finish inside the grace window. Removed the now-unused _signal_cancel_to_active helper.
…attern Dropped the phased cleanup + fail-close mechanism. That was solving a different problem (truly-stuck turns at grace-window expiry) than what this PR actually needed to fix (transient Redis failure mid-turn leaving session status=running forever). - cleanup() is now a single method mirroring backend.executor.manager: stop consumer (flag + broker), passive wait for active_tasks to drain, worker/executor/lock teardown. Same shape as the proven agent-executor cleanup. No pre-emptive cancel of healthy in-flight turns. - Removed _ActiveTask dataclass, reverted active_tasks to tuple-of- (future, cancel_event) + _task_locks side dict (same as agent-executor and pre-refactor copilot). - Removed all phase helpers, fail-close redis state, orphan delivery rejection, and the deferred channel.stop_consuming workaround they required. - Reverted clear_pending_messages_unsafe to single-key delete (the persist-queue extension was only needed by the dropped fail-close path). - Reverted _FakeRedis.delete to single-arg and dropped the persist-queue coverage test. The actual zombie-bug fix stays: sync_fail_close_session in processor.execute's finally. Added asyncio.wait_for(10s) around its mark_session_completed call so a broken Redis can't hang the safety net indefinitely — if the Redis CAS doesn't respond in 10s, we log and let the pod exit (stale-session watchdog eventually reaps it, matching the pre-PR failure mode rather than blocking shutdown).
… is obvious
Moves the sync/async bridging (run_coroutine_threadsafe, cancel check,
cluster_lock refresh, bounded waits) into a private _execute helper.
The public execute() is now a tight try/finally:
try:
self._execute(entry, cancel, cluster_lock, log)
finally:
sync_fail_close_session(entry.session_id, log)
The gate (``if future is None or not future.done()``) is gone — since
mark_session_completed is CAS on status=="running", calling it after
the async path succeeded is a harmless no-op Redis roundtrip. Trading
one Redis call per turn for a much cleaner control flow and a
guarantee that the safety net runs on every exit path.
Multiple agents have been stalling on /pr-test because a finished run keeps the .ign.testing.lock held "for manual exploration." The lock guards test EXECUTION, not the app lifecycle — the app can keep running on :3000/:8006 for manual poking while the lock is gone, and sibling agents will kill the stray processes themselves via Step 3c/3e's port-clear preamble. Adds explicit guidance + the end-of-test release snippet so the common path is: report → PR comment → release lock → optionally leave the app running with a note to the user.
A recent /pr-test run posted a PR comment with a `Report: /Users/.../ test-report.md` trailing line — useless to every reviewer. Adds an explicit prohibition with a grep-based sanity check run before the gh api call, and the reminder that the PR comment is a github.com artifact that must be self-contained (copy the CONTENT of local files in, not the path).
E2E testing of SIGTERM mid-turn surfaced a latent bug: the second turn
on the same pool-worker thread logs
sync fail-close mark_session_completed failed: Event loop is closed
Root cause: get_redis_async is @thread_cached. The cache is keyed by
thread, not by event loop. sync_fail_close_session uses asyncio.run()
to open a temporary loop, runs the CAS, then closes the loop. The
AsyncRedis client cached on the first call is bound (via its connection
pool's streams) to that now-closed loop, so turn 2+ on the same worker
thread pulls the stale cached client and can't do any I/O.
Fix: call get_redis_async.clear_cache() before each asyncio.run() in
the safety net. That forces connect_async() to create a fresh
AsyncRedis with an empty connection pool; its first I/O then binds
connections to the CURRENT live loop.
The happy path in the E2E test still worked because the async path
already CAS'd status=completed, so the sync CAS would have been a
no-op anyway. But the whole point of the safety net is to save the
case where the async path failed — and without this fix it was
broken from turn 2 onwards.
Also adds processor_test.py coverage for:
- TestSyncFailCloseSession::test_bounded_timeout_when_redis_hangs
- TestExecuteSafetyNet::test_happy_path_invokes_safety_net
- TestExecuteSafetyNet::test_sigterm_mid_turn_invokes_safety_net
- TestExecuteSafetyNet::test_zombie_redis_async_path_still_marks_session_failed
manager.py has one linter-driven whitespace tweak from poetry run format.
Round-5 review feedback: the previous asyncio.run() + clear_cache() approach created a fresh AsyncRedis client per turn exit (TCP connect + PING + CAS + loop close) and thrashed the @thread_cached cache. That matches the pattern agent-executor never needs because its state-update path is sync. We can't go fully sync without duplicating mark_session_completed's CAS+publish — but we can submit the coro to the processor's long-lived execution_loop, matching the pattern at backend.executor.manager.ExecutionProcessor.on_graph_execution (line 881-892 where it uses `node_execution_loop` the same way). Changes: - sync_fail_close_session now takes an explicit execution_loop arg and submits via asyncio.run_coroutine_threadsafe (exactly as node_execution_loop is used). No new loop per call, no clear_cache(), no per-turn AsyncRedis churn. - execute() passes self.execution_loop in. - RuntimeError on a closed loop is handled (cleanup already ran the per-worker teardown — let the stale-session watchdog handle it). - Outer future.result(timeout=_FAIL_CLOSE_REDIS_TIMEOUT + 2) is a belt-and-braces cap on the cross-thread wait. - Tests: added `exec_loop` fixture that spins up a daemon-thread loop mirroring the processor's execution_loop layout, updated all sync_fail_close_session callers to pass it in, and TestExecuteSafetyNet sets proc.execution_loop before exercising execute(). Dropped the get_redis_async import since clear_cache() is gone.
Tested empirically on dev's RabbitMQ 4.1.4: queue_declare with a different x-consumer-timeout value is accepted without error (unlike x-queue-type which triggers PRECONDITION_FAILED on mismatch). So the deploy doesn't need a queue-delete step — just the rabbitmqctl set_policy command to make the effective timeout 6h for running consumers mid-shutdown. Updated the utils.py comment to match.
Default redis-py clients have socket_timeout=None, which means a wedged Redis endpoint (accepts TCP but doesn't respond) hangs callers indefinitely. That's bad for cluster_lock.refresh() in particular — it's called from long-running code paths, and a stall there lets the lock TTL expire silently without anyone noticing. Adds sensible defaults to both the sync and async factories: - socket_timeout=5 — fail-fast on read/write stalls - socket_connect_timeout=5 — fail-fast on initial connect - socket_keepalive=True — let the kernel detect dead connections - health_check_interval=30 — redis-py PINGs idle connections every 30s so half-open sockets get flagged well before the OS's default ~2h TCP keepalive fires Each is env-overridable (REDIS_SOCKET_TIMEOUT / REDIS_SOCKET_CONNECT_ TIMEOUT / REDIS_HEALTH_CHECK_INTERVAL) so a specific deployment can tune the budget without a code change. Surface area: this touches the shared clients used across the whole backend (not just copilot). A surprising slow Redis query that previously blocked for 30s+ will now raise a TimeoutError instead — callers that relied on unbounded waits need to handle that. In practice no legitimate query should run past 5s; retry-on-timeout is already built into the conn_retry wrapper above.
5s was racing the xread(block=5000) blocking read in stream_registry and was tighter than the 30s convention established in rabbitmq.py. 30s gives ~6x headroom over the largest BLOCK wait while still bounding the wedged-Redis stall window. Connect timeout stays at 5s (fast-fail on initial connection — a slow connect means the endpoint is unreachable).
⚡ Risk Assessment —
|
| Files | Summary |
|---|---|
Sync Safety Net for Zombie Sessionsautogpt_platform/backend/backend/copilot/executor/processor.py, processor_test.py |
Adds sync_fail_close_session() function and refactors execute() to guarantee mark_session_completed runs in finally block even if async path fails. Includes comprehensive test coverage for four deploy-time scenarios (SIGTERM, happy path, zombie Redis, timeout). |
Graceful Shutdown & Lock Managementautogpt_platform/backend/backend/copilot/executor/manager.py |
Expands cleanup() docstring with 5-step shutdown sequence. Moves cluster-lock assignment after logging to prevent race. Adds timeout-exceeded warning when active tasks remain after graceful-shutdown window. |
Redis Connection Resilienceautogpt_platform/backend/backend/data/redis_client.py |
Adds socket timeouts (30s default), connect timeout (5s), and health-check interval (30s) to both sync and async Redis connections to prevent wedged endpoints from blocking indefinitely. |
Consumer Timeout & Deployment Configurationautogpt_platform/backend/backend/copilot/executor/utils.py |
Increases COPILOT_CONSUMER_TIMEOUT_SECONDS from 1 hour to 6 hours to match pod graceful-shutdown window. Ties GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS to consumer timeout. Adds RabbitMQ policy instructions for rolling deploys. |
API Naming & Documentationautogpt_platform/backend/backend/copilot/pending_messages.py, pending_messages_test.py |
Renames _clear_pending_messages_unsafe to clear_pending_messages_unsafe (public API). Clarifies docstring that function is debug/operator escape hatch only. |
Stream Registry & Session Lifecycleautogpt_platform/backend/backend/copilot/stream_registry.py |
Updates stale-session watchdog comment to reference 6-hour consumer timeout instead of 1-hour assumption. |
Skill Documentation.claude/skills/pr-polish/SKILL.md.claude/skills/pr-test/SKILL.md |
Adds new PR-polish skill and extends PR-test skill with lock-release and path-sanitization guidance. |
Sequence Diagram
sequenceDiagram
participant Worker as Pool Worker
participant Execute as execute()
participant AsyncPath as _execute_async()
participant ExecLoop as execution_loop
participant Redis as Redis
participant SafetyNet as sync_fail_close_session
Worker->>Execute: call execute(entry, cancel, lock)
Execute->>Execute: start_time = now
Execute->>AsyncPath: submit _execute_async to ExecLoop
AsyncPath->>ExecLoop: run in event loop
ExecLoop->>Redis: mark_session_completed (async)
alt Async succeeds
Redis-->>ExecLoop: OK
ExecLoop-->>AsyncPath: return
else Async fails (wedged Redis)
Redis-->>ExecLoop: timeout/error
ExecLoop-->>AsyncPath: raise exception
end
AsyncPath-->>Execute: future completes/fails
Execute->>SafetyNet: finally block invokes
SafetyNet->>ExecLoop: run_coroutine_threadsafe(mark_session_completed)
ExecLoop->>Redis: mark_session_completed (sync safety net)
Redis-->>ExecLoop: OK or timeout
ExecLoop-->>SafetyNet: result/timeout
SafetyNet->>SafetyNet: swallow errors, log warnings
SafetyNet-->>Execute: return
Execute-->>Worker: done
Dig Deeper With Commands
/review <file-path> <function-optional>/chat <file-path> "<question>"/roast <file-path>
Runs only when explicitly triggered.
Actionable Comments Posted: 0🧹 Nitpick comments (1)Silent BaseException swallow in cancel grace path - autogpt_platform/backend/backend/copilot/executor/processor.py (389, 391)🧾 Coverage Summary✔️ Covered (10 files) |
Mirror of upstream Significant-Gravitas#12886 for benchmark. Do not merge.
Summary by MergeMonkey