forked from Rockachopa/Timmy-time-dashboard
307 lines
11 KiB
Python
307 lines
11 KiB
Python
"""Sovereignty metrics collector and store.
|
|
|
|
Tracks research sovereignty progress: cache hit rate, API cost,
|
|
time-to-report, and human involvement. Persists to SQLite for
|
|
trend analysis and dashboard display.
|
|
|
|
Refs: #981
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
from contextlib import closing
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DB_PATH = Path(settings.repo_root) / "data" / "sovereignty_metrics.db"
|
|
|
|
_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS sovereignty_metrics (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
metric_type TEXT NOT NULL,
|
|
value REAL NOT NULL,
|
|
metadata TEXT DEFAULT '{}'
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_sm_type ON sovereignty_metrics(metric_type);
|
|
CREATE INDEX IF NOT EXISTS idx_sm_ts ON sovereignty_metrics(timestamp);
|
|
|
|
CREATE TABLE IF NOT EXISTS sovereignty_alerts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
alert_type TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
value REAL NOT NULL,
|
|
threshold REAL NOT NULL,
|
|
acknowledged INTEGER DEFAULT 0
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_sa_ts ON sovereignty_alerts(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_sa_ack ON sovereignty_alerts(acknowledged);
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class SovereigntyMetric:
|
|
"""A single sovereignty metric data point."""
|
|
|
|
metric_type: str # cache_hit_rate, api_cost, time_to_report, human_involvement
|
|
value: float
|
|
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
|
metadata: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class SovereigntyAlert:
|
|
"""An alert triggered when a metric exceeds a threshold."""
|
|
|
|
alert_type: str
|
|
message: str
|
|
value: float
|
|
threshold: float
|
|
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
|
acknowledged: bool = False
|
|
|
|
|
|
# Graduation targets from issue #981
|
|
GRADUATION_TARGETS = {
|
|
"cache_hit_rate": {"week1": 0.10, "month1": 0.40, "month3": 0.80, "graduation": 0.90},
|
|
"api_cost": {"week1": 1.50, "month1": 0.50, "month3": 0.10, "graduation": 0.01},
|
|
"time_to_report": {"week1": 180.0, "month1": 30.0, "month3": 5.0, "graduation": 1.0},
|
|
"human_involvement": {"week1": 1.0, "month1": 0.5, "month3": 0.25, "graduation": 0.0},
|
|
"local_artifacts": {"week1": 6, "month1": 30, "month3": 100, "graduation": 500},
|
|
}
|
|
|
|
|
|
class SovereigntyMetricsStore:
|
|
"""SQLite-backed sovereignty metrics store.
|
|
|
|
Thread-safe: creates a new connection per operation.
|
|
"""
|
|
|
|
def __init__(self, db_path: Path | None = None) -> None:
|
|
self._db_path = db_path or DB_PATH
|
|
self._init_db()
|
|
|
|
def _init_db(self) -> None:
|
|
"""Initialize the database schema."""
|
|
try:
|
|
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with closing(sqlite3.connect(str(self._db_path))) as conn:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
|
conn.executescript(_SCHEMA)
|
|
conn.commit()
|
|
except Exception as exc:
|
|
logger.warning("Failed to initialize sovereignty metrics DB: %s", exc)
|
|
|
|
def _connect(self) -> sqlite3.Connection:
|
|
"""Get a new connection."""
|
|
conn = sqlite3.connect(str(self._db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
|
return conn
|
|
|
|
def record(self, metric: SovereigntyMetric) -> None:
|
|
"""Record a sovereignty metric data point."""
|
|
try:
|
|
with closing(self._connect()) as conn:
|
|
conn.execute(
|
|
"INSERT INTO sovereignty_metrics (timestamp, metric_type, value, metadata) "
|
|
"VALUES (?, ?, ?, ?)",
|
|
(
|
|
metric.timestamp,
|
|
metric.metric_type,
|
|
metric.value,
|
|
json.dumps(metric.metadata),
|
|
),
|
|
)
|
|
conn.commit()
|
|
except Exception as exc:
|
|
logger.warning("Failed to record sovereignty metric: %s", exc)
|
|
|
|
# Check thresholds for alerts
|
|
self._check_alert(metric)
|
|
|
|
def _check_alert(self, metric: SovereigntyMetric) -> None:
|
|
"""Check if a metric triggers an alert."""
|
|
threshold = settings.sovereignty_api_cost_alert_threshold
|
|
if metric.metric_type == "api_cost" and metric.value > threshold:
|
|
alert = SovereigntyAlert(
|
|
alert_type="api_cost_exceeded",
|
|
message=f"API cost ${metric.value:.2f} exceeds threshold ${threshold:.2f}",
|
|
value=metric.value,
|
|
threshold=threshold,
|
|
)
|
|
self._record_alert(alert)
|
|
|
|
def _record_alert(self, alert: SovereigntyAlert) -> None:
|
|
"""Persist an alert."""
|
|
try:
|
|
with closing(self._connect()) as conn:
|
|
conn.execute(
|
|
"INSERT INTO sovereignty_alerts "
|
|
"(timestamp, alert_type, message, value, threshold) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(
|
|
alert.timestamp,
|
|
alert.alert_type,
|
|
alert.message,
|
|
alert.value,
|
|
alert.threshold,
|
|
),
|
|
)
|
|
conn.commit()
|
|
logger.warning("Sovereignty alert: %s", alert.message)
|
|
except Exception as exc:
|
|
logger.warning("Failed to record sovereignty alert: %s", exc)
|
|
|
|
def get_latest(self, metric_type: str, limit: int = 50) -> list[dict]:
|
|
"""Get the most recent metric values for a given type."""
|
|
try:
|
|
with closing(self._connect()) as conn:
|
|
rows = conn.execute(
|
|
"SELECT timestamp, value, metadata FROM sovereignty_metrics "
|
|
"WHERE metric_type = ? ORDER BY timestamp DESC LIMIT ?",
|
|
(metric_type, limit),
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"timestamp": row["timestamp"],
|
|
"value": row["value"],
|
|
"metadata": json.loads(row["metadata"]) if row["metadata"] else {},
|
|
}
|
|
for row in rows
|
|
]
|
|
except Exception as exc:
|
|
logger.warning("Failed to query sovereignty metrics: %s", exc)
|
|
return []
|
|
|
|
def get_summary(self) -> dict[str, Any]:
|
|
"""Get a summary of current sovereignty metrics progress."""
|
|
summary: dict[str, Any] = {}
|
|
for metric_type in GRADUATION_TARGETS:
|
|
latest = self.get_latest(metric_type, limit=1)
|
|
history = self.get_latest(metric_type, limit=30)
|
|
|
|
current_value = latest[0]["value"] if latest else None
|
|
targets = GRADUATION_TARGETS[metric_type]
|
|
|
|
# Determine current phase based on value
|
|
phase = "pre-start"
|
|
if current_value is not None:
|
|
if metric_type in ("api_cost", "time_to_report", "human_involvement"):
|
|
# Lower is better
|
|
if current_value <= targets["graduation"]:
|
|
phase = "graduated"
|
|
elif current_value <= targets["month3"]:
|
|
phase = "month3"
|
|
elif current_value <= targets["month1"]:
|
|
phase = "month1"
|
|
elif current_value <= targets["week1"]:
|
|
phase = "week1"
|
|
else:
|
|
phase = "pre-start"
|
|
else:
|
|
# Higher is better
|
|
if current_value >= targets["graduation"]:
|
|
phase = "graduated"
|
|
elif current_value >= targets["month3"]:
|
|
phase = "month3"
|
|
elif current_value >= targets["month1"]:
|
|
phase = "month1"
|
|
elif current_value >= targets["week1"]:
|
|
phase = "week1"
|
|
else:
|
|
phase = "pre-start"
|
|
|
|
summary[metric_type] = {
|
|
"current": current_value,
|
|
"phase": phase,
|
|
"targets": targets,
|
|
"trend": [{"t": h["timestamp"], "v": h["value"]} for h in reversed(history)],
|
|
}
|
|
|
|
return summary
|
|
|
|
def get_alerts(self, unacknowledged_only: bool = True, limit: int = 20) -> list[dict]:
|
|
"""Get sovereignty alerts."""
|
|
try:
|
|
with closing(self._connect()) as conn:
|
|
if unacknowledged_only:
|
|
rows = conn.execute(
|
|
"SELECT * FROM sovereignty_alerts "
|
|
"WHERE acknowledged = 0 ORDER BY timestamp DESC LIMIT ?",
|
|
(limit,),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT * FROM sovereignty_alerts ORDER BY timestamp DESC LIMIT ?",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(row) for row in rows]
|
|
except Exception as exc:
|
|
logger.warning("Failed to query sovereignty alerts: %s", exc)
|
|
return []
|
|
|
|
def acknowledge_alert(self, alert_id: int) -> bool:
|
|
"""Acknowledge an alert."""
|
|
try:
|
|
with closing(self._connect()) as conn:
|
|
conn.execute(
|
|
"UPDATE sovereignty_alerts SET acknowledged = 1 WHERE id = ?",
|
|
(alert_id,),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
except Exception as exc:
|
|
logger.warning("Failed to acknowledge alert: %s", exc)
|
|
return False
|
|
|
|
|
|
# ── Module-level singleton ─────────────────────────────────────────────────
|
|
_store: SovereigntyMetricsStore | None = None
|
|
|
|
|
|
def get_sovereignty_store() -> SovereigntyMetricsStore:
|
|
"""Return the module-level store, creating it on first access."""
|
|
global _store
|
|
if _store is None:
|
|
_store = SovereigntyMetricsStore()
|
|
return _store
|
|
|
|
|
|
async def emit_sovereignty_metric(
|
|
metric_type: str,
|
|
value: float,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Convenience function to record a sovereignty metric and emit an event.
|
|
|
|
Also publishes to the event bus for real-time subscribers.
|
|
"""
|
|
import asyncio
|
|
|
|
from infrastructure.events.bus import emit
|
|
|
|
metric = SovereigntyMetric(
|
|
metric_type=metric_type,
|
|
value=value,
|
|
metadata=metadata or {},
|
|
)
|
|
# Record to SQLite in thread to avoid blocking event loop
|
|
await asyncio.to_thread(get_sovereignty_store().record, metric)
|
|
|
|
# Publish to event bus for real-time consumers
|
|
await emit(
|
|
f"sovereignty.metric.{metric_type}",
|
|
source="sovereignty_metrics",
|
|
data={"metric_type": metric_type, "value": value, **(metadata or {})},
|
|
)
|