diff --git a/src/config.py b/src/config.py index c213e563..192c44e7 100644 --- a/src/config.py +++ b/src/config.py @@ -152,6 +152,10 @@ class Settings(BaseSettings): # Default is False (telemetry disabled) to align with sovereign AI vision. telemetry_enabled: bool = False + # ── Sovereignty Metrics ────────────────────────────────────────────── + # Alert when API cost per research task exceeds this threshold (USD). + sovereignty_api_cost_alert_threshold: float = 1.00 + # CORS allowed origins for the web chat interface (Gitea Pages, etc.) # Set CORS_ORIGINS as a comma-separated list, e.g. "http://localhost:3000,https://example.com" cors_origins: list[str] = [ diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 7e1ccba9..042b9965 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -45,6 +45,7 @@ from dashboard.routes.models import api_router as models_api_router from dashboard.routes.models import router as models_router from dashboard.routes.quests import router as quests_router from dashboard.routes.scorecards import router as scorecards_router +from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router from dashboard.routes.spark import router as spark_router from dashboard.routes.system import router as system_router from dashboard.routes.tasks import router as tasks_router @@ -631,6 +632,7 @@ app.include_router(tower_router) app.include_router(daily_run_router) app.include_router(quests_router) app.include_router(scorecards_router) +app.include_router(sovereignty_metrics_router) @app.websocket("/ws") diff --git a/src/dashboard/routes/sovereignty_metrics.py b/src/dashboard/routes/sovereignty_metrics.py new file mode 100644 index 00000000..3bffe95f --- /dev/null +++ b/src/dashboard/routes/sovereignty_metrics.py @@ -0,0 +1,74 @@ +"""Sovereignty metrics dashboard routes. + +Provides API endpoints and HTMX partials for tracking research +sovereignty progress against graduation targets. + +Refs: #981 +""" + +import logging +from typing import Any + +from fastapi import APIRouter, Request +from fastapi.responses import HTMLResponse + +from config import settings +from dashboard.templating import templates +from infrastructure.sovereignty_metrics import ( + GRADUATION_TARGETS, + get_sovereignty_store, +) + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/sovereignty", tags=["sovereignty"]) + + +@router.get("/metrics") +async def sovereignty_metrics_api() -> dict[str, Any]: + """JSON API: full sovereignty metrics summary with trends.""" + store = get_sovereignty_store() + summary = store.get_summary() + alerts = store.get_alerts(unacknowledged_only=True) + return { + "metrics": summary, + "alerts": alerts, + "targets": GRADUATION_TARGETS, + "cost_threshold": settings.sovereignty_api_cost_alert_threshold, + } + + +@router.get("/metrics/panel", response_class=HTMLResponse) +async def sovereignty_metrics_panel(request: Request) -> HTMLResponse: + """HTMX partial: sovereignty metrics progress panel.""" + store = get_sovereignty_store() + summary = store.get_summary() + alerts = store.get_alerts(unacknowledged_only=True) + + return templates.TemplateResponse( + request, + "partials/sovereignty_metrics.html", + { + "metrics": summary, + "alerts": alerts, + "targets": GRADUATION_TARGETS, + }, + ) + + +@router.get("/alerts") +async def sovereignty_alerts_api() -> dict[str, Any]: + """JSON API: sovereignty alerts.""" + store = get_sovereignty_store() + return { + "alerts": store.get_alerts(unacknowledged_only=False), + "unacknowledged": store.get_alerts(unacknowledged_only=True), + } + + +@router.post("/alerts/{alert_id}/acknowledge") +async def acknowledge_alert(alert_id: int) -> dict[str, bool]: + """Acknowledge a sovereignty alert.""" + store = get_sovereignty_store() + success = store.acknowledge_alert(alert_id) + return {"success": success} diff --git a/src/dashboard/templates/mission_control.html b/src/dashboard/templates/mission_control.html index 27acbd15..a090ff5b 100644 --- a/src/dashboard/templates/mission_control.html +++ b/src/dashboard/templates/mission_control.html @@ -179,6 +179,13 @@ + +{% call panel("SOVEREIGNTY METRICS", id="sovereignty-metrics-panel", + hx_get="/sovereignty/metrics/panel", + hx_trigger="load, every 30s") %} +

Loading sovereignty metrics...

+{% endcall %} +
diff --git a/src/dashboard/templates/partials/sovereignty_metrics.html b/src/dashboard/templates/partials/sovereignty_metrics.html new file mode 100644 index 00000000..3ef004fc --- /dev/null +++ b/src/dashboard/templates/partials/sovereignty_metrics.html @@ -0,0 +1,63 @@ +{# HTMX partial: Sovereignty Metrics Progress Panel + Loaded via hx-get="/sovereignty/metrics/panel" + Refs: #981 +#} +{% set phase_labels = {"pre-start": "Pre-start", "week1": "Week 1", "month1": "Month 1", "month3": "Month 3", "graduated": "Graduated"} %} +{% set phase_colors = {"pre-start": "var(--text-dim)", "week1": "var(--red)", "month1": "var(--amber)", "month3": "var(--green)", "graduated": "var(--purple)"} %} + +{% set metric_labels = { + "cache_hit_rate": "Cache Hit Rate", + "api_cost": "API Cost / Task", + "time_to_report": "Time to Report", + "human_involvement": "Human Involvement", + "local_artifacts": "Local Artifacts" +} %} + +{% set metric_units = { + "cache_hit_rate": "%", + "api_cost": "$", + "time_to_report": "min", + "human_involvement": "%", + "local_artifacts": "" +} %} + +{% if alerts %} +
+ {% for alert in alerts %} +
+ ! + {{ alert.message }} +
+ {% endfor %} +
+{% endif %} + +
+{% for key, data in metrics.items() %} + {% set label = metric_labels.get(key, key) %} + {% set unit = metric_units.get(key, "") %} + {% set phase = data.phase %} + {% set color = phase_colors.get(phase, "var(--text-dim)") %} +
+
+ {% if data.current is not none %} + {% if key == "cache_hit_rate" or key == "human_involvement" %} + {{ "%.0f"|format(data.current * 100) }}{{ unit }} + {% elif key == "api_cost" %} + {{ unit }}{{ "%.2f"|format(data.current) }} + {% elif key == "time_to_report" %} + {{ "%.1f"|format(data.current) }}{{ unit }} + {% else %} + {{ data.current|int }} + {% endif %} + {% else %} + -- + {% endif %} +
+
{{ label }}
+
+ {{ phase_labels.get(phase, phase) }} +
+
+{% endfor %} +
diff --git a/src/infrastructure/sovereignty_metrics.py b/src/infrastructure/sovereignty_metrics.py new file mode 100644 index 00000000..a305fa65 --- /dev/null +++ b/src/infrastructure/sovereignty_metrics.py @@ -0,0 +1,307 @@ +"""Sovereignty metrics collector and store. + +Tracks research sovereignty progress: cache hit rate, API cost, +time-to-report, and human involvement. Persists to SQLite for +trend analysis and dashboard display. + +Refs: #981 +""" + +import json +import logging +import sqlite3 +from contextlib import closing +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from config import settings + +logger = logging.getLogger(__name__) + +DB_PATH = Path(settings.repo_root) / "data" / "sovereignty_metrics.db" + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS sovereignty_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + metric_type TEXT NOT NULL, + value REAL NOT NULL, + metadata TEXT DEFAULT '{}' +); +CREATE INDEX IF NOT EXISTS idx_sm_type ON sovereignty_metrics(metric_type); +CREATE INDEX IF NOT EXISTS idx_sm_ts ON sovereignty_metrics(timestamp); + +CREATE TABLE IF NOT EXISTS sovereignty_alerts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + alert_type TEXT NOT NULL, + message TEXT NOT NULL, + value REAL NOT NULL, + threshold REAL NOT NULL, + acknowledged INTEGER DEFAULT 0 +); +CREATE INDEX IF NOT EXISTS idx_sa_ts ON sovereignty_alerts(timestamp); +CREATE INDEX IF NOT EXISTS idx_sa_ack ON sovereignty_alerts(acknowledged); +""" + + +@dataclass +class SovereigntyMetric: + """A single sovereignty metric data point.""" + + metric_type: str # cache_hit_rate, api_cost, time_to_report, human_involvement + value: float + timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + metadata: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class SovereigntyAlert: + """An alert triggered when a metric exceeds a threshold.""" + + alert_type: str + message: str + value: float + threshold: float + timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + acknowledged: bool = False + + +# Graduation targets from issue #981 +GRADUATION_TARGETS = { + "cache_hit_rate": {"week1": 0.10, "month1": 0.40, "month3": 0.80, "graduation": 0.90}, + "api_cost": {"week1": 1.50, "month1": 0.50, "month3": 0.10, "graduation": 0.01}, + "time_to_report": {"week1": 180.0, "month1": 30.0, "month3": 5.0, "graduation": 1.0}, + "human_involvement": {"week1": 1.0, "month1": 0.5, "month3": 0.25, "graduation": 0.0}, + "local_artifacts": {"week1": 6, "month1": 30, "month3": 100, "graduation": 500}, +} + + +class SovereigntyMetricsStore: + """SQLite-backed sovereignty metrics store. + + Thread-safe: creates a new connection per operation. + """ + + def __init__(self, db_path: Path | None = None) -> None: + self._db_path = db_path or DB_PATH + self._init_db() + + def _init_db(self) -> None: + """Initialize the database schema.""" + try: + self._db_path.parent.mkdir(parents=True, exist_ok=True) + with closing(sqlite3.connect(str(self._db_path))) as conn: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") + conn.executescript(_SCHEMA) + conn.commit() + except Exception as exc: + logger.warning("Failed to initialize sovereignty metrics DB: %s", exc) + + def _connect(self) -> sqlite3.Connection: + """Get a new connection.""" + conn = sqlite3.connect(str(self._db_path)) + conn.row_factory = sqlite3.Row + conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") + return conn + + def record(self, metric: SovereigntyMetric) -> None: + """Record a sovereignty metric data point.""" + try: + with closing(self._connect()) as conn: + conn.execute( + "INSERT INTO sovereignty_metrics (timestamp, metric_type, value, metadata) " + "VALUES (?, ?, ?, ?)", + ( + metric.timestamp, + metric.metric_type, + metric.value, + json.dumps(metric.metadata), + ), + ) + conn.commit() + except Exception as exc: + logger.warning("Failed to record sovereignty metric: %s", exc) + + # Check thresholds for alerts + self._check_alert(metric) + + def _check_alert(self, metric: SovereigntyMetric) -> None: + """Check if a metric triggers an alert.""" + threshold = settings.sovereignty_api_cost_alert_threshold + if metric.metric_type == "api_cost" and metric.value > threshold: + alert = SovereigntyAlert( + alert_type="api_cost_exceeded", + message=f"API cost ${metric.value:.2f} exceeds threshold ${threshold:.2f}", + value=metric.value, + threshold=threshold, + ) + self._record_alert(alert) + + def _record_alert(self, alert: SovereigntyAlert) -> None: + """Persist an alert.""" + try: + with closing(self._connect()) as conn: + conn.execute( + "INSERT INTO sovereignty_alerts " + "(timestamp, alert_type, message, value, threshold) " + "VALUES (?, ?, ?, ?, ?)", + ( + alert.timestamp, + alert.alert_type, + alert.message, + alert.value, + alert.threshold, + ), + ) + conn.commit() + logger.warning("Sovereignty alert: %s", alert.message) + except Exception as exc: + logger.warning("Failed to record sovereignty alert: %s", exc) + + def get_latest(self, metric_type: str, limit: int = 50) -> list[dict]: + """Get the most recent metric values for a given type.""" + try: + with closing(self._connect()) as conn: + rows = conn.execute( + "SELECT timestamp, value, metadata FROM sovereignty_metrics " + "WHERE metric_type = ? ORDER BY timestamp DESC LIMIT ?", + (metric_type, limit), + ).fetchall() + return [ + { + "timestamp": row["timestamp"], + "value": row["value"], + "metadata": json.loads(row["metadata"]) if row["metadata"] else {}, + } + for row in rows + ] + except Exception as exc: + logger.warning("Failed to query sovereignty metrics: %s", exc) + return [] + + def get_summary(self) -> dict[str, Any]: + """Get a summary of current sovereignty metrics progress.""" + summary: dict[str, Any] = {} + for metric_type in GRADUATION_TARGETS: + latest = self.get_latest(metric_type, limit=1) + history = self.get_latest(metric_type, limit=30) + + current_value = latest[0]["value"] if latest else None + targets = GRADUATION_TARGETS[metric_type] + + # Determine current phase based on value + phase = "pre-start" + if current_value is not None: + if metric_type in ("api_cost", "time_to_report", "human_involvement"): + # Lower is better + if current_value <= targets["graduation"]: + phase = "graduated" + elif current_value <= targets["month3"]: + phase = "month3" + elif current_value <= targets["month1"]: + phase = "month1" + elif current_value <= targets["week1"]: + phase = "week1" + else: + phase = "pre-start" + else: + # Higher is better + if current_value >= targets["graduation"]: + phase = "graduated" + elif current_value >= targets["month3"]: + phase = "month3" + elif current_value >= targets["month1"]: + phase = "month1" + elif current_value >= targets["week1"]: + phase = "week1" + else: + phase = "pre-start" + + summary[metric_type] = { + "current": current_value, + "phase": phase, + "targets": targets, + "trend": [{"t": h["timestamp"], "v": h["value"]} for h in reversed(history)], + } + + return summary + + def get_alerts(self, unacknowledged_only: bool = True, limit: int = 20) -> list[dict]: + """Get sovereignty alerts.""" + try: + with closing(self._connect()) as conn: + if unacknowledged_only: + rows = conn.execute( + "SELECT * FROM sovereignty_alerts " + "WHERE acknowledged = 0 ORDER BY timestamp DESC LIMIT ?", + (limit,), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM sovereignty_alerts " + "ORDER BY timestamp DESC LIMIT ?", + (limit,), + ).fetchall() + return [dict(row) for row in rows] + except Exception as exc: + logger.warning("Failed to query sovereignty alerts: %s", exc) + return [] + + def acknowledge_alert(self, alert_id: int) -> bool: + """Acknowledge an alert.""" + try: + with closing(self._connect()) as conn: + conn.execute( + "UPDATE sovereignty_alerts SET acknowledged = 1 WHERE id = ?", + (alert_id,), + ) + conn.commit() + return True + except Exception as exc: + logger.warning("Failed to acknowledge alert: %s", exc) + return False + + +# ── Module-level singleton ───────────────────────────────────────────────── +_store: SovereigntyMetricsStore | None = None + + +def get_sovereignty_store() -> SovereigntyMetricsStore: + """Return the module-level store, creating it on first access.""" + global _store + if _store is None: + _store = SovereigntyMetricsStore() + return _store + + +async def emit_sovereignty_metric( + metric_type: str, + value: float, + metadata: dict[str, Any] | None = None, +) -> None: + """Convenience function to record a sovereignty metric and emit an event. + + Also publishes to the event bus for real-time subscribers. + """ + import asyncio + + from infrastructure.events.bus import emit + + metric = SovereigntyMetric( + metric_type=metric_type, + value=value, + metadata=metadata or {}, + ) + # Record to SQLite in thread to avoid blocking event loop + await asyncio.to_thread(get_sovereignty_store().record, metric) + + # Publish to event bus for real-time consumers + await emit( + f"sovereignty.metric.{metric_type}", + source="sovereignty_metrics", + data={"metric_type": metric_type, "value": value, **(metadata or {})}, + ) diff --git a/tests/conftest.py b/tests/conftest.py index 3db5de56..bf684f69 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -147,10 +147,12 @@ def clean_database(tmp_path): # IMPORTANT: swarm.task_queue.models also has a DB_PATH that writes to # tasks.db — it MUST be patched too, or error_capture.capture_error() # will write test data to the production database. + tmp_sovereignty_db = tmp_path / "sovereignty_metrics.db" for mod_name, tmp_db in [ ("dashboard.routes.tasks", tmp_tasks_db), ("dashboard.routes.work_orders", tmp_work_orders_db), ("swarm.task_queue.models", tmp_tasks_db), + ("infrastructure.sovereignty_metrics", tmp_sovereignty_db), ]: try: mod = __import__(mod_name, fromlist=["DB_PATH"]) diff --git a/tests/infrastructure/test_moderation.py b/tests/infrastructure/test_moderation.py index add8c1b5..9ac59129 100644 --- a/tests/infrastructure/test_moderation.py +++ b/tests/infrastructure/test_moderation.py @@ -14,7 +14,6 @@ from infrastructure.guards.moderation import ( get_moderator, ) - # ── Unit tests for data types ──────────────────────────────────────────────── diff --git a/tests/infrastructure/test_sovereignty_metrics.py b/tests/infrastructure/test_sovereignty_metrics.py new file mode 100644 index 00000000..8acb4a0a --- /dev/null +++ b/tests/infrastructure/test_sovereignty_metrics.py @@ -0,0 +1,177 @@ +"""Tests for the sovereignty metrics store and API routes. + +Refs: #981 +""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from infrastructure.sovereignty_metrics import ( + GRADUATION_TARGETS, + SovereigntyMetric, + SovereigntyMetricsStore, + emit_sovereignty_metric, +) + + +@pytest.fixture +def store(tmp_path): + """Create a fresh sovereignty metrics store with a temp DB.""" + return SovereigntyMetricsStore(db_path=tmp_path / "test_sov.db") + + +class TestSovereigntyMetricsStore: + def test_record_and_get_latest(self, store): + metric = SovereigntyMetric(metric_type="cache_hit_rate", value=0.42) + store.record(metric) + + results = store.get_latest("cache_hit_rate", limit=10) + assert len(results) == 1 + assert results[0]["value"] == 0.42 + + def test_get_latest_returns_most_recent_first(self, store): + for val in [0.1, 0.2, 0.3]: + store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=val)) + + results = store.get_latest("cache_hit_rate", limit=10) + assert len(results) == 3 + assert results[0]["value"] == 0.3 # most recent first + + def test_get_latest_respects_limit(self, store): + for i in range(10): + store.record(SovereigntyMetric(metric_type="api_cost", value=float(i))) + + results = store.get_latest("api_cost", limit=3) + assert len(results) == 3 + + def test_get_latest_filters_by_type(self, store): + store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=0.5)) + store.record(SovereigntyMetric(metric_type="api_cost", value=1.20)) + + results = store.get_latest("cache_hit_rate") + assert len(results) == 1 + assert results[0]["value"] == 0.5 + + def test_get_summary_empty(self, store): + summary = store.get_summary() + assert "cache_hit_rate" in summary + assert summary["cache_hit_rate"]["current"] is None + assert summary["cache_hit_rate"]["phase"] == "pre-start" + + def test_get_summary_with_data(self, store): + store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=0.85)) + store.record(SovereigntyMetric(metric_type="api_cost", value=0.08)) + + summary = store.get_summary() + assert summary["cache_hit_rate"]["current"] == 0.85 + assert summary["cache_hit_rate"]["phase"] == "month3" + assert summary["api_cost"]["current"] == 0.08 + assert summary["api_cost"]["phase"] == "month3" + + def test_get_summary_graduation(self, store): + store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=0.95)) + summary = store.get_summary() + assert summary["cache_hit_rate"]["phase"] == "graduated" + + def test_alert_on_high_api_cost(self, store): + """API cost above threshold triggers an alert.""" + with patch("infrastructure.sovereignty_metrics.settings") as mock_settings: + mock_settings.sovereignty_api_cost_alert_threshold = 1.00 + mock_settings.db_busy_timeout_ms = 5000 + store.record(SovereigntyMetric(metric_type="api_cost", value=2.50)) + + alerts = store.get_alerts(unacknowledged_only=True) + assert len(alerts) == 1 + assert alerts[0]["alert_type"] == "api_cost_exceeded" + assert alerts[0]["value"] == 2.50 + + def test_no_alert_below_threshold(self, store): + """API cost below threshold does not trigger an alert.""" + with patch("infrastructure.sovereignty_metrics.settings") as mock_settings: + mock_settings.sovereignty_api_cost_alert_threshold = 1.00 + mock_settings.db_busy_timeout_ms = 5000 + store.record(SovereigntyMetric(metric_type="api_cost", value=0.50)) + + alerts = store.get_alerts(unacknowledged_only=True) + assert len(alerts) == 0 + + def test_acknowledge_alert(self, store): + with patch("infrastructure.sovereignty_metrics.settings") as mock_settings: + mock_settings.sovereignty_api_cost_alert_threshold = 0.50 + mock_settings.db_busy_timeout_ms = 5000 + store.record(SovereigntyMetric(metric_type="api_cost", value=1.00)) + + alerts = store.get_alerts(unacknowledged_only=True) + assert len(alerts) == 1 + + store.acknowledge_alert(alerts[0]["id"]) + assert len(store.get_alerts(unacknowledged_only=True)) == 0 + assert len(store.get_alerts(unacknowledged_only=False)) == 1 + + def test_metadata_preserved(self, store): + store.record( + SovereigntyMetric( + metric_type="cache_hit_rate", + value=0.5, + metadata={"source": "research_orchestrator"}, + ) + ) + results = store.get_latest("cache_hit_rate") + assert results[0]["metadata"]["source"] == "research_orchestrator" + + def test_summary_trend_data(self, store): + for v in [0.1, 0.2, 0.3]: + store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=v)) + + summary = store.get_summary() + trend = summary["cache_hit_rate"]["trend"] + assert len(trend) == 3 + assert trend[0]["v"] == 0.1 # oldest first (reversed) + assert trend[-1]["v"] == 0.3 + + def test_graduation_targets_complete(self): + """All expected metric types have graduation targets.""" + expected = {"cache_hit_rate", "api_cost", "time_to_report", "human_involvement", "local_artifacts"} + assert set(GRADUATION_TARGETS.keys()) == expected + + +class TestEmitSovereigntyMetric: + @pytest.mark.asyncio + async def test_emit_records_and_publishes(self, tmp_path): + """emit_sovereignty_metric records to store and publishes event.""" + with ( + patch("infrastructure.sovereignty_metrics._store", None), + patch( + "infrastructure.sovereignty_metrics.DB_PATH", + tmp_path / "emit_test.db", + ), + patch("infrastructure.events.bus.emit", new_callable=AsyncMock) as mock_emit, + ): + await emit_sovereignty_metric("cache_hit_rate", 0.75, {"source": "test"}) + + mock_emit.assert_called_once() + call_args = mock_emit.call_args + assert call_args[0][0] == "sovereignty.metric.cache_hit_rate" + + +class TestSovereigntyMetricsRoutes: + def test_metrics_api_returns_200(self, client): + response = client.get("/sovereignty/metrics") + assert response.status_code == 200 + data = response.json() + assert "metrics" in data + assert "alerts" in data + assert "targets" in data + + def test_metrics_panel_returns_html(self, client): + response = client.get("/sovereignty/metrics/panel") + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + def test_alerts_api_returns_200(self, client): + response = client.get("/sovereignty/alerts") + assert response.status_code == 200 + data = response.json() + assert "alerts" in data + assert "unacknowledged" in data