feat: add sovereignty metrics tracking and dashboard panel (#981)
Some checks failed
Tests / lint (pull_request) Failing after 13s
Tests / test (pull_request) Has been skipped

Add SQLite-backed sovereignty metrics store that tracks research
sovereignty progress (cache hit rate, API cost, time-to-report,
human involvement, local artifacts) against graduation targets.

- infrastructure/sovereignty_metrics.py: metrics store with alert
  thresholds, trend data, and event bus integration
- dashboard/routes/sovereignty_metrics.py: JSON API + HTMX partial
- Sovereignty metrics panel on Mission Control dashboard
- Config setting for API cost alert threshold
- 17 tests covering store, alerts, emit, and route endpoints

Fixes #981

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-03-23 10:08:24 -04:00
parent fc53a33361
commit 66ed9b7787
9 changed files with 636 additions and 1 deletions

View File

@@ -152,6 +152,10 @@ class Settings(BaseSettings):
# Default is False (telemetry disabled) to align with sovereign AI vision.
telemetry_enabled: bool = False
# ── Sovereignty Metrics ──────────────────────────────────────────────
# Alert when API cost per research task exceeds this threshold (USD).
sovereignty_api_cost_alert_threshold: float = 1.00
# CORS allowed origins for the web chat interface (Gitea Pages, etc.)
# Set CORS_ORIGINS as a comma-separated list, e.g. "http://localhost:3000,https://example.com"
cors_origins: list[str] = [

View File

@@ -45,6 +45,7 @@ from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -631,6 +632,7 @@ app.include_router(tower_router)
app.include_router(daily_run_router)
app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)
@app.websocket("/ws")

View File

@@ -0,0 +1,74 @@
"""Sovereignty metrics dashboard routes.
Provides API endpoints and HTMX partials for tracking research
sovereignty progress against graduation targets.
Refs: #981
"""
import logging
from typing import Any
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from config import settings
from dashboard.templating import templates
from infrastructure.sovereignty_metrics import (
GRADUATION_TARGETS,
get_sovereignty_store,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/sovereignty", tags=["sovereignty"])
@router.get("/metrics")
async def sovereignty_metrics_api() -> dict[str, Any]:
"""JSON API: full sovereignty metrics summary with trends."""
store = get_sovereignty_store()
summary = store.get_summary()
alerts = store.get_alerts(unacknowledged_only=True)
return {
"metrics": summary,
"alerts": alerts,
"targets": GRADUATION_TARGETS,
"cost_threshold": settings.sovereignty_api_cost_alert_threshold,
}
@router.get("/metrics/panel", response_class=HTMLResponse)
async def sovereignty_metrics_panel(request: Request) -> HTMLResponse:
"""HTMX partial: sovereignty metrics progress panel."""
store = get_sovereignty_store()
summary = store.get_summary()
alerts = store.get_alerts(unacknowledged_only=True)
return templates.TemplateResponse(
request,
"partials/sovereignty_metrics.html",
{
"metrics": summary,
"alerts": alerts,
"targets": GRADUATION_TARGETS,
},
)
@router.get("/alerts")
async def sovereignty_alerts_api() -> dict[str, Any]:
"""JSON API: sovereignty alerts."""
store = get_sovereignty_store()
return {
"alerts": store.get_alerts(unacknowledged_only=False),
"unacknowledged": store.get_alerts(unacknowledged_only=True),
}
@router.post("/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: int) -> dict[str, bool]:
"""Acknowledge a sovereignty alert."""
store = get_sovereignty_store()
success = store.acknowledge_alert(alert_id)
return {"success": success}

View File

@@ -179,6 +179,13 @@
</div>
</div>
<!-- Sovereignty Metrics -->
{% call panel("SOVEREIGNTY METRICS", id="sovereignty-metrics-panel",
hx_get="/sovereignty/metrics/panel",
hx_trigger="load, every 30s") %}
<p class="chat-history-placeholder">Loading sovereignty metrics...</p>
{% endcall %}
<!-- Chat History -->
<div class="card mc-card-spaced">
<div class="card-header">

View File

@@ -0,0 +1,63 @@
{# HTMX partial: Sovereignty Metrics Progress Panel
Loaded via hx-get="/sovereignty/metrics/panel"
Refs: #981
#}
{% set phase_labels = {"pre-start": "Pre-start", "week1": "Week 1", "month1": "Month 1", "month3": "Month 3", "graduated": "Graduated"} %}
{% set phase_colors = {"pre-start": "var(--text-dim)", "week1": "var(--red)", "month1": "var(--amber)", "month3": "var(--green)", "graduated": "var(--purple)"} %}
{% set metric_labels = {
"cache_hit_rate": "Cache Hit Rate",
"api_cost": "API Cost / Task",
"time_to_report": "Time to Report",
"human_involvement": "Human Involvement",
"local_artifacts": "Local Artifacts"
} %}
{% set metric_units = {
"cache_hit_rate": "%",
"api_cost": "$",
"time_to_report": "min",
"human_involvement": "%",
"local_artifacts": ""
} %}
{% if alerts %}
<div class="sov-alerts">
{% for alert in alerts %}
<div class="sov-alert-item">
<span class="sov-alert-icon">!</span>
<span>{{ alert.message }}</span>
</div>
{% endfor %}
</div>
{% endif %}
<div class="grid grid-3">
{% for key, data in metrics.items() %}
{% set label = metric_labels.get(key, key) %}
{% set unit = metric_units.get(key, "") %}
{% set phase = data.phase %}
{% set color = phase_colors.get(phase, "var(--text-dim)") %}
<div class="stat">
<div class="stat-value" style="color: {{ color }}">
{% if data.current is not none %}
{% if key == "cache_hit_rate" or key == "human_involvement" %}
{{ "%.0f"|format(data.current * 100) }}{{ unit }}
{% elif key == "api_cost" %}
{{ unit }}{{ "%.2f"|format(data.current) }}
{% elif key == "time_to_report" %}
{{ "%.1f"|format(data.current) }}{{ unit }}
{% else %}
{{ data.current|int }}
{% endif %}
{% else %}
--
{% endif %}
</div>
<div class="stat-label">{{ label }}</div>
<div class="stat-label" style="font-size: 0.7rem; color: {{ color }}">
{{ phase_labels.get(phase, phase) }}
</div>
</div>
{% endfor %}
</div>

View File

@@ -0,0 +1,307 @@
"""Sovereignty metrics collector and store.
Tracks research sovereignty progress: cache hit rate, API cost,
time-to-report, and human involvement. Persists to SQLite for
trend analysis and dashboard display.
Refs: #981
"""
import json
import logging
import sqlite3
from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
DB_PATH = Path(settings.repo_root) / "data" / "sovereignty_metrics.db"
_SCHEMA = """
CREATE TABLE IF NOT EXISTS sovereignty_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
metric_type TEXT NOT NULL,
value REAL NOT NULL,
metadata TEXT DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_sm_type ON sovereignty_metrics(metric_type);
CREATE INDEX IF NOT EXISTS idx_sm_ts ON sovereignty_metrics(timestamp);
CREATE TABLE IF NOT EXISTS sovereignty_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
alert_type TEXT NOT NULL,
message TEXT NOT NULL,
value REAL NOT NULL,
threshold REAL NOT NULL,
acknowledged INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_sa_ts ON sovereignty_alerts(timestamp);
CREATE INDEX IF NOT EXISTS idx_sa_ack ON sovereignty_alerts(acknowledged);
"""
@dataclass
class SovereigntyMetric:
"""A single sovereignty metric data point."""
metric_type: str # cache_hit_rate, api_cost, time_to_report, human_involvement
value: float
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class SovereigntyAlert:
"""An alert triggered when a metric exceeds a threshold."""
alert_type: str
message: str
value: float
threshold: float
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
acknowledged: bool = False
# Graduation targets from issue #981
GRADUATION_TARGETS = {
"cache_hit_rate": {"week1": 0.10, "month1": 0.40, "month3": 0.80, "graduation": 0.90},
"api_cost": {"week1": 1.50, "month1": 0.50, "month3": 0.10, "graduation": 0.01},
"time_to_report": {"week1": 180.0, "month1": 30.0, "month3": 5.0, "graduation": 1.0},
"human_involvement": {"week1": 1.0, "month1": 0.5, "month3": 0.25, "graduation": 0.0},
"local_artifacts": {"week1": 6, "month1": 30, "month3": 100, "graduation": 500},
}
class SovereigntyMetricsStore:
"""SQLite-backed sovereignty metrics store.
Thread-safe: creates a new connection per operation.
"""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or DB_PATH
self._init_db()
def _init_db(self) -> None:
"""Initialize the database schema."""
try:
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self._db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.executescript(_SCHEMA)
conn.commit()
except Exception as exc:
logger.warning("Failed to initialize sovereignty metrics DB: %s", exc)
def _connect(self) -> sqlite3.Connection:
"""Get a new connection."""
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
return conn
def record(self, metric: SovereigntyMetric) -> None:
"""Record a sovereignty metric data point."""
try:
with closing(self._connect()) as conn:
conn.execute(
"INSERT INTO sovereignty_metrics (timestamp, metric_type, value, metadata) "
"VALUES (?, ?, ?, ?)",
(
metric.timestamp,
metric.metric_type,
metric.value,
json.dumps(metric.metadata),
),
)
conn.commit()
except Exception as exc:
logger.warning("Failed to record sovereignty metric: %s", exc)
# Check thresholds for alerts
self._check_alert(metric)
def _check_alert(self, metric: SovereigntyMetric) -> None:
"""Check if a metric triggers an alert."""
threshold = settings.sovereignty_api_cost_alert_threshold
if metric.metric_type == "api_cost" and metric.value > threshold:
alert = SovereigntyAlert(
alert_type="api_cost_exceeded",
message=f"API cost ${metric.value:.2f} exceeds threshold ${threshold:.2f}",
value=metric.value,
threshold=threshold,
)
self._record_alert(alert)
def _record_alert(self, alert: SovereigntyAlert) -> None:
"""Persist an alert."""
try:
with closing(self._connect()) as conn:
conn.execute(
"INSERT INTO sovereignty_alerts "
"(timestamp, alert_type, message, value, threshold) "
"VALUES (?, ?, ?, ?, ?)",
(
alert.timestamp,
alert.alert_type,
alert.message,
alert.value,
alert.threshold,
),
)
conn.commit()
logger.warning("Sovereignty alert: %s", alert.message)
except Exception as exc:
logger.warning("Failed to record sovereignty alert: %s", exc)
def get_latest(self, metric_type: str, limit: int = 50) -> list[dict]:
"""Get the most recent metric values for a given type."""
try:
with closing(self._connect()) as conn:
rows = conn.execute(
"SELECT timestamp, value, metadata FROM sovereignty_metrics "
"WHERE metric_type = ? ORDER BY timestamp DESC LIMIT ?",
(metric_type, limit),
).fetchall()
return [
{
"timestamp": row["timestamp"],
"value": row["value"],
"metadata": json.loads(row["metadata"]) if row["metadata"] else {},
}
for row in rows
]
except Exception as exc:
logger.warning("Failed to query sovereignty metrics: %s", exc)
return []
def get_summary(self) -> dict[str, Any]:
"""Get a summary of current sovereignty metrics progress."""
summary: dict[str, Any] = {}
for metric_type in GRADUATION_TARGETS:
latest = self.get_latest(metric_type, limit=1)
history = self.get_latest(metric_type, limit=30)
current_value = latest[0]["value"] if latest else None
targets = GRADUATION_TARGETS[metric_type]
# Determine current phase based on value
phase = "pre-start"
if current_value is not None:
if metric_type in ("api_cost", "time_to_report", "human_involvement"):
# Lower is better
if current_value <= targets["graduation"]:
phase = "graduated"
elif current_value <= targets["month3"]:
phase = "month3"
elif current_value <= targets["month1"]:
phase = "month1"
elif current_value <= targets["week1"]:
phase = "week1"
else:
phase = "pre-start"
else:
# Higher is better
if current_value >= targets["graduation"]:
phase = "graduated"
elif current_value >= targets["month3"]:
phase = "month3"
elif current_value >= targets["month1"]:
phase = "month1"
elif current_value >= targets["week1"]:
phase = "week1"
else:
phase = "pre-start"
summary[metric_type] = {
"current": current_value,
"phase": phase,
"targets": targets,
"trend": [{"t": h["timestamp"], "v": h["value"]} for h in reversed(history)],
}
return summary
def get_alerts(self, unacknowledged_only: bool = True, limit: int = 20) -> list[dict]:
"""Get sovereignty alerts."""
try:
with closing(self._connect()) as conn:
if unacknowledged_only:
rows = conn.execute(
"SELECT * FROM sovereignty_alerts "
"WHERE acknowledged = 0 ORDER BY timestamp DESC LIMIT ?",
(limit,),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM sovereignty_alerts "
"ORDER BY timestamp DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(row) for row in rows]
except Exception as exc:
logger.warning("Failed to query sovereignty alerts: %s", exc)
return []
def acknowledge_alert(self, alert_id: int) -> bool:
"""Acknowledge an alert."""
try:
with closing(self._connect()) as conn:
conn.execute(
"UPDATE sovereignty_alerts SET acknowledged = 1 WHERE id = ?",
(alert_id,),
)
conn.commit()
return True
except Exception as exc:
logger.warning("Failed to acknowledge alert: %s", exc)
return False
# ── Module-level singleton ─────────────────────────────────────────────────
_store: SovereigntyMetricsStore | None = None
def get_sovereignty_store() -> SovereigntyMetricsStore:
"""Return the module-level store, creating it on first access."""
global _store
if _store is None:
_store = SovereigntyMetricsStore()
return _store
async def emit_sovereignty_metric(
metric_type: str,
value: float,
metadata: dict[str, Any] | None = None,
) -> None:
"""Convenience function to record a sovereignty metric and emit an event.
Also publishes to the event bus for real-time subscribers.
"""
import asyncio
from infrastructure.events.bus import emit
metric = SovereigntyMetric(
metric_type=metric_type,
value=value,
metadata=metadata or {},
)
# Record to SQLite in thread to avoid blocking event loop
await asyncio.to_thread(get_sovereignty_store().record, metric)
# Publish to event bus for real-time consumers
await emit(
f"sovereignty.metric.{metric_type}",
source="sovereignty_metrics",
data={"metric_type": metric_type, "value": value, **(metadata or {})},
)

View File

@@ -147,10 +147,12 @@ def clean_database(tmp_path):
# IMPORTANT: swarm.task_queue.models also has a DB_PATH that writes to
# tasks.db — it MUST be patched too, or error_capture.capture_error()
# will write test data to the production database.
tmp_sovereignty_db = tmp_path / "sovereignty_metrics.db"
for mod_name, tmp_db in [
("dashboard.routes.tasks", tmp_tasks_db),
("dashboard.routes.work_orders", tmp_work_orders_db),
("swarm.task_queue.models", tmp_tasks_db),
("infrastructure.sovereignty_metrics", tmp_sovereignty_db),
]:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])

View File

@@ -14,7 +14,6 @@ from infrastructure.guards.moderation import (
get_moderator,
)
# ── Unit tests for data types ────────────────────────────────────────────────

View File

@@ -0,0 +1,177 @@
"""Tests for the sovereignty metrics store and API routes.
Refs: #981
"""
from unittest.mock import AsyncMock, patch
import pytest
from infrastructure.sovereignty_metrics import (
GRADUATION_TARGETS,
SovereigntyMetric,
SovereigntyMetricsStore,
emit_sovereignty_metric,
)
@pytest.fixture
def store(tmp_path):
"""Create a fresh sovereignty metrics store with a temp DB."""
return SovereigntyMetricsStore(db_path=tmp_path / "test_sov.db")
class TestSovereigntyMetricsStore:
def test_record_and_get_latest(self, store):
metric = SovereigntyMetric(metric_type="cache_hit_rate", value=0.42)
store.record(metric)
results = store.get_latest("cache_hit_rate", limit=10)
assert len(results) == 1
assert results[0]["value"] == 0.42
def test_get_latest_returns_most_recent_first(self, store):
for val in [0.1, 0.2, 0.3]:
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=val))
results = store.get_latest("cache_hit_rate", limit=10)
assert len(results) == 3
assert results[0]["value"] == 0.3 # most recent first
def test_get_latest_respects_limit(self, store):
for i in range(10):
store.record(SovereigntyMetric(metric_type="api_cost", value=float(i)))
results = store.get_latest("api_cost", limit=3)
assert len(results) == 3
def test_get_latest_filters_by_type(self, store):
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=0.5))
store.record(SovereigntyMetric(metric_type="api_cost", value=1.20))
results = store.get_latest("cache_hit_rate")
assert len(results) == 1
assert results[0]["value"] == 0.5
def test_get_summary_empty(self, store):
summary = store.get_summary()
assert "cache_hit_rate" in summary
assert summary["cache_hit_rate"]["current"] is None
assert summary["cache_hit_rate"]["phase"] == "pre-start"
def test_get_summary_with_data(self, store):
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=0.85))
store.record(SovereigntyMetric(metric_type="api_cost", value=0.08))
summary = store.get_summary()
assert summary["cache_hit_rate"]["current"] == 0.85
assert summary["cache_hit_rate"]["phase"] == "month3"
assert summary["api_cost"]["current"] == 0.08
assert summary["api_cost"]["phase"] == "month3"
def test_get_summary_graduation(self, store):
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=0.95))
summary = store.get_summary()
assert summary["cache_hit_rate"]["phase"] == "graduated"
def test_alert_on_high_api_cost(self, store):
"""API cost above threshold triggers an alert."""
with patch("infrastructure.sovereignty_metrics.settings") as mock_settings:
mock_settings.sovereignty_api_cost_alert_threshold = 1.00
mock_settings.db_busy_timeout_ms = 5000
store.record(SovereigntyMetric(metric_type="api_cost", value=2.50))
alerts = store.get_alerts(unacknowledged_only=True)
assert len(alerts) == 1
assert alerts[0]["alert_type"] == "api_cost_exceeded"
assert alerts[0]["value"] == 2.50
def test_no_alert_below_threshold(self, store):
"""API cost below threshold does not trigger an alert."""
with patch("infrastructure.sovereignty_metrics.settings") as mock_settings:
mock_settings.sovereignty_api_cost_alert_threshold = 1.00
mock_settings.db_busy_timeout_ms = 5000
store.record(SovereigntyMetric(metric_type="api_cost", value=0.50))
alerts = store.get_alerts(unacknowledged_only=True)
assert len(alerts) == 0
def test_acknowledge_alert(self, store):
with patch("infrastructure.sovereignty_metrics.settings") as mock_settings:
mock_settings.sovereignty_api_cost_alert_threshold = 0.50
mock_settings.db_busy_timeout_ms = 5000
store.record(SovereigntyMetric(metric_type="api_cost", value=1.00))
alerts = store.get_alerts(unacknowledged_only=True)
assert len(alerts) == 1
store.acknowledge_alert(alerts[0]["id"])
assert len(store.get_alerts(unacknowledged_only=True)) == 0
assert len(store.get_alerts(unacknowledged_only=False)) == 1
def test_metadata_preserved(self, store):
store.record(
SovereigntyMetric(
metric_type="cache_hit_rate",
value=0.5,
metadata={"source": "research_orchestrator"},
)
)
results = store.get_latest("cache_hit_rate")
assert results[0]["metadata"]["source"] == "research_orchestrator"
def test_summary_trend_data(self, store):
for v in [0.1, 0.2, 0.3]:
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=v))
summary = store.get_summary()
trend = summary["cache_hit_rate"]["trend"]
assert len(trend) == 3
assert trend[0]["v"] == 0.1 # oldest first (reversed)
assert trend[-1]["v"] == 0.3
def test_graduation_targets_complete(self):
"""All expected metric types have graduation targets."""
expected = {"cache_hit_rate", "api_cost", "time_to_report", "human_involvement", "local_artifacts"}
assert set(GRADUATION_TARGETS.keys()) == expected
class TestEmitSovereigntyMetric:
@pytest.mark.asyncio
async def test_emit_records_and_publishes(self, tmp_path):
"""emit_sovereignty_metric records to store and publishes event."""
with (
patch("infrastructure.sovereignty_metrics._store", None),
patch(
"infrastructure.sovereignty_metrics.DB_PATH",
tmp_path / "emit_test.db",
),
patch("infrastructure.events.bus.emit", new_callable=AsyncMock) as mock_emit,
):
await emit_sovereignty_metric("cache_hit_rate", 0.75, {"source": "test"})
mock_emit.assert_called_once()
call_args = mock_emit.call_args
assert call_args[0][0] == "sovereignty.metric.cache_hit_rate"
class TestSovereigntyMetricsRoutes:
def test_metrics_api_returns_200(self, client):
response = client.get("/sovereignty/metrics")
assert response.status_code == 200
data = response.json()
assert "metrics" in data
assert "alerts" in data
assert "targets" in data
def test_metrics_panel_returns_html(self, client):
response = client.get("/sovereignty/metrics/panel")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
def test_alerts_api_returns_200(self, client):
response = client.get("/sovereignty/alerts")
assert response.status_code == 200
data = response.json()
assert "alerts" in data
assert "unacknowledged" in data