[claude] Sovereignty metrics emitter + SQLite store (#954) (#1164)

Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
This commit is contained in:
2026-03-23 19:52:20 +00:00
committed by rockachopa
parent 08d337e03d
commit dec9736679
5 changed files with 730 additions and 0 deletions

View File

@@ -47,6 +47,7 @@ from dashboard.routes.models import router as models_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router
from dashboard.routes.sovereignty_ws import router as sovereignty_ws_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -672,6 +673,7 @@ app.include_router(hermes_router)
app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)
app.include_router(sovereignty_ws_router)
@app.websocket("/ws")

View File

@@ -0,0 +1,40 @@
"""WebSocket emitter for the sovereignty metrics dashboard widget.
Streams real-time sovereignty snapshots to connected clients every
*_PUSH_INTERVAL* seconds. The snapshot includes per-layer sovereignty
percentages, API cost rate, and skill crystallisation count.
Refs: #954, #953
"""
import asyncio
import json
import logging
from fastapi import APIRouter, WebSocket
router = APIRouter(tags=["sovereignty"])
logger = logging.getLogger(__name__)
_PUSH_INTERVAL = 5 # seconds between snapshot pushes
@router.websocket("/ws/sovereignty")
async def sovereignty_ws(websocket: WebSocket) -> None:
"""Stream sovereignty metric snapshots to the dashboard widget."""
from timmy.sovereignty.metrics import get_metrics_store
await websocket.accept()
logger.info("Sovereignty WS connected")
store = get_metrics_store()
try:
# Send initial snapshot immediately
await websocket.send_text(json.dumps(store.get_snapshot()))
while True:
await asyncio.sleep(_PUSH_INTERVAL)
await websocket.send_text(json.dumps(store.get_snapshot()))
except Exception:
logger.debug("Sovereignty WS disconnected")

View File

@@ -0,0 +1,7 @@
"""Sovereignty metrics for the Bannerlord loop.
Tracks how much of each AI layer (perception, decision, narration)
runs locally vs. calls out to an LLM. Feeds the sovereignty dashboard.
Refs: #954, #953
"""

View File

@@ -0,0 +1,410 @@
"""Sovereignty metrics emitter and SQLite store.
Tracks the sovereignty percentage for each AI layer (perception, decision,
narration) plus API cost and skill crystallisation. All data is persisted to
``data/sovereignty_metrics.db`` so the dashboard can query trends over time.
Event types
-----------
perception layer:
``perception_cache_hit`` — frame answered from local cache (sovereign)
``perception_vlm_call`` — frame required a VLM inference call (non-sovereign)
decision layer:
``decision_rule_hit`` — action chosen by a deterministic rule (sovereign)
``decision_llm_call`` — action required LLM reasoning (non-sovereign)
narration layer:
``narration_template`` — text generated from a template (sovereign)
``narration_llm`` — text generated by an LLM (non-sovereign)
skill layer:
``skill_crystallized`` — a new skill was crystallised from LLM output
cost:
``api_call`` — any external API call was made
``api_cost`` — monetary cost of an API call (metadata: {"usd": float})
Refs: #954, #953
"""
import asyncio
import json
import logging
import sqlite3
import uuid
from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Constants ─────────────────────────────────────────────────────────────────
DB_PATH = Path(settings.repo_root) / "data" / "sovereignty_metrics.db"
#: Sovereign event types for each layer (numerator of sovereignty %).
_SOVEREIGN_EVENTS: dict[str, frozenset[str]] = {
"perception": frozenset({"perception_cache_hit"}),
"decision": frozenset({"decision_rule_hit"}),
"narration": frozenset({"narration_template"}),
}
#: All tracked event types for each layer (denominator of sovereignty %).
_LAYER_EVENTS: dict[str, frozenset[str]] = {
"perception": frozenset({"perception_cache_hit", "perception_vlm_call"}),
"decision": frozenset({"decision_rule_hit", "decision_llm_call"}),
"narration": frozenset({"narration_template", "narration_llm"}),
}
ALL_EVENT_TYPES: frozenset[str] = frozenset(
{
"perception_cache_hit",
"perception_vlm_call",
"decision_rule_hit",
"decision_llm_call",
"narration_template",
"narration_llm",
"skill_crystallized",
"api_call",
"api_cost",
}
)
# ── Schema ────────────────────────────────────────────────────────────────────
_SCHEMA = """
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
event_type TEXT NOT NULL,
session_id TEXT NOT NULL DEFAULT '',
metadata_json TEXT NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_ev_type ON events(event_type);
CREATE INDEX IF NOT EXISTS idx_ev_ts ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_ev_session ON events(session_id);
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
game TEXT NOT NULL DEFAULT '',
start_time TEXT NOT NULL,
end_time TEXT
);
"""
# ── Data classes ──────────────────────────────────────────────────────────────
@dataclass
class SovereigntyEvent:
"""A single sovereignty event."""
event_type: str
session_id: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
# ── Store ─────────────────────────────────────────────────────────────────────
class SovereigntyMetricsStore:
"""SQLite-backed sovereignty event store.
Thread-safe: creates a new connection per operation (WAL mode).
"""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or DB_PATH
self._init_db()
# ── internal ─────────────────────────────────────────────────────────────
def _init_db(self) -> None:
try:
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self._db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.executescript(_SCHEMA)
conn.commit()
except Exception as exc:
logger.warning("Failed to initialise sovereignty metrics DB: %s", exc)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
return conn
# ── public API ────────────────────────────────────────────────────────────
def record(self, event_type: str, metadata: dict[str, Any] | None = None, *, session_id: str = "") -> None:
"""Record a sovereignty event.
Parameters
----------
event_type:
One of ``ALL_EVENT_TYPES``.
metadata:
Optional dict of extra data (serialised as JSON).
session_id:
Identifier of the current game session, if known.
"""
event = SovereigntyEvent(
event_type=event_type,
session_id=session_id,
metadata=metadata or {},
)
try:
with closing(self._connect()) as conn:
conn.execute(
"INSERT INTO events (timestamp, event_type, session_id, metadata_json) "
"VALUES (?, ?, ?, ?)",
(
event.timestamp,
event.event_type,
event.session_id,
json.dumps(event.metadata),
),
)
conn.commit()
except Exception as exc:
logger.warning("Failed to record sovereignty event: %s", exc)
def start_session(self, game: str = "", session_id: str | None = None) -> str:
"""Register a new game session. Returns the session_id."""
sid = session_id or str(uuid.uuid4())
try:
with closing(self._connect()) as conn:
conn.execute(
"INSERT OR IGNORE INTO sessions (session_id, game, start_time) VALUES (?, ?, ?)",
(sid, game, datetime.now(UTC).isoformat()),
)
conn.commit()
except Exception as exc:
logger.warning("Failed to start session: %s", exc)
return sid
def end_session(self, session_id: str) -> None:
"""Mark a session as ended."""
try:
with closing(self._connect()) as conn:
conn.execute(
"UPDATE sessions SET end_time = ? WHERE session_id = ?",
(datetime.now(UTC).isoformat(), session_id),
)
conn.commit()
except Exception as exc:
logger.warning("Failed to end session: %s", exc)
# ── analytics ─────────────────────────────────────────────────────────────
def get_sovereignty_pct(self, layer: str, time_window: float | None = None) -> float:
"""Return the sovereignty percentage (0.0100.0) for *layer*.
Parameters
----------
layer:
One of ``"perception"``, ``"decision"``, ``"narration"``.
time_window:
If given, only consider events from the last *time_window* seconds.
If ``None``, all events are used.
Returns
-------
float
Percentage of sovereign events for the layer, or 0.0 if no data.
"""
if layer not in _LAYER_EVENTS:
logger.warning("Unknown sovereignty layer: %s", layer)
return 0.0
sovereign = _SOVEREIGN_EVENTS[layer]
total_types = _LAYER_EVENTS[layer]
sovereign_placeholders = ",".join("?" * len(sovereign))
total_placeholders = ",".join("?" * len(total_types))
params_sov: list[Any] = list(sovereign)
params_total: list[Any] = list(total_types)
if time_window is not None:
cutoff = _seconds_ago_iso(time_window)
where_ts = " AND timestamp >= ?"
params_sov.append(cutoff)
params_total.append(cutoff)
else:
where_ts = ""
try:
with closing(self._connect()) as conn:
total_count = conn.execute(
f"SELECT COUNT(*) FROM events WHERE event_type IN ({total_placeholders}){where_ts}",
params_total,
).fetchone()[0]
if total_count == 0:
return 0.0
sov_count = conn.execute(
f"SELECT COUNT(*) FROM events WHERE event_type IN ({sovereign_placeholders}){where_ts}",
params_sov,
).fetchone()[0]
return round(100.0 * sov_count / total_count, 2)
except Exception as exc:
logger.warning("Failed to compute sovereignty pct: %s", exc)
return 0.0
def get_cost_per_hour(self, time_window: float | None = None) -> float:
"""Return the total API cost in USD extrapolated to a per-hour rate.
Parameters
----------
time_window:
Seconds of history to consider. Defaults to 3600 (last hour).
Returns
-------
float
USD cost per hour, or 0.0 if no ``api_cost`` events exist.
"""
window = time_window if time_window is not None else 3600.0
cutoff = _seconds_ago_iso(window)
try:
with closing(self._connect()) as conn:
rows = conn.execute(
"SELECT metadata_json FROM events WHERE event_type = 'api_cost' AND timestamp >= ?",
(cutoff,),
).fetchall()
except Exception as exc:
logger.warning("Failed to query api_cost events: %s", exc)
return 0.0
total_usd = 0.0
for row in rows:
try:
meta = json.loads(row["metadata_json"] or "{}")
total_usd += float(meta.get("usd", 0.0))
except (ValueError, TypeError, json.JSONDecodeError):
pass
# Extrapolate: (total in window) * (3600 / window_seconds)
if window == 0:
return 0.0
return round(total_usd * (3600.0 / window), 4)
def get_skills_crystallized(self, session_id: str | None = None) -> int:
"""Return the number of skills crystallised.
Parameters
----------
session_id:
If given, count only events for that session. If ``None``,
count across all sessions.
"""
try:
with closing(self._connect()) as conn:
if session_id:
return conn.execute(
"SELECT COUNT(*) FROM events WHERE event_type = 'skill_crystallized' AND session_id = ?",
(session_id,),
).fetchone()[0]
return conn.execute(
"SELECT COUNT(*) FROM events WHERE event_type = 'skill_crystallized'",
).fetchone()[0]
except Exception as exc:
logger.warning("Failed to query skill_crystallized: %s", exc)
return 0
def get_snapshot(self) -> dict[str, Any]:
"""Return a real-time metrics snapshot suitable for dashboard widgets."""
return {
"sovereignty": {
layer: self.get_sovereignty_pct(layer, time_window=3600)
for layer in _LAYER_EVENTS
},
"cost_per_hour": self.get_cost_per_hour(),
"skills_crystallized": self.get_skills_crystallized(),
}
# ── Module-level singleton ────────────────────────────────────────────────────
_store: SovereigntyMetricsStore | None = None
def get_metrics_store() -> SovereigntyMetricsStore:
"""Return (or lazily create) the module-level singleton store."""
global _store
if _store is None:
_store = SovereigntyMetricsStore()
return _store
# ── Convenience helpers ───────────────────────────────────────────────────────
def record(event_type: str, metadata: dict[str, Any] | None = None, *, session_id: str = "") -> None:
"""Module-level shortcut: ``metrics.record("perception_cache_hit")``."""
get_metrics_store().record(event_type, metadata=metadata, session_id=session_id)
def get_sovereignty_pct(layer: str, time_window: float | None = None) -> float:
"""Module-level shortcut for :meth:`SovereigntyMetricsStore.get_sovereignty_pct`."""
return get_metrics_store().get_sovereignty_pct(layer, time_window)
def get_cost_per_hour(time_window: float | None = None) -> float:
"""Module-level shortcut for :meth:`SovereigntyMetricsStore.get_cost_per_hour`."""
return get_metrics_store().get_cost_per_hour(time_window)
def get_skills_crystallized(session_id: str | None = None) -> int:
"""Module-level shortcut for :meth:`SovereigntyMetricsStore.get_skills_crystallized`."""
return get_metrics_store().get_skills_crystallized(session_id)
async def emit_sovereignty_event(
event_type: str,
metadata: dict[str, Any] | None = None,
*,
session_id: str = "",
) -> None:
"""Record an event in a thread and publish it on the event bus.
This is the async-safe entry-point used by the agentic loop.
"""
from infrastructure.events.bus import emit
await asyncio.to_thread(
get_metrics_store().record,
event_type,
metadata,
session_id=session_id,
)
await emit(
f"sovereignty.event.{event_type}",
source="sovereignty_metrics",
data={
"event_type": event_type,
"session_id": session_id,
**(metadata or {}),
},
)
# ── Private helpers ───────────────────────────────────────────────────────────
def _seconds_ago_iso(seconds: float) -> str:
"""Return an ISO-8601 timestamp *seconds* before now (UTC)."""
import datetime as _dt
delta = _dt.timedelta(seconds=seconds)
return (_dt.datetime.now(UTC) - delta).isoformat()