diff --git a/src/config.py b/src/config.py
index c4a6662..e49545f 100644
--- a/src/config.py
+++ b/src/config.py
@@ -207,6 +207,13 @@ class Settings(BaseSettings):
thinking_enabled: bool = True
thinking_interval_seconds: int = 300 # 5 minutes between thoughts
+ # ── Loop QA (Self-Testing) ─────────────────────────────────────────
+ # Self-test orchestrator that probes capabilities alongside the thinking loop.
+ loop_qa_enabled: bool = True
+ loop_qa_interval_ticks: int = 5 # run 1 self-test every Nth thinking tick (~25 min)
+ loop_qa_upgrade_threshold: int = 3 # consecutive failures → file task
+ loop_qa_max_per_hour: int = 12 # safety throttle
+
# ── Paperclip AI — orchestration bridge ────────────────────────────
# URL where the Paperclip server listens.
# For VPS deployment behind nginx, use the public domain.
diff --git a/src/dashboard/app.py b/src/dashboard/app.py
index b7b3f39..1569b8b 100644
--- a/src/dashboard/app.py
+++ b/src/dashboard/app.py
@@ -32,6 +32,7 @@ from dashboard.routes.discord import router as discord_router
from dashboard.routes.experiments import router as experiments_router
from dashboard.routes.grok import router as grok_router
from dashboard.routes.health import router as health_router
+from dashboard.routes.loop_qa import router as loop_qa_router
from dashboard.routes.marketplace import router as marketplace_router
from dashboard.routes.memory import router as memory_router
from dashboard.routes.mobile import router as mobile_router
@@ -161,6 +162,35 @@ async def _thinking_scheduler() -> None:
await asyncio.sleep(settings.thinking_interval_seconds)
+async def _loop_qa_scheduler() -> None:
+ """Background task: run capability self-tests on a separate timer.
+
+ Independent of the thinking loop — runs every N thinking ticks
+ to probe subsystems and detect degradation.
+ """
+ from timmy.loop_qa import loop_qa_orchestrator
+
+ await asyncio.sleep(10) # Stagger after thinking scheduler
+
+ while True:
+ try:
+ if settings.loop_qa_enabled:
+ result = await loop_qa_orchestrator.run_next_test()
+ if result:
+ status = "PASS" if result["success"] else "FAIL"
+ logger.info(
+ "Loop QA [%s]: %s — %s",
+ result["capability"],
+ status,
+ result.get("details", "")[:80],
+ )
+ except Exception as exc:
+ logger.error("Loop QA scheduler error: %s", exc)
+
+ interval = settings.thinking_interval_seconds * settings.loop_qa_interval_ticks
+ await asyncio.sleep(interval)
+
+
async def _start_chat_integrations_background() -> None:
"""Background task: start chat integrations without blocking startup."""
from integrations.chat_bridge.registry import platform_registry
@@ -268,6 +298,7 @@ async def lifespan(app: FastAPI):
# Create all background tasks without waiting for them
briefing_task = asyncio.create_task(_briefing_scheduler())
thinking_task = asyncio.create_task(_thinking_scheduler())
+ loop_qa_task = asyncio.create_task(_loop_qa_scheduler())
# Initialize Spark Intelligence engine
from spark.engine import get_spark_engine
@@ -323,7 +354,7 @@ async def lifespan(app: FastAPI):
await discord_bot.stop()
await telegram_bot.stop()
- for task in [briefing_task, thinking_task, chat_task]:
+ for task in [briefing_task, thinking_task, chat_task, loop_qa_task]:
if task:
task.cancel()
try:
@@ -410,6 +441,7 @@ app.include_router(calm_router)
app.include_router(swarm_router)
app.include_router(tasks_router)
app.include_router(work_orders_router)
+app.include_router(loop_qa_router)
app.include_router(system_router)
app.include_router(paperclip_router)
app.include_router(experiments_router)
diff --git a/src/dashboard/routes/loop_qa.py b/src/dashboard/routes/loop_qa.py
new file mode 100644
index 0000000..8605a1f
--- /dev/null
+++ b/src/dashboard/routes/loop_qa.py
@@ -0,0 +1,34 @@
+"""Loop QA health endpoints — capability self-test status."""
+
+import logging
+
+from fastapi import APIRouter, Request
+from fastapi.responses import HTMLResponse, JSONResponse
+
+from dashboard.templating import templates
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(tags=["health"])
+
+
+@router.get("/health/loop-qa")
+async def loop_qa_health():
+ """Return HealthSnapshot as JSON."""
+ from timmy.loop_qa import loop_qa_orchestrator
+
+ snapshot = loop_qa_orchestrator.get_health_snapshot()
+ return JSONResponse(content=snapshot)
+
+
+@router.get("/health/loop-qa/partial", response_class=HTMLResponse)
+async def loop_qa_health_partial(request: Request):
+ """Return HTMX partial for the dashboard health panel."""
+ from timmy.loop_qa import loop_qa_orchestrator
+
+ snapshot = loop_qa_orchestrator.get_health_snapshot()
+ return templates.TemplateResponse(
+ request,
+ "partials/loop_qa_health.html",
+ {"snapshot": snapshot},
+ )
diff --git a/src/dashboard/templates/partials/health_status.html b/src/dashboard/templates/partials/health_status.html
index ec25fe4..38865c5 100644
--- a/src/dashboard/templates/partials/health_status.html
+++ b/src/dashboard/templates/partials/health_status.html
@@ -16,4 +16,9 @@
MODEL
{{ model }}
+
+
diff --git a/src/dashboard/templates/partials/loop_qa_health.html b/src/dashboard/templates/partials/loop_qa_health.html
new file mode 100644
index 0000000..2cd178c
--- /dev/null
+++ b/src/dashboard/templates/partials/loop_qa_health.html
@@ -0,0 +1,13 @@
+{# Loop QA capability health rows — polled via HTMX every 30s #}
+{% for cap in snapshot.capabilities %}
+
+ {{ cap.capability.upper().replace("_", " ") }}
+ {% if cap.status == "green" %}
+ OK
+ {% elif cap.status == "yellow" %}
+ WARN
+ {% else %}
+ FAIL
+ {% endif %}
+
+{% endfor %}
diff --git a/src/swarm/event_log.py b/src/swarm/event_log.py
index aa7253a..7f09039 100644
--- a/src/swarm/event_log.py
+++ b/src/swarm/event_log.py
@@ -58,6 +58,10 @@ class EventType(Enum):
# Thinking
TIMMY_THOUGHT = "timmy.thought"
+ # Loop QA self-tests
+ LOOP_QA_OK = "loop_qa.ok"
+ LOOP_QA_FAIL = "loop_qa.fail"
+
@dataclass
class EventLogEntry:
diff --git a/src/timmy/loop_qa.py b/src/timmy/loop_qa.py
new file mode 100644
index 0000000..daf64bd
--- /dev/null
+++ b/src/timmy/loop_qa.py
@@ -0,0 +1,434 @@
+"""Loop QA — structured self-test framework for Timmy's capabilities.
+
+Runs alongside (not inside) the thinking loop. Each cycle probes one
+capability in round-robin, logs results via event_log, tracks failures
+in memory, and files upgrade tasks via create_task() when degradation
+is detected.
+
+Reuses existing infrastructure:
+- swarm.event_log.log_event / EventType → result persistence
+- swarm.task_queue.models.create_task → upgrade proposals
+- infrastructure.error_capture → crash handling
+
+Usage::
+
+ from timmy.loop_qa import loop_qa_orchestrator
+
+ await loop_qa_orchestrator.run_next_test()
+ snapshot = loop_qa_orchestrator.get_health_snapshot()
+"""
+
+import asyncio
+import logging
+import uuid
+from datetime import UTC, datetime
+from enum import StrEnum
+
+from config import settings
+
+logger = logging.getLogger(__name__)
+
+
+# ---------------------------------------------------------------------------
+# Data models
+# ---------------------------------------------------------------------------
+
+
+class Capability(StrEnum):
+ """Capabilities exercised by self-test probes."""
+
+ TOOL_USE = "tool_use"
+ MULTISTEP_PLANNING = "multistep_planning"
+ MEMORY_READ = "memory_read"
+ MEMORY_WRITE = "memory_write"
+ SELF_CODING = "self_coding"
+ LIGHTNING_ECON = "lightning_econ"
+
+
+# ---------------------------------------------------------------------------
+# Lazy accessors (avoid import-time side effects)
+# ---------------------------------------------------------------------------
+
+
+def _get_shell_hand():
+ """Lazy-import the shell hand singleton."""
+ from infrastructure.hands.shell import shell_hand
+
+ return shell_hand
+
+
+def _get_vault():
+ """Lazy-import the vault memory singleton."""
+ from timmy.memory_system import get_memory_system
+
+ return get_memory_system().vault
+
+
+def _get_brain_memory():
+ """Lazy-import the brain unified memory."""
+ from brain.memory import get_memory
+
+ return get_memory()
+
+
+# ---------------------------------------------------------------------------
+# Six self-test probes — each returns a result dict
+# ---------------------------------------------------------------------------
+
+
+async def probe_tool_use() -> dict:
+ """T1: call shell_hand.run('ls') and confirm non-empty result."""
+ cap = Capability.TOOL_USE
+ try:
+ hand = _get_shell_hand()
+ result = await hand.run("ls")
+ if result.success and result.stdout.strip():
+ return {
+ "success": True,
+ "capability": cap,
+ "details": f"ls returned {len(result.stdout.splitlines())} lines",
+ "error_type": None,
+ }
+ return {
+ "success": False,
+ "capability": cap,
+ "details": f"ls returned empty or failed: {result.stderr[:100]}",
+ "error_type": "empty_result",
+ }
+ except Exception as exc:
+ return {
+ "success": False,
+ "capability": cap,
+ "details": str(exc)[:200],
+ "error_type": type(exc).__name__,
+ }
+
+
+async def probe_multistep_planning() -> dict:
+ """T2: write a temp vault note and verify it exists with content."""
+ cap = Capability.MULTISTEP_PLANNING
+ try:
+ vault = _get_vault()
+ marker = f"loop_qa_plan_test_{uuid.uuid4().hex[:8]}"
+ content = (
+ f"# Loop QA Planning Test\n\nMarker: {marker}\nDate: {datetime.now(UTC).isoformat()}"
+ )
+ path = await asyncio.to_thread(vault.write_note, "loop_qa_test", content, "notes")
+ if path.exists() and marker in path.read_text():
+ return {
+ "success": True,
+ "capability": cap,
+ "details": f"Wrote and verified {path.name}",
+ "error_type": None,
+ }
+ return {
+ "success": False,
+ "capability": cap,
+ "details": "File missing or content mismatch",
+ "error_type": "verification_failed",
+ }
+ except Exception as exc:
+ return {
+ "success": False,
+ "capability": cap,
+ "details": str(exc)[:200],
+ "error_type": type(exc).__name__,
+ }
+
+
+async def probe_memory_write() -> dict:
+ """T3: call brain.store_fact_sync and verify no exception."""
+ cap = Capability.MEMORY_WRITE
+ try:
+ mem = _get_brain_memory()
+ marker = f"loop_qa_marker_{uuid.uuid4().hex[:8]}"
+ await asyncio.to_thread(mem.store_fact_sync, "self_test_marker", marker)
+ return {
+ "success": True,
+ "capability": cap,
+ "details": f"Stored fact: {marker}",
+ "error_type": None,
+ }
+ except Exception as exc:
+ return {
+ "success": False,
+ "capability": cap,
+ "details": str(exc)[:200],
+ "error_type": type(exc).__name__,
+ }
+
+
+async def probe_memory_read() -> dict:
+ """T4: call brain.get_facts_sync and verify results returned."""
+ cap = Capability.MEMORY_READ
+ try:
+ mem = _get_brain_memory()
+ facts = await asyncio.to_thread(mem.get_facts_sync, "self_test_marker")
+ if facts:
+ return {
+ "success": True,
+ "capability": cap,
+ "details": f"Retrieved {len(facts)} self_test_marker facts",
+ "error_type": None,
+ }
+ return {
+ "success": False,
+ "capability": cap,
+ "details": "No self_test_marker facts found",
+ "error_type": "empty_result",
+ }
+ except Exception as exc:
+ return {
+ "success": False,
+ "capability": cap,
+ "details": str(exc)[:200],
+ "error_type": type(exc).__name__,
+ }
+
+
+async def probe_self_coding() -> dict:
+ """T5: write a self-test note to memory/self/ via vault."""
+ cap = Capability.SELF_CODING
+ try:
+ vault = _get_vault()
+ content = (
+ "# Self-Test Improvement Note\n\n"
+ f"**Generated:** {datetime.now(UTC).isoformat()}\n\n"
+ "## What\nLoop QA self-coding probe — validates vault write capability.\n\n"
+ "## Why\nEnsure the self-coding pathway is functional.\n\n"
+ "## How\nWrite this note and verify it exists."
+ )
+ path = await asyncio.to_thread(vault.write_note, "self_test_note", content, "self")
+ if path.exists() and path.stat().st_size > 0:
+ return {
+ "success": True,
+ "capability": cap,
+ "details": f"Wrote {path.name} ({path.stat().st_size} bytes)",
+ "error_type": None,
+ }
+ return {
+ "success": False,
+ "capability": cap,
+ "details": "File missing or empty after write",
+ "error_type": "verification_failed",
+ }
+ except Exception as exc:
+ return {
+ "success": False,
+ "capability": cap,
+ "details": str(exc)[:200],
+ "error_type": type(exc).__name__,
+ }
+
+
+async def probe_lightning_econ() -> dict:
+ """T6: placeholder — Lightning module pending v2."""
+ return {
+ "success": True,
+ "capability": Capability.LIGHTNING_ECON,
+ "details": "Lightning module pending v2 — placeholder pass",
+ "error_type": None,
+ }
+
+
+# ---------------------------------------------------------------------------
+# Test sequence (round-robin order)
+# ---------------------------------------------------------------------------
+
+TEST_SEQUENCE: list[tuple[Capability, str]] = [
+ (Capability.TOOL_USE, "probe_tool_use"),
+ (Capability.MULTISTEP_PLANNING, "probe_multistep_planning"),
+ (Capability.MEMORY_WRITE, "probe_memory_write"),
+ (Capability.MEMORY_READ, "probe_memory_read"),
+ (Capability.SELF_CODING, "probe_self_coding"),
+ (Capability.LIGHTNING_ECON, "probe_lightning_econ"),
+]
+
+
+# ---------------------------------------------------------------------------
+# Orchestrator
+# ---------------------------------------------------------------------------
+
+
+def log_event(event_type, **kwargs):
+ """Proxy to swarm event_log.log_event — lazy import."""
+ try:
+ from swarm.event_log import log_event as _log_event
+
+ return _log_event(event_type, **kwargs)
+ except Exception as exc:
+ logger.debug("Failed to log event: %s", exc)
+
+
+def capture_error(exc, **kwargs):
+ """Proxy to infrastructure.error_capture — lazy import."""
+ try:
+ from infrastructure.error_capture import capture_error as _capture
+
+ return _capture(exc, **kwargs)
+ except Exception:
+ logger.debug("Failed to capture error", exc_info=True)
+
+
+def create_task(**kwargs):
+ """Proxy to swarm.task_queue.models.create_task — lazy import."""
+ from swarm.task_queue.models import create_task as _create
+
+ return _create(**kwargs)
+
+
+class LoopQAOrchestrator:
+ """Round-robin self-test orchestrator.
+
+ Runs one probe per invocation, cycling through T1–T6. Tracks
+ consecutive failures in memory (circuit-breaker pattern) and
+ files upgrade tasks via create_task() when degradation is detected.
+ """
+
+ def __init__(self) -> None:
+ self._test_index: int = 0
+ self._failure_counts: dict[Capability, int] = {c: 0 for c in Capability}
+ self._last_failed: dict[Capability, str | None] = {c: None for c in Capability}
+ self._proposal_filed: set[Capability] = set()
+ self._hourly_count: int = 0
+ self._hour_marker: int = -1
+
+ async def run_next_test(self) -> dict | None:
+ """Run the next probe in the round-robin sequence.
+
+ Returns result dict, or None if disabled/throttled.
+ """
+ if not settings.loop_qa_enabled:
+ return None
+
+ # Hourly throttle
+ now = datetime.now(UTC)
+ current_hour = now.hour
+ if current_hour != self._hour_marker:
+ self._hourly_count = 0
+ self._hour_marker = current_hour
+
+ if self._hourly_count >= settings.loop_qa_max_per_hour:
+ logger.debug(
+ "Loop QA throttled: %d/%d this hour",
+ self._hourly_count,
+ settings.loop_qa_max_per_hour,
+ )
+ return None
+
+ # Pick next probe (resolve name at call time for testability)
+ import timmy.loop_qa as _self_module
+
+ cap, probe_name = TEST_SEQUENCE[self._test_index]
+ probe_fn = getattr(_self_module, probe_name)
+ self._test_index = (self._test_index + 1) % len(TEST_SEQUENCE)
+ self._hourly_count += 1
+
+ # Run probe
+ try:
+ result = await probe_fn()
+ except Exception as exc:
+ # Probe itself crashed — record failure and report
+ capture_error(exc, source="loop_qa", context={"capability": cap.value})
+ result = {
+ "success": False,
+ "capability": cap,
+ "details": f"Probe crashed: {exc!s}"[:200],
+ "error_type": type(exc).__name__,
+ }
+
+ # Log via event_log
+ from swarm.event_log import EventType
+
+ event_type = EventType.LOOP_QA_OK if result["success"] else EventType.LOOP_QA_FAIL
+ log_event(
+ event_type,
+ source="loop_qa",
+ data={
+ "capability": cap.value,
+ "details": result.get("details", ""),
+ "error_type": result.get("error_type"),
+ },
+ )
+
+ # Update failure counter
+ if result["success"]:
+ self._failure_counts[cap] = 0
+ self._last_failed[cap] = None
+ self._proposal_filed.discard(cap)
+ else:
+ self._failure_counts[cap] += 1
+ self._last_failed[cap] = now.isoformat()
+ self._maybe_file_upgrade(cap)
+
+ return result
+
+ def _maybe_file_upgrade(self, cap: Capability) -> None:
+ """File an upgrade task if threshold is reached and not already filed."""
+ count = self._failure_counts[cap]
+ if count < settings.loop_qa_upgrade_threshold:
+ return
+ if cap in self._proposal_filed:
+ return
+
+ try:
+ title = f"Stabilize {cap.value.upper()}: self-test failing {count}x in a row"
+ description = (
+ f"Loop QA detected {count} consecutive failures "
+ f"for capability '{cap.value}'.\n\n"
+ f"Last failure: {self._last_failed[cap]}\n"
+ f"Action: investigate root cause and restore capability."
+ )
+ create_task(
+ title=title,
+ description=description,
+ priority="high",
+ created_by="timmy_loop_qa",
+ task_type="loop_qa_upgrade",
+ )
+ self._proposal_filed.add(cap)
+ logger.info("Filed upgrade proposal for %s: %s", cap.value, title)
+ except Exception as exc:
+ logger.warning("Failed to file upgrade proposal: %s", exc)
+
+ def get_health_snapshot(self) -> dict:
+ """Build a health snapshot from in-memory failure counters."""
+ capabilities = []
+ for cap in Capability:
+ count = self._failure_counts.get(cap, 0)
+ capabilities.append(
+ {
+ "capability": cap,
+ "status": self.status_for_failures(count),
+ "last_failed_at": self._last_failed.get(cap),
+ "consecutive_failures": count,
+ }
+ )
+
+ statuses = [c["status"] for c in capabilities]
+ if "red" in statuses:
+ overall = "red"
+ elif "yellow" in statuses:
+ overall = "yellow"
+ else:
+ overall = "green"
+
+ return {
+ "generated_at": datetime.now(UTC).isoformat(),
+ "overall_status": overall,
+ "capabilities": capabilities,
+ }
+
+ @staticmethod
+ def status_for_failures(count: int) -> str:
+ """Map consecutive failure count to green/yellow/red."""
+ if count >= settings.loop_qa_upgrade_threshold:
+ return "red"
+ elif count >= 2:
+ return "yellow"
+ return "green"
+
+
+# ── Module singleton ─────────────────────────────────────────────────────────
+
+loop_qa_orchestrator = LoopQAOrchestrator()
diff --git a/tests/timmy/test_loop_qa.py b/tests/timmy/test_loop_qa.py
new file mode 100644
index 0000000..27783ec
--- /dev/null
+++ b/tests/timmy/test_loop_qa.py
@@ -0,0 +1,443 @@
+"""Tests for timmy.loop_qa — capability self-test framework.
+
+TDD: these tests are written before the implementation. They validate:
+- Capability enum and status mapping
+- Six self-test probes (T1–T6)
+- Round-robin orchestrator with throttling
+- Failure counter logic and upgrade proposal filing
+- Health snapshot derivation
+"""
+
+from datetime import UTC, datetime
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+# ---------------------------------------------------------------------------
+# Model tests
+# ---------------------------------------------------------------------------
+
+
+def test_capability_enum_has_all_members():
+ """Capability StrEnum should have exactly 6 members."""
+ from timmy.loop_qa import Capability
+
+ expected = {
+ "tool_use",
+ "multistep_planning",
+ "memory_read",
+ "memory_write",
+ "self_coding",
+ "lightning_econ",
+ }
+ assert {c.value for c in Capability} == expected
+
+
+def test_status_for_failures_mapping():
+ """green for 0–1, yellow for 2, red for >= threshold."""
+ from timmy.loop_qa import LoopQAOrchestrator
+
+ assert LoopQAOrchestrator.status_for_failures(0) == "green"
+ assert LoopQAOrchestrator.status_for_failures(1) == "green"
+ assert LoopQAOrchestrator.status_for_failures(2) == "yellow"
+ assert LoopQAOrchestrator.status_for_failures(3) == "red"
+ assert LoopQAOrchestrator.status_for_failures(10) == "red"
+
+
+def test_probe_registry_has_six_entries():
+ """The test sequence should cover all 6 capabilities."""
+ from timmy.loop_qa import TEST_SEQUENCE, Capability
+
+ capabilities_covered = {cap for cap, _ in TEST_SEQUENCE}
+ assert capabilities_covered == set(Capability)
+ assert len(TEST_SEQUENCE) == 6
+
+
+# ---------------------------------------------------------------------------
+# Self-test probe tests (T1–T6)
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_t1_tool_use_success():
+ """T1 should succeed when shell_hand.run returns non-empty stdout."""
+ from timmy.loop_qa import Capability, probe_tool_use
+
+ mock_result = MagicMock(success=True, stdout="file1.py\nfile2.py\n")
+ with patch("timmy.loop_qa._get_shell_hand") as mock_get:
+ mock_hand = AsyncMock()
+ mock_hand.run = AsyncMock(return_value=mock_result)
+ mock_get.return_value = mock_hand
+
+ result = await probe_tool_use()
+ assert result["success"] is True
+ assert result["capability"] == Capability.TOOL_USE
+
+
+@pytest.mark.asyncio
+async def test_t1_tool_use_failure():
+ """T1 should fail when shell_hand.run raises."""
+ from timmy.loop_qa import Capability, probe_tool_use
+
+ with patch("timmy.loop_qa._get_shell_hand") as mock_get:
+ mock_hand = AsyncMock()
+ mock_hand.run = AsyncMock(side_effect=RuntimeError("shell unavailable"))
+ mock_get.return_value = mock_hand
+
+ result = await probe_tool_use()
+ assert result["success"] is False
+ assert result["capability"] == Capability.TOOL_USE
+ assert result["error_type"] == "RuntimeError"
+
+
+@pytest.mark.asyncio
+async def test_t2_multistep_planning(tmp_path):
+ """T2 should write a vault note and verify it exists."""
+ from timmy.loop_qa import probe_multistep_planning
+
+ written_path = tmp_path / "test_note.md"
+
+ # Mock write_note to actually write the content passed by the probe,
+ # so the marker verification succeeds when the probe reads back.
+ def fake_write_note(name, content, folder):
+ written_path.write_text(content)
+ return written_path
+
+ mock_vault = MagicMock()
+ mock_vault.write_note = MagicMock(side_effect=fake_write_note)
+
+ with patch("timmy.loop_qa._get_vault", return_value=mock_vault):
+ result = await probe_multistep_planning()
+ assert result["success"] is True
+
+
+@pytest.mark.asyncio
+async def test_t3_memory_write():
+ """T3 should call brain store_fact_sync and succeed."""
+ from timmy.loop_qa import probe_memory_write
+
+ mock_mem = MagicMock()
+ mock_mem.store_fact_sync = MagicMock(return_value=None)
+
+ with patch("timmy.loop_qa._get_brain_memory", return_value=mock_mem):
+ result = await probe_memory_write()
+ assert result["success"] is True
+ # Verify store_fact_sync was called with "self_test_marker" category
+ mock_mem.store_fact_sync.assert_called_once()
+ call_args = mock_mem.store_fact_sync.call_args
+ assert call_args[0][0] == "self_test_marker"
+
+
+@pytest.mark.asyncio
+async def test_t4_memory_read():
+ """T4 should verify facts are retrievable."""
+ from timmy.loop_qa import probe_memory_read
+
+ mock_mem = MagicMock()
+ mock_mem.get_facts_sync = MagicMock(
+ return_value=[{"content": "test_marker_123", "category": "self_test_marker"}]
+ )
+
+ with patch("timmy.loop_qa._get_brain_memory", return_value=mock_mem):
+ result = await probe_memory_read()
+ assert result["success"] is True
+
+
+@pytest.mark.asyncio
+async def test_t4_memory_read_empty():
+ """T4 should fail when no facts are returned."""
+ from timmy.loop_qa import probe_memory_read
+
+ mock_mem = MagicMock()
+ mock_mem.get_facts_sync = MagicMock(return_value=[])
+
+ with patch("timmy.loop_qa._get_brain_memory", return_value=mock_mem):
+ result = await probe_memory_read()
+ assert result["success"] is False
+
+
+@pytest.mark.asyncio
+async def test_t5_self_coding(tmp_path):
+ """T5 should write a self-test note and verify it exists."""
+ from timmy.loop_qa import probe_self_coding
+
+ written_path = tmp_path / "self_test_note.md"
+ written_path.write_text("# Self-Test Note\n\nImprovement sketch.")
+
+ mock_vault = MagicMock()
+ mock_vault.write_note = MagicMock(return_value=written_path)
+
+ with patch("timmy.loop_qa._get_vault", return_value=mock_vault):
+ result = await probe_self_coding()
+ assert result["success"] is True
+
+
+@pytest.mark.asyncio
+async def test_t6_lightning_econ_placeholder():
+ """T6 should always succeed as a placeholder."""
+ from timmy.loop_qa import probe_lightning_econ
+
+ result = await probe_lightning_econ()
+ assert result["success"] is True
+ assert "pending" in result["details"].lower() or "v2" in result["details"].lower()
+
+
+# ---------------------------------------------------------------------------
+# Orchestrator tests
+# ---------------------------------------------------------------------------
+
+
+def _make_orchestrator():
+ """Create an orchestrator with patched external services."""
+ from timmy.loop_qa import LoopQAOrchestrator
+
+ return LoopQAOrchestrator()
+
+
+@pytest.mark.asyncio
+async def test_run_next_test_round_robin():
+ """Orchestrator should cycle through probes in order."""
+ from timmy.loop_qa import TEST_SEQUENCE, LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ results = []
+
+ # Patch all probes to return success quickly
+ with patch("timmy.loop_qa.log_event"):
+ for cap, _ in TEST_SEQUENCE:
+ probe_name = f"timmy.loop_qa.probe_{cap.value}"
+ with patch(probe_name, new_callable=AsyncMock) as mock_probe:
+ mock_probe.return_value = {
+ "success": True,
+ "capability": cap,
+ "details": "ok",
+ "error_type": None,
+ }
+ result = await orch.run_next_test()
+ results.append(result)
+
+ # All 6 should run
+ assert len(results) == 6
+ assert all(r is not None for r in results)
+
+
+@pytest.mark.asyncio
+async def test_run_next_test_disabled():
+ """run_next_test should return None when loop_qa_enabled is False."""
+ from timmy.loop_qa import LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ with patch("timmy.loop_qa.settings") as mock_settings:
+ mock_settings.loop_qa_enabled = False
+ result = await orch.run_next_test()
+ assert result is None
+
+
+@pytest.mark.asyncio
+async def test_run_next_test_throttle():
+ """Should return None when max_per_hour is reached."""
+ from timmy.loop_qa import LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ orch._hourly_count = 100 # Well above any threshold
+ orch._hour_marker = datetime.now(UTC).hour
+
+ result = await orch.run_next_test()
+ assert result is None
+
+
+@pytest.mark.asyncio
+async def test_failure_counter_increments():
+ """Consecutive failure count should increment on failure."""
+ from timmy.loop_qa import Capability, LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ cap = Capability.TOOL_USE
+
+ with patch("timmy.loop_qa.log_event"):
+ with patch(
+ "timmy.loop_qa.probe_tool_use",
+ new_callable=AsyncMock,
+ return_value={
+ "success": False,
+ "capability": cap,
+ "details": "empty stdout",
+ "error_type": "AssertionError",
+ },
+ ):
+ await orch.run_next_test()
+
+ assert orch._failure_counts[cap] == 1
+
+
+@pytest.mark.asyncio
+async def test_failure_counter_resets_on_success():
+ """Consecutive failure count should reset to 0 on success."""
+ from timmy.loop_qa import Capability, LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ cap = Capability.TOOL_USE
+ orch._failure_counts[cap] = 5
+ orch._proposal_filed.add(cap)
+
+ with patch("timmy.loop_qa.log_event"):
+ with patch(
+ "timmy.loop_qa.probe_tool_use",
+ new_callable=AsyncMock,
+ return_value={
+ "success": True,
+ "capability": cap,
+ "details": "ok",
+ "error_type": None,
+ },
+ ):
+ await orch.run_next_test()
+
+ assert orch._failure_counts[cap] == 0
+ assert cap not in orch._proposal_filed
+
+
+@pytest.mark.asyncio
+async def test_upgrade_proposal_filed_at_threshold():
+ """When failures reach threshold, create_task should be called."""
+ from timmy.loop_qa import Capability, LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ cap = Capability.TOOL_USE
+ orch._failure_counts[cap] = 2 # One more failure hits threshold of 3
+
+ with patch("timmy.loop_qa.log_event"):
+ with patch("timmy.loop_qa.create_task") as mock_create:
+ with patch(
+ "timmy.loop_qa.probe_tool_use",
+ new_callable=AsyncMock,
+ return_value={
+ "success": False,
+ "capability": cap,
+ "details": "empty stdout",
+ "error_type": "AssertionError",
+ },
+ ):
+ await orch.run_next_test()
+
+ mock_create.assert_called_once()
+ call_kwargs = mock_create.call_args
+ assert "TOOL_USE" in call_kwargs[1]["title"] or "TOOL_USE" in str(call_kwargs)
+ assert cap in orch._proposal_filed
+
+
+@pytest.mark.asyncio
+async def test_upgrade_proposal_not_refiled():
+ """Once a proposal is filed, it should not be filed again."""
+ from timmy.loop_qa import Capability, LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ cap = Capability.TOOL_USE
+ orch._failure_counts[cap] = 5
+ orch._proposal_filed.add(cap) # Already filed
+
+ with patch("timmy.loop_qa.log_event"):
+ with patch("timmy.loop_qa.create_task") as mock_create:
+ with patch(
+ "timmy.loop_qa.probe_tool_use",
+ new_callable=AsyncMock,
+ return_value={
+ "success": False,
+ "capability": cap,
+ "details": "still broken",
+ "error_type": "RuntimeError",
+ },
+ ):
+ await orch.run_next_test()
+
+ mock_create.assert_not_called()
+
+
+@pytest.mark.asyncio
+async def test_graceful_on_probe_crash():
+ """If a probe raises unexpectedly, orchestrator should not crash."""
+ from timmy.loop_qa import LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+
+ with patch("timmy.loop_qa.log_event"):
+ with patch("timmy.loop_qa.capture_error"):
+ with patch(
+ "timmy.loop_qa.probe_tool_use",
+ new_callable=AsyncMock,
+ side_effect=Exception("probe exploded"),
+ ):
+ result = await orch.run_next_test()
+
+ # Should return a failure result, not raise
+ assert result is not None
+ assert result["success"] is False
+
+
+# ---------------------------------------------------------------------------
+# Health snapshot tests
+# ---------------------------------------------------------------------------
+
+
+def test_health_snapshot_all_green():
+ """Snapshot should show green when all counters are 0."""
+ from timmy.loop_qa import LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ snapshot = orch.get_health_snapshot()
+
+ assert snapshot["overall_status"] == "green"
+ assert all(c["status"] == "green" for c in snapshot["capabilities"])
+
+
+def test_health_snapshot_mixed_statuses():
+ """Snapshot should correctly map different failure counts."""
+ from timmy.loop_qa import Capability, LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ orch._failure_counts[Capability.TOOL_USE] = 2 # yellow
+ orch._failure_counts[Capability.MEMORY_READ] = 5 # red
+
+ snapshot = orch.get_health_snapshot()
+
+ by_cap = {c["capability"]: c["status"] for c in snapshot["capabilities"]}
+ assert by_cap[Capability.TOOL_USE] == "yellow"
+ assert by_cap[Capability.MEMORY_READ] == "red"
+ assert by_cap[Capability.LIGHTNING_ECON] == "green"
+
+
+def test_health_snapshot_overall_worst():
+ """overall_status should be the worst of all capabilities."""
+ from timmy.loop_qa import Capability, LoopQAOrchestrator
+
+ orch = LoopQAOrchestrator()
+ orch._failure_counts[Capability.TOOL_USE] = 2 # yellow
+
+ snapshot = orch.get_health_snapshot()
+ assert snapshot["overall_status"] == "yellow"
+
+ orch._failure_counts[Capability.MEMORY_WRITE] = 5 # red
+ snapshot = orch.get_health_snapshot()
+ assert snapshot["overall_status"] == "red"
+
+
+# ---------------------------------------------------------------------------
+# Dashboard route tests
+# ---------------------------------------------------------------------------
+
+
+def test_loop_qa_health_json(client):
+ """GET /health/loop-qa should return 200 with snapshot JSON."""
+ resp = client.get("/health/loop-qa")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "overall_status" in data
+ assert "capabilities" in data
+ assert len(data["capabilities"]) == 6
+
+
+def test_loop_qa_health_partial(client):
+ """GET /health/loop-qa/partial should return 200 with HTML."""
+ resp = client.get("/health/loop-qa/partial")
+ assert resp.status_code == 200
+ assert "text/html" in resp.headers["content-type"]