Structured self-test framework that probes 6 capabilities (tool use, multistep planning, memory read/write, self-coding, lightning econ) in round-robin. Reuses existing infra: event_log for persistence, create_task() for upgrade proposals, capture_error() for crash handling, and in-memory circuit breaker for failure tracking. - src/timmy/loop_qa.py: Capability enum, 6 async probes, orchestrator - src/dashboard/routes/loop_qa.py: JSON + HTMX health endpoints - HTMX partial polls every 30s on the health panel - Background scheduler in app.py lifespan - 25 tests covering probes, orchestrator, health snapshot, routes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
444 lines
14 KiB
Python
444 lines
14 KiB
Python
"""Tests for timmy.loop_qa — capability self-test framework.
|
||
|
||
TDD: these tests are written before the implementation. They validate:
|
||
- Capability enum and status mapping
|
||
- Six self-test probes (T1–T6)
|
||
- Round-robin orchestrator with throttling
|
||
- Failure counter logic and upgrade proposal filing
|
||
- Health snapshot derivation
|
||
"""
|
||
|
||
from datetime import UTC, datetime
|
||
from unittest.mock import AsyncMock, MagicMock, patch
|
||
|
||
import pytest
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Model tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def test_capability_enum_has_all_members():
|
||
"""Capability StrEnum should have exactly 6 members."""
|
||
from timmy.loop_qa import Capability
|
||
|
||
expected = {
|
||
"tool_use",
|
||
"multistep_planning",
|
||
"memory_read",
|
||
"memory_write",
|
||
"self_coding",
|
||
"lightning_econ",
|
||
}
|
||
assert {c.value for c in Capability} == expected
|
||
|
||
|
||
def test_status_for_failures_mapping():
|
||
"""green for 0–1, yellow for 2, red for >= threshold."""
|
||
from timmy.loop_qa import LoopQAOrchestrator
|
||
|
||
assert LoopQAOrchestrator.status_for_failures(0) == "green"
|
||
assert LoopQAOrchestrator.status_for_failures(1) == "green"
|
||
assert LoopQAOrchestrator.status_for_failures(2) == "yellow"
|
||
assert LoopQAOrchestrator.status_for_failures(3) == "red"
|
||
assert LoopQAOrchestrator.status_for_failures(10) == "red"
|
||
|
||
|
||
def test_probe_registry_has_six_entries():
|
||
"""The test sequence should cover all 6 capabilities."""
|
||
from timmy.loop_qa import TEST_SEQUENCE, Capability
|
||
|
||
capabilities_covered = {cap for cap, _ in TEST_SEQUENCE}
|
||
assert capabilities_covered == set(Capability)
|
||
assert len(TEST_SEQUENCE) == 6
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Self-test probe tests (T1–T6)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_t1_tool_use_success():
|
||
"""T1 should succeed when shell_hand.run returns non-empty stdout."""
|
||
from timmy.loop_qa import Capability, probe_tool_use
|
||
|
||
mock_result = MagicMock(success=True, stdout="file1.py\nfile2.py\n")
|
||
with patch("timmy.loop_qa._get_shell_hand") as mock_get:
|
||
mock_hand = AsyncMock()
|
||
mock_hand.run = AsyncMock(return_value=mock_result)
|
||
mock_get.return_value = mock_hand
|
||
|
||
result = await probe_tool_use()
|
||
assert result["success"] is True
|
||
assert result["capability"] == Capability.TOOL_USE
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_t1_tool_use_failure():
|
||
"""T1 should fail when shell_hand.run raises."""
|
||
from timmy.loop_qa import Capability, probe_tool_use
|
||
|
||
with patch("timmy.loop_qa._get_shell_hand") as mock_get:
|
||
mock_hand = AsyncMock()
|
||
mock_hand.run = AsyncMock(side_effect=RuntimeError("shell unavailable"))
|
||
mock_get.return_value = mock_hand
|
||
|
||
result = await probe_tool_use()
|
||
assert result["success"] is False
|
||
assert result["capability"] == Capability.TOOL_USE
|
||
assert result["error_type"] == "RuntimeError"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_t2_multistep_planning(tmp_path):
|
||
"""T2 should write a vault note and verify it exists."""
|
||
from timmy.loop_qa import probe_multistep_planning
|
||
|
||
written_path = tmp_path / "test_note.md"
|
||
|
||
# Mock write_note to actually write the content passed by the probe,
|
||
# so the marker verification succeeds when the probe reads back.
|
||
def fake_write_note(name, content, folder):
|
||
written_path.write_text(content)
|
||
return written_path
|
||
|
||
mock_vault = MagicMock()
|
||
mock_vault.write_note = MagicMock(side_effect=fake_write_note)
|
||
|
||
with patch("timmy.loop_qa._get_vault", return_value=mock_vault):
|
||
result = await probe_multistep_planning()
|
||
assert result["success"] is True
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_t3_memory_write():
|
||
"""T3 should call brain store_fact_sync and succeed."""
|
||
from timmy.loop_qa import probe_memory_write
|
||
|
||
mock_mem = MagicMock()
|
||
mock_mem.store_fact_sync = MagicMock(return_value=None)
|
||
|
||
with patch("timmy.loop_qa._get_brain_memory", return_value=mock_mem):
|
||
result = await probe_memory_write()
|
||
assert result["success"] is True
|
||
# Verify store_fact_sync was called with "self_test_marker" category
|
||
mock_mem.store_fact_sync.assert_called_once()
|
||
call_args = mock_mem.store_fact_sync.call_args
|
||
assert call_args[0][0] == "self_test_marker"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_t4_memory_read():
|
||
"""T4 should verify facts are retrievable."""
|
||
from timmy.loop_qa import probe_memory_read
|
||
|
||
mock_mem = MagicMock()
|
||
mock_mem.get_facts_sync = MagicMock(
|
||
return_value=[{"content": "test_marker_123", "category": "self_test_marker"}]
|
||
)
|
||
|
||
with patch("timmy.loop_qa._get_brain_memory", return_value=mock_mem):
|
||
result = await probe_memory_read()
|
||
assert result["success"] is True
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_t4_memory_read_empty():
|
||
"""T4 should fail when no facts are returned."""
|
||
from timmy.loop_qa import probe_memory_read
|
||
|
||
mock_mem = MagicMock()
|
||
mock_mem.get_facts_sync = MagicMock(return_value=[])
|
||
|
||
with patch("timmy.loop_qa._get_brain_memory", return_value=mock_mem):
|
||
result = await probe_memory_read()
|
||
assert result["success"] is False
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_t5_self_coding(tmp_path):
|
||
"""T5 should write a self-test note and verify it exists."""
|
||
from timmy.loop_qa import probe_self_coding
|
||
|
||
written_path = tmp_path / "self_test_note.md"
|
||
written_path.write_text("# Self-Test Note\n\nImprovement sketch.")
|
||
|
||
mock_vault = MagicMock()
|
||
mock_vault.write_note = MagicMock(return_value=written_path)
|
||
|
||
with patch("timmy.loop_qa._get_vault", return_value=mock_vault):
|
||
result = await probe_self_coding()
|
||
assert result["success"] is True
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_t6_lightning_econ_placeholder():
|
||
"""T6 should always succeed as a placeholder."""
|
||
from timmy.loop_qa import probe_lightning_econ
|
||
|
||
result = await probe_lightning_econ()
|
||
assert result["success"] is True
|
||
assert "pending" in result["details"].lower() or "v2" in result["details"].lower()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Orchestrator tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _make_orchestrator():
|
||
"""Create an orchestrator with patched external services."""
|
||
from timmy.loop_qa import LoopQAOrchestrator
|
||
|
||
return LoopQAOrchestrator()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_run_next_test_round_robin():
|
||
"""Orchestrator should cycle through probes in order."""
|
||
from timmy.loop_qa import TEST_SEQUENCE, LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
results = []
|
||
|
||
# Patch all probes to return success quickly
|
||
with patch("timmy.loop_qa.log_event"):
|
||
for cap, _ in TEST_SEQUENCE:
|
||
probe_name = f"timmy.loop_qa.probe_{cap.value}"
|
||
with patch(probe_name, new_callable=AsyncMock) as mock_probe:
|
||
mock_probe.return_value = {
|
||
"success": True,
|
||
"capability": cap,
|
||
"details": "ok",
|
||
"error_type": None,
|
||
}
|
||
result = await orch.run_next_test()
|
||
results.append(result)
|
||
|
||
# All 6 should run
|
||
assert len(results) == 6
|
||
assert all(r is not None for r in results)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_run_next_test_disabled():
|
||
"""run_next_test should return None when loop_qa_enabled is False."""
|
||
from timmy.loop_qa import LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
with patch("timmy.loop_qa.settings") as mock_settings:
|
||
mock_settings.loop_qa_enabled = False
|
||
result = await orch.run_next_test()
|
||
assert result is None
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_run_next_test_throttle():
|
||
"""Should return None when max_per_hour is reached."""
|
||
from timmy.loop_qa import LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
orch._hourly_count = 100 # Well above any threshold
|
||
orch._hour_marker = datetime.now(UTC).hour
|
||
|
||
result = await orch.run_next_test()
|
||
assert result is None
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_failure_counter_increments():
|
||
"""Consecutive failure count should increment on failure."""
|
||
from timmy.loop_qa import Capability, LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
cap = Capability.TOOL_USE
|
||
|
||
with patch("timmy.loop_qa.log_event"):
|
||
with patch(
|
||
"timmy.loop_qa.probe_tool_use",
|
||
new_callable=AsyncMock,
|
||
return_value={
|
||
"success": False,
|
||
"capability": cap,
|
||
"details": "empty stdout",
|
||
"error_type": "AssertionError",
|
||
},
|
||
):
|
||
await orch.run_next_test()
|
||
|
||
assert orch._failure_counts[cap] == 1
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_failure_counter_resets_on_success():
|
||
"""Consecutive failure count should reset to 0 on success."""
|
||
from timmy.loop_qa import Capability, LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
cap = Capability.TOOL_USE
|
||
orch._failure_counts[cap] = 5
|
||
orch._proposal_filed.add(cap)
|
||
|
||
with patch("timmy.loop_qa.log_event"):
|
||
with patch(
|
||
"timmy.loop_qa.probe_tool_use",
|
||
new_callable=AsyncMock,
|
||
return_value={
|
||
"success": True,
|
||
"capability": cap,
|
||
"details": "ok",
|
||
"error_type": None,
|
||
},
|
||
):
|
||
await orch.run_next_test()
|
||
|
||
assert orch._failure_counts[cap] == 0
|
||
assert cap not in orch._proposal_filed
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_upgrade_proposal_filed_at_threshold():
|
||
"""When failures reach threshold, create_task should be called."""
|
||
from timmy.loop_qa import Capability, LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
cap = Capability.TOOL_USE
|
||
orch._failure_counts[cap] = 2 # One more failure hits threshold of 3
|
||
|
||
with patch("timmy.loop_qa.log_event"):
|
||
with patch("timmy.loop_qa.create_task") as mock_create:
|
||
with patch(
|
||
"timmy.loop_qa.probe_tool_use",
|
||
new_callable=AsyncMock,
|
||
return_value={
|
||
"success": False,
|
||
"capability": cap,
|
||
"details": "empty stdout",
|
||
"error_type": "AssertionError",
|
||
},
|
||
):
|
||
await orch.run_next_test()
|
||
|
||
mock_create.assert_called_once()
|
||
call_kwargs = mock_create.call_args
|
||
assert "TOOL_USE" in call_kwargs[1]["title"] or "TOOL_USE" in str(call_kwargs)
|
||
assert cap in orch._proposal_filed
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_upgrade_proposal_not_refiled():
|
||
"""Once a proposal is filed, it should not be filed again."""
|
||
from timmy.loop_qa import Capability, LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
cap = Capability.TOOL_USE
|
||
orch._failure_counts[cap] = 5
|
||
orch._proposal_filed.add(cap) # Already filed
|
||
|
||
with patch("timmy.loop_qa.log_event"):
|
||
with patch("timmy.loop_qa.create_task") as mock_create:
|
||
with patch(
|
||
"timmy.loop_qa.probe_tool_use",
|
||
new_callable=AsyncMock,
|
||
return_value={
|
||
"success": False,
|
||
"capability": cap,
|
||
"details": "still broken",
|
||
"error_type": "RuntimeError",
|
||
},
|
||
):
|
||
await orch.run_next_test()
|
||
|
||
mock_create.assert_not_called()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_graceful_on_probe_crash():
|
||
"""If a probe raises unexpectedly, orchestrator should not crash."""
|
||
from timmy.loop_qa import LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
|
||
with patch("timmy.loop_qa.log_event"):
|
||
with patch("timmy.loop_qa.capture_error"):
|
||
with patch(
|
||
"timmy.loop_qa.probe_tool_use",
|
||
new_callable=AsyncMock,
|
||
side_effect=Exception("probe exploded"),
|
||
):
|
||
result = await orch.run_next_test()
|
||
|
||
# Should return a failure result, not raise
|
||
assert result is not None
|
||
assert result["success"] is False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Health snapshot tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def test_health_snapshot_all_green():
|
||
"""Snapshot should show green when all counters are 0."""
|
||
from timmy.loop_qa import LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
snapshot = orch.get_health_snapshot()
|
||
|
||
assert snapshot["overall_status"] == "green"
|
||
assert all(c["status"] == "green" for c in snapshot["capabilities"])
|
||
|
||
|
||
def test_health_snapshot_mixed_statuses():
|
||
"""Snapshot should correctly map different failure counts."""
|
||
from timmy.loop_qa import Capability, LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
orch._failure_counts[Capability.TOOL_USE] = 2 # yellow
|
||
orch._failure_counts[Capability.MEMORY_READ] = 5 # red
|
||
|
||
snapshot = orch.get_health_snapshot()
|
||
|
||
by_cap = {c["capability"]: c["status"] for c in snapshot["capabilities"]}
|
||
assert by_cap[Capability.TOOL_USE] == "yellow"
|
||
assert by_cap[Capability.MEMORY_READ] == "red"
|
||
assert by_cap[Capability.LIGHTNING_ECON] == "green"
|
||
|
||
|
||
def test_health_snapshot_overall_worst():
|
||
"""overall_status should be the worst of all capabilities."""
|
||
from timmy.loop_qa import Capability, LoopQAOrchestrator
|
||
|
||
orch = LoopQAOrchestrator()
|
||
orch._failure_counts[Capability.TOOL_USE] = 2 # yellow
|
||
|
||
snapshot = orch.get_health_snapshot()
|
||
assert snapshot["overall_status"] == "yellow"
|
||
|
||
orch._failure_counts[Capability.MEMORY_WRITE] = 5 # red
|
||
snapshot = orch.get_health_snapshot()
|
||
assert snapshot["overall_status"] == "red"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dashboard route tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def test_loop_qa_health_json(client):
|
||
"""GET /health/loop-qa should return 200 with snapshot JSON."""
|
||
resp = client.get("/health/loop-qa")
|
||
assert resp.status_code == 200
|
||
data = resp.json()
|
||
assert "overall_status" in data
|
||
assert "capabilities" in data
|
||
assert len(data["capabilities"]) == 6
|
||
|
||
|
||
def test_loop_qa_health_partial(client):
|
||
"""GET /health/loop-qa/partial should return 200 with HTML."""
|
||
resp = client.get("/health/loop-qa/partial")
|
||
assert resp.status_code == 200
|
||
assert "text/html" in resp.headers["content-type"]
|