From 57e507ac4dce453f55faee65109c08099c228679 Mon Sep 17 00:00:00 2001 From: jkbennitt Date: Sat, 16 May 2026 02:30:09 -0400 Subject: [PATCH] Surface RIMAPI action errors to agents + fix load-settle race Two fixes from live-game testing that caused agents to fail and re-fail the same invalid action every tick: 1) Load-settle race in run_scenario.py: the polling loop broke on first `population > 0`, which was ~4s after load_game. RIMAPI returns 200 before Unity's main thread finishes applying the load, so the immediately-following unforbid_all_items() POST got 500'd. Now require 5 consecutive stable-population checks (10s floor) before writes. 2) Surface per-action errors back to agents via CentralPost: - ExecutionResult.outcomes now carries an ActionOutcome per dispatched action with success + a cleaned-up error message (unwrapped from RIMAPI's {"errors":[...]} JSON envelope). - game_loop broadcasts STATUS_UPDATE with failed outcomes so agents see "DO NOT REPEAT" context next tick. Removes the "researcher keeps re-proposing already-finished Electricity" loop observed in the live run. - Also fixed pre-existing position-based telemetry bug (`i < executed` assumed actions failed in order). 4 new unit tests cover the outcome capture paths. Tests: 382 pass. ruff/mypy clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/run_scenario.py | 20 ++++++-- src/rle/orchestration/action_executor.py | 59 +++++++++++++++++++-- src/rle/orchestration/game_loop.py | 36 +++++++++++-- tests/unit/test_action_executor.py | 65 ++++++++++++++++++++++++ 4 files changed, 167 insertions(+), 13 deletions(-) diff --git a/scripts/run_scenario.py b/scripts/run_scenario.py index a9f57a5..af9e099 100644 --- a/scripts/run_scenario.py +++ b/scripts/run_scenario.py @@ -153,16 +153,26 @@ async def main(args: argparse.Namespace) -> None: print(f"Loading save: {scenario.save_name}") try: await client.load_game(scenario.save_name) - # Wait for RIMAPI to respond, then poll until colonists are loaded + # Wait for RIMAPI to respond, then poll until colonists are loaded. + # Then wait ~10s of stable state before any writes: RIMAPI returns + # HTTP 200 before Unity's main thread has finished applying the load, + # and writes that race the settle window get 500'd. await wait_for_rimapi(config.rimapi_url, timeout=30.0) - for _ in range(15): + stable_count = 0 + last_population = -1 + for _ in range(30): await asyncio.sleep(2) try: colony = await client.get_colony() - if colony.population > 0: - break + if colony.population > 0 and colony.population == last_population: + stable_count += 1 + if stable_count >= 5: + break + else: + stable_count = 0 + last_population = colony.population except Exception: - pass + stable_count = 0 # Unforbid all starting items so colonists can use them unforbid_count = await client.unforbid_all_items() if unforbid_count: diff --git a/src/rle/orchestration/action_executor.py b/src/rle/orchestration/action_executor.py index 813c15b..4ffb71b 100644 --- a/src/rle/orchestration/action_executor.py +++ b/src/rle/orchestration/action_executor.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json import logging from typing import Any, cast @@ -9,7 +10,7 @@ from rle.agents.actions import Action, ActionPlan, resolve_endpoint from rle.rimapi.api_catalog import WRITE_CATALOG -from rle.rimapi.client import RimAPIClient +from rle.rimapi.client import RimAPIClient, RimAPIResponseError logger = logging.getLogger(__name__) @@ -26,6 +27,20 @@ }) +class ActionOutcome(BaseModel): + """Per-action execution result. Captures failure detail so the next + tick's deliberation context can surface it to the agent that proposed it. + """ + + model_config = ConfigDict(frozen=True) + + action_type: str + endpoint: str + target_colonist_id: str | None = None + success: bool + error: str | None = None + + class ExecutionResult(BaseModel): """Summary of action execution for one tick.""" @@ -34,6 +49,19 @@ class ExecutionResult(BaseModel): executed: int failed: int total: int + outcomes: tuple[ActionOutcome, ...] = () + + +def _extract_rimapi_error(detail: str) -> str: + """Pull the first error string out of a RIMAPI JSON envelope, else return raw detail.""" + try: + parsed = json.loads(detail) + except (json.JSONDecodeError, TypeError): + return detail + errors = parsed.get("errors") if isinstance(parsed, dict) else None + if isinstance(errors, list) and errors: + return str(errors[0]) + return detail class ActionExecutor: @@ -47,10 +75,11 @@ def __init__(self, client: RimAPIClient) -> None: self._client = client async def execute(self, plan: ActionPlan) -> ExecutionResult: - """Execute all actions in a plan, return summary.""" + """Execute all actions in a plan, return summary + per-action outcomes.""" executed = 0 failed = 0 no_action_count = 0 + outcomes: list[ActionOutcome] = [] for action in plan.actions: endpoint = resolve_endpoint(action.action_type) if endpoint == "no_action": @@ -59,13 +88,37 @@ async def execute(self, plan: ActionPlan) -> ExecutionResult: try: await self._dispatch(action, endpoint) executed += 1 - except Exception: + outcomes.append(ActionOutcome( + action_type=action.action_type, + endpoint=endpoint, + target_colonist_id=action.target_colonist_id, + success=True, + )) + except RimAPIResponseError as exc: + logger.warning("Action %s failed: %s", endpoint, exc.detail) + failed += 1 + outcomes.append(ActionOutcome( + action_type=action.action_type, + endpoint=endpoint, + target_colonist_id=action.target_colonist_id, + success=False, + error=_extract_rimapi_error(exc.detail), + )) + except Exception as exc: logger.warning("Action %s failed", endpoint, exc_info=True) failed += 1 + outcomes.append(ActionOutcome( + action_type=action.action_type, + endpoint=endpoint, + target_colonist_id=action.target_colonist_id, + success=False, + error=str(exc) or type(exc).__name__, + )) return ExecutionResult( executed=executed, failed=failed, total=len(plan.actions) - no_action_count, + outcomes=tuple(outcomes), ) async def _dispatch(self, action: Action, endpoint: str) -> None: diff --git a/src/rle/orchestration/game_loop.py b/src/rle/orchestration/game_loop.py index 471f205..7fb0508 100644 --- a/src/rle/orchestration/game_loop.py +++ b/src/rle/orchestration/game_loop.py @@ -500,13 +500,39 @@ async def run_tick(self) -> TickResult: # 7. Execute merged plan exec_result = await self._executor.execute(resolved) - for i, action in enumerate(resolved.actions): - success = i < exec_result.executed + for outcome in exec_result.outcomes: self._emit( EventType.ACTION_EXEC, tick_num, - action_type=action.action_type, - target=action.target_colonist_id, - success=success, + action_type=outcome.action_type, + target=outcome.target_colonist_id, + success=outcome.success, + error=outcome.error, + ) + + # 7b. Surface per-action errors back to agents so they can avoid + # re-proposing the same invalid action next tick (e.g. researching + # an already-finished project, setting priority for a disabled work type). + failed_outcomes = [o for o in exec_result.outcomes if not o.success and o.error] + if failed_outcomes: + error_summary = "; ".join( + f"{o.action_type}({o.target_colonist_id or '-'}): {o.error}" + for o in failed_outcomes[:10] + ) + self._spoke_manager.broadcast_message( + MessageType.STATUS_UPDATE, + { + "tick": state.colony.tick, + "summary": f"Last tick action errors — DO NOT REPEAT: {error_summary}", + "action_errors": [ + { + "action_type": o.action_type, + "target_colonist_id": o.target_colonist_id, + "error": o.error, + } + for o in failed_outcomes + ], + }, + sender_id="hub", ) # 8. Score this tick diff --git a/tests/unit/test_action_executor.py b/tests/unit/test_action_executor.py index a126808..f6307fd 100644 --- a/tests/unit/test_action_executor.py +++ b/tests/unit/test_action_executor.py @@ -74,6 +74,71 @@ async def test_pending_upstream_action_succeeds(self) -> None: assert result.failed == 0 +class TestOutcomes: + """ExecutionResult.outcomes carries per-action success/error so the + game loop can surface failures back to agents next tick.""" + + async def test_success_outcome_recorded(self) -> None: + from rle.orchestration.action_executor import ActionExecutor + client = AsyncMock() + executor = ActionExecutor(client) + plan = _make_plan( + Action(action_type="set_work_priority", + target_colonist_id="42", parameters={"Growing": 1}), + ) + result = await executor.execute(plan) + assert len(result.outcomes) == 1 + outcome = result.outcomes[0] + assert outcome.success is True + assert outcome.action_type == "set_work_priority" + assert outcome.target_colonist_id == "42" + assert outcome.error is None + + async def test_rimapi_error_extracted_into_outcome(self) -> None: + """RIMAPI's JSON error envelope is unwrapped to a clean human-readable string.""" + from rle.orchestration.action_executor import ActionExecutor + from rle.rimapi.client import RimAPIResponseError + client = AsyncMock() + client.set_research_target = AsyncMock(side_effect=RimAPIResponseError( + 500, + '{"success":false,"errors":["Research project \'Electricity\' is already finished."],' + '"warnings":[],"timestamp":"2026-05-16T06:00:00Z"}', + )) + executor = ActionExecutor(client) + plan = _make_plan( + Action(action_type="set_research_target", parameters={"project": "Electricity"}), + ) + result = await executor.execute(plan) + assert result.failed == 1 + assert len(result.outcomes) == 1 + outcome = result.outcomes[0] + assert outcome.success is False + assert outcome.error == "Research project 'Electricity' is already finished." + + async def test_generic_exception_message_captured(self) -> None: + from rle.orchestration.action_executor import ActionExecutor + client = AsyncMock() + client.set_research_target = AsyncMock(side_effect=RuntimeError("boom")) + executor = ActionExecutor(client) + plan = _make_plan(Action(action_type="set_research_target", parameters={})) + result = await executor.execute(plan) + assert result.outcomes[0].success is False + assert result.outcomes[0].error == "boom" + + async def test_outcomes_preserve_order(self) -> None: + """no_action is skipped; outcomes only cover dispatched actions in order.""" + from rle.orchestration.action_executor import ActionExecutor + client = AsyncMock() + executor = ActionExecutor(client) + plan = _make_plan( + Action(action_type="no_action"), + Action(action_type="draft_colonist", target_colonist_id="1"), + Action(action_type="draft_colonist", target_colonist_id="2"), + ) + result = await executor.execute(plan) + assert [o.target_colonist_id for o in result.outcomes] == ["1", "2"] + + class TestDispatch: async def test_set_work_priority(self) -> None: client = AsyncMock()