From dec9736679a2b0d8f9f3e3f8bae5603404f3c3f6 Mon Sep 17 00:00:00 2001 From: "Claude (Opus 4.6)" Date: Mon, 23 Mar 2026 19:52:20 +0000 Subject: [PATCH] [claude] Sovereignty metrics emitter + SQLite store (#954) (#1164) Co-authored-by: Claude (Opus 4.6) Co-committed-by: Claude (Opus 4.6) --- src/dashboard/app.py | 2 + src/dashboard/routes/sovereignty_ws.py | 40 +++ src/timmy/sovereignty/__init__.py | 7 + src/timmy/sovereignty/metrics.py | 410 +++++++++++++++++++++++++ tests/unit/test_sovereignty_metrics.py | 271 ++++++++++++++++ 5 files changed, 730 insertions(+) create mode 100644 src/dashboard/routes/sovereignty_ws.py create mode 100644 src/timmy/sovereignty/__init__.py create mode 100644 src/timmy/sovereignty/metrics.py create mode 100644 tests/unit/test_sovereignty_metrics.py diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 27393797..02cd2093 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -47,6 +47,7 @@ from dashboard.routes.models import router as models_router from dashboard.routes.quests import router as quests_router from dashboard.routes.scorecards import router as scorecards_router from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router +from dashboard.routes.sovereignty_ws import router as sovereignty_ws_router from dashboard.routes.spark import router as spark_router from dashboard.routes.system import router as system_router from dashboard.routes.tasks import router as tasks_router @@ -672,6 +673,7 @@ app.include_router(hermes_router) app.include_router(quests_router) app.include_router(scorecards_router) app.include_router(sovereignty_metrics_router) +app.include_router(sovereignty_ws_router) @app.websocket("/ws") diff --git a/src/dashboard/routes/sovereignty_ws.py b/src/dashboard/routes/sovereignty_ws.py new file mode 100644 index 00000000..5eaefbc4 --- /dev/null +++ b/src/dashboard/routes/sovereignty_ws.py @@ -0,0 +1,40 @@ +"""WebSocket emitter for the sovereignty metrics dashboard widget. + +Streams real-time sovereignty snapshots to connected clients every +*_PUSH_INTERVAL* seconds. The snapshot includes per-layer sovereignty +percentages, API cost rate, and skill crystallisation count. + +Refs: #954, #953 +""" + +import asyncio +import json +import logging + +from fastapi import APIRouter, WebSocket + +router = APIRouter(tags=["sovereignty"]) + +logger = logging.getLogger(__name__) + +_PUSH_INTERVAL = 5 # seconds between snapshot pushes + + +@router.websocket("/ws/sovereignty") +async def sovereignty_ws(websocket: WebSocket) -> None: + """Stream sovereignty metric snapshots to the dashboard widget.""" + from timmy.sovereignty.metrics import get_metrics_store + + await websocket.accept() + logger.info("Sovereignty WS connected") + + store = get_metrics_store() + try: + # Send initial snapshot immediately + await websocket.send_text(json.dumps(store.get_snapshot())) + + while True: + await asyncio.sleep(_PUSH_INTERVAL) + await websocket.send_text(json.dumps(store.get_snapshot())) + except Exception: + logger.debug("Sovereignty WS disconnected") diff --git a/src/timmy/sovereignty/__init__.py b/src/timmy/sovereignty/__init__.py new file mode 100644 index 00000000..44ca4a45 --- /dev/null +++ b/src/timmy/sovereignty/__init__.py @@ -0,0 +1,7 @@ +"""Sovereignty metrics for the Bannerlord loop. + +Tracks how much of each AI layer (perception, decision, narration) +runs locally vs. calls out to an LLM. Feeds the sovereignty dashboard. + +Refs: #954, #953 +""" diff --git a/src/timmy/sovereignty/metrics.py b/src/timmy/sovereignty/metrics.py new file mode 100644 index 00000000..d59050ad --- /dev/null +++ b/src/timmy/sovereignty/metrics.py @@ -0,0 +1,410 @@ +"""Sovereignty metrics emitter and SQLite store. + +Tracks the sovereignty percentage for each AI layer (perception, decision, +narration) plus API cost and skill crystallisation. All data is persisted to +``data/sovereignty_metrics.db`` so the dashboard can query trends over time. + +Event types +----------- +perception layer: + ``perception_cache_hit`` — frame answered from local cache (sovereign) + ``perception_vlm_call`` — frame required a VLM inference call (non-sovereign) + +decision layer: + ``decision_rule_hit`` — action chosen by a deterministic rule (sovereign) + ``decision_llm_call`` — action required LLM reasoning (non-sovereign) + +narration layer: + ``narration_template`` — text generated from a template (sovereign) + ``narration_llm`` — text generated by an LLM (non-sovereign) + +skill layer: + ``skill_crystallized`` — a new skill was crystallised from LLM output + +cost: + ``api_call`` — any external API call was made + ``api_cost`` — monetary cost of an API call (metadata: {"usd": float}) + +Refs: #954, #953 +""" + +import asyncio +import json +import logging +import sqlite3 +import uuid +from contextlib import closing +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from config import settings + +logger = logging.getLogger(__name__) + +# ── Constants ───────────────────────────────────────────────────────────────── + +DB_PATH = Path(settings.repo_root) / "data" / "sovereignty_metrics.db" + +#: Sovereign event types for each layer (numerator of sovereignty %). +_SOVEREIGN_EVENTS: dict[str, frozenset[str]] = { + "perception": frozenset({"perception_cache_hit"}), + "decision": frozenset({"decision_rule_hit"}), + "narration": frozenset({"narration_template"}), +} + +#: All tracked event types for each layer (denominator of sovereignty %). +_LAYER_EVENTS: dict[str, frozenset[str]] = { + "perception": frozenset({"perception_cache_hit", "perception_vlm_call"}), + "decision": frozenset({"decision_rule_hit", "decision_llm_call"}), + "narration": frozenset({"narration_template", "narration_llm"}), +} + +ALL_EVENT_TYPES: frozenset[str] = frozenset( + { + "perception_cache_hit", + "perception_vlm_call", + "decision_rule_hit", + "decision_llm_call", + "narration_template", + "narration_llm", + "skill_crystallized", + "api_call", + "api_cost", + } +) + +# ── Schema ──────────────────────────────────────────────────────────────────── + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + event_type TEXT NOT NULL, + session_id TEXT NOT NULL DEFAULT '', + metadata_json TEXT NOT NULL DEFAULT '{}' +); +CREATE INDEX IF NOT EXISTS idx_ev_type ON events(event_type); +CREATE INDEX IF NOT EXISTS idx_ev_ts ON events(timestamp); +CREATE INDEX IF NOT EXISTS idx_ev_session ON events(session_id); + +CREATE TABLE IF NOT EXISTS sessions ( + session_id TEXT PRIMARY KEY, + game TEXT NOT NULL DEFAULT '', + start_time TEXT NOT NULL, + end_time TEXT +); +""" + + +# ── Data classes ────────────────────────────────────────────────────────────── + + +@dataclass +class SovereigntyEvent: + """A single sovereignty event.""" + + event_type: str + session_id: str = "" + metadata: dict[str, Any] = field(default_factory=dict) + timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + + +# ── Store ───────────────────────────────────────────────────────────────────── + + +class SovereigntyMetricsStore: + """SQLite-backed sovereignty event store. + + Thread-safe: creates a new connection per operation (WAL mode). + """ + + def __init__(self, db_path: Path | None = None) -> None: + self._db_path = db_path or DB_PATH + self._init_db() + + # ── internal ───────────────────────────────────────────────────────────── + + def _init_db(self) -> None: + try: + self._db_path.parent.mkdir(parents=True, exist_ok=True) + with closing(sqlite3.connect(str(self._db_path))) as conn: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") + conn.executescript(_SCHEMA) + conn.commit() + except Exception as exc: + logger.warning("Failed to initialise sovereignty metrics DB: %s", exc) + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(str(self._db_path)) + conn.row_factory = sqlite3.Row + conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") + return conn + + # ── public API ──────────────────────────────────────────────────────────── + + def record(self, event_type: str, metadata: dict[str, Any] | None = None, *, session_id: str = "") -> None: + """Record a sovereignty event. + + Parameters + ---------- + event_type: + One of ``ALL_EVENT_TYPES``. + metadata: + Optional dict of extra data (serialised as JSON). + session_id: + Identifier of the current game session, if known. + """ + event = SovereigntyEvent( + event_type=event_type, + session_id=session_id, + metadata=metadata or {}, + ) + try: + with closing(self._connect()) as conn: + conn.execute( + "INSERT INTO events (timestamp, event_type, session_id, metadata_json) " + "VALUES (?, ?, ?, ?)", + ( + event.timestamp, + event.event_type, + event.session_id, + json.dumps(event.metadata), + ), + ) + conn.commit() + except Exception as exc: + logger.warning("Failed to record sovereignty event: %s", exc) + + def start_session(self, game: str = "", session_id: str | None = None) -> str: + """Register a new game session. Returns the session_id.""" + sid = session_id or str(uuid.uuid4()) + try: + with closing(self._connect()) as conn: + conn.execute( + "INSERT OR IGNORE INTO sessions (session_id, game, start_time) VALUES (?, ?, ?)", + (sid, game, datetime.now(UTC).isoformat()), + ) + conn.commit() + except Exception as exc: + logger.warning("Failed to start session: %s", exc) + return sid + + def end_session(self, session_id: str) -> None: + """Mark a session as ended.""" + try: + with closing(self._connect()) as conn: + conn.execute( + "UPDATE sessions SET end_time = ? WHERE session_id = ?", + (datetime.now(UTC).isoformat(), session_id), + ) + conn.commit() + except Exception as exc: + logger.warning("Failed to end session: %s", exc) + + # ── analytics ───────────────────────────────────────────────────────────── + + def get_sovereignty_pct(self, layer: str, time_window: float | None = None) -> float: + """Return the sovereignty percentage (0.0–100.0) for *layer*. + + Parameters + ---------- + layer: + One of ``"perception"``, ``"decision"``, ``"narration"``. + time_window: + If given, only consider events from the last *time_window* seconds. + If ``None``, all events are used. + + Returns + ------- + float + Percentage of sovereign events for the layer, or 0.0 if no data. + """ + if layer not in _LAYER_EVENTS: + logger.warning("Unknown sovereignty layer: %s", layer) + return 0.0 + + sovereign = _SOVEREIGN_EVENTS[layer] + total_types = _LAYER_EVENTS[layer] + + sovereign_placeholders = ",".join("?" * len(sovereign)) + total_placeholders = ",".join("?" * len(total_types)) + + params_sov: list[Any] = list(sovereign) + params_total: list[Any] = list(total_types) + + if time_window is not None: + cutoff = _seconds_ago_iso(time_window) + where_ts = " AND timestamp >= ?" + params_sov.append(cutoff) + params_total.append(cutoff) + else: + where_ts = "" + + try: + with closing(self._connect()) as conn: + total_count = conn.execute( + f"SELECT COUNT(*) FROM events WHERE event_type IN ({total_placeholders}){where_ts}", + params_total, + ).fetchone()[0] + if total_count == 0: + return 0.0 + sov_count = conn.execute( + f"SELECT COUNT(*) FROM events WHERE event_type IN ({sovereign_placeholders}){where_ts}", + params_sov, + ).fetchone()[0] + return round(100.0 * sov_count / total_count, 2) + except Exception as exc: + logger.warning("Failed to compute sovereignty pct: %s", exc) + return 0.0 + + def get_cost_per_hour(self, time_window: float | None = None) -> float: + """Return the total API cost in USD extrapolated to a per-hour rate. + + Parameters + ---------- + time_window: + Seconds of history to consider. Defaults to 3600 (last hour). + + Returns + ------- + float + USD cost per hour, or 0.0 if no ``api_cost`` events exist. + """ + window = time_window if time_window is not None else 3600.0 + cutoff = _seconds_ago_iso(window) + + try: + with closing(self._connect()) as conn: + rows = conn.execute( + "SELECT metadata_json FROM events WHERE event_type = 'api_cost' AND timestamp >= ?", + (cutoff,), + ).fetchall() + except Exception as exc: + logger.warning("Failed to query api_cost events: %s", exc) + return 0.0 + + total_usd = 0.0 + for row in rows: + try: + meta = json.loads(row["metadata_json"] or "{}") + total_usd += float(meta.get("usd", 0.0)) + except (ValueError, TypeError, json.JSONDecodeError): + pass + + # Extrapolate: (total in window) * (3600 / window_seconds) + if window == 0: + return 0.0 + return round(total_usd * (3600.0 / window), 4) + + def get_skills_crystallized(self, session_id: str | None = None) -> int: + """Return the number of skills crystallised. + + Parameters + ---------- + session_id: + If given, count only events for that session. If ``None``, + count across all sessions. + """ + try: + with closing(self._connect()) as conn: + if session_id: + return conn.execute( + "SELECT COUNT(*) FROM events WHERE event_type = 'skill_crystallized' AND session_id = ?", + (session_id,), + ).fetchone()[0] + return conn.execute( + "SELECT COUNT(*) FROM events WHERE event_type = 'skill_crystallized'", + ).fetchone()[0] + except Exception as exc: + logger.warning("Failed to query skill_crystallized: %s", exc) + return 0 + + def get_snapshot(self) -> dict[str, Any]: + """Return a real-time metrics snapshot suitable for dashboard widgets.""" + return { + "sovereignty": { + layer: self.get_sovereignty_pct(layer, time_window=3600) + for layer in _LAYER_EVENTS + }, + "cost_per_hour": self.get_cost_per_hour(), + "skills_crystallized": self.get_skills_crystallized(), + } + + +# ── Module-level singleton ──────────────────────────────────────────────────── + +_store: SovereigntyMetricsStore | None = None + + +def get_metrics_store() -> SovereigntyMetricsStore: + """Return (or lazily create) the module-level singleton store.""" + global _store + if _store is None: + _store = SovereigntyMetricsStore() + return _store + + +# ── Convenience helpers ─────────────────────────────────────────────────────── + + +def record(event_type: str, metadata: dict[str, Any] | None = None, *, session_id: str = "") -> None: + """Module-level shortcut: ``metrics.record("perception_cache_hit")``.""" + get_metrics_store().record(event_type, metadata=metadata, session_id=session_id) + + +def get_sovereignty_pct(layer: str, time_window: float | None = None) -> float: + """Module-level shortcut for :meth:`SovereigntyMetricsStore.get_sovereignty_pct`.""" + return get_metrics_store().get_sovereignty_pct(layer, time_window) + + +def get_cost_per_hour(time_window: float | None = None) -> float: + """Module-level shortcut for :meth:`SovereigntyMetricsStore.get_cost_per_hour`.""" + return get_metrics_store().get_cost_per_hour(time_window) + + +def get_skills_crystallized(session_id: str | None = None) -> int: + """Module-level shortcut for :meth:`SovereigntyMetricsStore.get_skills_crystallized`.""" + return get_metrics_store().get_skills_crystallized(session_id) + + +async def emit_sovereignty_event( + event_type: str, + metadata: dict[str, Any] | None = None, + *, + session_id: str = "", +) -> None: + """Record an event in a thread and publish it on the event bus. + + This is the async-safe entry-point used by the agentic loop. + """ + from infrastructure.events.bus import emit + + await asyncio.to_thread( + get_metrics_store().record, + event_type, + metadata, + session_id=session_id, + ) + await emit( + f"sovereignty.event.{event_type}", + source="sovereignty_metrics", + data={ + "event_type": event_type, + "session_id": session_id, + **(metadata or {}), + }, + ) + + +# ── Private helpers ─────────────────────────────────────────────────────────── + + +def _seconds_ago_iso(seconds: float) -> str: + """Return an ISO-8601 timestamp *seconds* before now (UTC).""" + import datetime as _dt + + delta = _dt.timedelta(seconds=seconds) + return (_dt.datetime.now(UTC) - delta).isoformat() diff --git a/tests/unit/test_sovereignty_metrics.py b/tests/unit/test_sovereignty_metrics.py new file mode 100644 index 00000000..352206ed --- /dev/null +++ b/tests/unit/test_sovereignty_metrics.py @@ -0,0 +1,271 @@ +"""Unit tests for the sovereignty metrics emitter and store. + +Refs: #954 +""" + +from unittest.mock import AsyncMock, patch + +import pytest + +pytestmark = pytest.mark.unit + +from timmy.sovereignty.metrics import ( + ALL_EVENT_TYPES, + SovereigntyMetricsStore, + emit_sovereignty_event, + get_cost_per_hour, + get_metrics_store, + get_skills_crystallized, + get_sovereignty_pct, + record, +) + + +@pytest.fixture +def store(tmp_path): + """A fresh SovereigntyMetricsStore backed by a temp database.""" + return SovereigntyMetricsStore(db_path=tmp_path / "test_sov.db") + + +# ── ALL_EVENT_TYPES ─────────────────────────────────────────────────────────── + + +class TestEventTypes: + def test_all_expected_event_types_present(self): + expected = { + "perception_cache_hit", + "perception_vlm_call", + "decision_rule_hit", + "decision_llm_call", + "narration_template", + "narration_llm", + "skill_crystallized", + "api_call", + "api_cost", + } + assert ALL_EVENT_TYPES == expected + + +# ── Record & retrieval ──────────────────────────────────────────────────────── + + +class TestRecord: + def test_record_inserts_event(self, store): + store.record("perception_cache_hit") + pct = store.get_sovereignty_pct("perception") + assert pct == 100.0 + + def test_record_with_metadata(self, store): + store.record("api_cost", metadata={"usd": 0.05}) + cost = store.get_cost_per_hour() + assert cost > 0.0 + + def test_record_with_session_id(self, store): + store.record("skill_crystallized", session_id="sess-1") + assert store.get_skills_crystallized("sess-1") == 1 + + def test_record_unknown_type_does_not_raise(self, store): + """Unknown event types are silently stored (no crash).""" + store.record("totally_unknown_event") # should not raise + + +# ── Sessions ────────────────────────────────────────────────────────────────── + + +class TestSessions: + def test_start_session_returns_id(self, store): + sid = store.start_session(game="Bannerlord") + assert isinstance(sid, str) + assert len(sid) > 0 + + def test_start_session_accepts_custom_id(self, store): + sid = store.start_session(game="Bannerlord", session_id="my-session") + assert sid == "my-session" + + def test_end_session_does_not_raise(self, store): + sid = store.start_session() + store.end_session(sid) # should not raise + + def test_start_session_idempotent(self, store): + """Starting a session with the same ID twice is a no-op.""" + sid = store.start_session(session_id="dup") + sid2 = store.start_session(session_id="dup") + assert sid == sid2 + + +# ── Sovereignty percentage ──────────────────────────────────────────────────── + + +class TestGetSovereigntyPct: + def test_perception_all_cache_hits(self, store): + for _ in range(5): + store.record("perception_cache_hit") + assert store.get_sovereignty_pct("perception") == 100.0 + + def test_perception_mixed(self, store): + store.record("perception_cache_hit") + store.record("perception_vlm_call") + assert store.get_sovereignty_pct("perception") == 50.0 + + def test_decision_all_sovereign(self, store): + for _ in range(3): + store.record("decision_rule_hit") + assert store.get_sovereignty_pct("decision") == 100.0 + + def test_narration_all_sovereign(self, store): + store.record("narration_template") + store.record("narration_template") + assert store.get_sovereignty_pct("narration") == 100.0 + + def test_narration_all_llm(self, store): + store.record("narration_llm") + assert store.get_sovereignty_pct("narration") == 0.0 + + def test_no_events_returns_zero(self, store): + assert store.get_sovereignty_pct("perception") == 0.0 + + def test_unknown_layer_returns_zero(self, store): + assert store.get_sovereignty_pct("nonexistent_layer") == 0.0 + + def test_time_window_filters_old_events(self, store, tmp_path): + """Events outside the time window are excluded.""" + # Insert an event with a very old timestamp directly + import json + import sqlite3 + from contextlib import closing + + with closing(sqlite3.connect(str(store._db_path))) as conn: + conn.execute( + "INSERT INTO events (timestamp, event_type, session_id, metadata_json) VALUES (?, ?, ?, ?)", + ("2000-01-01T00:00:00+00:00", "perception_cache_hit", "", "{}"), + ) + conn.commit() + + # With a 60-second window, the old event should be excluded + pct = store.get_sovereignty_pct("perception", time_window=60) + assert pct == 0.0 + + def test_time_window_includes_recent_events(self, store): + store.record("decision_rule_hit") + pct = store.get_sovereignty_pct("decision", time_window=60) + assert pct == 100.0 + + +# ── Cost per hour ───────────────────────────────────────────────────────────── + + +class TestGetCostPerHour: + def test_no_events_returns_zero(self, store): + assert store.get_cost_per_hour() == 0.0 + + def test_single_cost_event(self, store): + # Record a cost of $1.00 within the last hour window + store.record("api_cost", metadata={"usd": 1.00}) + cost = store.get_cost_per_hour(time_window=3600) + assert cost == pytest.approx(1.00, rel=1e-3) + + def test_multiple_cost_events(self, store): + store.record("api_cost", metadata={"usd": 0.25}) + store.record("api_cost", metadata={"usd": 0.75}) + cost = store.get_cost_per_hour(time_window=3600) + assert cost == pytest.approx(1.00, rel=1e-3) + + def test_missing_usd_field_is_zero(self, store): + store.record("api_cost", metadata={"model": "gpt-4"}) + assert store.get_cost_per_hour() == 0.0 + + def test_cost_extrapolated_for_short_window(self, store): + """Cost recorded in a 1800s window is doubled to get per-hour rate.""" + store.record("api_cost", metadata={"usd": 0.5}) + cost = store.get_cost_per_hour(time_window=1800) + assert cost == pytest.approx(1.0, rel=1e-3) + + +# ── Skills crystallised ─────────────────────────────────────────────────────── + + +class TestGetSkillsCrystallized: + def test_no_skills_returns_zero(self, store): + assert store.get_skills_crystallized() == 0 + + def test_counts_all_sessions(self, store): + store.record("skill_crystallized", session_id="a") + store.record("skill_crystallized", session_id="b") + assert store.get_skills_crystallized() == 2 + + def test_filters_by_session(self, store): + store.record("skill_crystallized", session_id="sess-1") + store.record("skill_crystallized", session_id="sess-2") + assert store.get_skills_crystallized("sess-1") == 1 + + def test_session_with_no_skills(self, store): + store.record("skill_crystallized", session_id="sess-1") + assert store.get_skills_crystallized("sess-999") == 0 + + +# ── Snapshot ────────────────────────────────────────────────────────────────── + + +class TestGetSnapshot: + def test_snapshot_structure(self, store): + snap = store.get_snapshot() + assert "sovereignty" in snap + assert "cost_per_hour" in snap + assert "skills_crystallized" in snap + + def test_snapshot_sovereignty_has_all_layers(self, store): + snap = store.get_snapshot() + assert set(snap["sovereignty"].keys()) == {"perception", "decision", "narration"} + + def test_snapshot_reflects_events(self, store): + store.record("perception_cache_hit") + store.record("skill_crystallized") + snap = store.get_snapshot() + assert snap["sovereignty"]["perception"] == 100.0 + assert snap["skills_crystallized"] == 1 + + +# ── Module-level convenience functions ─────────────────────────────────────── + + +class TestModuleLevelFunctions: + def test_record_and_get_sovereignty_pct(self, tmp_path): + with patch("timmy.sovereignty.metrics._store", None), patch( + "timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test.db" + ): + record("decision_rule_hit") + pct = get_sovereignty_pct("decision") + assert pct == 100.0 + + def test_get_cost_per_hour_module_fn(self, tmp_path): + with patch("timmy.sovereignty.metrics._store", None), patch( + "timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test2.db" + ): + record("api_cost", {"usd": 0.5}) + cost = get_cost_per_hour() + assert cost > 0.0 + + def test_get_skills_crystallized_module_fn(self, tmp_path): + with patch("timmy.sovereignty.metrics._store", None), patch( + "timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test3.db" + ): + record("skill_crystallized") + count = get_skills_crystallized() + assert count == 1 + + +# ── emit_sovereignty_event ──────────────────────────────────────────────────── + + +class TestEmitSovereigntyEvent: + @pytest.mark.asyncio + async def test_emit_records_and_publishes(self, tmp_path): + with ( + patch("timmy.sovereignty.metrics._store", None), + patch("timmy.sovereignty.metrics.DB_PATH", tmp_path / "emit_test.db"), + patch("infrastructure.events.bus.emit", new_callable=AsyncMock) as mock_emit, + ): + await emit_sovereignty_event("perception_cache_hit", {"frame": 42}, session_id="s1") + mock_emit.assert_called_once() + args = mock_emit.call_args[0] + assert args[0] == "sovereignty.event.perception_cache_hit"