[loop-cycle-1] refactor: break up run_agentic_loop (#531) #1084

Merged
Timmy merged 1 commits from refactor/break-up-agentic-loop into main 2026-03-23 15:07:01 +00:00
5 changed files with 138 additions and 81 deletions

View File

@@ -31,7 +31,7 @@ logger = logging.getLogger(__name__)
class ModerationVerdict(Enum): class ModerationVerdict(Enum):
"""Result of a moderation check.""" """Result of a moderation check."""
PASS = "pass" PASS = "pass" # noqa: S105
FAIL = "fail" FAIL = "fail"
ERROR = "error" ERROR = "error"
@@ -285,9 +285,7 @@ class ContentModerator:
cleaned = pattern.sub("[GAME_TERM]", cleaned) cleaned = pattern.sub("[GAME_TERM]", cleaned)
return cleaned return cleaned
async def _run_guard( async def _run_guard(self, text: str, profile: GameProfile) -> ModerationResult:
self, text: str, profile: GameProfile
) -> ModerationResult:
"""Layer 2: Run LLM guard model or fall back to regex.""" """Layer 2: Run LLM guard model or fall back to regex."""
if not settings.moderation_enabled: if not settings.moderation_enabled:
return ModerationResult( return ModerationResult(
@@ -326,8 +324,7 @@ class ContentModerator:
data = await resp.json() data = await resp.json()
models = [m.get("name", "") for m in data.get("models", [])] models = [m.get("name", "") for m in data.get("models", [])]
self._guard_available = any( self._guard_available = any(
self._guard_model in m or m.startswith(self._guard_model) self._guard_model in m or m.startswith(self._guard_model) for m in models
for m in models
) )
if not self._guard_available: if not self._guard_available:
logger.info( logger.info(

View File

@@ -242,8 +242,7 @@ class SovereigntyMetricsStore:
).fetchall() ).fetchall()
else: else:
rows = conn.execute( rows = conn.execute(
"SELECT * FROM sovereignty_alerts " "SELECT * FROM sovereignty_alerts ORDER BY timestamp DESC LIMIT ?",
"ORDER BY timestamp DESC LIMIT ?",
(limit,), (limit,),
).fetchall() ).fetchall()
return [dict(row) for row in rows] return [dict(row) for row in rows]

View File

@@ -215,6 +215,119 @@ def _summarize(result: AgenticResult, total_steps: int, was_truncated: bool) ->
result.status = "completed" result.status = "completed"
# ---------------------------------------------------------------------------
# Execution orchestrator
# ---------------------------------------------------------------------------
async def _execute_all_steps(
agent,
task: str,
task_id: str,
steps: list[str],
total_steps: int,
session_id: str,
result: AgenticResult,
on_progress: Callable | None,
) -> list[str]:
"""Execute all planned steps, handling failures with adaptation.
Appends AgenticStep objects to *result.steps* and returns the list
of completed-result strings (used as context for later steps).
"""
completed_results: list[str] = []
for i, step_desc in enumerate(steps, 1):
step_start = time.monotonic()
try:
step = await _execute_step(
agent,
task,
step_desc,
i,
total_steps,
completed_results,
session_id,
)
result.steps.append(step)
completed_results.append(f"Step {i}: {step.result[:200]}")
await _broadcast_progress(
"agentic.step_complete",
{
"task_id": task_id,
"step": i,
"total": total_steps,
"description": step_desc,
"result": step.result[:200],
},
)
if on_progress:
await on_progress(step_desc, i, total_steps)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.warning("Agentic loop step %d failed: %s", i, exc)
step = await _handle_step_failure(
agent,
step_desc,
i,
total_steps,
task_id,
exc,
step_start,
session_id,
result,
completed_results,
on_progress,
)
return completed_results
async def _handle_step_failure(
agent,
step_desc: str,
step_num: int,
total_steps: int,
task_id: str,
exc: Exception,
step_start: float,
session_id: str,
result: AgenticResult,
completed_results: list[str],
on_progress: Callable | None,
) -> None:
"""Try to adapt a failed step; record a hard failure if adaptation also fails."""
try:
step = await _adapt_step(agent, step_desc, step_num, exc, step_start, session_id)
result.steps.append(step)
completed_results.append(f"Step {step_num} (adapted): {step.result[:200]}")
await _broadcast_progress(
"agentic.step_adapted",
{
"task_id": task_id,
"step": step_num,
"total": total_steps,
"description": step_desc,
"error": str(exc),
"adaptation": step.result[:200],
},
)
if on_progress:
await on_progress(f"[Adapted] {step_desc}", step_num, total_steps)
except Exception as adapt_exc: # broad catch intentional
logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
result.steps.append(
AgenticStep(
step_num=step_num,
description=step_desc,
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
status="failed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
)
completed_results.append(f"Step {step_num}: FAILED")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Core loop # Core loop
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -265,65 +378,9 @@ async def run_agentic_loop(
) )
# Phase 2: Execution # Phase 2: Execution
completed_results: list[str] = [] await _execute_all_steps(
for i, step_desc in enumerate(steps, 1): agent, task, task_id, steps, total_steps, session_id, result, on_progress
step_start = time.monotonic() )
try:
step = await _execute_step(
agent,
task,
step_desc,
i,
total_steps,
completed_results,
session_id,
)
result.steps.append(step)
completed_results.append(f"Step {i}: {step.result[:200]}")
await _broadcast_progress(
"agentic.step_complete",
{
"task_id": task_id,
"step": i,
"total": total_steps,
"description": step_desc,
"result": step.result[:200],
},
)
if on_progress:
await on_progress(step_desc, i, total_steps)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.warning("Agentic loop step %d failed: %s", i, exc)
try:
step = await _adapt_step(agent, step_desc, i, exc, step_start, session_id)
result.steps.append(step)
completed_results.append(f"Step {i} (adapted): {step.result[:200]}")
await _broadcast_progress(
"agentic.step_adapted",
{
"task_id": task_id,
"step": i,
"total": total_steps,
"description": step_desc,
"error": str(exc),
"adaptation": step.result[:200],
},
)
if on_progress:
await on_progress(f"[Adapted] {step_desc}", i, total_steps)
except Exception as adapt_exc: # broad catch intentional
logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
result.steps.append(
AgenticStep(
step_num=i,
description=step_desc,
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
status="failed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
)
completed_results.append(f"Step {i}: FAILED")
# Phase 3: Summary # Phase 3: Summary
_summarize(result, total_steps, was_truncated) _summarize(result, total_steps, was_truncated)

View File

@@ -193,9 +193,7 @@ class TestContentModerator:
layer="llama_guard", layer="llama_guard",
category=ViolationCategory.VIOLENCE_GLORIFICATION, category=ViolationCategory.VIOLENCE_GLORIFICATION,
) )
with patch.object( with patch.object(mod, "_run_guard", new_callable=AsyncMock, return_value=low_conf_result):
mod, "_run_guard", new_callable=AsyncMock, return_value=low_conf_result
):
result = await mod.check("sword fight scene", game="morrowind") result = await mod.check("sword fight scene", game="morrowind")
assert result.passed assert result.passed
assert not result.blocked assert not result.blocked
@@ -212,9 +210,7 @@ class TestContentModerator:
layer="llama_guard", layer="llama_guard",
category=ViolationCategory.REAL_WORLD_HARM, category=ViolationCategory.REAL_WORLD_HARM,
) )
with patch.object( with patch.object(mod, "_run_guard", new_callable=AsyncMock, return_value=high_conf_result):
mod, "_run_guard", new_callable=AsyncMock, return_value=high_conf_result
):
result = await mod.check("harmful content", game="morrowind") result = await mod.check("harmful content", game="morrowind")
assert result.blocked assert result.blocked
@@ -229,9 +225,7 @@ class TestContentModerator:
def test_regex_passes_game_violence(self): def test_regex_passes_game_violence(self):
"""Regex should not flag in-game violence narration.""" """Regex should not flag in-game violence narration."""
mod = self._make_moderator() mod = self._make_moderator()
result = mod._check_with_regex( result = mod._check_with_regex("The warrior slays the dragon with a mighty blow.")
"The warrior slays the dragon with a mighty blow."
)
assert result.passed assert result.passed
def test_regex_passes_normal_narration(self): def test_regex_passes_normal_narration(self):
@@ -261,10 +255,14 @@ class TestContentModerator:
async def test_guard_fallback_on_error(self): async def test_guard_fallback_on_error(self):
"""Should fall back to regex when guard model errors.""" """Should fall back to regex when guard model errors."""
mod = self._make_moderator() mod = self._make_moderator()
with patch.object( with (
mod, "_is_guard_available", new_callable=AsyncMock, return_value=True patch.object(mod, "_is_guard_available", new_callable=AsyncMock, return_value=True),
), patch.object( patch.object(
mod, "_check_with_guard", new_callable=AsyncMock, side_effect=RuntimeError("timeout") mod,
"_check_with_guard",
new_callable=AsyncMock,
side_effect=RuntimeError("timeout"),
),
): ):
result = await mod.check("safe text", game="default") result = await mod.check("safe text", game="default")
# Should fall back to regex and pass # Should fall back to regex and pass

View File

@@ -132,7 +132,13 @@ class TestSovereigntyMetricsStore:
def test_graduation_targets_complete(self): def test_graduation_targets_complete(self):
"""All expected metric types have graduation targets.""" """All expected metric types have graduation targets."""
expected = {"cache_hit_rate", "api_cost", "time_to_report", "human_involvement", "local_artifacts"} expected = {
"cache_hit_rate",
"api_cost",
"time_to_report",
"human_involvement",
"local_artifacts",
}
assert set(GRADUATION_TARGETS.keys()) == expected assert set(GRADUATION_TARGETS.keys()) == expected