Compare commits

...

1 Commits

Author SHA1 Message Date
kimi
51b1338453 test: Add unit tests for orchestration_loop.py
Some checks failed
Tests / lint (pull_request) Failing after 41s
Tests / test (pull_request) Has been skipped
Add comprehensive test coverage for the VassalOrchestrator core module:

- VassalCycleRecord dataclass tests (creation, health property)
- VassalOrchestrator initialization tests
- run_cycle() tests for backlog, agent health, house health steps
- Background loop start/stop tests
- Interval resolution tests
- WebSocket broadcast tests (success and graceful degradation)
- Module singleton tests

All external dependencies (Gitea, settings, WebSocket manager) are mocked.
Tests verify graceful error handling at each step.

Fixes #1278
2026-03-23 22:00:59 -04:00

View File

@@ -0,0 +1,667 @@
"""Tests for timmy.vassal.orchestration_loop — VassalOrchestrator core module.
Refs #1278
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
# -----------------------------------------------------------------------------
# VassalCycleRecord tests
# -----------------------------------------------------------------------------
class TestVassalCycleRecord:
"""Unit tests for the VassalCycleRecord dataclass."""
def test_creation_defaults(self):
"""Test creating a cycle record with minimal fields."""
record = VassalCycleRecord(
cycle_id=1,
started_at="2026-03-23T12:00:00+00:00",
)
assert record.cycle_id == 1
assert record.started_at == "2026-03-23T12:00:00+00:00"
assert record.finished_at == ""
assert record.duration_ms == 0
assert record.issues_fetched == 0
assert record.issues_dispatched == 0
assert record.stuck_agents == []
assert record.house_warnings == []
assert record.errors == []
def test_healthy_property_no_issues(self):
"""Record is healthy when no errors or warnings."""
record = VassalCycleRecord(
cycle_id=1,
started_at="2026-03-23T12:00:00+00:00",
)
assert record.healthy is True
def test_healthy_property_with_errors(self):
"""Record is unhealthy when errors exist."""
record = VassalCycleRecord(
cycle_id=1,
started_at="2026-03-23T12:00:00+00:00",
errors=["backlog: Connection failed"],
)
assert record.healthy is False
def test_healthy_property_with_warnings(self):
"""Record is unhealthy when house warnings exist."""
record = VassalCycleRecord(
cycle_id=1,
started_at="2026-03-23T12:00:00+00:00",
house_warnings=["Disk: 90% used"],
)
assert record.healthy is False
def test_full_populated_record(self):
"""Test a fully populated cycle record."""
record = VassalCycleRecord(
cycle_id=5,
started_at="2026-03-23T12:00:00+00:00",
finished_at="2026-03-23T12:00:01+00:00",
duration_ms=1000,
issues_fetched=10,
issues_dispatched=3,
dispatched_to_claude=1,
dispatched_to_kimi=1,
dispatched_to_timmy=1,
stuck_agents=["claude"],
nudges_sent=1,
house_warnings=[],
cleanup_deleted=0,
errors=[],
)
assert record.cycle_id == 5
assert record.duration_ms == 1000
assert record.healthy is True
# -----------------------------------------------------------------------------
# VassalOrchestrator initialization tests
# -----------------------------------------------------------------------------
class TestVassalOrchestratorInit:
"""Tests for VassalOrchestrator initialization."""
def test_default_initialization(self):
"""Test default initialization with no parameters."""
orchestrator = VassalOrchestrator()
assert orchestrator.cycle_count == 0
assert orchestrator.is_running is False
assert orchestrator.history == []
assert orchestrator._max_dispatch == 10
def test_custom_interval(self):
"""Test initialization with custom cycle interval."""
orchestrator = VassalOrchestrator(cycle_interval=60.0)
assert orchestrator._cycle_interval == 60.0
def test_custom_max_dispatch(self):
"""Test initialization with custom max dispatch."""
orchestrator = VassalOrchestrator(max_dispatch_per_cycle=5)
assert orchestrator._max_dispatch == 5
def test_get_status_empty_history(self):
"""Test get_status when no cycles have run."""
orchestrator = VassalOrchestrator()
status = orchestrator.get_status()
assert status["running"] is False
assert status["cycle_count"] == 0
assert status["last_cycle"] is None
# -----------------------------------------------------------------------------
# Run cycle tests
# -----------------------------------------------------------------------------
class TestRunCycle:
"""Tests for the run_cycle method."""
@pytest.fixture
def orchestrator(self):
"""Create a fresh orchestrator for each test."""
return VassalOrchestrator()
@pytest.fixture(autouse=True)
def _clear_dispatch_registry(self):
"""Clear dispatch registry before each test."""
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
yield
clear_dispatch_registry()
@pytest.mark.asyncio
async def test_run_cycle_empty_backlog(self, orchestrator):
"""Test a cycle with no issues to process."""
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = []
record = await orchestrator.run_cycle()
assert record.cycle_id == 1
assert record.issues_fetched == 0
assert record.issues_dispatched == 0
assert record.duration_ms >= 0
assert record.finished_at != ""
assert orchestrator.cycle_count == 1
assert len(orchestrator.history) == 1
@pytest.mark.asyncio
async def test_run_cycle_dispatches_issues(self, orchestrator):
"""Test dispatching issues to agents."""
mock_issue = {
"number": 123,
"title": "Test issue",
"body": "Test body",
"labels": [],
"assignees": [],
"html_url": "http://test/123",
}
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = [mock_issue]
with patch(
"timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock
) as mock_dispatch:
mock_dispatch.return_value = MagicMock()
record = await orchestrator.run_cycle()
assert record.cycle_id == 1
assert record.issues_fetched == 1
assert record.issues_dispatched == 1
mock_dispatch.assert_awaited_once()
@pytest.mark.asyncio
async def test_run_cycle_respects_max_dispatch(self, orchestrator):
"""Test that max_dispatch_per_cycle limits dispatches."""
mock_issues = [
{
"number": i,
"title": f"Issue {i}",
"body": "Test",
"labels": [],
"assignees": [],
"html_url": f"http://test/{i}",
}
for i in range(1, 15)
]
orchestrator._max_dispatch = 3
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = mock_issues
with patch(
"timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock
) as mock_dispatch:
mock_dispatch.return_value = MagicMock()
record = await orchestrator.run_cycle()
assert record.issues_fetched == 14
assert record.issues_dispatched == 3
assert mock_dispatch.await_count == 3
@pytest.mark.asyncio
async def test_run_cycle_skips_already_dispatched(self, orchestrator):
"""Test that already dispatched issues are skipped."""
mock_issues = [
{
"number": 1,
"title": "Issue 1",
"body": "Test",
"labels": [],
"assignees": [],
"html_url": "http://test/1",
},
{
"number": 2,
"title": "Issue 2",
"body": "Test",
"labels": [],
"assignees": [],
"html_url": "http://test/2",
},
]
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = mock_issues
with patch(
"timmy.vassal.dispatch.get_dispatch_registry"
) as mock_registry:
# Issue 1 already dispatched
mock_registry.return_value = {1: MagicMock()}
with patch(
"timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock
) as mock_dispatch:
mock_dispatch.return_value = MagicMock()
record = await orchestrator.run_cycle()
assert record.issues_fetched == 2
assert record.issues_dispatched == 1
mock_dispatch.assert_awaited_once()
# Should be called with issue 2
call_args = mock_dispatch.call_args[0][0]
assert call_args.number == 2
@pytest.mark.asyncio
async def test_run_cycle_tracks_agent_targets(self, orchestrator):
"""Test that dispatch counts are tracked per agent."""
mock_issues = [
{
"number": 1,
"title": "Architecture refactor", # Should route to Claude
"body": "Test",
"labels": [],
"assignees": [],
"html_url": "http://test/1",
},
{
"number": 2,
"title": "Research analysis", # Should route to Kimi
"body": "Test",
"labels": [],
"assignees": [],
"html_url": "http://test/2",
},
{
"number": 3,
"title": "Docs update", # Should route to Timmy
"body": "Test",
"labels": [],
"assignees": [],
"html_url": "http://test/3",
},
]
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = mock_issues
with patch(
"timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock
) as mock_dispatch:
mock_dispatch.return_value = MagicMock()
record = await orchestrator.run_cycle()
assert record.issues_dispatched == 3
assert record.dispatched_to_claude == 1
assert record.dispatched_to_kimi == 1
assert record.dispatched_to_timmy == 1
@pytest.mark.asyncio
async def test_run_cycle_handles_backlog_error(self, orchestrator):
"""Test graceful handling of backlog step errors."""
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.side_effect = RuntimeError("Gitea down")
record = await orchestrator.run_cycle()
assert record.cycle_id == 1
assert record.issues_fetched == 0
assert len(record.errors) == 1
assert "backlog" in record.errors[0]
assert record.healthy is False
@pytest.mark.asyncio
async def test_run_cycle_handles_agent_health_error(self, orchestrator):
"""Test graceful handling of agent health step errors."""
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = []
with patch(
"timmy.vassal.agent_health.get_full_health_report",
new_callable=AsyncMock,
) as mock_health:
mock_health.side_effect = RuntimeError("Health check failed")
record = await orchestrator.run_cycle()
assert len(record.errors) == 1
assert "agent_health" in record.errors[0]
@pytest.mark.asyncio
async def test_run_cycle_handles_house_health_error(self, orchestrator):
"""Test graceful handling of house health step errors."""
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = []
with patch(
"timmy.vassal.house_health.get_system_snapshot",
new_callable=AsyncMock,
) as mock_snapshot:
mock_snapshot.side_effect = RuntimeError("Snapshot failed")
record = await orchestrator.run_cycle()
assert len(record.errors) == 1
assert "house_health" in record.errors[0]
@pytest.mark.asyncio
async def test_run_cycle_detects_stuck_agents(self, orchestrator):
"""Test detection and nudging of stuck agents."""
from dataclasses import dataclass, field
@dataclass
class MockAgentStatus:
agent: str
is_stuck: bool = False
is_idle: bool = False
stuck_issue_numbers: list = field(default_factory=list)
mock_report = MagicMock()
mock_report.agents = [
MockAgentStatus(agent="claude", is_stuck=True, stuck_issue_numbers=[100]),
MockAgentStatus(agent="kimi", is_stuck=False),
]
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = []
with patch(
"timmy.vassal.agent_health.get_full_health_report",
new_callable=AsyncMock,
) as mock_health:
mock_health.return_value = mock_report
with patch(
"timmy.vassal.agent_health.nudge_stuck_agent",
new_callable=AsyncMock,
) as mock_nudge:
mock_nudge.return_value = True
record = await orchestrator.run_cycle()
assert "claude" in record.stuck_agents
assert record.nudges_sent == 1
mock_nudge.assert_awaited_once_with("claude", 100)
@pytest.mark.asyncio
async def test_run_cycle_triggers_cleanup_on_high_disk(self, orchestrator):
"""Test cleanup is triggered when disk usage is high."""
mock_snapshot = MagicMock()
mock_snapshot.disk.percent_used = 85.0 # Above 80% threshold
mock_snapshot.warnings = ["Disk: 85% used"]
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = []
with patch(
"timmy.vassal.house_health.get_system_snapshot",
new_callable=AsyncMock,
) as mock_snapshot_fn:
mock_snapshot_fn.return_value = mock_snapshot
with patch(
"timmy.vassal.house_health.cleanup_stale_files",
new_callable=AsyncMock,
) as mock_cleanup:
mock_cleanup.return_value = {"deleted_count": 5}
record = await orchestrator.run_cycle()
assert record.cleanup_deleted == 5
assert record.house_warnings == ["Disk: 85% used"]
@pytest.mark.asyncio
async def test_get_status_after_cycle(self, orchestrator):
"""Test get_status returns correct info after a cycle."""
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
) as mock_broadcast:
mock_broadcast.return_value = None
with patch(
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
) as mock_fetch:
mock_fetch.return_value = []
await orchestrator.run_cycle()
status = orchestrator.get_status()
assert status["running"] is False
assert status["cycle_count"] == 1
assert status["last_cycle"] is not None
assert status["last_cycle"]["cycle_id"] == 1
assert status["last_cycle"]["issues_fetched"] == 0
assert status["last_cycle"]["healthy"] is True
# -----------------------------------------------------------------------------
# Background loop tests
# -----------------------------------------------------------------------------
class TestBackgroundLoop:
"""Tests for the start/stop background loop methods."""
@pytest.fixture
def orchestrator(self):
"""Create a fresh orchestrator for each test."""
return VassalOrchestrator(cycle_interval=0.1)
@pytest.mark.asyncio
async def test_start_stop_cycle(self, orchestrator):
"""Test starting and stopping the background loop."""
with patch.object(orchestrator, "run_cycle", new_callable=AsyncMock) as mock_run:
mock_run.return_value = MagicMock()
# Start the loop
await orchestrator.start()
assert orchestrator.is_running is True
assert orchestrator._task is not None
# Let it run for a bit
await asyncio.sleep(0.25)
# Stop the loop
orchestrator.stop()
assert orchestrator.is_running is False
# Should have run at least once
assert mock_run.await_count >= 1
@pytest.mark.asyncio
async def test_start_already_running(self, orchestrator):
"""Test starting when already running is a no-op."""
with patch.object(orchestrator, "run_cycle", new_callable=AsyncMock):
await orchestrator.start()
first_task = orchestrator._task
# Start again should not create new task
await orchestrator.start()
assert orchestrator._task is first_task
orchestrator.stop()
@pytest.mark.asyncio
async def test_stop_not_running(self, orchestrator):
"""Test stopping when not running is a no-op."""
orchestrator.stop()
assert orchestrator.is_running is False
assert orchestrator._task is None
@pytest.mark.asyncio
async def test_loop_handles_cycle_exceptions(self, orchestrator):
"""Test that exceptions in run_cycle don't crash the loop."""
with patch.object(
orchestrator, "run_cycle", new_callable=AsyncMock
) as mock_run:
mock_run.side_effect = [RuntimeError("Boom"), MagicMock()]
await orchestrator.start()
await asyncio.sleep(0.25)
orchestrator.stop()
# Should have been called multiple times despite error
assert mock_run.await_count >= 2
# -----------------------------------------------------------------------------
# Interval resolution tests
# -----------------------------------------------------------------------------
class TestIntervalResolution:
"""Tests for the _resolve_interval method."""
def test_resolve_interval_explicit(self):
"""Test that explicit interval is used when provided."""
orchestrator = VassalOrchestrator(cycle_interval=60.0)
assert orchestrator._resolve_interval() == 60.0
def test_resolve_interval_from_settings(self):
"""Test interval is read from settings when not explicitly set."""
orchestrator = VassalOrchestrator()
mock_settings = MagicMock()
mock_settings.vassal_cycle_interval = 120.0
with patch("config.settings", mock_settings):
assert orchestrator._resolve_interval() == 120.0
def test_resolve_interval_default_fallback(self):
"""Test default 300s is used when settings fails."""
orchestrator = VassalOrchestrator()
with patch("config.settings", None):
assert orchestrator._resolve_interval() == 300.0
# -----------------------------------------------------------------------------
# Broadcast tests
# -----------------------------------------------------------------------------
class TestBroadcast:
"""Tests for the _broadcast helper."""
@pytest.mark.asyncio
async def test_broadcast_success(self):
"""Test successful WebSocket broadcast."""
orchestrator = VassalOrchestrator()
record = VassalCycleRecord(
cycle_id=1,
started_at="2026-03-23T12:00:00+00:00",
finished_at="2026-03-23T12:00:01+00:00",
duration_ms=1000,
issues_fetched=5,
issues_dispatched=2,
)
mock_ws_manager = MagicMock()
mock_ws_manager.broadcast = AsyncMock()
with patch(
"infrastructure.ws_manager.handler.ws_manager", mock_ws_manager
):
await orchestrator._broadcast(record)
mock_ws_manager.broadcast.assert_awaited_once()
call_args = mock_ws_manager.broadcast.call_args[0]
assert call_args[0] == "vassal.cycle"
assert call_args[1]["cycle_id"] == 1
assert call_args[1]["healthy"] is True
@pytest.mark.asyncio
async def test_broadcast_graceful_degradation(self):
"""Test broadcast gracefully handles errors."""
orchestrator = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="2026-03-23T12:00:00+00:00")
with patch(
"infrastructure.ws_manager.handler.ws_manager"
) as mock_ws_manager:
mock_ws_manager.broadcast = AsyncMock(
side_effect=RuntimeError("WS disconnected")
)
# Should not raise
await orchestrator._broadcast(record)
@pytest.mark.asyncio
async def test_broadcast_import_error(self):
"""Test broadcast handles missing ws_manager module."""
orchestrator = VassalOrchestrator()
record = VassalCycleRecord(cycle_id=1, started_at="2026-03-23T12:00:00+00:00")
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": None}):
# Should not raise
await orchestrator._broadcast(record)
# -----------------------------------------------------------------------------
# Module singleton test
# -----------------------------------------------------------------------------
class TestModuleSingleton:
"""Tests for the module-level vassal_orchestrator singleton."""
def test_singleton_import(self):
"""Test that the module-level singleton is available."""
from timmy.vassal import vassal_orchestrator
assert isinstance(vassal_orchestrator, VassalOrchestrator)
def test_singleton_is_single_instance(self):
"""Test that importing twice returns same instance."""
from timmy.vassal import vassal_orchestrator as orch1
from timmy.vassal import vassal_orchestrator as orch2
assert orch1 is orch2
# Need to import asyncio for the background loop tests
import asyncio # noqa: E402