Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions scripts/run_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,26 @@ async def main(args: argparse.Namespace) -> None:
print(f"Loading save: {scenario.save_name}")
try:
await client.load_game(scenario.save_name)
# Wait for RIMAPI to respond, then poll until colonists are loaded
# Wait for RIMAPI to respond, then poll until colonists are loaded.
# Then wait ~10s of stable state before any writes: RIMAPI returns
# HTTP 200 before Unity's main thread has finished applying the load,
# and writes that race the settle window get 500'd.
await wait_for_rimapi(config.rimapi_url, timeout=30.0)
for _ in range(15):
stable_count = 0
last_population = -1
for _ in range(30):
await asyncio.sleep(2)
try:
colony = await client.get_colony()
if colony.population > 0:
break
if colony.population > 0 and colony.population == last_population:
stable_count += 1
if stable_count >= 5:
break
else:
stable_count = 0
last_population = colony.population
except Exception:
pass
stable_count = 0
# Unforbid all starting items so colonists can use them
unforbid_count = await client.unforbid_all_items()
if unforbid_count:
Expand Down
59 changes: 56 additions & 3 deletions src/rle/orchestration/action_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

from __future__ import annotations

import json
import logging
from typing import Any, cast

from pydantic import BaseModel, ConfigDict

from rle.agents.actions import Action, ActionPlan, resolve_endpoint
from rle.rimapi.api_catalog import WRITE_CATALOG
from rle.rimapi.client import RimAPIClient
from rle.rimapi.client import RimAPIClient, RimAPIResponseError

logger = logging.getLogger(__name__)

Expand All @@ -26,6 +27,20 @@
})


class ActionOutcome(BaseModel):
"""Per-action execution result. Captures failure detail so the next
tick's deliberation context can surface it to the agent that proposed it.
"""

model_config = ConfigDict(frozen=True)

action_type: str
endpoint: str
target_colonist_id: str | None = None
success: bool
error: str | None = None


class ExecutionResult(BaseModel):
"""Summary of action execution for one tick."""

Expand All @@ -34,6 +49,19 @@ class ExecutionResult(BaseModel):
executed: int
failed: int
total: int
outcomes: tuple[ActionOutcome, ...] = ()


def _extract_rimapi_error(detail: str) -> str:
"""Pull the first error string out of a RIMAPI JSON envelope, else return raw detail."""
try:
parsed = json.loads(detail)
except (json.JSONDecodeError, TypeError):
return detail
errors = parsed.get("errors") if isinstance(parsed, dict) else None
if isinstance(errors, list) and errors:
return str(errors[0])
return detail


class ActionExecutor:
Expand All @@ -47,10 +75,11 @@ def __init__(self, client: RimAPIClient) -> None:
self._client = client

async def execute(self, plan: ActionPlan) -> ExecutionResult:
"""Execute all actions in a plan, return summary."""
"""Execute all actions in a plan, return summary + per-action outcomes."""
executed = 0
failed = 0
no_action_count = 0
outcomes: list[ActionOutcome] = []
for action in plan.actions:
endpoint = resolve_endpoint(action.action_type)
if endpoint == "no_action":
Expand All @@ -59,13 +88,37 @@ async def execute(self, plan: ActionPlan) -> ExecutionResult:
try:
await self._dispatch(action, endpoint)
executed += 1
except Exception:
outcomes.append(ActionOutcome(
action_type=action.action_type,
endpoint=endpoint,
target_colonist_id=action.target_colonist_id,
success=True,
))
except RimAPIResponseError as exc:
logger.warning("Action %s failed: %s", endpoint, exc.detail)
failed += 1
outcomes.append(ActionOutcome(
action_type=action.action_type,
endpoint=endpoint,
target_colonist_id=action.target_colonist_id,
success=False,
error=_extract_rimapi_error(exc.detail),
))
except Exception as exc:
logger.warning("Action %s failed", endpoint, exc_info=True)
failed += 1
outcomes.append(ActionOutcome(
action_type=action.action_type,
endpoint=endpoint,
target_colonist_id=action.target_colonist_id,
success=False,
error=str(exc) or type(exc).__name__,
))
return ExecutionResult(
executed=executed,
failed=failed,
total=len(plan.actions) - no_action_count,
outcomes=tuple(outcomes),
)

async def _dispatch(self, action: Action, endpoint: str) -> None:
Expand Down
36 changes: 31 additions & 5 deletions src/rle/orchestration/game_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,39 @@ async def run_tick(self) -> TickResult:

# 7. Execute merged plan
exec_result = await self._executor.execute(resolved)
for i, action in enumerate(resolved.actions):
success = i < exec_result.executed
for outcome in exec_result.outcomes:
self._emit(
EventType.ACTION_EXEC, tick_num,
action_type=action.action_type,
target=action.target_colonist_id,
success=success,
action_type=outcome.action_type,
target=outcome.target_colonist_id,
success=outcome.success,
error=outcome.error,
)

# 7b. Surface per-action errors back to agents so they can avoid
# re-proposing the same invalid action next tick (e.g. researching
# an already-finished project, setting priority for a disabled work type).
failed_outcomes = [o for o in exec_result.outcomes if not o.success and o.error]
if failed_outcomes:
error_summary = "; ".join(
f"{o.action_type}({o.target_colonist_id or '-'}): {o.error}"
for o in failed_outcomes[:10]
)
self._spoke_manager.broadcast_message(
MessageType.STATUS_UPDATE,
{
"tick": state.colony.tick,
"summary": f"Last tick action errors — DO NOT REPEAT: {error_summary}",
"action_errors": [
{
"action_type": o.action_type,
"target_colonist_id": o.target_colonist_id,
"error": o.error,
}
for o in failed_outcomes
],
},
sender_id="hub",
)

# 8. Score this tick
Expand Down
65 changes: 65 additions & 0 deletions tests/unit/test_action_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,71 @@ async def test_pending_upstream_action_succeeds(self) -> None:
assert result.failed == 0


class TestOutcomes:
"""ExecutionResult.outcomes carries per-action success/error so the
game loop can surface failures back to agents next tick."""

async def test_success_outcome_recorded(self) -> None:
from rle.orchestration.action_executor import ActionExecutor
client = AsyncMock()
executor = ActionExecutor(client)
plan = _make_plan(
Action(action_type="set_work_priority",
target_colonist_id="42", parameters={"Growing": 1}),
)
result = await executor.execute(plan)
assert len(result.outcomes) == 1
outcome = result.outcomes[0]
assert outcome.success is True
assert outcome.action_type == "set_work_priority"
assert outcome.target_colonist_id == "42"
assert outcome.error is None

async def test_rimapi_error_extracted_into_outcome(self) -> None:
"""RIMAPI's JSON error envelope is unwrapped to a clean human-readable string."""
from rle.orchestration.action_executor import ActionExecutor
from rle.rimapi.client import RimAPIResponseError
client = AsyncMock()
client.set_research_target = AsyncMock(side_effect=RimAPIResponseError(
500,
'{"success":false,"errors":["Research project \'Electricity\' is already finished."],'
'"warnings":[],"timestamp":"2026-05-16T06:00:00Z"}',
))
executor = ActionExecutor(client)
plan = _make_plan(
Action(action_type="set_research_target", parameters={"project": "Electricity"}),
)
result = await executor.execute(plan)
assert result.failed == 1
assert len(result.outcomes) == 1
outcome = result.outcomes[0]
assert outcome.success is False
assert outcome.error == "Research project 'Electricity' is already finished."

async def test_generic_exception_message_captured(self) -> None:
from rle.orchestration.action_executor import ActionExecutor
client = AsyncMock()
client.set_research_target = AsyncMock(side_effect=RuntimeError("boom"))
executor = ActionExecutor(client)
plan = _make_plan(Action(action_type="set_research_target", parameters={}))
result = await executor.execute(plan)
assert result.outcomes[0].success is False
assert result.outcomes[0].error == "boom"

async def test_outcomes_preserve_order(self) -> None:
"""no_action is skipped; outcomes only cover dispatched actions in order."""
from rle.orchestration.action_executor import ActionExecutor
client = AsyncMock()
executor = ActionExecutor(client)
plan = _make_plan(
Action(action_type="no_action"),
Action(action_type="draft_colonist", target_colonist_id="1"),
Action(action_type="draft_colonist", target_colonist_id="2"),
)
result = await executor.execute(plan)
assert [o.target_colonist_id for o in result.outcomes] == ["1", "2"]


class TestDispatch:
async def test_set_work_priority(self) -> None:
client = AsyncMock()
Expand Down
Loading