From 51b13384537f8e5fede341cf1915157ed5d63bc0 Mon Sep 17 00:00:00 2001 From: kimi Date: Mon, 23 Mar 2026 22:00:59 -0400 Subject: [PATCH] test: Add unit tests for orchestration_loop.py Add comprehensive test coverage for the VassalOrchestrator core module: - VassalCycleRecord dataclass tests (creation, health property) - VassalOrchestrator initialization tests - run_cycle() tests for backlog, agent health, house health steps - Background loop start/stop tests - Interval resolution tests - WebSocket broadcast tests (success and graceful degradation) - Module singleton tests All external dependencies (Gitea, settings, WebSocket manager) are mocked. Tests verify graceful error handling at each step. Fixes #1278 --- tests/timmy/test_orchestration_loop.py | 667 +++++++++++++++++++++++++ 1 file changed, 667 insertions(+) create mode 100644 tests/timmy/test_orchestration_loop.py diff --git a/tests/timmy/test_orchestration_loop.py b/tests/timmy/test_orchestration_loop.py new file mode 100644 index 00000000..a9ad242c --- /dev/null +++ b/tests/timmy/test_orchestration_loop.py @@ -0,0 +1,667 @@ +"""Tests for timmy.vassal.orchestration_loop — VassalOrchestrator core module. + +Refs #1278 +""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator + +# ----------------------------------------------------------------------------- +# VassalCycleRecord tests +# ----------------------------------------------------------------------------- + + +class TestVassalCycleRecord: + """Unit tests for the VassalCycleRecord dataclass.""" + + def test_creation_defaults(self): + """Test creating a cycle record with minimal fields.""" + record = VassalCycleRecord( + cycle_id=1, + started_at="2026-03-23T12:00:00+00:00", + ) + assert record.cycle_id == 1 + assert record.started_at == "2026-03-23T12:00:00+00:00" + assert record.finished_at == "" + assert record.duration_ms == 0 + assert record.issues_fetched == 0 + assert record.issues_dispatched == 0 + assert record.stuck_agents == [] + assert record.house_warnings == [] + assert record.errors == [] + + def test_healthy_property_no_issues(self): + """Record is healthy when no errors or warnings.""" + record = VassalCycleRecord( + cycle_id=1, + started_at="2026-03-23T12:00:00+00:00", + ) + assert record.healthy is True + + def test_healthy_property_with_errors(self): + """Record is unhealthy when errors exist.""" + record = VassalCycleRecord( + cycle_id=1, + started_at="2026-03-23T12:00:00+00:00", + errors=["backlog: Connection failed"], + ) + assert record.healthy is False + + def test_healthy_property_with_warnings(self): + """Record is unhealthy when house warnings exist.""" + record = VassalCycleRecord( + cycle_id=1, + started_at="2026-03-23T12:00:00+00:00", + house_warnings=["Disk: 90% used"], + ) + assert record.healthy is False + + def test_full_populated_record(self): + """Test a fully populated cycle record.""" + record = VassalCycleRecord( + cycle_id=5, + started_at="2026-03-23T12:00:00+00:00", + finished_at="2026-03-23T12:00:01+00:00", + duration_ms=1000, + issues_fetched=10, + issues_dispatched=3, + dispatched_to_claude=1, + dispatched_to_kimi=1, + dispatched_to_timmy=1, + stuck_agents=["claude"], + nudges_sent=1, + house_warnings=[], + cleanup_deleted=0, + errors=[], + ) + assert record.cycle_id == 5 + assert record.duration_ms == 1000 + assert record.healthy is True + + +# ----------------------------------------------------------------------------- +# VassalOrchestrator initialization tests +# ----------------------------------------------------------------------------- + + +class TestVassalOrchestratorInit: + """Tests for VassalOrchestrator initialization.""" + + def test_default_initialization(self): + """Test default initialization with no parameters.""" + orchestrator = VassalOrchestrator() + assert orchestrator.cycle_count == 0 + assert orchestrator.is_running is False + assert orchestrator.history == [] + assert orchestrator._max_dispatch == 10 + + def test_custom_interval(self): + """Test initialization with custom cycle interval.""" + orchestrator = VassalOrchestrator(cycle_interval=60.0) + assert orchestrator._cycle_interval == 60.0 + + def test_custom_max_dispatch(self): + """Test initialization with custom max dispatch.""" + orchestrator = VassalOrchestrator(max_dispatch_per_cycle=5) + assert orchestrator._max_dispatch == 5 + + def test_get_status_empty_history(self): + """Test get_status when no cycles have run.""" + orchestrator = VassalOrchestrator() + status = orchestrator.get_status() + assert status["running"] is False + assert status["cycle_count"] == 0 + assert status["last_cycle"] is None + + +# ----------------------------------------------------------------------------- +# Run cycle tests +# ----------------------------------------------------------------------------- + + +class TestRunCycle: + """Tests for the run_cycle method.""" + + @pytest.fixture + def orchestrator(self): + """Create a fresh orchestrator for each test.""" + return VassalOrchestrator() + + @pytest.fixture(autouse=True) + def _clear_dispatch_registry(self): + """Clear dispatch registry before each test.""" + from timmy.vassal.dispatch import clear_dispatch_registry + + clear_dispatch_registry() + yield + clear_dispatch_registry() + + @pytest.mark.asyncio + async def test_run_cycle_empty_backlog(self, orchestrator): + """Test a cycle with no issues to process.""" + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = [] + record = await orchestrator.run_cycle() + + assert record.cycle_id == 1 + assert record.issues_fetched == 0 + assert record.issues_dispatched == 0 + assert record.duration_ms >= 0 + assert record.finished_at != "" + assert orchestrator.cycle_count == 1 + assert len(orchestrator.history) == 1 + + @pytest.mark.asyncio + async def test_run_cycle_dispatches_issues(self, orchestrator): + """Test dispatching issues to agents.""" + + mock_issue = { + "number": 123, + "title": "Test issue", + "body": "Test body", + "labels": [], + "assignees": [], + "html_url": "http://test/123", + } + + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = [mock_issue] + with patch( + "timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock + ) as mock_dispatch: + mock_dispatch.return_value = MagicMock() + record = await orchestrator.run_cycle() + + assert record.cycle_id == 1 + assert record.issues_fetched == 1 + assert record.issues_dispatched == 1 + mock_dispatch.assert_awaited_once() + + @pytest.mark.asyncio + async def test_run_cycle_respects_max_dispatch(self, orchestrator): + """Test that max_dispatch_per_cycle limits dispatches.""" + mock_issues = [ + { + "number": i, + "title": f"Issue {i}", + "body": "Test", + "labels": [], + "assignees": [], + "html_url": f"http://test/{i}", + } + for i in range(1, 15) + ] + + orchestrator._max_dispatch = 3 + + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = mock_issues + with patch( + "timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock + ) as mock_dispatch: + mock_dispatch.return_value = MagicMock() + record = await orchestrator.run_cycle() + + assert record.issues_fetched == 14 + assert record.issues_dispatched == 3 + assert mock_dispatch.await_count == 3 + + @pytest.mark.asyncio + async def test_run_cycle_skips_already_dispatched(self, orchestrator): + """Test that already dispatched issues are skipped.""" + mock_issues = [ + { + "number": 1, + "title": "Issue 1", + "body": "Test", + "labels": [], + "assignees": [], + "html_url": "http://test/1", + }, + { + "number": 2, + "title": "Issue 2", + "body": "Test", + "labels": [], + "assignees": [], + "html_url": "http://test/2", + }, + ] + + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = mock_issues + with patch( + "timmy.vassal.dispatch.get_dispatch_registry" + ) as mock_registry: + # Issue 1 already dispatched + mock_registry.return_value = {1: MagicMock()} + with patch( + "timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock + ) as mock_dispatch: + mock_dispatch.return_value = MagicMock() + record = await orchestrator.run_cycle() + + assert record.issues_fetched == 2 + assert record.issues_dispatched == 1 + mock_dispatch.assert_awaited_once() + # Should be called with issue 2 + call_args = mock_dispatch.call_args[0][0] + assert call_args.number == 2 + + @pytest.mark.asyncio + async def test_run_cycle_tracks_agent_targets(self, orchestrator): + """Test that dispatch counts are tracked per agent.""" + + mock_issues = [ + { + "number": 1, + "title": "Architecture refactor", # Should route to Claude + "body": "Test", + "labels": [], + "assignees": [], + "html_url": "http://test/1", + }, + { + "number": 2, + "title": "Research analysis", # Should route to Kimi + "body": "Test", + "labels": [], + "assignees": [], + "html_url": "http://test/2", + }, + { + "number": 3, + "title": "Docs update", # Should route to Timmy + "body": "Test", + "labels": [], + "assignees": [], + "html_url": "http://test/3", + }, + ] + + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = mock_issues + with patch( + "timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock + ) as mock_dispatch: + mock_dispatch.return_value = MagicMock() + record = await orchestrator.run_cycle() + + assert record.issues_dispatched == 3 + assert record.dispatched_to_claude == 1 + assert record.dispatched_to_kimi == 1 + assert record.dispatched_to_timmy == 1 + + @pytest.mark.asyncio + async def test_run_cycle_handles_backlog_error(self, orchestrator): + """Test graceful handling of backlog step errors.""" + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.side_effect = RuntimeError("Gitea down") + record = await orchestrator.run_cycle() + + assert record.cycle_id == 1 + assert record.issues_fetched == 0 + assert len(record.errors) == 1 + assert "backlog" in record.errors[0] + assert record.healthy is False + + @pytest.mark.asyncio + async def test_run_cycle_handles_agent_health_error(self, orchestrator): + """Test graceful handling of agent health step errors.""" + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = [] + with patch( + "timmy.vassal.agent_health.get_full_health_report", + new_callable=AsyncMock, + ) as mock_health: + mock_health.side_effect = RuntimeError("Health check failed") + record = await orchestrator.run_cycle() + + assert len(record.errors) == 1 + assert "agent_health" in record.errors[0] + + @pytest.mark.asyncio + async def test_run_cycle_handles_house_health_error(self, orchestrator): + """Test graceful handling of house health step errors.""" + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = [] + with patch( + "timmy.vassal.house_health.get_system_snapshot", + new_callable=AsyncMock, + ) as mock_snapshot: + mock_snapshot.side_effect = RuntimeError("Snapshot failed") + record = await orchestrator.run_cycle() + + assert len(record.errors) == 1 + assert "house_health" in record.errors[0] + + @pytest.mark.asyncio + async def test_run_cycle_detects_stuck_agents(self, orchestrator): + """Test detection and nudging of stuck agents.""" + from dataclasses import dataclass, field + + @dataclass + class MockAgentStatus: + agent: str + is_stuck: bool = False + is_idle: bool = False + stuck_issue_numbers: list = field(default_factory=list) + + mock_report = MagicMock() + mock_report.agents = [ + MockAgentStatus(agent="claude", is_stuck=True, stuck_issue_numbers=[100]), + MockAgentStatus(agent="kimi", is_stuck=False), + ] + + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = [] + with patch( + "timmy.vassal.agent_health.get_full_health_report", + new_callable=AsyncMock, + ) as mock_health: + mock_health.return_value = mock_report + with patch( + "timmy.vassal.agent_health.nudge_stuck_agent", + new_callable=AsyncMock, + ) as mock_nudge: + mock_nudge.return_value = True + record = await orchestrator.run_cycle() + + assert "claude" in record.stuck_agents + assert record.nudges_sent == 1 + mock_nudge.assert_awaited_once_with("claude", 100) + + @pytest.mark.asyncio + async def test_run_cycle_triggers_cleanup_on_high_disk(self, orchestrator): + """Test cleanup is triggered when disk usage is high.""" + mock_snapshot = MagicMock() + mock_snapshot.disk.percent_used = 85.0 # Above 80% threshold + mock_snapshot.warnings = ["Disk: 85% used"] + + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = [] + with patch( + "timmy.vassal.house_health.get_system_snapshot", + new_callable=AsyncMock, + ) as mock_snapshot_fn: + mock_snapshot_fn.return_value = mock_snapshot + with patch( + "timmy.vassal.house_health.cleanup_stale_files", + new_callable=AsyncMock, + ) as mock_cleanup: + mock_cleanup.return_value = {"deleted_count": 5} + record = await orchestrator.run_cycle() + + assert record.cleanup_deleted == 5 + assert record.house_warnings == ["Disk: 85% used"] + + @pytest.mark.asyncio + async def test_get_status_after_cycle(self, orchestrator): + """Test get_status returns correct info after a cycle.""" + with patch( + "timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast" + ) as mock_broadcast: + mock_broadcast.return_value = None + with patch( + "timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = [] + await orchestrator.run_cycle() + + status = orchestrator.get_status() + assert status["running"] is False + assert status["cycle_count"] == 1 + assert status["last_cycle"] is not None + assert status["last_cycle"]["cycle_id"] == 1 + assert status["last_cycle"]["issues_fetched"] == 0 + assert status["last_cycle"]["healthy"] is True + + +# ----------------------------------------------------------------------------- +# Background loop tests +# ----------------------------------------------------------------------------- + + +class TestBackgroundLoop: + """Tests for the start/stop background loop methods.""" + + @pytest.fixture + def orchestrator(self): + """Create a fresh orchestrator for each test.""" + return VassalOrchestrator(cycle_interval=0.1) + + @pytest.mark.asyncio + async def test_start_stop_cycle(self, orchestrator): + """Test starting and stopping the background loop.""" + with patch.object(orchestrator, "run_cycle", new_callable=AsyncMock) as mock_run: + mock_run.return_value = MagicMock() + + # Start the loop + await orchestrator.start() + assert orchestrator.is_running is True + assert orchestrator._task is not None + + # Let it run for a bit + await asyncio.sleep(0.25) + + # Stop the loop + orchestrator.stop() + assert orchestrator.is_running is False + + # Should have run at least once + assert mock_run.await_count >= 1 + + @pytest.mark.asyncio + async def test_start_already_running(self, orchestrator): + """Test starting when already running is a no-op.""" + with patch.object(orchestrator, "run_cycle", new_callable=AsyncMock): + await orchestrator.start() + first_task = orchestrator._task + + # Start again should not create new task + await orchestrator.start() + assert orchestrator._task is first_task + + orchestrator.stop() + + @pytest.mark.asyncio + async def test_stop_not_running(self, orchestrator): + """Test stopping when not running is a no-op.""" + orchestrator.stop() + assert orchestrator.is_running is False + assert orchestrator._task is None + + @pytest.mark.asyncio + async def test_loop_handles_cycle_exceptions(self, orchestrator): + """Test that exceptions in run_cycle don't crash the loop.""" + with patch.object( + orchestrator, "run_cycle", new_callable=AsyncMock + ) as mock_run: + mock_run.side_effect = [RuntimeError("Boom"), MagicMock()] + + await orchestrator.start() + await asyncio.sleep(0.25) + orchestrator.stop() + + # Should have been called multiple times despite error + assert mock_run.await_count >= 2 + + +# ----------------------------------------------------------------------------- +# Interval resolution tests +# ----------------------------------------------------------------------------- + + +class TestIntervalResolution: + """Tests for the _resolve_interval method.""" + + def test_resolve_interval_explicit(self): + """Test that explicit interval is used when provided.""" + orchestrator = VassalOrchestrator(cycle_interval=60.0) + assert orchestrator._resolve_interval() == 60.0 + + def test_resolve_interval_from_settings(self): + """Test interval is read from settings when not explicitly set.""" + orchestrator = VassalOrchestrator() + mock_settings = MagicMock() + mock_settings.vassal_cycle_interval = 120.0 + + with patch("config.settings", mock_settings): + assert orchestrator._resolve_interval() == 120.0 + + def test_resolve_interval_default_fallback(self): + """Test default 300s is used when settings fails.""" + orchestrator = VassalOrchestrator() + + with patch("config.settings", None): + assert orchestrator._resolve_interval() == 300.0 + + +# ----------------------------------------------------------------------------- +# Broadcast tests +# ----------------------------------------------------------------------------- + + +class TestBroadcast: + """Tests for the _broadcast helper.""" + + @pytest.mark.asyncio + async def test_broadcast_success(self): + """Test successful WebSocket broadcast.""" + orchestrator = VassalOrchestrator() + record = VassalCycleRecord( + cycle_id=1, + started_at="2026-03-23T12:00:00+00:00", + finished_at="2026-03-23T12:00:01+00:00", + duration_ms=1000, + issues_fetched=5, + issues_dispatched=2, + ) + + mock_ws_manager = MagicMock() + mock_ws_manager.broadcast = AsyncMock() + + with patch( + "infrastructure.ws_manager.handler.ws_manager", mock_ws_manager + ): + await orchestrator._broadcast(record) + + mock_ws_manager.broadcast.assert_awaited_once() + call_args = mock_ws_manager.broadcast.call_args[0] + assert call_args[0] == "vassal.cycle" + assert call_args[1]["cycle_id"] == 1 + assert call_args[1]["healthy"] is True + + @pytest.mark.asyncio + async def test_broadcast_graceful_degradation(self): + """Test broadcast gracefully handles errors.""" + orchestrator = VassalOrchestrator() + record = VassalCycleRecord(cycle_id=1, started_at="2026-03-23T12:00:00+00:00") + + with patch( + "infrastructure.ws_manager.handler.ws_manager" + ) as mock_ws_manager: + mock_ws_manager.broadcast = AsyncMock( + side_effect=RuntimeError("WS disconnected") + ) + # Should not raise + await orchestrator._broadcast(record) + + @pytest.mark.asyncio + async def test_broadcast_import_error(self): + """Test broadcast handles missing ws_manager module.""" + orchestrator = VassalOrchestrator() + record = VassalCycleRecord(cycle_id=1, started_at="2026-03-23T12:00:00+00:00") + + with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": None}): + # Should not raise + await orchestrator._broadcast(record) + + +# ----------------------------------------------------------------------------- +# Module singleton test +# ----------------------------------------------------------------------------- + + +class TestModuleSingleton: + """Tests for the module-level vassal_orchestrator singleton.""" + + def test_singleton_import(self): + """Test that the module-level singleton is available.""" + from timmy.vassal import vassal_orchestrator + + assert isinstance(vassal_orchestrator, VassalOrchestrator) + + def test_singleton_is_single_instance(self): + """Test that importing twice returns same instance.""" + from timmy.vassal import vassal_orchestrator as orch1 + from timmy.vassal import vassal_orchestrator as orch2 + + assert orch1 is orch2 + + +# Need to import asyncio for the background loop tests +import asyncio # noqa: E402 -- 2.43.0