[claude] Add unit tests for orchestration_loop.py (#1278) (#1311)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled

This commit was merged in pull request #1311.
This commit is contained in:
2026-03-24 02:01:31 +00:00
parent 0b4ed1b756
commit 24f4fd9188

View File

@@ -0,0 +1,485 @@
"""Unit tests for timmy.vassal.orchestration_loop — VassalOrchestrator."""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
# ---------------------------------------------------------------------------
# VassalCycleRecord
# ---------------------------------------------------------------------------
class TestVassalCycleRecord:
def test_defaults(self):
record = VassalCycleRecord(cycle_id=1, started_at="2026-01-01T00:00:00")
assert record.issues_fetched == 0
assert record.issues_dispatched == 0
assert record.errors == []
assert record.stuck_agents == []
assert record.house_warnings == []
assert record.finished_at == ""
assert record.duration_ms == 0
def test_healthy_no_errors_no_warnings(self):
record = VassalCycleRecord(cycle_id=1, started_at="2026-01-01T00:00:00")
assert record.healthy is True
def test_unhealthy_with_errors(self):
record = VassalCycleRecord(cycle_id=1, started_at="2026-01-01T00:00:00")
record.errors.append("something broke")
assert record.healthy is False
def test_unhealthy_with_house_warnings(self):
record = VassalCycleRecord(cycle_id=1, started_at="2026-01-01T00:00:00")
record.house_warnings.append("disk nearly full")
assert record.healthy is False
def test_dispatch_counters(self):
record = VassalCycleRecord(cycle_id=2, started_at="2026-01-01T00:00:00")
record.dispatched_to_claude = 3
record.dispatched_to_kimi = 1
record.dispatched_to_timmy = 2
assert record.dispatched_to_claude + record.dispatched_to_kimi + record.dispatched_to_timmy == 6
# ---------------------------------------------------------------------------
# VassalOrchestrator — properties and get_status
# ---------------------------------------------------------------------------
class TestVassalOrchestratorProperties:
def test_initial_state(self):
orch = VassalOrchestrator()
assert orch.cycle_count == 0
assert orch.is_running is False
assert orch.history == []
def test_get_status_no_cycles(self):
orch = VassalOrchestrator()
status = orch.get_status()
assert status["running"] is False
assert status["cycle_count"] == 0
assert status["last_cycle"] is None
def test_get_status_after_cycle(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="2026-01-01T00:00:00")
record.issues_fetched = 5
record.issues_dispatched = 3
orch._history.append(record)
orch._cycle_count = 1
status = orch.get_status()
assert status["cycle_count"] == 1
lc = status["last_cycle"]
assert lc["cycle_id"] == 1
assert lc["issues_fetched"] == 5
assert lc["issues_dispatched"] == 3
assert lc["healthy"] is True
def test_history_returns_copy(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="now")
orch._history.append(record)
h = orch.history
h.clear()
assert len(orch._history) == 1 # original unmodified
# ---------------------------------------------------------------------------
# _resolve_interval
# ---------------------------------------------------------------------------
class TestResolveInterval:
def test_explicit_interval_used(self):
orch = VassalOrchestrator(cycle_interval=42.0)
assert orch._resolve_interval() == 42.0
def test_falls_back_to_settings(self):
orch = VassalOrchestrator()
mock_settings = MagicMock()
mock_settings.vassal_cycle_interval = 120
with patch("timmy.vassal.orchestration_loop.logger"):
with patch("config.settings", mock_settings):
interval = orch._resolve_interval()
assert interval == 120.0
def test_falls_back_to_default_on_exception(self):
orch = VassalOrchestrator()
with patch("builtins.__import__", side_effect=ImportError("no config")):
# _resolve_interval catches all exceptions and returns 300
interval = orch._resolve_interval()
assert interval == 300.0
# ---------------------------------------------------------------------------
# run_cycle — happy path and graceful degradation
# ---------------------------------------------------------------------------
def _make_backlog_mocks():
"""Return patched versions of all sub-step dependencies."""
mock_fetch = AsyncMock(return_value=[])
mock_triage = MagicMock(return_value=[])
mock_registry = MagicMock(return_value={})
mock_dispatch = AsyncMock()
return mock_fetch, mock_triage, mock_registry, mock_dispatch
class TestRunCycle:
@pytest.mark.asyncio
async def test_increments_cycle_count(self):
orch = VassalOrchestrator(cycle_interval=0)
with (
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast", new_callable=AsyncMock),
):
await orch.run_cycle()
await orch.run_cycle()
assert orch.cycle_count == 2
@pytest.mark.asyncio
async def test_record_appended_to_history(self):
orch = VassalOrchestrator(cycle_interval=0)
with (
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast", new_callable=AsyncMock),
):
record = await orch.run_cycle()
assert len(orch.history) == 1
assert orch.history[0].cycle_id == 1
assert record.finished_at != ""
assert record.duration_ms >= 0
@pytest.mark.asyncio
async def test_backlog_step_failure_recorded(self):
orch = VassalOrchestrator(cycle_interval=0)
async def bad_backlog(record):
raise RuntimeError("gitea down")
with (
patch.object(orch, "_step_backlog", side_effect=bad_backlog),
patch.object(orch, "_step_agent_health", new_callable=AsyncMock),
patch.object(orch, "_step_house_health", new_callable=AsyncMock),
patch.object(orch, "_broadcast", new_callable=AsyncMock),
):
record = await orch.run_cycle()
# Errors from step failures bubble up through the step itself;
# the test verifies the cycle still completes.
assert record.cycle_id == 1
@pytest.mark.asyncio
async def test_broadcast_called(self):
orch = VassalOrchestrator(cycle_interval=0)
broadcast_mock = AsyncMock()
with (
patch.object(orch, "_step_backlog", new_callable=AsyncMock),
patch.object(orch, "_step_agent_health", new_callable=AsyncMock),
patch.object(orch, "_step_house_health", new_callable=AsyncMock),
patch.object(orch, "_broadcast", broadcast_mock),
):
await orch.run_cycle()
broadcast_mock.assert_awaited_once()
# ---------------------------------------------------------------------------
# _step_backlog
# ---------------------------------------------------------------------------
class TestStepBacklog:
@pytest.mark.asyncio
async def test_no_issues_returns_early(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="now")
mock_fetch = AsyncMock(return_value=[])
mock_triage = MagicMock(return_value=[])
with (
patch("timmy.vassal.backlog.fetch_open_issues", mock_fetch),
patch("timmy.vassal.backlog.triage_issues", mock_triage),
patch("timmy.vassal.dispatch.get_dispatch_registry", MagicMock(return_value={})),
):
await orch._step_backlog(record)
assert record.issues_fetched == 0
assert record.issues_dispatched == 0
@pytest.mark.asyncio
async def test_exception_adds_to_errors(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="now")
with patch(
"timmy.vassal.orchestration_loop.__import__",
side_effect=ImportError("no backlog"),
):
# Trigger failure by making fetch_open_issues raise
with patch(
"timmy.vassal.backlog.fetch_open_issues",
AsyncMock(side_effect=RuntimeError("fetch failed")),
):
await orch._step_backlog(record)
assert any("backlog" in e for e in record.errors)
@pytest.mark.asyncio
async def test_dispatches_up_to_max(self):
from timmy.vassal.backlog import AgentTarget
orch = VassalOrchestrator(max_dispatch_per_cycle=2)
record = VassalCycleRecord(cycle_id=1, started_at="now")
issues = []
for i in range(5):
issue = MagicMock()
issue.number = i + 1
issue.agent_target = AgentTarget.CLAUDE
issues.append(issue)
mock_fetch = AsyncMock(return_value=issues)
mock_triage = MagicMock(return_value=issues)
mock_registry = MagicMock(return_value={})
mock_dispatch = AsyncMock()
with (
patch("timmy.vassal.backlog.fetch_open_issues", mock_fetch),
patch("timmy.vassal.backlog.triage_issues", mock_triage),
patch("timmy.vassal.dispatch.get_dispatch_registry", mock_registry),
patch("timmy.vassal.dispatch.dispatch_issue", mock_dispatch),
):
await orch._step_backlog(record)
assert record.issues_dispatched == 2
assert record.issues_fetched == 5
@pytest.mark.asyncio
async def test_already_dispatched_skipped(self):
from timmy.vassal.backlog import AgentTarget
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="now")
issue = MagicMock()
issue.number = 42
issue.agent_target = AgentTarget.TIMMY
mock_fetch = AsyncMock(return_value=[issue])
mock_triage = MagicMock(return_value=[issue])
mock_registry = MagicMock(return_value={42: "already done"})
mock_dispatch = AsyncMock()
with (
patch("timmy.vassal.backlog.fetch_open_issues", mock_fetch),
patch("timmy.vassal.backlog.triage_issues", mock_triage),
patch("timmy.vassal.dispatch.get_dispatch_registry", mock_registry),
patch("timmy.vassal.dispatch.dispatch_issue", mock_dispatch),
):
await orch._step_backlog(record)
mock_dispatch.assert_not_awaited()
assert record.issues_dispatched == 0
# ---------------------------------------------------------------------------
# _step_agent_health
# ---------------------------------------------------------------------------
class TestStepAgentHealth:
@pytest.mark.asyncio
async def test_stuck_agents_recorded(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="now")
stuck = MagicMock()
stuck.is_stuck = True
stuck.agent = "claude"
stuck.stuck_issue_numbers = [101, 102]
not_stuck = MagicMock()
not_stuck.is_stuck = False
health_report = MagicMock()
health_report.agents = [stuck, not_stuck]
mock_get_report = AsyncMock(return_value=health_report)
mock_nudge = AsyncMock(return_value=True)
mock_settings = MagicMock()
mock_settings.vassal_stuck_threshold_minutes = 60
with (
patch("timmy.vassal.agent_health.get_full_health_report", mock_get_report),
patch("timmy.vassal.agent_health.nudge_stuck_agent", mock_nudge),
patch("config.settings", mock_settings),
):
await orch._step_agent_health(record)
assert "claude" in record.stuck_agents
assert record.nudges_sent == 2
@pytest.mark.asyncio
async def test_exception_adds_to_errors(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="now")
with patch(
"timmy.vassal.agent_health.get_full_health_report",
AsyncMock(side_effect=RuntimeError("health check failed")),
):
await orch._step_agent_health(record)
assert any("agent_health" in e for e in record.errors)
# ---------------------------------------------------------------------------
# _step_house_health
# ---------------------------------------------------------------------------
class TestStepHouseHealth:
@pytest.mark.asyncio
async def test_warnings_recorded(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="now")
snapshot = MagicMock()
snapshot.warnings = ["low disk", "high cpu"]
snapshot.disk = MagicMock()
snapshot.disk.percent_used = 50.0
with patch("timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)):
await orch._step_house_health(record)
assert record.house_warnings == ["low disk", "high cpu"]
@pytest.mark.asyncio
async def test_cleanup_triggered_above_80_percent(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="now")
snapshot = MagicMock()
snapshot.warnings = []
snapshot.disk = MagicMock()
snapshot.disk.percent_used = 85.0
mock_cleanup = AsyncMock(return_value={"deleted_count": 7})
with (
patch("timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)),
patch("timmy.vassal.house_health.cleanup_stale_files", mock_cleanup),
):
await orch._step_house_health(record)
assert record.cleanup_deleted == 7
mock_cleanup.assert_awaited_once()
@pytest.mark.asyncio
async def test_exception_adds_to_errors(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="now")
with patch(
"timmy.vassal.house_health.get_system_snapshot",
AsyncMock(side_effect=OSError("no disk info")),
):
await orch._step_house_health(record)
assert any("house_health" in e for e in record.errors)
# ---------------------------------------------------------------------------
# _broadcast — best-effort, swallows errors
# ---------------------------------------------------------------------------
class TestBroadcast:
@pytest.mark.asyncio
async def test_successful_broadcast(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="2026-01-01T00:00:00")
record.finished_at = "2026-01-01T00:00:01"
record.duration_ms = 100
mock_ws = MagicMock()
mock_ws.broadcast = AsyncMock()
mock_module = MagicMock()
mock_module.ws_manager = mock_ws
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": mock_module}):
await orch._broadcast(record)
mock_ws.broadcast.assert_awaited_once()
call_args = mock_ws.broadcast.call_args
assert call_args[0][0] == "vassal.cycle"
payload = call_args[0][1]
assert payload["cycle_id"] == 1
@pytest.mark.asyncio
async def test_import_error_swallowed(self):
orch = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="2026-01-01T00:00:00")
record.finished_at = "now"
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": None}):
# Should not raise
await orch._broadcast(record)
# ---------------------------------------------------------------------------
# start / stop
# ---------------------------------------------------------------------------
class TestStartStop:
@pytest.mark.asyncio
async def test_start_sets_running(self):
orch = VassalOrchestrator(cycle_interval=9999)
with patch.object(orch, "run_cycle", new_callable=AsyncMock):
await orch.start()
assert orch.is_running is True
orch.stop()
if orch._task and not orch._task.done():
orch._task.cancel()
@pytest.mark.asyncio
async def test_double_start_ignored(self):
orch = VassalOrchestrator(cycle_interval=9999)
with patch.object(orch, "run_cycle", new_callable=AsyncMock):
await orch.start()
task1 = orch._task
await orch.start() # second call — should be ignored
assert orch._task is task1
orch.stop()
if orch._task and not orch._task.done():
orch._task.cancel()
def test_stop_sets_not_running(self):
orch = VassalOrchestrator()
orch._running = True
orch.stop()
assert orch.is_running is False