Skip to content

[mirror] fix(backend/copilot): sync safety net for Redis-induced zombie sessions#2

Open
yashwant86 wants to merge 18 commits intomm-base-12886from
mm-pr-12886
Open

[mirror] fix(backend/copilot): sync safety net for Redis-induced zombie sessions#2
yashwant86 wants to merge 18 commits intomm-base-12886from
mm-pr-12886

Conversation

@yashwant86
Copy link
Copy Markdown

@yashwant86 yashwant86 commented Apr 26, 2026

Mirror of upstream Significant-Gravitas#12886 for benchmark. Do not merge.


Summary by MergeMonkey

  • The Fine Print:
    • Expanded cleanup() docstring with detailed 5-step shutdown sequence and zombie-session mitigation explanation.
    • Added comprehensive comments to sync_fail_close_session explaining cross-thread loop reuse and atomic CAS safety.
    • Documented Redis socket timeout configuration and health-check interval with deployment notes.
    • Updated RabbitMQ consumer-timeout comments with policy application instructions for rolling deploys.
  • Enhancements:
    • Sync safety net (sync_fail_close_session) ensures sessions marked failed even if async Redis path fails.
    • Bounded timeouts on Redis operations and event-loop waits prevent wedged endpoints from blocking worker threads.
    • Health-check interval and socket keepalive on Redis connections detect half-open sockets early.
  • Resolved Issues:
    • Fixed zombie sessions by guaranteeing mark_session_completed runs in execute() finally block regardless of async failures.
    • Fixed cluster-lock assignment race by moving lock storage after logging (not before) to prevent stale references.
    • Fixed unbounded blocking on wedged Redis by adding socket timeouts and bounded wait_for on async calls.
  • Under the Hood:
    • Refactored execute() into thin wrapper + _execute() helper to isolate sync/async boundary.
    • Renamed _clear_pending_messages_unsafe to clear_pending_messages_unsafe (public API).
    • Increased COPILOT_CONSUMER_TIMEOUT_SECONDS from 1 hour to 6 hours to match pod graceful-shutdown window.
    • Extracted timeout constants (_CANCEL_GRACE_SECONDS, _FAIL_CLOSE_REDIS_TIMEOUT) for clarity and testability.

majdyz added 18 commits April 22, 2026 22:15
On SIGTERM the executor's cleanup() waited passively for active_tasks to
drain but never signalled cancel_event to the running turns. Anything
longer than the grace window got SIGKILL'd; the processor's finally never
ran, so mark_session_completed never fired and the Redis session stayed
status=running until the 65-minute stale watchdog eventually reaped it.
While the session was a zombie, pending-message pushes succeeded (HTTP
side only needs status=running) but no consumer ever drained them, so the
frontend showed "Queued" bubbles forever.

Changes:
- cleanup() now sets each active task's cancel_event first, letting the
  turn break out of its stream loop and hit its own finally.
- If a turn still hasn't drained after the grace window, cleanup()
  synchronously marks the session failed and clears its pending buffer
  so the UI transitions to a terminal state immediately.
- Added sync_fail_close_session, invoked from CoPilotProcessor.execute's
  finally as a last line of defence — covers the case where the async
  event loop dies before its own finally can run. mark_session_completed
  is a CAS on status==running so the happy path is a cheap no-op.
- Raised COPILOT_CONSUMER_TIMEOUT_SECONDS and GRACEFUL_SHUTDOWN_TIMEOUT_
  SECONDS to 6h so a rolling deploy can let the longest legitimate turn
  finish. Matches the helm chart terminationGracePeriodSeconds change.
- Renamed _clear_pending_messages_unsafe to clear_pending_messages_unsafe
  (still carries the _unsafe suffix) so it's importable without the
  leading-underscore private marker; docs now call out the legitimate
  cleanup fail-close use case.
- Refactor cleanup() into four self-documenting phases with a single
  _ActiveTask dataclass instead of three parallel dicts.
- Reject RabbitMQ delivery during fail-close so orphan messages are not
  redelivered and double-executed on another pod (addresses critical bug
  in previous patch where active_tasks held (future, event) tuple but
  the ack callback was only reachable through on_run_done).
- Gate sync_fail_close_session to only fire when the future didn't reach
  done — avoids racing _execute_async's own finally and overwriting its
  accurate terminal-state message with the generic shutdown one.
- Give the async finally a short grace window (5s) after future.cancel()
  so it has time to publish the terminal state before the sync safety
  net fires.
- clear_pending_messages_unsafe now drops BOTH the primary buffer and
  the persist queue — otherwise stale follow-ups in
  copilot:pending-persist:* could ride a later unrelated tool result on
  the same session.
- Extract SHUTDOWN_ERROR_MESSAGE constant shared between processor and
  manager (no more duplicated string).
- utils.py: document the RabbitMQ policy-based migration needed when
  deploying the new x-consumer-timeout value (existing durable queue
  would otherwise fail redeclaration with PRECONDITION_FAILED).
- pr-polish skill: guard against empty PR inference, filter bots +
  author from baseline issue comments, treat all non-success terminal
  CI conclusions as blockers (not just "failure"), refresh the baseline
  after each /pr-address so already-addressed items don't stay "new"
  forever.
Phase 1 used to both flag the event AND call channel.stop_consuming()
via add_callback_threadsafe. Once start_consuming() returned, pika's
ioloop exited — and phase 3's basic_nack threadsafe callbacks silently
never fired. That meant orphan-session deliveries went unacked, got
redelivered on channel close, and could re-execute an already-failed
turn on another pod.

Split the phase: now phase 1 only sets the Python event that gates
_handle_run_message (new deliveries are nacked there), and the actual
channel.stop_consuming() is deferred to phase 4's _stop_message_consumers.
Because both phase 3's nacks and phase 4's stop are queued via
add_callback_threadsafe, pika processes them FIFO — nacks fire first,
then stop_consuming exits the ioloop.
…TERM

The previous patch signalled cancel_event to every in-flight turn as
soon as SIGTERM arrived, which interrupted turns that were running
perfectly fine. That violates the "let in-flight work finish naturally"
principle.

The zombie-session bug this PR targets was never caused by SIGTERM
killing the turn — it was caused by a transient Redis failure
mid-turn whose reporting path (_execute_async.finally →
mark_session_completed) also hit the failure and silently swallowed it.
That case is already handled by sync_fail_close_session in execute()'s
finally (fresh asyncio loop + fresh Redis client on every turn exit),
so cancelling healthy turns on shutdown adds no value and hurts UX.

Reverts phase 2 to a passive wait. Phase 3's fail-close still handles
the much rarer case where a turn genuinely cannot finish inside the
grace window. Removed the now-unused _signal_cancel_to_active helper.
…attern

Dropped the phased cleanup + fail-close mechanism. That was solving a
different problem (truly-stuck turns at grace-window expiry) than what
this PR actually needed to fix (transient Redis failure mid-turn leaving
session status=running forever).

- cleanup() is now a single method mirroring backend.executor.manager:
  stop consumer (flag + broker), passive wait for active_tasks to drain,
  worker/executor/lock teardown. Same shape as the proven agent-executor
  cleanup. No pre-emptive cancel of healthy in-flight turns.
- Removed _ActiveTask dataclass, reverted active_tasks to tuple-of-
  (future, cancel_event) + _task_locks side dict (same as agent-executor
  and pre-refactor copilot).
- Removed all phase helpers, fail-close redis state, orphan delivery
  rejection, and the deferred channel.stop_consuming workaround they
  required.
- Reverted clear_pending_messages_unsafe to single-key delete (the
  persist-queue extension was only needed by the dropped fail-close
  path).
- Reverted _FakeRedis.delete to single-arg and dropped the persist-queue
  coverage test.

The actual zombie-bug fix stays: sync_fail_close_session in
processor.execute's finally. Added asyncio.wait_for(10s) around its
mark_session_completed call so a broken Redis can't hang the safety
net indefinitely — if the Redis CAS doesn't respond in 10s, we log
and let the pod exit (stale-session watchdog eventually reaps it,
matching the pre-PR failure mode rather than blocking shutdown).
… is obvious

Moves the sync/async bridging (run_coroutine_threadsafe, cancel check,
cluster_lock refresh, bounded waits) into a private _execute helper.
The public execute() is now a tight try/finally:

    try:
        self._execute(entry, cancel, cluster_lock, log)
    finally:
        sync_fail_close_session(entry.session_id, log)

The gate (``if future is None or not future.done()``) is gone — since
mark_session_completed is CAS on status=="running", calling it after
the async path succeeded is a harmless no-op Redis roundtrip. Trading
one Redis call per turn for a much cleaner control flow and a
guarantee that the safety net runs on every exit path.
Multiple agents have been stalling on /pr-test because a finished run
keeps the .ign.testing.lock held "for manual exploration." The lock
guards test EXECUTION, not the app lifecycle — the app can keep running
on :3000/:8006 for manual poking while the lock is gone, and sibling
agents will kill the stray processes themselves via Step 3c/3e's
port-clear preamble.

Adds explicit guidance + the end-of-test release snippet so the common
path is: report → PR comment → release lock → optionally leave the app
running with a note to the user.
A recent /pr-test run posted a PR comment with a `Report: /Users/.../
test-report.md` trailing line — useless to every reviewer. Adds an
explicit prohibition with a grep-based sanity check run before the
gh api call, and the reminder that the PR comment is a github.com
artifact that must be self-contained (copy the CONTENT of local
files in, not the path).
E2E testing of SIGTERM mid-turn surfaced a latent bug: the second turn
on the same pool-worker thread logs

    sync fail-close mark_session_completed failed: Event loop is closed

Root cause: get_redis_async is @thread_cached. The cache is keyed by
thread, not by event loop. sync_fail_close_session uses asyncio.run()
to open a temporary loop, runs the CAS, then closes the loop. The
AsyncRedis client cached on the first call is bound (via its connection
pool's streams) to that now-closed loop, so turn 2+ on the same worker
thread pulls the stale cached client and can't do any I/O.

Fix: call get_redis_async.clear_cache() before each asyncio.run() in
the safety net. That forces connect_async() to create a fresh
AsyncRedis with an empty connection pool; its first I/O then binds
connections to the CURRENT live loop.

The happy path in the E2E test still worked because the async path
already CAS'd status=completed, so the sync CAS would have been a
no-op anyway. But the whole point of the safety net is to save the
case where the async path failed — and without this fix it was
broken from turn 2 onwards.

Also adds processor_test.py coverage for:
- TestSyncFailCloseSession::test_bounded_timeout_when_redis_hangs
- TestExecuteSafetyNet::test_happy_path_invokes_safety_net
- TestExecuteSafetyNet::test_sigterm_mid_turn_invokes_safety_net
- TestExecuteSafetyNet::test_zombie_redis_async_path_still_marks_session_failed

manager.py has one linter-driven whitespace tweak from poetry run format.
Round-5 review feedback: the previous asyncio.run() + clear_cache()
approach created a fresh AsyncRedis client per turn exit (TCP connect
+ PING + CAS + loop close) and thrashed the @thread_cached cache. That
matches the pattern agent-executor never needs because its state-update
path is sync. We can't go fully sync without duplicating
mark_session_completed's CAS+publish — but we can submit the coro to
the processor's long-lived execution_loop, matching the pattern at
backend.executor.manager.ExecutionProcessor.on_graph_execution (line
881-892 where it uses `node_execution_loop` the same way).

Changes:
- sync_fail_close_session now takes an explicit execution_loop arg and
  submits via asyncio.run_coroutine_threadsafe (exactly as
  node_execution_loop is used). No new loop per call, no clear_cache(),
  no per-turn AsyncRedis churn.
- execute() passes self.execution_loop in.
- RuntimeError on a closed loop is handled (cleanup already ran the
  per-worker teardown — let the stale-session watchdog handle it).
- Outer future.result(timeout=_FAIL_CLOSE_REDIS_TIMEOUT + 2) is a
  belt-and-braces cap on the cross-thread wait.
- Tests: added `exec_loop` fixture that spins up a daemon-thread loop
  mirroring the processor's execution_loop layout, updated all
  sync_fail_close_session callers to pass it in, and TestExecuteSafetyNet
  sets proc.execution_loop before exercising execute().

Dropped the get_redis_async import since clear_cache() is gone.
Tested empirically on dev's RabbitMQ 4.1.4: queue_declare with a
different x-consumer-timeout value is accepted without error
(unlike x-queue-type which triggers PRECONDITION_FAILED on mismatch).
So the deploy doesn't need a queue-delete step — just the
rabbitmqctl set_policy command to make the effective timeout 6h for
running consumers mid-shutdown. Updated the utils.py comment to
match.
Default redis-py clients have socket_timeout=None, which means a wedged
Redis endpoint (accepts TCP but doesn't respond) hangs callers
indefinitely. That's bad for cluster_lock.refresh() in particular —
it's called from long-running code paths, and a stall there lets the
lock TTL expire silently without anyone noticing.

Adds sensible defaults to both the sync and async factories:

- socket_timeout=5 — fail-fast on read/write stalls
- socket_connect_timeout=5 — fail-fast on initial connect
- socket_keepalive=True — let the kernel detect dead connections
- health_check_interval=30 — redis-py PINGs idle connections every
  30s so half-open sockets get flagged well before the OS's default
  ~2h TCP keepalive fires

Each is env-overridable (REDIS_SOCKET_TIMEOUT / REDIS_SOCKET_CONNECT_
TIMEOUT / REDIS_HEALTH_CHECK_INTERVAL) so a specific deployment can
tune the budget without a code change.

Surface area: this touches the shared clients used across the whole
backend (not just copilot). A surprising slow Redis query that
previously blocked for 30s+ will now raise a TimeoutError instead —
callers that relied on unbounded waits need to handle that. In
practice no legitimate query should run past 5s; retry-on-timeout
is already built into the conn_retry wrapper above.
5s was racing the xread(block=5000) blocking read in stream_registry
and was tighter than the 30s convention established in rabbitmq.py.
30s gives ~6x headroom over the largest BLOCK wait while still bounding
the wedged-Redis stall window. Connect timeout stays at 5s (fast-fail
on initial connection — a slow connect means the endpoint is
unreachable).
@bot-mergemonkey
Copy link
Copy Markdown

bot-mergemonkey Bot commented Apr 26, 2026

Risk AssessmentCRITICAL · ~45 min review

Focus areas: sync_fail_close_session safety net and finally-block guarantee · Redis socket timeouts and health-check configuration · 6-hour consumer timeout alignment with pod graceful-shutdown window · Cluster-lock assignment race fix and cleanup() sequence

Assessment: Hardens session lifecycle with sync safety net to prevent zombie sessions from wedged Redis.

Walkthrough

When a CoPilot turn executes, the processor submits _execute_async to a long-lived event loop and waits for completion with bounded timeouts. If the async path fails to mark the session completed (e.g., Redis wedged), the outer execute() method's finally block invokes sync_fail_close_session, which submits mark_session_completed to the same event loop via run_coroutine_threadsafe. This ensures the session is marked failed even if the async path's Redis call failed, preventing zombie sessions. Redis connections now have socket timeouts and health checks to fail fast instead of blocking indefinitely.

Changes

Files Summary
Sync Safety Net for Zombie Sessions
autogpt_platform/backend/backend/copilot/executor/processor.py, processor_test.py
Adds sync_fail_close_session() function and refactors execute() to guarantee mark_session_completed runs in finally block even if async path fails. Includes comprehensive test coverage for four deploy-time scenarios (SIGTERM, happy path, zombie Redis, timeout).
Graceful Shutdown & Lock Management
autogpt_platform/backend/backend/copilot/executor/manager.py
Expands cleanup() docstring with 5-step shutdown sequence. Moves cluster-lock assignment after logging to prevent race. Adds timeout-exceeded warning when active tasks remain after graceful-shutdown window.
Redis Connection Resilience
autogpt_platform/backend/backend/data/redis_client.py
Adds socket timeouts (30s default), connect timeout (5s), and health-check interval (30s) to both sync and async Redis connections to prevent wedged endpoints from blocking indefinitely.
Consumer Timeout & Deployment Configuration
autogpt_platform/backend/backend/copilot/executor/utils.py
Increases COPILOT_CONSUMER_TIMEOUT_SECONDS from 1 hour to 6 hours to match pod graceful-shutdown window. Ties GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS to consumer timeout. Adds RabbitMQ policy instructions for rolling deploys.
API Naming & Documentation
autogpt_platform/backend/backend/copilot/pending_messages.py, pending_messages_test.py
Renames _clear_pending_messages_unsafe to clear_pending_messages_unsafe (public API). Clarifies docstring that function is debug/operator escape hatch only.
Stream Registry & Session Lifecycle
autogpt_platform/backend/backend/copilot/stream_registry.py
Updates stale-session watchdog comment to reference 6-hour consumer timeout instead of 1-hour assumption.
Skill Documentation
.claude/skills/pr-polish/SKILL.md
.claude/skills/pr-test/SKILL.md
Adds new PR-polish skill and extends PR-test skill with lock-release and path-sanitization guidance.

Sequence Diagram

sequenceDiagram
    participant Worker as Pool Worker
    participant Execute as execute()
    participant AsyncPath as _execute_async()
    participant ExecLoop as execution_loop
    participant Redis as Redis
    participant SafetyNet as sync_fail_close_session
    
    Worker->>Execute: call execute(entry, cancel, lock)
    Execute->>Execute: start_time = now
    Execute->>AsyncPath: submit _execute_async to ExecLoop
    AsyncPath->>ExecLoop: run in event loop
    ExecLoop->>Redis: mark_session_completed (async)
    alt Async succeeds
        Redis-->>ExecLoop: OK
        ExecLoop-->>AsyncPath: return
    else Async fails (wedged Redis)
        Redis-->>ExecLoop: timeout/error
        ExecLoop-->>AsyncPath: raise exception
    end
    AsyncPath-->>Execute: future completes/fails
    Execute->>SafetyNet: finally block invokes
    SafetyNet->>ExecLoop: run_coroutine_threadsafe(mark_session_completed)
    ExecLoop->>Redis: mark_session_completed (sync safety net)
    Redis-->>ExecLoop: OK or timeout
    ExecLoop-->>SafetyNet: result/timeout
    SafetyNet->>SafetyNet: swallow errors, log warnings
    SafetyNet-->>Execute: return
    Execute-->>Worker: done
Loading

Dig Deeper With Commands

  • /review <file-path> <function-optional>
  • /chat <file-path> "<question>"
  • /roast <file-path>

Runs only when explicitly triggered.

@bot-mergemonkey
Copy link
Copy Markdown

Actionable Comments Posted: 0

🧹 Nitpick comments (1)
Silent BaseException swallow in cancel grace path - autogpt_platform/backend/backend/copilot/executor/processor.py (389, 391)
After future.cancel() the except BaseException: pass at line 391 swallows everything (CancelledError, SystemExit, KeyboardInterrupt) without even a debug log. CancelledError is the expected case here, but if SystemExit ever propagates out of the coroutine you'd never know. A log.debug("cancel grace exited: %r", exc) would help post-mortem debugging without changing behavior.

Add a log.debug("sync fail-close cancel grace ended: %r", exc) inside the except so cancel-path exits are observable.
🧾 Coverage Summary
✔️ Covered (10 files)
- .claude/skills/pr-polish/SKILL.md
- .claude/skills/pr-test/SKILL.md
- autogpt_platform/backend/backend/copilot/executor/manager.py
- autogpt_platform/backend/backend/copilot/executor/processor.py
- autogpt_platform/backend/backend/copilot/executor/processor_test.py
- autogpt_platform/backend/backend/copilot/executor/utils.py
- autogpt_platform/backend/backend/copilot/pending_messages.py
- autogpt_platform/backend/backend/copilot/pending_messages_test.py
- autogpt_platform/backend/backend/copilot/stream_registry.py
- autogpt_platform/backend/backend/data/redis_client.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants