diff --git a/src/dashboard/routes/nexus.py b/src/dashboard/routes/nexus.py index 9a76caa7..8cd403d6 100644 --- a/src/dashboard/routes/nexus.py +++ b/src/dashboard/routes/nexus.py @@ -1,21 +1,32 @@ -"""Nexus — Timmy's persistent conversational awareness space. +"""Nexus v2 — Timmy's persistent conversational awareness space. -A conversational-only interface where Timmy maintains live memory context. -No tool use; pure conversation with memory integration and a teaching panel. +Extends the v1 Nexus (chat + memory sidebar + teaching panel) with: + +- **Persistent conversations** — SQLite-backed history survives restarts. +- **Introspection panel** — live cognitive state, recent thoughts, session + analytics rendered alongside every conversation turn. +- **Sovereignty pulse** — real-time sovereignty health badge in the sidebar. +- **WebSocket** — pushes introspection + sovereignty snapshots so the + Nexus page stays alive without polling. Routes: - GET /nexus — render nexus page with live memory sidebar + GET /nexus — render nexus page with full awareness panels POST /nexus/chat — send a message; returns HTMX partial POST /nexus/teach — inject a fact into Timmy's live memory DELETE /nexus/history — clear the nexus conversation history + GET /nexus/introspect — JSON introspection snapshot (API) + WS /nexus/ws — live introspection + sovereignty push + +Refs: #1090 (Nexus Epic), #953 (Sovereignty Loop) """ import asyncio +import json import logging from datetime import UTC, datetime -from fastapi import APIRouter, Form, Request -from fastapi.responses import HTMLResponse +from fastapi import APIRouter, Form, Request, WebSocket +from fastapi.responses import HTMLResponse, JSONResponse from dashboard.templating import templates from timmy.memory_system import ( @@ -24,6 +35,9 @@ from timmy.memory_system import ( search_memories, store_personal_fact, ) +from timmy.nexus.introspection import nexus_introspector +from timmy.nexus.persistence import nexus_store +from timmy.nexus.sovereignty_pulse import sovereignty_pulse from timmy.session import _clean_response, chat, reset_session logger = logging.getLogger(__name__) @@ -32,28 +46,74 @@ router = APIRouter(prefix="/nexus", tags=["nexus"]) _NEXUS_SESSION_ID = "nexus" _MAX_MESSAGE_LENGTH = 10_000 +_WS_PUSH_INTERVAL = 5 # seconds between WebSocket pushes -# In-memory conversation log for the Nexus session (mirrors chat store pattern -# but is scoped to the Nexus so it won't pollute the main dashboard history). +# In-memory conversation log — kept in sync with the persistent store +# so templates can render without hitting the DB on every page load. _nexus_log: list[dict] = [] +# ── Initialisation ─────────────────────────────────────────────────────────── +# On module load, hydrate the in-memory log from the persistent store. +# This runs once at import time (process startup). +_HYDRATED = False + + +def _hydrate_log() -> None: + """Load persisted history into the in-memory log (idempotent).""" + global _HYDRATED + if _HYDRATED: + return + try: + rows = nexus_store.get_history(limit=200) + _nexus_log.clear() + for row in rows: + _nexus_log.append( + { + "role": row["role"], + "content": row["content"], + "timestamp": row["timestamp"], + } + ) + _HYDRATED = True + logger.info("Nexus: hydrated %d messages from persistent store", len(_nexus_log)) + except Exception as exc: + logger.warning("Nexus: failed to hydrate from store: %s", exc) + _HYDRATED = True # Don't retry repeatedly + + +# ── Helpers ────────────────────────────────────────────────────────────────── + def _ts() -> str: return datetime.now(UTC).strftime("%H:%M:%S") def _append_log(role: str, content: str) -> None: - _nexus_log.append({"role": role, "content": content, "timestamp": _ts()}) - # Keep last 200 exchanges to bound memory usage + """Append to both in-memory log and persistent store.""" + ts = _ts() + _nexus_log.append({"role": role, "content": content, "timestamp": ts}) + # Bound in-memory log if len(_nexus_log) > 200: del _nexus_log[:-200] + # Persist + try: + nexus_store.append(role, content, timestamp=ts) + except Exception as exc: + logger.warning("Nexus: persist failed: %s", exc) + + +# ── Page route ─────────────────────────────────────────────────────────────── @router.get("", response_class=HTMLResponse) async def nexus_page(request: Request): - """Render the Nexus page with live memory context.""" + """Render the Nexus page with full awareness panels.""" + _hydrate_log() + stats = get_memory_stats() facts = recall_personal_facts_with_ids()[:8] + introspection = nexus_introspector.snapshot(conversation_log=_nexus_log) + pulse = sovereignty_pulse.snapshot() return templates.TemplateResponse( request, @@ -63,13 +123,18 @@ async def nexus_page(request: Request): "messages": list(_nexus_log), "stats": stats, "facts": facts, + "introspection": introspection.to_dict(), + "pulse": pulse.to_dict(), }, ) +# ── Chat route ─────────────────────────────────────────────────────────────── + + @router.post("/chat", response_class=HTMLResponse) async def nexus_chat(request: Request, message: str = Form(...)): - """Conversational-only chat routed through the Nexus session. + """Conversational-only chat with persistence and introspection. Does not invoke tool-use approval flow — pure conversation with memory context injected from Timmy's live memory store. @@ -87,18 +152,22 @@ async def nexus_chat(request: Request, message: str = Form(...)): "error": "Message too long (max 10 000 chars).", "timestamp": _ts(), "memory_hits": [], + "introspection": nexus_introspector.snapshot().to_dict(), }, ) ts = _ts() - # Fetch semantically relevant memories to surface in the sidebar + # Fetch semantically relevant memories try: memory_hits = await asyncio.to_thread(search_memories, query=message, limit=4) except Exception as exc: logger.warning("Nexus memory search failed: %s", exc) memory_hits = [] + # Track memory hits for analytics + nexus_introspector.record_memory_hits(len(memory_hits)) + # Conversational response — no tool approval flow response_text: str | None = None error_text: str | None = None @@ -113,6 +182,9 @@ async def nexus_chat(request: Request, message: str = Form(...)): if response_text: _append_log("assistant", response_text) + # Build fresh introspection snapshot after the exchange + introspection = nexus_introspector.snapshot(conversation_log=_nexus_log) + return templates.TemplateResponse( request, "partials/nexus_message.html", @@ -122,10 +194,14 @@ async def nexus_chat(request: Request, message: str = Form(...)): "error": error_text, "timestamp": ts, "memory_hits": memory_hits, + "introspection": introspection.to_dict(), }, ) +# ── Teach route ────────────────────────────────────────────────────────────── + + @router.post("/teach", response_class=HTMLResponse) async def nexus_teach(request: Request, fact: str = Form(...)): """Inject a fact into Timmy's live memory from the Nexus teaching panel.""" @@ -148,11 +224,20 @@ async def nexus_teach(request: Request, fact: str = Form(...)): ) +# ── Clear history ──────────────────────────────────────────────────────────── + + @router.delete("/history", response_class=HTMLResponse) async def nexus_clear_history(request: Request): - """Clear the Nexus conversation history.""" + """Clear the Nexus conversation history (both in-memory and persistent).""" _nexus_log.clear() + try: + nexus_store.clear() + except Exception as exc: + logger.warning("Nexus: persistent clear failed: %s", exc) + nexus_introspector.reset() reset_session(session_id=_NEXUS_SESSION_ID) + return templates.TemplateResponse( request, "partials/nexus_message.html", @@ -162,5 +247,55 @@ async def nexus_clear_history(request: Request): "error": None, "timestamp": _ts(), "memory_hits": [], + "introspection": nexus_introspector.snapshot().to_dict(), }, ) + + +# ── Introspection API ──────────────────────────────────────────────────────── + + +@router.get("/introspect", response_class=JSONResponse) +async def nexus_introspect(): + """Return a JSON introspection snapshot (for API consumers).""" + snap = nexus_introspector.snapshot(conversation_log=_nexus_log) + pulse = sovereignty_pulse.snapshot() + return { + "introspection": snap.to_dict(), + "sovereignty_pulse": pulse.to_dict(), + } + + +# ── WebSocket — live Nexus push ────────────────────────────────────────────── + + +@router.websocket("/ws") +async def nexus_ws(websocket: WebSocket) -> None: + """Push introspection + sovereignty pulse snapshots to the Nexus page. + + The frontend connects on page load and receives JSON updates every + ``_WS_PUSH_INTERVAL`` seconds, keeping the cognitive state panel, + thought stream, and sovereignty badge fresh without HTMX polling. + """ + await websocket.accept() + logger.info("Nexus WS connected") + try: + # Immediate first push + await _push_snapshot(websocket) + while True: + await asyncio.sleep(_WS_PUSH_INTERVAL) + await _push_snapshot(websocket) + except Exception: + logger.debug("Nexus WS disconnected") + + +async def _push_snapshot(ws: WebSocket) -> None: + """Send the combined introspection + pulse payload.""" + snap = nexus_introspector.snapshot(conversation_log=_nexus_log) + pulse = sovereignty_pulse.snapshot() + payload = { + "type": "nexus_state", + "introspection": snap.to_dict(), + "sovereignty_pulse": pulse.to_dict(), + } + await ws.send_text(json.dumps(payload)) diff --git a/src/dashboard/templates/nexus.html b/src/dashboard/templates/nexus.html index 1020e1f5..5c6b9f90 100644 --- a/src/dashboard/templates/nexus.html +++ b/src/dashboard/templates/nexus.html @@ -8,26 +8,40 @@
-
// NEXUS
-
- Persistent conversational awareness — always present, always learning. +
+
+
// NEXUS
+
+ Persistent conversational awareness — always present, always learning. +
+
+ +
+ + SOVEREIGNTY + {{ pulse.overall_pct }}% +
-
+
// CONVERSATION - +
+ {{ messages|length }} msgs + +
@@ -67,14 +81,115 @@
- +
- + +
+
+ // COGNITIVE STATE + + {{ introspection.cognitive.engagement | upper }} + +
+
+
+
+
MOOD
+
{{ introspection.cognitive.mood }}
+
+
+
FOCUS
+
+ {{ introspection.cognitive.focus_topic or '—' }} +
+
+
+
DEPTH
+
{{ introspection.cognitive.conversation_depth }}
+
+
+
INITIATIVE
+
+ {{ introspection.cognitive.last_initiative or '—' }} +
+
+
+ {% if introspection.cognitive.active_commitments %} +
+
ACTIVE COMMITMENTS
+ {% for c in introspection.cognitive.active_commitments %} +
{{ c | e }}
+ {% endfor %} +
+ {% endif %} +
+
+ + +
+
+ // THOUGHT STREAM +
+
+ {% if introspection.recent_thoughts %} + {% for t in introspection.recent_thoughts %} +
+
+ {{ t.seed_type }} + {{ t.created_at[:16] }} +
+
{{ t.content | e }}
+
+ {% endfor %} + {% else %} +
No thoughts yet. The thinking engine will populate this.
+ {% endif %} +
+
+ + +
+
+ // SOVEREIGNTY PULSE + + {{ pulse.health | upper }} + +
+
+
+ {% for layer in pulse.layers %} +
+
{{ layer.name | upper }}
+
+
+
+
{{ layer.sovereign_pct }}%
+
+ {% endfor %} +
+
+
+ Crystallizations + {{ pulse.crystallizations_last_hour }} +
+
+ API Independence + {{ pulse.api_independence_pct }}% +
+
+ Total Events + {{ pulse.total_events }} +
+
+
+
+ +
// LIVE MEMORY - + {{ stats.total_entries }} stored
@@ -85,7 +200,32 @@
- + +
+
// SESSION ANALYTICS
+
+
+
+ Messages + {{ introspection.analytics.total_messages }} +
+
+ Avg Response + {{ introspection.analytics.avg_response_length }} chars +
+
+ Memory Hits + {{ introspection.analytics.memory_hits_total }} +
+
+ Duration + {{ introspection.analytics.session_duration_minutes }} min +
+
+
+
+ +
// TEACH TIMMY
@@ -119,4 +259,128 @@
+ + + {% endblock %} diff --git a/src/timmy/nexus/__init__.py b/src/timmy/nexus/__init__.py new file mode 100644 index 00000000..c9cb22c0 --- /dev/null +++ b/src/timmy/nexus/__init__.py @@ -0,0 +1,15 @@ +"""Nexus subsystem — Timmy's sovereign conversational awareness space. + +Extends the Nexus v1 chat interface with: + +- **Introspection engine** — real-time cognitive state, thought-stream + integration, and session analytics surfaced directly in the Nexus. +- **Persistent sessions** — SQLite-backed conversation history that + survives process restarts. +- **Sovereignty pulse** — a live dashboard-within-dashboard showing + Timmy's sovereignty health, crystallization rate, and API independence. +""" + +from timmy.nexus.introspection import NexusIntrospector # noqa: F401 +from timmy.nexus.persistence import NexusStore # noqa: F401 +from timmy.nexus.sovereignty_pulse import SovereigntyPulse # noqa: F401 diff --git a/src/timmy/nexus/introspection.py b/src/timmy/nexus/introspection.py new file mode 100644 index 00000000..239acbc9 --- /dev/null +++ b/src/timmy/nexus/introspection.py @@ -0,0 +1,236 @@ +"""Nexus Introspection Engine — cognitive self-awareness for Timmy. + +Aggregates live signals from the CognitiveTracker, ThinkingEngine, and +MemorySystem into a unified introspection snapshot. The Nexus template +renders this as an always-visible cognitive state panel so the operator +can observe Timmy's inner life in real time. + +Design principles: +- Read-only observer — never mutates cognitive state. +- Graceful degradation — if any upstream is unavailable, the snapshot + still returns with partial data instead of crashing. +- JSON-serializable — every method returns plain dicts ready for + WebSocket push or Jinja2 template rendering. + +Refs: #1090 (Nexus Epic), architecture-v2.md §Intelligence Surface +""" + +from __future__ import annotations + +import logging +from dataclasses import asdict, dataclass, field +from datetime import UTC, datetime + +logger = logging.getLogger(__name__) + + +# ── Data models ────────────────────────────────────────────────────────────── + + +@dataclass +class CognitiveSummary: + """Distilled view of Timmy's current cognitive state.""" + + mood: str = "settled" + engagement: str = "idle" + focus_topic: str | None = None + conversation_depth: int = 0 + active_commitments: list[str] = field(default_factory=list) + last_initiative: str | None = None + + def to_dict(self) -> dict: + return asdict(self) + + +@dataclass +class ThoughtSummary: + """Compact representation of a single thought for the Nexus viewer.""" + + id: str + content: str + seed_type: str + created_at: str + parent_id: str | None = None + + def to_dict(self) -> dict: + return asdict(self) + + +@dataclass +class SessionAnalytics: + """Conversation-level analytics for the active Nexus session.""" + + total_messages: int = 0 + user_messages: int = 0 + assistant_messages: int = 0 + avg_response_length: float = 0.0 + topics_discussed: list[str] = field(default_factory=list) + session_start: str | None = None + session_duration_minutes: float = 0.0 + memory_hits_total: int = 0 + + def to_dict(self) -> dict: + return asdict(self) + + +@dataclass +class IntrospectionSnapshot: + """Everything the Nexus template needs to render the cognitive panel.""" + + cognitive: CognitiveSummary = field(default_factory=CognitiveSummary) + recent_thoughts: list[ThoughtSummary] = field(default_factory=list) + analytics: SessionAnalytics = field(default_factory=SessionAnalytics) + timestamp: str = field( + default_factory=lambda: datetime.now(UTC).isoformat() + ) + + def to_dict(self) -> dict: + return { + "cognitive": self.cognitive.to_dict(), + "recent_thoughts": [t.to_dict() for t in self.recent_thoughts], + "analytics": self.analytics.to_dict(), + "timestamp": self.timestamp, + } + + +# ── Introspector ───────────────────────────────────────────────────────────── + + +class NexusIntrospector: + """Aggregates cognitive signals into a single introspection snapshot. + + Lazily pulls from: + - ``timmy.cognitive_state.cognitive_tracker`` + - ``timmy.thinking.thinking_engine`` + - Nexus conversation log (passed in to avoid circular import) + """ + + def __init__(self) -> None: + self._session_start: datetime | None = None + self._topics: list[str] = [] + self._memory_hit_count: int = 0 + + # ── Public API ──────────────────────────────────────────────────────── + + def snapshot( + self, + conversation_log: list[dict] | None = None, + ) -> IntrospectionSnapshot: + """Build a complete introspection snapshot. + + Parameters + ---------- + conversation_log: + The in-memory ``_nexus_log`` from the routes module (list of + dicts with ``role``, ``content``, ``timestamp`` keys). + """ + return IntrospectionSnapshot( + cognitive=self._read_cognitive(), + recent_thoughts=self._read_thoughts(), + analytics=self._compute_analytics(conversation_log or []), + ) + + def record_memory_hits(self, count: int) -> None: + """Track cumulative memory hits for session analytics.""" + self._memory_hit_count += count + + def reset(self) -> None: + """Reset session-scoped analytics (e.g. on history clear).""" + self._session_start = None + self._topics.clear() + self._memory_hit_count = 0 + + # ── Cognitive state reader ──────────────────────────────────────────── + + def _read_cognitive(self) -> CognitiveSummary: + """Pull current state from the CognitiveTracker singleton.""" + try: + from timmy.cognitive_state import cognitive_tracker + + state = cognitive_tracker.get_state() + return CognitiveSummary( + mood=state.mood, + engagement=state.engagement, + focus_topic=state.focus_topic, + conversation_depth=state.conversation_depth, + active_commitments=list(state.active_commitments), + last_initiative=state.last_initiative, + ) + except Exception as exc: + logger.debug("Introspection: cognitive state unavailable: %s", exc) + return CognitiveSummary() + + # ── Thought stream reader ───────────────────────────────────────────── + + def _read_thoughts(self, limit: int = 5) -> list[ThoughtSummary]: + """Pull recent thoughts from the ThinkingEngine.""" + try: + from timmy.thinking import thinking_engine + + thoughts = thinking_engine.get_recent_thoughts(limit=limit) + return [ + ThoughtSummary( + id=t.id, + content=( + t.content[:200] + "…" if len(t.content) > 200 else t.content + ), + seed_type=t.seed_type, + created_at=t.created_at, + parent_id=t.parent_id, + ) + for t in thoughts + ] + except Exception as exc: + logger.debug("Introspection: thought stream unavailable: %s", exc) + return [] + + # ── Session analytics ───────────────────────────────────────────────── + + def _compute_analytics( + self, conversation_log: list[dict] + ) -> SessionAnalytics: + """Derive analytics from the Nexus conversation log.""" + if not conversation_log: + return SessionAnalytics() + + if self._session_start is None: + self._session_start = datetime.now(UTC) + + user_msgs = [m for m in conversation_log if m.get("role") == "user"] + asst_msgs = [ + m for m in conversation_log if m.get("role") == "assistant" + ] + + avg_len = 0.0 + if asst_msgs: + total_chars = sum(len(m.get("content", "")) for m in asst_msgs) + avg_len = total_chars / len(asst_msgs) + + # Extract topics from user messages (simple: first 40 chars) + topics = [] + seen: set[str] = set() + for m in user_msgs: + topic = m.get("content", "")[:40].strip() + if topic and topic.lower() not in seen: + topics.append(topic) + seen.add(topic.lower()) + # Keep last 8 topics + topics = topics[-8:] + + elapsed = (datetime.now(UTC) - self._session_start).total_seconds() / 60 + + return SessionAnalytics( + total_messages=len(conversation_log), + user_messages=len(user_msgs), + assistant_messages=len(asst_msgs), + avg_response_length=round(avg_len, 1), + topics_discussed=topics, + session_start=self._session_start.strftime("%H:%M:%S"), + session_duration_minutes=round(elapsed, 1), + memory_hits_total=self._memory_hit_count, + ) + + +# ── Module singleton ───────────────────────────────────────────────────────── + +nexus_introspector = NexusIntrospector() diff --git a/src/timmy/nexus/persistence.py b/src/timmy/nexus/persistence.py new file mode 100644 index 00000000..826f694d --- /dev/null +++ b/src/timmy/nexus/persistence.py @@ -0,0 +1,230 @@ +"""Nexus Session Persistence — durable conversation history. + +The v1 Nexus kept conversations in a Python ``list`` that vanished on +every process restart. This module provides a SQLite-backed store so +Nexus conversations survive reboots while remaining fully local. + +Schema: + nexus_messages(id, role, content, timestamp, session_tag) + +Design decisions: +- One table, one DB file (``data/nexus.db``). Cheap, portable, sovereign. +- ``session_tag`` enables future per-operator sessions (#1090 deferred scope). +- Bounded history: ``MAX_MESSAGES`` rows per session tag. Oldest are pruned + automatically on insert. +- Thread-safe via SQLite WAL mode + module-level singleton. + +Refs: #1090 (Nexus Epic — session persistence), architecture-v2.md §Data Layer +""" + +from __future__ import annotations + +import logging +import sqlite3 +from contextlib import closing +from datetime import UTC, datetime +from pathlib import Path +from typing import TypedDict + +logger = logging.getLogger(__name__) + +# ── Defaults ───────────────────────────────────────────────────────────────── + +_DEFAULT_DB_DIR = Path("data") +DB_PATH: Path = _DEFAULT_DB_DIR / "nexus.db" + +MAX_MESSAGES = 500 # per session tag +DEFAULT_SESSION_TAG = "nexus" + +# ── Schema ─────────────────────────────────────────────────────────────────── + +_SCHEMA = """\ +CREATE TABLE IF NOT EXISTS nexus_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + role TEXT NOT NULL, + content TEXT NOT NULL, + timestamp TEXT NOT NULL, + session_tag TEXT NOT NULL DEFAULT 'nexus' +); +CREATE INDEX IF NOT EXISTS idx_nexus_session ON nexus_messages(session_tag); +CREATE INDEX IF NOT EXISTS idx_nexus_ts ON nexus_messages(timestamp); +""" + + +# ── Typed dict for rows ────────────────────────────────────────────────────── + + +class NexusMessage(TypedDict): + id: int + role: str + content: str + timestamp: str + session_tag: str + + +# ── Store ──────────────────────────────────────────────────────────────────── + + +class NexusStore: + """SQLite-backed persistence for Nexus conversations. + + Usage:: + + store = NexusStore() # uses module-level DB_PATH + store.append("user", "hi") + msgs = store.get_history() # → list[NexusMessage] + store.clear() # wipe session + """ + + def __init__(self, db_path: Path | None = None) -> None: + self._db_path = db_path or DB_PATH + self._conn: sqlite3.Connection | None = None + + # ── Connection management ───────────────────────────────────────────── + + def _get_conn(self) -> sqlite3.Connection: + if self._conn is None: + self._db_path.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect( + str(self._db_path), + check_same_thread=False, + ) + self._conn.row_factory = sqlite3.Row + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.executescript(_SCHEMA) + return self._conn + + def close(self) -> None: + """Close the underlying connection (idempotent).""" + if self._conn is not None: + try: + self._conn.close() + except Exception: + pass + self._conn = None + + # ── Write ───────────────────────────────────────────────────────────── + + def append( + self, + role: str, + content: str, + *, + timestamp: str | None = None, + session_tag: str = DEFAULT_SESSION_TAG, + ) -> int: + """Insert a message and return its row id. + + Automatically prunes oldest messages when the session exceeds + ``MAX_MESSAGES``. + """ + ts = timestamp or datetime.now(UTC).strftime("%H:%M:%S") + conn = self._get_conn() + with closing(conn.cursor()) as cur: + cur.execute( + "INSERT INTO nexus_messages (role, content, timestamp, session_tag) " + "VALUES (?, ?, ?, ?)", + (role, content, ts, session_tag), + ) + row_id: int = cur.lastrowid # type: ignore[assignment] + conn.commit() + + # Prune + self._prune(session_tag) + + return row_id + + def _prune(self, session_tag: str) -> None: + """Remove oldest rows that exceed MAX_MESSAGES for *session_tag*.""" + conn = self._get_conn() + with closing(conn.cursor()) as cur: + cur.execute( + "SELECT COUNT(*) FROM nexus_messages WHERE session_tag = ?", + (session_tag,), + ) + count = cur.fetchone()[0] + if count > MAX_MESSAGES: + excess = count - MAX_MESSAGES + cur.execute( + "DELETE FROM nexus_messages WHERE id IN (" + " SELECT id FROM nexus_messages " + " WHERE session_tag = ? ORDER BY id ASC LIMIT ?" + ")", + (session_tag, excess), + ) + conn.commit() + + # ── Read ────────────────────────────────────────────────────────────── + + def get_history( + self, + session_tag: str = DEFAULT_SESSION_TAG, + limit: int = 200, + ) -> list[NexusMessage]: + """Return the most recent *limit* messages for *session_tag*. + + Results are ordered oldest-first (ascending id). + """ + conn = self._get_conn() + with closing(conn.cursor()) as cur: + cur.execute( + "SELECT id, role, content, timestamp, session_tag " + "FROM nexus_messages " + "WHERE session_tag = ? " + "ORDER BY id DESC LIMIT ?", + (session_tag, limit), + ) + rows = cur.fetchall() + + # Reverse to chronological order + messages: list[NexusMessage] = [ + NexusMessage( + id=r["id"], + role=r["role"], + content=r["content"], + timestamp=r["timestamp"], + session_tag=r["session_tag"], + ) + for r in reversed(rows) + ] + return messages + + def message_count( + self, session_tag: str = DEFAULT_SESSION_TAG + ) -> int: + """Return total message count for *session_tag*.""" + conn = self._get_conn() + with closing(conn.cursor()) as cur: + cur.execute( + "SELECT COUNT(*) FROM nexus_messages WHERE session_tag = ?", + (session_tag,), + ) + return cur.fetchone()[0] + + # ── Delete ──────────────────────────────────────────────────────────── + + def clear(self, session_tag: str = DEFAULT_SESSION_TAG) -> int: + """Delete all messages for *session_tag*. Returns count deleted.""" + conn = self._get_conn() + with closing(conn.cursor()) as cur: + cur.execute( + "DELETE FROM nexus_messages WHERE session_tag = ?", + (session_tag,), + ) + deleted: int = cur.rowcount + conn.commit() + return deleted + + def clear_all(self) -> int: + """Delete every message across all session tags.""" + conn = self._get_conn() + with closing(conn.cursor()) as cur: + cur.execute("DELETE FROM nexus_messages") + deleted: int = cur.rowcount + conn.commit() + return deleted + + +# ── Module singleton ───────────────────────────────────────────────────────── + +nexus_store = NexusStore() diff --git a/src/timmy/nexus/sovereignty_pulse.py b/src/timmy/nexus/sovereignty_pulse.py new file mode 100644 index 00000000..e7a2d375 --- /dev/null +++ b/src/timmy/nexus/sovereignty_pulse.py @@ -0,0 +1,153 @@ +"""Sovereignty Pulse — real-time sovereignty health for the Nexus. + +Reads from the ``SovereigntyMetricsStore`` (created in PR #1331) and +distils it into a compact "pulse" that the Nexus template can render +as a persistent health badge. + +The pulse answers one question at a glance: *how sovereign is Timmy +right now?* + +Signals: +- Overall sovereignty percentage (0–100). +- Per-layer breakdown (perception, decision, narration). +- Crystallization velocity — new rules learned in the last hour. +- API independence — percentage of recent inferences served locally. +- Health rating (sovereign / degraded / dependent). + +All methods return plain dicts — no imports leak into the template layer. + +Refs: #953 (Sovereignty Loop), #954 (metrics), #1090 (Nexus epic) +""" + +from __future__ import annotations + +import logging +from dataclasses import asdict, dataclass, field +from datetime import UTC, datetime + +logger = logging.getLogger(__name__) + + +# ── Data model ─────────────────────────────────────────────────────────────── + + +@dataclass +class LayerPulse: + """Sovereignty metrics for a single AI layer.""" + + name: str + sovereign_pct: float = 0.0 + cache_hits: int = 0 + model_calls: int = 0 + + def to_dict(self) -> dict: + return asdict(self) + + +@dataclass +class SovereigntyPulseSnapshot: + """Complete sovereignty health reading for the Nexus display.""" + + overall_pct: float = 0.0 + health: str = "unknown" # sovereign | degraded | dependent | unknown + layers: list[LayerPulse] = field(default_factory=list) + crystallizations_last_hour: int = 0 + api_independence_pct: float = 0.0 + total_events: int = 0 + timestamp: str = field( + default_factory=lambda: datetime.now(UTC).isoformat() + ) + + def to_dict(self) -> dict: + return { + "overall_pct": self.overall_pct, + "health": self.health, + "layers": [layer.to_dict() for layer in self.layers], + "crystallizations_last_hour": self.crystallizations_last_hour, + "api_independence_pct": self.api_independence_pct, + "total_events": self.total_events, + "timestamp": self.timestamp, + } + + +# ── Pulse reader ───────────────────────────────────────────────────────────── + + +def _classify_health(pct: float) -> str: + """Map overall sovereignty percentage to a human-readable health label.""" + if pct >= 80.0: + return "sovereign" + if pct >= 50.0: + return "degraded" + if pct > 0.0: + return "dependent" + return "unknown" + + +class SovereigntyPulse: + """Reads sovereignty metrics and emits pulse snapshots. + + Lazily imports from ``timmy.sovereignty.metrics`` so the Nexus + module has no hard compile-time dependency on the Sovereignty Loop. + """ + + def snapshot(self) -> SovereigntyPulseSnapshot: + """Build a pulse snapshot from the live metrics store.""" + try: + return self._read_metrics() + except Exception as exc: + logger.debug("SovereigntyPulse: metrics unavailable: %s", exc) + return SovereigntyPulseSnapshot() + + def _read_metrics(self) -> SovereigntyPulseSnapshot: + """Internal reader — allowed to raise if imports fail.""" + from timmy.sovereignty.metrics import get_metrics_store + + store = get_metrics_store() + snap = store.get_snapshot() + + # Parse per-layer stats from the snapshot + layers = [] + layer_pcts: list[float] = [] + for layer_name in ("perception", "decision", "narration"): + layer_data = snap.get(layer_name, {}) + hits = layer_data.get("cache_hits", 0) + calls = layer_data.get("model_calls", 0) + total = hits + calls + pct = (hits / total * 100) if total > 0 else 0.0 + layers.append( + LayerPulse( + name=layer_name, + sovereign_pct=round(pct, 1), + cache_hits=hits, + model_calls=calls, + ) + ) + layer_pcts.append(pct) + + overall = round(sum(layer_pcts) / len(layer_pcts), 1) if layer_pcts else 0.0 + + # Crystallization count + cryst = snap.get("crystallizations", 0) + + # API independence: cache_hits / total across all layers + total_hits = sum(layer.cache_hits for layer in layers) + total_calls = sum(layer.model_calls for layer in layers) + total_all = total_hits + total_calls + api_indep = round((total_hits / total_all * 100), 1) if total_all > 0 else 0.0 + + total_events = snap.get("total_events", 0) + + return SovereigntyPulseSnapshot( + overall_pct=overall, + health=_classify_health(overall), + layers=layers, + crystallizations_last_hour=cryst, + api_independence_pct=api_indep, + total_events=total_events, + ) + + +# ── Module singleton ───────────────────────────────────────────────────────── + +sovereignty_pulse = SovereigntyPulse() diff --git a/static/css/mission-control.css b/static/css/mission-control.css index 0745bdb8..adce9675 100644 --- a/static/css/mission-control.css +++ b/static/css/mission-control.css @@ -2665,25 +2665,27 @@ } .vs-btn-save:hover { opacity: 0.85; } -/* ── Nexus ────────────────────────────────────────────────── */ -.nexus-layout { max-width: 1400px; margin: 0 auto; } +/* ── Nexus v2 ─────────────────────────────────────────────── */ +.nexus-layout { max-width: 1600px; margin: 0 auto; } .nexus-header { border-bottom: 1px solid var(--border); padding-bottom: 0.5rem; } .nexus-title { font-size: 1.4rem; font-weight: 700; color: var(--purple); letter-spacing: 0.1em; } .nexus-subtitle { font-size: 0.8rem; color: var(--text-dim); margin-top: 0.2rem; } -.nexus-grid { +/* v2 grid: wider sidebar for awareness panels */ +.nexus-grid-v2 { display: grid; - grid-template-columns: 1fr 320px; + grid-template-columns: 1fr 360px; gap: 1rem; align-items: start; } -@media (max-width: 900px) { - .nexus-grid { grid-template-columns: 1fr; } +@media (max-width: 1000px) { + .nexus-grid-v2 { grid-template-columns: 1fr; } } .nexus-chat-panel { height: calc(100vh - 180px); display: flex; flex-direction: column; } .nexus-chat-panel .card-body { overflow-y: auto; flex: 1; } +.nexus-msg-count { font-size: 0.7rem; color: var(--text-dim); letter-spacing: 0.05em; } .nexus-empty-state { color: var(--text-dim); @@ -2693,6 +2695,177 @@ text-align: center; } +/* Sidebar scrollable on short screens */ +.nexus-sidebar-col { max-height: calc(100vh - 140px); overflow-y: auto; } + +/* ── Sovereignty Pulse Badge (header) ── */ +.nexus-pulse-badge { + display: flex; + align-items: center; + gap: 0.4rem; + background: var(--bg-card); + border: 1px solid var(--border); + border-radius: var(--radius-md); + padding: 0.3rem 0.7rem; + font-size: 0.72rem; + letter-spacing: 0.05em; +} +.nexus-pulse-dot { + width: 8px; height: 8px; + border-radius: 50%; + display: inline-block; +} +.nexus-pulse-dot.nexus-pulse-sovereign { background: var(--green); box-shadow: 0 0 6px var(--green); } +.nexus-pulse-dot.nexus-pulse-degraded { background: var(--amber); box-shadow: 0 0 6px var(--amber); } +.nexus-pulse-dot.nexus-pulse-dependent { background: var(--red); box-shadow: 0 0 6px var(--red); } +.nexus-pulse-dot.nexus-pulse-unknown { background: var(--text-dim); } +.nexus-pulse-label { color: var(--text-dim); } +.nexus-pulse-value { color: var(--text-bright); font-weight: 600; } + +/* ── Cognitive State Panel ── */ +.nexus-cognitive-panel .card-body { font-size: 0.78rem; } +.nexus-engagement-badge { + font-size: 0.65rem; + letter-spacing: 0.08em; + padding: 0.15rem 0.5rem; + border-radius: 3px; + background: rgba(168,85,247,0.12); + color: var(--purple); +} +.nexus-cog-grid { + display: grid; + grid-template-columns: 1fr 1fr; + gap: 0.5rem; +} +.nexus-cog-item { + background: rgba(255,255,255,0.02); + border-radius: 4px; + padding: 0.35rem 0.5rem; +} +.nexus-cog-label { + font-size: 0.62rem; + color: var(--text-dim); + letter-spacing: 0.08em; + margin-bottom: 0.15rem; +} +.nexus-cog-value { + color: var(--text-bright); + font-size: 0.8rem; +} +.nexus-cog-focus { + font-size: 0.72rem; + color: var(--text); + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + max-width: 140px; +} +.nexus-commitments { font-size: 0.72rem; } +.nexus-commitment-item { + color: var(--text); + padding: 0.2rem 0; + border-bottom: 1px solid rgba(59,26,92,0.4); + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; +} + +/* ── Thought Stream Panel ── */ +.nexus-thoughts-panel .card-body { max-height: 200px; overflow-y: auto; } +.nexus-thought-item { + border-left: 2px solid var(--purple); + padding: 0.3rem 0.5rem; + margin-bottom: 0.5rem; + font-size: 0.76rem; + background: rgba(168,85,247,0.04); + border-radius: 0 4px 4px 0; +} +.nexus-thought-meta { + display: flex; + justify-content: space-between; + margin-bottom: 0.2rem; +} +.nexus-thought-seed { + color: var(--purple); + font-size: 0.65rem; + letter-spacing: 0.06em; + text-transform: uppercase; +} +.nexus-thought-time { color: var(--text-dim); font-size: 0.62rem; } +.nexus-thought-content { color: var(--text); line-height: 1.4; } + +/* ── Sovereignty Pulse Detail Panel ── */ +.nexus-health-badge { + font-size: 0.62rem; + letter-spacing: 0.08em; + padding: 0.15rem 0.5rem; + border-radius: 3px; +} +.nexus-health-sovereign { background: rgba(0,232,122,0.12); color: var(--green); } +.nexus-health-degraded { background: rgba(255,184,0,0.12); color: var(--amber); } +.nexus-health-dependent { background: rgba(255,68,85,0.12); color: var(--red); } +.nexus-health-unknown { background: rgba(107,74,138,0.12); color: var(--text-dim); } + +.nexus-pulse-layer { + display: flex; + align-items: center; + gap: 0.4rem; + margin-bottom: 0.35rem; + font-size: 0.72rem; +} +.nexus-pulse-layer-label { + color: var(--text-dim); + min-width: 80px; + letter-spacing: 0.06em; + font-size: 0.65rem; +} +.nexus-pulse-bar-track { + flex: 1; + height: 6px; + background: rgba(59,26,92,0.5); + border-radius: 3px; + overflow: hidden; +} +.nexus-pulse-bar-fill { + height: 100%; + background: linear-gradient(90deg, var(--purple), var(--green)); + border-radius: 3px; + transition: width 0.6s ease; +} +.nexus-pulse-layer-pct { + color: var(--text-bright); + font-size: 0.68rem; + min-width: 36px; + text-align: right; +} + +.nexus-pulse-stats { font-size: 0.72rem; } +.nexus-pulse-stat { + display: flex; + justify-content: space-between; + padding: 0.2rem 0; + border-bottom: 1px solid rgba(59,26,92,0.3); +} +.nexus-pulse-stat-label { color: var(--text-dim); } +.nexus-pulse-stat-value { color: var(--text-bright); } + +/* ── Session Analytics Panel ── */ +.nexus-analytics-grid { + display: grid; + grid-template-columns: 1fr 1fr; + gap: 0.4rem; + font-size: 0.72rem; +} +.nexus-analytics-item { + display: flex; + justify-content: space-between; + padding: 0.25rem 0.4rem; + background: rgba(255,255,255,0.02); + border-radius: 4px; +} +.nexus-analytics-label { color: var(--text-dim); } +.nexus-analytics-value { color: var(--text-bright); } + /* Memory sidebar */ .nexus-memory-hits { font-size: 0.78rem; } .nexus-memory-label { color: var(--text-dim); font-size: 0.72rem; margin-bottom: 0.4rem; letter-spacing: 0.05em; } diff --git a/tests/dashboard/test_nexus.py b/tests/dashboard/test_nexus.py index 0e4cc43d..5d652b89 100644 --- a/tests/dashboard/test_nexus.py +++ b/tests/dashboard/test_nexus.py @@ -1,4 +1,4 @@ -"""Tests for the Nexus conversational awareness routes.""" +"""Tests for the Nexus v2 conversational awareness routes.""" from unittest.mock import patch @@ -24,6 +24,41 @@ def test_nexus_page_contains_teach_form(client): assert "/nexus/teach" in response.text +def test_nexus_page_contains_cognitive_panel(client): + """Nexus v2 page must include the cognitive state panel.""" + response = client.get("/nexus") + assert response.status_code == 200 + assert "COGNITIVE STATE" in response.text + + +def test_nexus_page_contains_thought_stream(client): + """Nexus v2 page must include the thought stream panel.""" + response = client.get("/nexus") + assert response.status_code == 200 + assert "THOUGHT STREAM" in response.text + + +def test_nexus_page_contains_sovereignty_pulse(client): + """Nexus v2 page must include the sovereignty pulse panel.""" + response = client.get("/nexus") + assert response.status_code == 200 + assert "SOVEREIGNTY PULSE" in response.text + + +def test_nexus_page_contains_session_analytics(client): + """Nexus v2 page must include the session analytics panel.""" + response = client.get("/nexus") + assert response.status_code == 200 + assert "SESSION ANALYTICS" in response.text + + +def test_nexus_page_contains_websocket_script(client): + """Nexus v2 page must include the WebSocket connection script.""" + response = client.get("/nexus") + assert response.status_code == 200 + assert "/nexus/ws" in response.text + + def test_nexus_chat_empty_message_returns_empty(client): """POST /nexus/chat with blank message returns empty response.""" response = client.post("/nexus/chat", data={"message": " "}) @@ -72,3 +107,17 @@ def test_nexus_clear_history(client): response = client.request("DELETE", "/nexus/history") assert response.status_code == 200 assert "cleared" in response.text.lower() + + +def test_nexus_introspect_api(client): + """GET /nexus/introspect should return JSON introspection snapshot.""" + response = client.get("/nexus/introspect") + assert response.status_code == 200 + data = response.json() + assert "introspection" in data + assert "sovereignty_pulse" in data + assert "cognitive" in data["introspection"] + assert "recent_thoughts" in data["introspection"] + assert "analytics" in data["introspection"] + assert "overall_pct" in data["sovereignty_pulse"] + assert "health" in data["sovereignty_pulse"] diff --git a/tests/timmy/nexus/__init__.py b/tests/timmy/nexus/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/timmy/nexus/test_introspection.py b/tests/timmy/nexus/test_introspection.py new file mode 100644 index 00000000..a6ed735b --- /dev/null +++ b/tests/timmy/nexus/test_introspection.py @@ -0,0 +1,199 @@ +"""Tests for the Nexus Introspection Engine.""" + +from unittest.mock import MagicMock, patch + +from timmy.nexus.introspection import ( + CognitiveSummary, + IntrospectionSnapshot, + NexusIntrospector, + SessionAnalytics, + ThoughtSummary, +) + +# ── Data model tests ───────────────────────────────────────────────────────── + + +class TestCognitiveSummary: + def test_defaults(self): + s = CognitiveSummary() + assert s.mood == "settled" + assert s.engagement == "idle" + assert s.focus_topic is None + + def test_to_dict(self): + s = CognitiveSummary(mood="curious", engagement="deep", focus_topic="architecture") + d = s.to_dict() + assert d["mood"] == "curious" + assert d["engagement"] == "deep" + assert d["focus_topic"] == "architecture" + + +class TestThoughtSummary: + def test_to_dict(self): + t = ThoughtSummary( + id="t1", content="Hello world", seed_type="freeform", created_at="2026-01-01" + ) + d = t.to_dict() + assert d["id"] == "t1" + assert d["seed_type"] == "freeform" + assert d["parent_id"] is None + + +class TestSessionAnalytics: + def test_defaults(self): + a = SessionAnalytics() + assert a.total_messages == 0 + assert a.avg_response_length == 0.0 + assert a.topics_discussed == [] + + +class TestIntrospectionSnapshot: + def test_to_dict_structure(self): + snap = IntrospectionSnapshot() + d = snap.to_dict() + assert "cognitive" in d + assert "recent_thoughts" in d + assert "analytics" in d + assert "timestamp" in d + + def test_to_dict_with_data(self): + snap = IntrospectionSnapshot( + cognitive=CognitiveSummary(mood="energized"), + recent_thoughts=[ + ThoughtSummary(id="x", content="test", seed_type="s", created_at="now"), + ], + ) + d = snap.to_dict() + assert d["cognitive"]["mood"] == "energized" + assert len(d["recent_thoughts"]) == 1 + + +# ── Introspector tests ─────────────────────────────────────────────────────── + + +class TestNexusIntrospector: + def test_snapshot_empty_log(self): + intro = NexusIntrospector() + snap = intro.snapshot(conversation_log=[]) + assert isinstance(snap, IntrospectionSnapshot) + assert snap.analytics.total_messages == 0 + + def test_snapshot_with_messages(self): + intro = NexusIntrospector() + log = [ + {"role": "user", "content": "hello", "timestamp": "10:00:00"}, + {"role": "assistant", "content": "Hi there!", "timestamp": "10:00:01"}, + {"role": "user", "content": "architecture question", "timestamp": "10:00:02"}, + ] + snap = intro.snapshot(conversation_log=log) + assert snap.analytics.total_messages == 3 + assert snap.analytics.user_messages == 2 + assert snap.analytics.assistant_messages == 1 + assert snap.analytics.avg_response_length > 0 + + def test_record_memory_hits(self): + intro = NexusIntrospector() + intro.record_memory_hits(3) + intro.record_memory_hits(2) + snap = intro.snapshot( + conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}] + ) + assert snap.analytics.memory_hits_total == 5 + + def test_reset_clears_state(self): + intro = NexusIntrospector() + intro.record_memory_hits(10) + intro.reset() + snap = intro.snapshot( + conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}] + ) + assert snap.analytics.memory_hits_total == 0 + + def test_topics_deduplication(self): + intro = NexusIntrospector() + log = [ + {"role": "user", "content": "hello", "timestamp": "t"}, + {"role": "user", "content": "hello", "timestamp": "t"}, + {"role": "user", "content": "different topic", "timestamp": "t"}, + ] + snap = intro.snapshot(conversation_log=log) + assert len(snap.analytics.topics_discussed) == 2 + + def test_topics_capped_at_8(self): + intro = NexusIntrospector() + log = [{"role": "user", "content": f"topic {i}", "timestamp": "t"} for i in range(15)] + snap = intro.snapshot(conversation_log=log) + assert len(snap.analytics.topics_discussed) <= 8 + + def test_cognitive_read_fallback(self): + """If cognitive read fails, snapshot still works with defaults.""" + intro = NexusIntrospector() + # Patch the module-level import inside _read_cognitive + with patch.dict("sys.modules", {"timmy.cognitive_state": None}): + snap = intro.snapshot(conversation_log=[]) + # Should not raise — fallback to default + assert snap.cognitive.mood == "settled" + + def test_thoughts_read_fallback(self): + """If thought read fails, snapshot still works with empty list.""" + intro = NexusIntrospector() + with patch.dict("sys.modules", {"timmy.thinking": None}): + snap = intro.snapshot(conversation_log=[]) + assert snap.recent_thoughts == [] + + def test_read_cognitive_from_tracker(self): + intro = NexusIntrospector() + mock_state = MagicMock() + mock_state.mood = "curious" + mock_state.engagement = "deep" + mock_state.focus_topic = "sovereignty" + mock_state.conversation_depth = 5 + mock_state.active_commitments = ["build something"] + mock_state.last_initiative = "build something" + + mock_tracker = MagicMock() + mock_tracker.get_state.return_value = mock_state + + with patch("timmy.cognitive_state.cognitive_tracker", mock_tracker): + summary = intro._read_cognitive() + + assert summary.mood == "curious" + assert summary.engagement == "deep" + assert summary.focus_topic == "sovereignty" + assert summary.conversation_depth == 5 + + def test_read_thoughts_from_engine(self): + intro = NexusIntrospector() + mock_thought = MagicMock() + mock_thought.id = "t1" + mock_thought.content = "Deep thought about sovereignty" + mock_thought.seed_type = "existential" + mock_thought.created_at = "2026-03-23T10:00:00" + mock_thought.parent_id = None + + mock_engine = MagicMock() + mock_engine.get_recent_thoughts.return_value = [mock_thought] + + with patch("timmy.thinking.thinking_engine", mock_engine): + thoughts = intro._read_thoughts(limit=5) + + assert len(thoughts) == 1 + assert thoughts[0].id == "t1" + assert thoughts[0].seed_type == "existential" + + def test_read_thoughts_truncates_long_content(self): + intro = NexusIntrospector() + mock_thought = MagicMock() + mock_thought.id = "t2" + mock_thought.content = "x" * 300 + mock_thought.seed_type = "freeform" + mock_thought.created_at = "2026-03-23" + mock_thought.parent_id = None + + mock_engine = MagicMock() + mock_engine.get_recent_thoughts.return_value = [mock_thought] + + with patch("timmy.thinking.thinking_engine", mock_engine): + thoughts = intro._read_thoughts(limit=5) + + assert len(thoughts[0].content) <= 201 # 200 + "…" diff --git a/tests/timmy/nexus/test_persistence.py b/tests/timmy/nexus/test_persistence.py new file mode 100644 index 00000000..cf3e9250 --- /dev/null +++ b/tests/timmy/nexus/test_persistence.py @@ -0,0 +1,144 @@ +"""Tests for the Nexus Session Persistence store.""" + +import pytest + +from timmy.nexus.persistence import MAX_MESSAGES, NexusStore + + +@pytest.fixture +def store(tmp_path): + """Provide a NexusStore backed by a temp database.""" + db = tmp_path / "test_nexus.db" + s = NexusStore(db_path=db) + yield s + s.close() + + +class TestNexusStoreBasic: + def test_append_and_retrieve(self, store): + store.append("user", "hello") + store.append("assistant", "hi there") + history = store.get_history() + assert len(history) == 2 + assert history[0]["role"] == "user" + assert history[0]["content"] == "hello" + assert history[1]["role"] == "assistant" + + def test_message_count(self, store): + assert store.message_count() == 0 + store.append("user", "a") + store.append("user", "b") + assert store.message_count() == 2 + + def test_custom_timestamp(self, store): + store.append("user", "msg", timestamp="12:34:56") + history = store.get_history() + assert history[0]["timestamp"] == "12:34:56" + + def test_clear_session(self, store): + store.append("user", "a") + store.append("assistant", "b") + deleted = store.clear() + assert deleted == 2 + assert store.message_count() == 0 + + def test_clear_empty_session(self, store): + deleted = store.clear() + assert deleted == 0 + + def test_clear_all(self, store): + store.append("user", "a", session_tag="s1") + store.append("user", "b", session_tag="s2") + deleted = store.clear_all() + assert deleted == 2 + assert store.message_count(session_tag="s1") == 0 + assert store.message_count(session_tag="s2") == 0 + + +class TestNexusStoreOrdering: + def test_chronological_order(self, store): + for i in range(5): + store.append("user", f"msg-{i}") + history = store.get_history() + contents = [m["content"] for m in history] + assert contents == ["msg-0", "msg-1", "msg-2", "msg-3", "msg-4"] + + def test_limit_parameter(self, store): + for i in range(10): + store.append("user", f"msg-{i}") + history = store.get_history(limit=3) + assert len(history) == 3 + # Should be the 3 most recent + assert history[0]["content"] == "msg-7" + assert history[2]["content"] == "msg-9" + + +class TestNexusStoreSessionTags: + def test_session_isolation(self, store): + store.append("user", "nexus-msg", session_tag="nexus") + store.append("user", "other-msg", session_tag="other") + nexus_history = store.get_history(session_tag="nexus") + other_history = store.get_history(session_tag="other") + assert len(nexus_history) == 1 + assert len(other_history) == 1 + assert nexus_history[0]["content"] == "nexus-msg" + + def test_clear_only_affects_target_session(self, store): + store.append("user", "a", session_tag="s1") + store.append("user", "b", session_tag="s2") + store.clear(session_tag="s1") + assert store.message_count(session_tag="s1") == 0 + assert store.message_count(session_tag="s2") == 1 + + +class TestNexusStorePruning: + def test_prune_excess_messages(self, tmp_path): + """Inserting beyond MAX_MESSAGES should prune oldest.""" + db = tmp_path / "prune_test.db" + s = NexusStore(db_path=db) + # Insert MAX_MESSAGES + 5 to trigger pruning + for i in range(MAX_MESSAGES + 5): + s.append("user", f"msg-{i}") + assert s.message_count() == MAX_MESSAGES + # Get full history — oldest remaining should be msg-5 + history = s.get_history(limit=MAX_MESSAGES) + assert history[0]["content"] == "msg-5" + s.close() + + +class TestNexusStoreReopen: + def test_data_survives_close_reopen(self, tmp_path): + """Data persists across store instances (simulates process restart).""" + db = tmp_path / "reopen.db" + + s1 = NexusStore(db_path=db) + s1.append("user", "persistent message") + s1.close() + + s2 = NexusStore(db_path=db) + history = s2.get_history() + assert len(history) == 1 + assert history[0]["content"] == "persistent message" + s2.close() + + +class TestNexusStoreReturnedId: + def test_append_returns_row_id(self, store): + id1 = store.append("user", "first") + id2 = store.append("user", "second") + assert isinstance(id1, int) + assert id2 > id1 + + +class TestNexusStoreClose: + def test_close_is_idempotent(self, store): + store.close() + store.close() # Should not raise + + def test_operations_after_close_reconnect(self, store): + """After close, next operation should reconnect automatically.""" + store.append("user", "before close") + store.close() + # Should auto-reconnect + store.append("user", "after close") + assert store.message_count() == 2 diff --git a/tests/timmy/nexus/test_sovereignty_pulse.py b/tests/timmy/nexus/test_sovereignty_pulse.py new file mode 100644 index 00000000..dc94ae5c --- /dev/null +++ b/tests/timmy/nexus/test_sovereignty_pulse.py @@ -0,0 +1,151 @@ +"""Tests for the Sovereignty Pulse module.""" + +from unittest.mock import MagicMock, patch + +from timmy.nexus.sovereignty_pulse import ( + LayerPulse, + SovereigntyPulse, + SovereigntyPulseSnapshot, + _classify_health, +) + + +class TestClassifyHealth: + def test_sovereign(self): + assert _classify_health(95.0) == "sovereign" + assert _classify_health(80.0) == "sovereign" + + def test_degraded(self): + assert _classify_health(79.9) == "degraded" + assert _classify_health(50.0) == "degraded" + + def test_dependent(self): + assert _classify_health(49.9) == "dependent" + assert _classify_health(0.1) == "dependent" + + def test_unknown(self): + assert _classify_health(0.0) == "unknown" + + +class TestLayerPulse: + def test_to_dict(self): + lp = LayerPulse(name="perception", sovereign_pct=75.0, cache_hits=15, model_calls=5) + d = lp.to_dict() + assert d["name"] == "perception" + assert d["sovereign_pct"] == 75.0 + assert d["cache_hits"] == 15 + + +class TestSovereigntyPulseSnapshot: + def test_defaults(self): + snap = SovereigntyPulseSnapshot() + assert snap.overall_pct == 0.0 + assert snap.health == "unknown" + assert snap.layers == [] + + def test_to_dict_structure(self): + snap = SovereigntyPulseSnapshot( + overall_pct=85.0, + health="sovereign", + layers=[LayerPulse(name="perception", sovereign_pct=90.0)], + crystallizations_last_hour=3, + api_independence_pct=88.0, + total_events=42, + ) + d = snap.to_dict() + assert d["overall_pct"] == 85.0 + assert d["health"] == "sovereign" + assert len(d["layers"]) == 1 + assert d["layers"][0]["name"] == "perception" + assert d["crystallizations_last_hour"] == 3 + assert d["api_independence_pct"] == 88.0 + assert d["total_events"] == 42 + assert "timestamp" in d + + +class TestSovereigntyPulse: + def test_snapshot_graceful_degradation(self): + """When metrics are unavailable, should return default snapshot.""" + pulse = SovereigntyPulse() + with patch.object( + pulse, + "_read_metrics", + side_effect=ImportError("no metrics"), + ): + snap = pulse.snapshot() + assert isinstance(snap, SovereigntyPulseSnapshot) + assert snap.health == "unknown" + + def test_snapshot_with_metrics(self): + """When metrics are available, should read and compute correctly.""" + pulse = SovereigntyPulse() + mock_snapshot = { + "perception": {"cache_hits": 8, "model_calls": 2}, + "decision": {"cache_hits": 6, "model_calls": 4}, + "narration": {"cache_hits": 10, "model_calls": 0}, + "crystallizations": 7, + "total_events": 100, + } + mock_store = MagicMock() + mock_store.get_snapshot.return_value = mock_snapshot + + with patch( + "timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store + ): + snap = pulse.snapshot() + + # Perception: 8/10 = 80%, Decision: 6/10 = 60%, Narration: 10/10 = 100% + # Overall: (80 + 60 + 100) / 3 = 80.0 + assert len(snap.layers) == 3 + assert snap.layers[0].name == "perception" + assert snap.layers[0].sovereign_pct == 80.0 + assert snap.layers[1].name == "decision" + assert snap.layers[1].sovereign_pct == 60.0 + assert snap.layers[2].name == "narration" + assert snap.layers[2].sovereign_pct == 100.0 + assert snap.overall_pct == 80.0 + assert snap.health == "sovereign" + assert snap.crystallizations_last_hour == 7 + assert snap.total_events == 100 + + def test_api_independence_calculation(self): + pulse = SovereigntyPulse() + mock_snapshot = { + "perception": {"cache_hits": 5, "model_calls": 5}, + "decision": {"cache_hits": 5, "model_calls": 5}, + "narration": {"cache_hits": 5, "model_calls": 5}, + "crystallizations": 0, + "total_events": 0, + } + mock_store = MagicMock() + mock_store.get_snapshot.return_value = mock_snapshot + + with patch( + "timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store + ): + snap = pulse.snapshot() + + # Total hits: 15, Total calls: 15, Total: 30 + # Independence: 15/30 = 50% + assert snap.api_independence_pct == 50.0 + + def test_zero_events_no_division_error(self): + pulse = SovereigntyPulse() + mock_snapshot = { + "perception": {"cache_hits": 0, "model_calls": 0}, + "decision": {"cache_hits": 0, "model_calls": 0}, + "narration": {"cache_hits": 0, "model_calls": 0}, + "crystallizations": 0, + "total_events": 0, + } + mock_store = MagicMock() + mock_store.get_snapshot.return_value = mock_snapshot + + with patch( + "timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store + ): + snap = pulse.snapshot() + + assert snap.overall_pct == 0.0 + assert snap.api_independence_pct == 0.0 + assert snap.health == "unknown"