1
0

Compare commits

...

14 Commits

Author SHA1 Message Date
46b5bf96cc [loop-cycle-3] refactor: split app.py into schedulers.py and startup.py (#1363) (#1455) 2026-03-24 19:07:18 +00:00
79bc2d6790 [loop-cycle-2] refactor: split world.py into focused submodules (#1360) (#1449) 2026-03-24 18:55:13 +00:00
8a7a34499c [loop-cycle-1] refactor: split cascade.py into focused modules (#1342) (#1448) 2026-03-24 18:39:06 +00:00
008663ae58 [loop-cycle-31] fix: create missing kimi-loop.sh script (#1415)
fix: create missing kimi-loop.sh script with efficient Gitea filtering (#1415)
2026-03-24 14:39:52 +00:00
002ace5b3c [loop-cycle-7] fix: Configure mypy with explicit-package-bases for proper src/ layout (#1346) (#1359) 2026-03-24 09:39:12 +00:00
91d06eeb49 [kimi] Add unit tests for memory/crud.py (#1344) (#1358) 2026-03-24 03:08:36 +00:00
9e9dd5309a [kimi] Fix: stub cv2 in tests to prevent timeout (#1336) (#1356)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-24 02:59:52 +00:00
36f3f1b3a7 [claude] Add unit tests for tools/system_tools.py (#1345) (#1354) 2026-03-24 02:56:35 +00:00
6a2a0377d2 [loop-cycle-1] fix: thread timeout method for xdist compatibility (#1336) (#1355) 2026-03-24 02:56:19 +00:00
cd0f718d6b [claude] fix: restore live timestamp to HotMemory.read() (#1339) (#1353) 2026-03-24 02:55:48 +00:00
cddfd09c01 [claude] Add unit tests for spark/engine.py (#1343) (#1352) 2026-03-24 02:52:15 +00:00
d0b6d87eb1 [perplexity] feat: Nexus v2 — Cognitive Awareness & Introspection Engine (#1090) (#1348)
Co-authored-by: Perplexity Computer <perplexity@tower.local>
Co-committed-by: Perplexity Computer <perplexity@tower.local>
2026-03-24 02:50:40 +00:00
9e8e0f8552 [claude] Placeholder research artifact for issue #1341 (#1350) 2026-03-24 02:49:37 +00:00
e09082a8a8 [kimi] Add comprehensive unit tests for models/budget.py (#1316) (#1347) 2026-03-24 02:48:51 +00:00
38 changed files with 6589 additions and 2102 deletions

35
memory/research/task.md Normal file
View File

@@ -0,0 +1,35 @@
# Research Report: Task #1341
**Date:** 2026-03-23
**Issue:** [#1341](http://143.198.27.163:3000/Rockachopa/Timmy-time-dashboard/issues/1341)
**Priority:** normal
**Delegated by:** Timmy via Kimi delegation pipeline
---
## Summary
This issue was submitted as a placeholder via the Kimi delegation pipeline with unfilled template fields:
- **Research Question:** `Q?` (template default — no actual question provided)
- **Background / Context:** `ctx` (template default — no context provided)
- **Task:** `Task` (template default — no task specified)
## Findings
No actionable research question was specified. The issue appears to be a test or
accidental submission of an unfilled delegation template.
## Recommendations
1. **Re-open with a real question** if there is a specific topic to research.
2. **Review the delegation pipeline** to add validation that prevents empty/template-default
submissions from reaching the backlog (e.g. reject issues where the body contains
literal placeholder strings like `Q?` or `ctx`).
3. **Add a pipeline guard** in the Kimi delegation script to require non-empty, non-default
values for `Research Question` and `Background / Context` before creating an issue.
## Next Steps
- [ ] Add input validation to Kimi delegation pipeline
- [ ] Re-file with a concrete research question if needed

View File

@@ -99,8 +99,8 @@ pythonpath = ["src", "tests"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
timeout = 30
timeout_method = "signal"
timeout_func_only = false
timeout_method = "thread"
timeout_func_only = true
addopts = "-v --tb=short --strict-markers --disable-warnings --durations=10 --cov-fail-under=60"
markers = [
"unit: Unit tests (fast, no I/O)",
@@ -167,3 +167,29 @@ directory = "htmlcov"
[tool.coverage.xml]
output = "coverage.xml"
[tool.mypy]
python_version = "3.11"
mypy_path = "src"
explicit_package_bases = true
namespace_packages = true
check_untyped_defs = true
warn_unused_ignores = true
warn_redundant_casts = true
warn_unreachable = true
strict_optional = true
[[tool.mypy.overrides]]
module = [
"airllm.*",
"pymumble.*",
"pyttsx3.*",
"serpapi.*",
"discord.*",
"psutil.*",
"health_snapshot.*",
"swarm.*",
"lightning.*",
"mcp.*",
]
ignore_missing_imports = true

74
scripts/kimi-loop.sh Executable file
View File

@@ -0,0 +1,74 @@
#!/bin/bash
# kimi-loop.sh — Efficient Gitea issue polling for Kimi agent
#
# Fetches only Kimi-assigned issues using proper query parameters,
# avoiding the need to pull all unassigned tickets and filter in Python.
#
# Usage:
# ./scripts/kimi-loop.sh
#
# Exit codes:
# 0 — Found work for Kimi
# 1 — No work available
set -euo pipefail
# Configuration
GITEA_API="${TIMMY_GITEA_API:-${GITEA_API:-http://143.198.27.163:3000/api/v1}}"
REPO_SLUG="${REPO_SLUG:-rockachopa/Timmy-time-dashboard}"
TOKEN_FILE="${HOME}/.hermes/gitea_token"
WORKTREE_DIR="${HOME}/worktrees"
# Ensure token exists
if [[ ! -f "$TOKEN_FILE" ]]; then
echo "ERROR: Gitea token not found at $TOKEN_FILE" >&2
exit 1
fi
TOKEN=$(cat "$TOKEN_FILE")
# Function to make authenticated Gitea API calls
gitea_api() {
local endpoint="$1"
local method="${2:-GET}"
curl -s -X "$method" \
-H "Authorization: token $TOKEN" \
-H "Content-Type: application/json" \
"$GITEA_API/repos/$REPO_SLUG/$endpoint"
}
# Efficiently fetch only Kimi-assigned issues (fixes the filter bug)
# Uses assignee parameter to filter server-side instead of pulling all issues
get_kimi_issues() {
gitea_api "issues?state=open&assignee=kimi&sort=created&order=asc&limit=10"
}
# Main execution
main() {
echo "🤖 Kimi loop: Checking for assigned work..."
# Fetch Kimi's issues efficiently (server-side filtering)
issues=$(get_kimi_issues)
# Count issues using jq
count=$(echo "$issues" | jq '. | length')
if [[ "$count" -eq 0 ]]; then
echo "📭 No issues assigned to Kimi. Idle."
exit 1
fi
echo "📝 Found $count issue(s) assigned to Kimi:"
echo "$issues" | jq -r '.[] | " #\(.number): \(.title)"'
# TODO: Process each issue (create worktree, run task, create PR)
# For now, just report availability
echo "✅ Kimi has work available."
exit 0
}
# Handle script being sourced vs executed
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
main "$@"
fi

View File

@@ -7,11 +7,8 @@ Key improvements:
4. Security and logging handled by dedicated middleware
"""
import asyncio
import json
import logging
import re
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, Request, WebSocket
@@ -40,6 +37,7 @@ from dashboard.routes.experiments import router as experiments_router
from dashboard.routes.grok import router as grok_router
from dashboard.routes.health import router as health_router
from dashboard.routes.hermes import router as hermes_router
from dashboard.routes.legal import router as legal_router
from dashboard.routes.loop_qa import router as loop_qa_router
from dashboard.routes.memory import router as memory_router
from dashboard.routes.mobile import router as mobile_router
@@ -49,8 +47,8 @@ from dashboard.routes.monitoring import router as monitoring_router
from dashboard.routes.nexus import router as nexus_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
from dashboard.routes.legal import router as legal_router
from dashboard.routes.self_correction import router as self_correction_router
from dashboard.routes.seo import router as seo_router
from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router
from dashboard.routes.sovereignty_ws import router as sovereignty_ws_router
from dashboard.routes.spark import router as spark_router
@@ -63,10 +61,13 @@ from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router
from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.seo import router as seo_router
from dashboard.routes.world import matrix_router
from dashboard.routes.world import router as world_router
from timmy.workshop_state import PRESENCE_FILE
from dashboard.schedulers import ( # noqa: F401 — re-export for backward compat
_SYNTHESIZED_STATE,
_presence_watcher,
)
from dashboard.startup import lifespan
class _ColorFormatter(logging.Formatter):
@@ -139,444 +140,6 @@ logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).parent
PROJECT_ROOT = BASE_DIR.parent.parent
_BRIEFING_INTERVAL_HOURS = 6
async def _briefing_scheduler() -> None:
"""Background task: regenerate Timmy's briefing every 6 hours."""
from infrastructure.notifications.push import notify_briefing_ready
from timmy.briefing import engine as briefing_engine
await asyncio.sleep(2)
while True:
try:
if briefing_engine.needs_refresh():
logger.info("Generating morning briefing…")
briefing = briefing_engine.generate()
await notify_briefing_ready(briefing)
else:
logger.info("Briefing is fresh; skipping generation.")
except Exception as exc:
logger.error("Briefing scheduler error: %s", exc)
await asyncio.sleep(_BRIEFING_INTERVAL_HOURS * 3600)
async def _thinking_scheduler() -> None:
"""Background task: execute Timmy's thinking cycle every N seconds."""
from timmy.thinking import thinking_engine
await asyncio.sleep(5) # Stagger after briefing scheduler
while True:
try:
if settings.thinking_enabled:
await asyncio.wait_for(
thinking_engine.think_once(),
timeout=settings.thinking_timeout_seconds,
)
except TimeoutError:
logger.warning(
"Thinking cycle timed out after %ds — Ollama may be unresponsive",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Thinking scheduler error: %s", exc)
await asyncio.sleep(settings.thinking_interval_seconds)
async def _hermes_scheduler() -> None:
"""Background task: Hermes system health monitor, runs every 5 minutes.
Checks memory, disk, Ollama, processes, and network.
Auto-resolves what it can; fires push notifications when human help is needed.
"""
from infrastructure.hermes.monitor import hermes_monitor
await asyncio.sleep(20) # Stagger after other schedulers
while True:
try:
if settings.hermes_enabled:
report = await hermes_monitor.run_cycle()
if report.has_issues:
logger.warning(
"Hermes health issues detected — overall: %s",
report.overall.value,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Hermes scheduler error: %s", exc)
await asyncio.sleep(settings.hermes_interval_seconds)
async def _loop_qa_scheduler() -> None:
"""Background task: run capability self-tests on a separate timer.
Independent of the thinking loop — runs every N thinking ticks
to probe subsystems and detect degradation.
"""
from timmy.loop_qa import loop_qa_orchestrator
await asyncio.sleep(10) # Stagger after thinking scheduler
while True:
try:
if settings.loop_qa_enabled:
result = await asyncio.wait_for(
loop_qa_orchestrator.run_next_test(),
timeout=settings.thinking_timeout_seconds,
)
if result:
status = "PASS" if result["success"] else "FAIL"
logger.info(
"Loop QA [%s]: %s%s",
result["capability"],
status,
result.get("details", "")[:80],
)
except TimeoutError:
logger.warning(
"Loop QA test timed out after %ds",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Loop QA scheduler error: %s", exc)
interval = settings.thinking_interval_seconds * settings.loop_qa_interval_ticks
await asyncio.sleep(interval)
_PRESENCE_POLL_SECONDS = 30
_PRESENCE_INITIAL_DELAY = 3
_SYNTHESIZED_STATE: dict = {
"version": 1,
"liveness": None,
"current_focus": "",
"mood": "idle",
"active_threads": [],
"recent_events": [],
"concerns": [],
}
async def _presence_watcher() -> None:
"""Background task: watch ~/.timmy/presence.json and broadcast changes via WS.
Polls the file every 30 seconds (matching Timmy's write cadence).
If the file doesn't exist, broadcasts a synthesised idle state.
"""
from infrastructure.ws_manager.handler import ws_manager as ws_mgr
await asyncio.sleep(_PRESENCE_INITIAL_DELAY) # Stagger after other schedulers
last_mtime: float = 0.0
while True:
try:
if PRESENCE_FILE.exists():
mtime = PRESENCE_FILE.stat().st_mtime
if mtime != last_mtime:
last_mtime = mtime
raw = await asyncio.to_thread(PRESENCE_FILE.read_text)
state = json.loads(raw)
await ws_mgr.broadcast("timmy_state", state)
else:
# File absent — broadcast synthesised state once per cycle
if last_mtime != -1.0:
last_mtime = -1.0
await ws_mgr.broadcast("timmy_state", _SYNTHESIZED_STATE)
except json.JSONDecodeError as exc:
logger.warning("presence.json parse error: %s", exc)
except Exception as exc:
logger.warning("Presence watcher error: %s", exc)
await asyncio.sleep(_PRESENCE_POLL_SECONDS)
async def _start_chat_integrations_background() -> None:
"""Background task: start chat integrations without blocking startup."""
from integrations.chat_bridge.registry import platform_registry
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot
await asyncio.sleep(0.5)
# Register Discord in the platform registry
platform_registry.register(discord_bot)
if settings.telegram_token:
try:
await telegram_bot.start()
logger.info("Telegram bot started")
except Exception as exc:
logger.warning("Failed to start Telegram bot: %s", exc)
else:
logger.debug("Telegram: no token configured, skipping")
if settings.discord_token or discord_bot.load_token():
try:
await discord_bot.start()
logger.info("Discord bot started")
except Exception as exc:
logger.warning("Failed to start Discord bot: %s", exc)
else:
logger.debug("Discord: no token configured, skipping")
# If Discord isn't connected yet, start a watcher that polls for the
# token to appear in the environment or .env file.
if discord_bot.state.name != "CONNECTED":
asyncio.create_task(_discord_token_watcher())
async def _discord_token_watcher() -> None:
"""Poll for DISCORD_TOKEN appearing in env or .env and auto-start Discord bot."""
from integrations.chat_bridge.vendors.discord import discord_bot
# Don't poll if discord.py isn't even installed
try:
import discord as _discord_check # noqa: F401
except ImportError:
logger.debug("discord.py not installed — token watcher exiting")
return
while True:
await asyncio.sleep(30)
if discord_bot.state.name == "CONNECTED":
return # Already running — stop watching
# 1. Check settings (pydantic-settings reads env on instantiation;
# hot-reload is handled by re-reading .env below)
token = settings.discord_token
# 2. Re-read .env file for hot-reload
if not token:
try:
from dotenv import dotenv_values
env_path = Path(settings.repo_root) / ".env"
if env_path.exists():
vals = dotenv_values(env_path)
token = vals.get("DISCORD_TOKEN", "")
except ImportError:
pass # python-dotenv not installed
# 3. Check state file (written by /discord/setup)
if not token:
token = discord_bot.load_token() or ""
if token:
try:
logger.info(
"Discord watcher: token found, attempting start (state=%s)",
discord_bot.state.name,
)
success = await discord_bot.start(token=token)
if success:
logger.info("Discord bot auto-started (token detected)")
return # Done — stop watching
logger.warning(
"Discord watcher: start() returned False (state=%s)",
discord_bot.state.name,
)
except Exception as exc:
logger.warning("Discord auto-start failed: %s", exc)
def _startup_init() -> None:
"""Validate config and enable event persistence."""
from config import validate_startup
validate_startup()
from infrastructure.events.bus import init_event_bus_persistence
init_event_bus_persistence()
from spark.engine import get_spark_engine
if get_spark_engine().enabled:
logger.info("Spark Intelligence active — event capture enabled")
def _startup_background_tasks() -> list[asyncio.Task]:
"""Spawn all recurring background tasks (non-blocking)."""
bg_tasks = [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
asyncio.create_task(_hermes_scheduler()),
]
try:
from timmy.paperclip import start_paperclip_poller
bg_tasks.append(asyncio.create_task(start_paperclip_poller()))
logger.info("Paperclip poller started")
except ImportError:
logger.debug("Paperclip module not found, skipping poller")
return bg_tasks
def _try_prune(label: str, prune_fn, days: int) -> None:
"""Run a prune function, log results, swallow errors."""
try:
pruned = prune_fn()
if pruned:
logger.info(
"%s auto-prune: removed %d entries older than %d days",
label,
pruned,
days,
)
except Exception as exc:
logger.debug("%s auto-prune skipped: %s", label, exc)
def _check_vault_size() -> None:
"""Warn if the memory vault exceeds the configured size limit."""
try:
vault_path = Path(settings.repo_root) / "memory" / "notes"
if vault_path.exists():
total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file())
total_mb = total_bytes / (1024 * 1024)
if total_mb > settings.memory_vault_max_mb:
logger.warning(
"Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes",
total_mb,
settings.memory_vault_max_mb,
)
except Exception as exc:
logger.debug("Vault size check skipped: %s", exc)
def _startup_pruning() -> None:
"""Auto-prune old memories, thoughts, and events on startup."""
if settings.memory_prune_days > 0:
from timmy.memory_system import prune_memories
_try_prune(
"Memory",
lambda: prune_memories(
older_than_days=settings.memory_prune_days,
keep_facts=settings.memory_prune_keep_facts,
),
settings.memory_prune_days,
)
if settings.thoughts_prune_days > 0:
from timmy.thinking import thinking_engine
_try_prune(
"Thought",
lambda: thinking_engine.prune_old_thoughts(
keep_days=settings.thoughts_prune_days,
keep_min=settings.thoughts_prune_keep_min,
),
settings.thoughts_prune_days,
)
if settings.events_prune_days > 0:
from swarm.event_log import prune_old_events
_try_prune(
"Event",
lambda: prune_old_events(
keep_days=settings.events_prune_days,
keep_min=settings.events_prune_keep_min,
),
settings.events_prune_days,
)
if settings.memory_vault_max_mb > 0:
_check_vault_size()
async def _shutdown_cleanup(
bg_tasks: list[asyncio.Task],
workshop_heartbeat,
) -> None:
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot
await discord_bot.stop()
await telegram_bot.stop()
try:
from timmy.mcp_tools import close_mcp_sessions
await close_mcp_sessions()
except Exception as exc:
logger.debug("MCP shutdown: %s", exc)
await workshop_heartbeat.stop()
for task in bg_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
_startup_init()
bg_tasks = _startup_background_tasks()
_startup_pruning()
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Register session logger with error capture
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
logger.debug("Failed to register error recorder")
# Mark session start for sovereignty duration tracking
try:
from timmy.sovereignty import mark_session_start
mark_session_start()
except Exception:
logger.debug("Failed to mark sovereignty session start")
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
# Generate and commit sovereignty session report
try:
from timmy.sovereignty import generate_and_commit_report
await generate_and_commit_report()
except Exception as exc:
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
app = FastAPI(
title="Mission Control",

View File

@@ -1,21 +1,32 @@
"""Nexus — Timmy's persistent conversational awareness space.
"""Nexus v2 — Timmy's persistent conversational awareness space.
A conversational-only interface where Timmy maintains live memory context.
No tool use; pure conversation with memory integration and a teaching panel.
Extends the v1 Nexus (chat + memory sidebar + teaching panel) with:
- **Persistent conversations** — SQLite-backed history survives restarts.
- **Introspection panel** — live cognitive state, recent thoughts, session
analytics rendered alongside every conversation turn.
- **Sovereignty pulse** — real-time sovereignty health badge in the sidebar.
- **WebSocket** — pushes introspection + sovereignty snapshots so the
Nexus page stays alive without polling.
Routes:
GET /nexus — render nexus page with live memory sidebar
GET /nexus — render nexus page with full awareness panels
POST /nexus/chat — send a message; returns HTMX partial
POST /nexus/teach — inject a fact into Timmy's live memory
DELETE /nexus/history — clear the nexus conversation history
GET /nexus/introspect — JSON introspection snapshot (API)
WS /nexus/ws — live introspection + sovereignty push
Refs: #1090 (Nexus Epic), #953 (Sovereignty Loop)
"""
import asyncio
import json
import logging
from datetime import UTC, datetime
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse
from fastapi import APIRouter, Form, Request, WebSocket
from fastapi.responses import HTMLResponse, JSONResponse
from dashboard.templating import templates
from timmy.memory_system import (
@@ -24,6 +35,9 @@ from timmy.memory_system import (
search_memories,
store_personal_fact,
)
from timmy.nexus.introspection import nexus_introspector
from timmy.nexus.persistence import nexus_store
from timmy.nexus.sovereignty_pulse import sovereignty_pulse
from timmy.session import _clean_response, chat, reset_session
logger = logging.getLogger(__name__)
@@ -32,28 +46,74 @@ router = APIRouter(prefix="/nexus", tags=["nexus"])
_NEXUS_SESSION_ID = "nexus"
_MAX_MESSAGE_LENGTH = 10_000
_WS_PUSH_INTERVAL = 5 # seconds between WebSocket pushes
# In-memory conversation log for the Nexus session (mirrors chat store pattern
# but is scoped to the Nexus so it won't pollute the main dashboard history).
# In-memory conversation log — kept in sync with the persistent store
# so templates can render without hitting the DB on every page load.
_nexus_log: list[dict] = []
# ── Initialisation ───────────────────────────────────────────────────────────
# On module load, hydrate the in-memory log from the persistent store.
# This runs once at import time (process startup).
_HYDRATED = False
def _hydrate_log() -> None:
"""Load persisted history into the in-memory log (idempotent)."""
global _HYDRATED
if _HYDRATED:
return
try:
rows = nexus_store.get_history(limit=200)
_nexus_log.clear()
for row in rows:
_nexus_log.append(
{
"role": row["role"],
"content": row["content"],
"timestamp": row["timestamp"],
}
)
_HYDRATED = True
logger.info("Nexus: hydrated %d messages from persistent store", len(_nexus_log))
except Exception as exc:
logger.warning("Nexus: failed to hydrate from store: %s", exc)
_HYDRATED = True # Don't retry repeatedly
# ── Helpers ──────────────────────────────────────────────────────────────────
def _ts() -> str:
return datetime.now(UTC).strftime("%H:%M:%S")
def _append_log(role: str, content: str) -> None:
_nexus_log.append({"role": role, "content": content, "timestamp": _ts()})
# Keep last 200 exchanges to bound memory usage
"""Append to both in-memory log and persistent store."""
ts = _ts()
_nexus_log.append({"role": role, "content": content, "timestamp": ts})
# Bound in-memory log
if len(_nexus_log) > 200:
del _nexus_log[:-200]
# Persist
try:
nexus_store.append(role, content, timestamp=ts)
except Exception as exc:
logger.warning("Nexus: persist failed: %s", exc)
# ── Page route ───────────────────────────────────────────────────────────────
@router.get("", response_class=HTMLResponse)
async def nexus_page(request: Request):
"""Render the Nexus page with live memory context."""
"""Render the Nexus page with full awareness panels."""
_hydrate_log()
stats = get_memory_stats()
facts = recall_personal_facts_with_ids()[:8]
introspection = nexus_introspector.snapshot(conversation_log=_nexus_log)
pulse = sovereignty_pulse.snapshot()
return templates.TemplateResponse(
request,
@@ -63,13 +123,18 @@ async def nexus_page(request: Request):
"messages": list(_nexus_log),
"stats": stats,
"facts": facts,
"introspection": introspection.to_dict(),
"pulse": pulse.to_dict(),
},
)
# ── Chat route ───────────────────────────────────────────────────────────────
@router.post("/chat", response_class=HTMLResponse)
async def nexus_chat(request: Request, message: str = Form(...)):
"""Conversational-only chat routed through the Nexus session.
"""Conversational-only chat with persistence and introspection.
Does not invoke tool-use approval flow — pure conversation with memory
context injected from Timmy's live memory store.
@@ -87,18 +152,22 @@ async def nexus_chat(request: Request, message: str = Form(...)):
"error": "Message too long (max 10 000 chars).",
"timestamp": _ts(),
"memory_hits": [],
"introspection": nexus_introspector.snapshot().to_dict(),
},
)
ts = _ts()
# Fetch semantically relevant memories to surface in the sidebar
# Fetch semantically relevant memories
try:
memory_hits = await asyncio.to_thread(search_memories, query=message, limit=4)
except Exception as exc:
logger.warning("Nexus memory search failed: %s", exc)
memory_hits = []
# Track memory hits for analytics
nexus_introspector.record_memory_hits(len(memory_hits))
# Conversational response — no tool approval flow
response_text: str | None = None
error_text: str | None = None
@@ -113,6 +182,9 @@ async def nexus_chat(request: Request, message: str = Form(...)):
if response_text:
_append_log("assistant", response_text)
# Build fresh introspection snapshot after the exchange
introspection = nexus_introspector.snapshot(conversation_log=_nexus_log)
return templates.TemplateResponse(
request,
"partials/nexus_message.html",
@@ -122,10 +194,14 @@ async def nexus_chat(request: Request, message: str = Form(...)):
"error": error_text,
"timestamp": ts,
"memory_hits": memory_hits,
"introspection": introspection.to_dict(),
},
)
# ── Teach route ──────────────────────────────────────────────────────────────
@router.post("/teach", response_class=HTMLResponse)
async def nexus_teach(request: Request, fact: str = Form(...)):
"""Inject a fact into Timmy's live memory from the Nexus teaching panel."""
@@ -148,11 +224,20 @@ async def nexus_teach(request: Request, fact: str = Form(...)):
)
# ── Clear history ────────────────────────────────────────────────────────────
@router.delete("/history", response_class=HTMLResponse)
async def nexus_clear_history(request: Request):
"""Clear the Nexus conversation history."""
"""Clear the Nexus conversation history (both in-memory and persistent)."""
_nexus_log.clear()
try:
nexus_store.clear()
except Exception as exc:
logger.warning("Nexus: persistent clear failed: %s", exc)
nexus_introspector.reset()
reset_session(session_id=_NEXUS_SESSION_ID)
return templates.TemplateResponse(
request,
"partials/nexus_message.html",
@@ -162,5 +247,55 @@ async def nexus_clear_history(request: Request):
"error": None,
"timestamp": _ts(),
"memory_hits": [],
"introspection": nexus_introspector.snapshot().to_dict(),
},
)
# ── Introspection API ────────────────────────────────────────────────────────
@router.get("/introspect", response_class=JSONResponse)
async def nexus_introspect():
"""Return a JSON introspection snapshot (for API consumers)."""
snap = nexus_introspector.snapshot(conversation_log=_nexus_log)
pulse = sovereignty_pulse.snapshot()
return {
"introspection": snap.to_dict(),
"sovereignty_pulse": pulse.to_dict(),
}
# ── WebSocket — live Nexus push ──────────────────────────────────────────────
@router.websocket("/ws")
async def nexus_ws(websocket: WebSocket) -> None:
"""Push introspection + sovereignty pulse snapshots to the Nexus page.
The frontend connects on page load and receives JSON updates every
``_WS_PUSH_INTERVAL`` seconds, keeping the cognitive state panel,
thought stream, and sovereignty badge fresh without HTMX polling.
"""
await websocket.accept()
logger.info("Nexus WS connected")
try:
# Immediate first push
await _push_snapshot(websocket)
while True:
await asyncio.sleep(_WS_PUSH_INTERVAL)
await _push_snapshot(websocket)
except Exception:
logger.debug("Nexus WS disconnected")
async def _push_snapshot(ws: WebSocket) -> None:
"""Send the combined introspection + pulse payload."""
snap = nexus_introspector.snapshot(conversation_log=_nexus_log)
pulse = sovereignty_pulse.snapshot()
payload = {
"type": "nexus_state",
"introspection": snap.to_dict(),
"sovereignty_pulse": pulse.to_dict(),
}
await ws.send_text(json.dumps(payload))

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,124 @@
"""Workshop world state API and WebSocket relay.
Serves Timmy's current presence state to the Workshop 3D renderer.
The primary consumer is the browser on first load — before any
WebSocket events arrive, the client needs a full state snapshot.
The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever
the heartbeat detects a state change. It also accepts ``visitor_message``
frames from the 3D client and responds with ``timmy_speech`` barks.
Source of truth: ``~/.timmy/presence.json`` written by
:class:`~timmy.workshop_state.WorkshopHeartbeat`.
Falls back to a live ``get_state_dict()`` call if the file is stale
or missing.
"""
from fastapi import APIRouter
# Import submodule routers
from .bark import matrix_router as _bark_matrix_router
from .matrix import matrix_router as _matrix_matrix_router
from .state import router as _state_router
from .websocket import router as _ws_router
# ---------------------------------------------------------------------------
# Combine sub-routers into the two top-level routers that app.py expects
# ---------------------------------------------------------------------------
router = APIRouter(prefix="/api/world", tags=["world"])
# Include state routes (GET /state)
for route in _state_router.routes:
router.routes.append(route)
# Include websocket routes (WS /ws)
for route in _ws_router.routes:
router.routes.append(route)
# Combine matrix sub-routers
matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"])
for route in _bark_matrix_router.routes:
matrix_router.routes.append(route)
for route in _matrix_matrix_router.routes:
matrix_router.routes.append(route)
# ---------------------------------------------------------------------------
# Re-export public API for backward compatibility
# ---------------------------------------------------------------------------
# Used by src/dashboard/app.py
from .websocket import broadcast_world_state # noqa: E402, F401
# Used by src/infrastructure/presence.py
from .websocket import _ws_clients # noqa: E402, F401
# Used by tests
from .bark import ( # noqa: E402, F401
BarkRequest,
_BARK_RATE_LIMIT_SECONDS,
_GROUND_TTL,
_MAX_EXCHANGES,
_bark_and_broadcast,
_bark_last_request,
_conversation,
_generate_bark,
_handle_client_message,
_log_bark_failure,
_refresh_ground,
post_matrix_bark,
reset_conversation_ground,
)
from .commitments import ( # noqa: E402, F401
_COMMITMENT_PATTERNS,
_MAX_COMMITMENTS,
_REMIND_AFTER,
_build_commitment_context,
_commitments,
_extract_commitments,
_record_commitments,
_tick_commitments,
close_commitment,
get_commitments,
reset_commitments,
)
from .matrix import ( # noqa: E402, F401
_DEFAULT_MATRIX_CONFIG,
_build_matrix_agents_response,
_build_matrix_health_response,
_build_matrix_memory_response,
_build_matrix_thoughts_response,
_check_capability_bark,
_check_capability_familiar,
_check_capability_lightning,
_check_capability_memory,
_check_capability_thinking,
_load_matrix_config,
_memory_search_last_request,
get_matrix_agents,
get_matrix_config,
get_matrix_health,
get_matrix_memory_search,
get_matrix_thoughts,
)
from .state import ( # noqa: E402, F401
_STALE_THRESHOLD,
_build_world_state,
_get_current_state,
_read_presence_file,
get_world_state,
)
from .utils import ( # noqa: E402, F401
_compute_circular_positions,
_get_agent_color,
_get_agent_shape,
_get_client_ip,
)
from .websocket import ( # noqa: E402, F401
_authenticate_ws,
_broadcast,
_heartbeat,
world_ws,
)

View File

@@ -0,0 +1,212 @@
"""Bark/conversation — visitor chat engine and Matrix bark endpoint."""
import asyncio
import json
import logging
import time
from collections import deque
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from infrastructure.presence import produce_bark
from .commitments import (
_build_commitment_context,
_record_commitments,
_tick_commitments,
)
logger = logging.getLogger(__name__)
matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"])
# Rate limiting: 1 request per 3 seconds per visitor_id
_BARK_RATE_LIMIT_SECONDS = 3
_bark_last_request: dict[str, float] = {}
# Recent conversation buffer — kept in memory for the Workshop overlay.
# Stores the last _MAX_EXCHANGES (visitor_text, timmy_text) pairs.
_MAX_EXCHANGES = 3
_conversation: deque[dict] = deque(maxlen=_MAX_EXCHANGES)
_WORKSHOP_SESSION_ID = "workshop"
# Conversation grounding — anchor to opening topic so Timmy doesn't drift.
_ground_topic: str | None = None
_ground_set_at: float = 0.0
_GROUND_TTL = 300 # seconds of inactivity before the anchor expires
class BarkRequest(BaseModel):
"""Request body for POST /api/matrix/bark."""
text: str
visitor_id: str
@matrix_router.post("/bark")
async def post_matrix_bark(request: BarkRequest) -> JSONResponse:
"""Generate a bark response for a visitor message.
HTTP fallback for when WebSocket isn't available. The Matrix frontend
can POST a message and get Timmy's bark response back as JSON.
Rate limited to 1 request per 3 seconds per visitor_id.
Request body:
- text: The visitor's message text
- visitor_id: Unique identifier for the visitor (used for rate limiting)
Returns:
- 200: Bark message in produce_bark() format
- 429: Rate limit exceeded (try again later)
- 422: Invalid request (missing/invalid fields)
"""
# Validate inputs
text = request.text.strip() if request.text else ""
visitor_id = request.visitor_id.strip() if request.visitor_id else ""
if not text:
return JSONResponse(
status_code=422,
content={"error": "text is required"},
)
if not visitor_id:
return JSONResponse(
status_code=422,
content={"error": "visitor_id is required"},
)
# Rate limiting check
now = time.time()
last_request = _bark_last_request.get(visitor_id, 0)
time_since_last = now - last_request
if time_since_last < _BARK_RATE_LIMIT_SECONDS:
retry_after = _BARK_RATE_LIMIT_SECONDS - time_since_last
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded. Try again later."},
headers={"Retry-After": str(int(retry_after) + 1)},
)
# Record this request
_bark_last_request[visitor_id] = now
# Generate bark response
try:
reply = await _generate_bark(text)
except Exception as exc:
logger.warning("Bark generation failed: %s", exc)
reply = "Hmm, my thoughts are a bit tangled right now."
# Build bark response using produce_bark format
bark = produce_bark(agent_id="timmy", text=reply, style="speech")
return JSONResponse(
content=bark,
headers={"Cache-Control": "no-cache, no-store"},
)
def reset_conversation_ground() -> None:
"""Clear the conversation grounding anchor (e.g. after inactivity)."""
global _ground_topic, _ground_set_at
_ground_topic = None
_ground_set_at = 0.0
def _refresh_ground(visitor_text: str) -> None:
"""Set or refresh the conversation grounding anchor.
The first visitor message in a session (or after the TTL expires)
becomes the anchor topic. Subsequent messages are grounded against it.
"""
global _ground_topic, _ground_set_at
now = time.time()
if _ground_topic is None or (now - _ground_set_at) > _GROUND_TTL:
_ground_topic = visitor_text[:120]
logger.debug("Ground topic set: %s", _ground_topic)
_ground_set_at = now
async def _bark_and_broadcast(visitor_text: str) -> None:
"""Generate a bark response and broadcast it to all Workshop clients."""
from .websocket import _broadcast
await _broadcast(json.dumps({"type": "timmy_thinking"}))
# Notify Pip that a visitor spoke
try:
from timmy.familiar import pip_familiar
pip_familiar.on_event("visitor_spoke")
except Exception:
logger.debug("Pip familiar notification failed (optional)")
_refresh_ground(visitor_text)
_tick_commitments()
reply = await _generate_bark(visitor_text)
_record_commitments(reply)
_conversation.append({"visitor": visitor_text, "timmy": reply})
await _broadcast(
json.dumps(
{
"type": "timmy_speech",
"text": reply,
"recentExchanges": list(_conversation),
}
)
)
async def _generate_bark(visitor_text: str) -> str:
"""Generate a short in-character bark response.
Uses the existing Timmy session with a dedicated workshop session ID.
When a grounding anchor exists, the opening topic is prepended so the
model stays on-topic across long sessions.
Gracefully degrades to a canned response if inference fails.
"""
try:
from timmy import session as _session
grounded = visitor_text
commitment_ctx = _build_commitment_context()
if commitment_ctx:
grounded = f"{commitment_ctx}\n{grounded}"
if _ground_topic and visitor_text != _ground_topic:
grounded = f"[Workshop conversation topic: {_ground_topic}]\n{grounded}"
response = await _session.chat(grounded, session_id=_WORKSHOP_SESSION_ID)
return response
except Exception as exc:
logger.warning("Bark generation failed: %s", exc)
return "Hmm, my thoughts are a bit tangled right now."
def _log_bark_failure(task: asyncio.Task) -> None:
"""Log unhandled exceptions from fire-and-forget bark tasks."""
if task.cancelled():
return
exc = task.exception()
if exc is not None:
logger.error("Bark task failed: %s", exc)
async def _handle_client_message(raw: str) -> None:
"""Dispatch an incoming WebSocket frame from the Workshop client."""
try:
data = json.loads(raw)
except (json.JSONDecodeError, TypeError):
return # ignore non-JSON keep-alive pings
if data.get("type") == "visitor_message":
text = (data.get("text") or "").strip()
if text:
task = asyncio.create_task(_bark_and_broadcast(text))
task.add_done_callback(_log_bark_failure)

View File

@@ -0,0 +1,77 @@
"""Conversation grounding — commitment tracking (rescued from PR #408)."""
import re
import time
# Patterns that indicate Timmy is committing to an action.
_COMMITMENT_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"I'll (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
re.compile(r"I will (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
re.compile(r"[Ll]et me (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
]
# After this many messages without follow-up, surface open commitments.
_REMIND_AFTER = 5
_MAX_COMMITMENTS = 10
# In-memory list of open commitments.
# Each entry: {"text": str, "created_at": float, "messages_since": int}
_commitments: list[dict] = []
def _extract_commitments(text: str) -> list[str]:
"""Pull commitment phrases from Timmy's reply text."""
found: list[str] = []
for pattern in _COMMITMENT_PATTERNS:
for match in pattern.finditer(text):
phrase = match.group(1).strip()
if len(phrase) > 5: # skip trivially short matches
found.append(phrase[:120])
return found
def _record_commitments(reply: str) -> None:
"""Scan a Timmy reply for commitments and store them."""
for phrase in _extract_commitments(reply):
# Avoid near-duplicate commitments
if any(c["text"] == phrase for c in _commitments):
continue
_commitments.append({"text": phrase, "created_at": time.time(), "messages_since": 0})
if len(_commitments) > _MAX_COMMITMENTS:
_commitments.pop(0)
def _tick_commitments() -> None:
"""Increment messages_since for every open commitment."""
for c in _commitments:
c["messages_since"] += 1
def _build_commitment_context() -> str:
"""Return a grounding note if any commitments are overdue for follow-up."""
overdue = [c for c in _commitments if c["messages_since"] >= _REMIND_AFTER]
if not overdue:
return ""
lines = [f"- {c['text']}" for c in overdue]
return (
"[Open commitments Timmy made earlier — "
"weave awareness naturally, don't list robotically]\n" + "\n".join(lines)
)
def close_commitment(index: int) -> bool:
"""Remove a commitment by index. Returns True if removed."""
if 0 <= index < len(_commitments):
_commitments.pop(index)
return True
return False
def get_commitments() -> list[dict]:
"""Return a copy of open commitments (for testing / API)."""
return list(_commitments)
def reset_commitments() -> None:
"""Clear all commitments (for testing / session reset)."""
_commitments.clear()

View File

@@ -0,0 +1,397 @@
"""Matrix API endpoints — config, agents, health, thoughts, memory search."""
import logging
import time
from pathlib import Path
from typing import Any
import yaml
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from config import settings
from timmy.memory_system import search_memories
from .utils import (
_DEFAULT_STATUS,
_compute_circular_positions,
_get_agent_color,
_get_agent_shape,
_get_client_ip,
)
logger = logging.getLogger(__name__)
matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"])
# Default Matrix configuration (fallback when matrix.yaml is missing/corrupt)
_DEFAULT_MATRIX_CONFIG: dict[str, Any] = {
"lighting": {
"ambient_color": "#1a1a2e",
"ambient_intensity": 0.4,
"point_lights": [
{"color": "#FFD700", "intensity": 1.2, "position": {"x": 0, "y": 5, "z": 0}},
{"color": "#3B82F6", "intensity": 0.8, "position": {"x": -5, "y": 3, "z": -5}},
{"color": "#A855F7", "intensity": 0.6, "position": {"x": 5, "y": 3, "z": 5}},
],
},
"environment": {
"rain_enabled": False,
"starfield_enabled": True,
"fog_color": "#0f0f23",
"fog_density": 0.02,
},
"features": {
"chat_enabled": True,
"visitor_avatars": True,
"pip_familiar": True,
"workshop_portal": True,
},
}
def _load_matrix_config() -> dict[str, Any]:
"""Load Matrix world configuration from matrix.yaml with fallback to defaults.
Returns a dict with sections: lighting, environment, features.
If the config file is missing or invalid, returns sensible defaults.
"""
try:
config_path = Path(settings.repo_root) / "config" / "matrix.yaml"
if not config_path.exists():
logger.debug("matrix.yaml not found, using default config")
return _DEFAULT_MATRIX_CONFIG.copy()
raw = config_path.read_text()
config = yaml.safe_load(raw)
if not isinstance(config, dict):
logger.warning("matrix.yaml invalid format, using defaults")
return _DEFAULT_MATRIX_CONFIG.copy()
# Merge with defaults to ensure all required fields exist
result: dict[str, Any] = {
"lighting": {
**_DEFAULT_MATRIX_CONFIG["lighting"],
**config.get("lighting", {}),
},
"environment": {
**_DEFAULT_MATRIX_CONFIG["environment"],
**config.get("environment", {}),
},
"features": {
**_DEFAULT_MATRIX_CONFIG["features"],
**config.get("features", {}),
},
}
# Ensure point_lights is a list
if "point_lights" in config.get("lighting", {}):
result["lighting"]["point_lights"] = config["lighting"]["point_lights"]
else:
result["lighting"]["point_lights"] = _DEFAULT_MATRIX_CONFIG["lighting"]["point_lights"]
return result
except Exception as exc:
logger.warning("Failed to load matrix config: %s, using defaults", exc)
return _DEFAULT_MATRIX_CONFIG.copy()
@matrix_router.get("/config")
async def get_matrix_config() -> JSONResponse:
"""Return Matrix world configuration.
Serves lighting presets, environment settings, and feature flags
to the Matrix frontend so it can be config-driven rather than
hardcoded. Reads from config/matrix.yaml with sensible defaults.
"""
config = _load_matrix_config()
return JSONResponse(
content=config,
headers={"Cache-Control": "no-cache, no-store"},
)
def _build_matrix_agents_response() -> list[dict[str, Any]]:
"""Build the Matrix agent registry response.
Reads from agents.yaml and returns agents with Matrix-compatible
formatting including colors, shapes, and positions.
"""
try:
from timmy.agents.loader import list_agents
agents = list_agents()
if not agents:
return []
positions = _compute_circular_positions(len(agents))
result = []
for i, agent in enumerate(agents):
agent_id = agent.get("id", "")
result.append(
{
"id": agent_id,
"display_name": agent.get("name", agent_id.title()),
"role": agent.get("role", "general"),
"color": _get_agent_color(agent_id),
"position": positions[i],
"shape": _get_agent_shape(agent_id),
"status": agent.get("status", _DEFAULT_STATUS),
}
)
return result
except Exception as exc:
logger.warning("Failed to load agents for Matrix: %s", exc)
return []
@matrix_router.get("/agents")
async def get_matrix_agents() -> JSONResponse:
"""Return the agent registry for Matrix visualization.
Serves agents from agents.yaml with Matrix-compatible formatting.
Returns 200 with empty list if no agents configured.
"""
agents = _build_matrix_agents_response()
return JSONResponse(
content=agents,
headers={"Cache-Control": "no-cache, no-store"},
)
_MAX_THOUGHT_LIMIT = 50 # Maximum thoughts allowed per request
_DEFAULT_THOUGHT_LIMIT = 10 # Default number of thoughts to return
_MAX_THOUGHT_TEXT_LEN = 500 # Max characters for thought text
def _build_matrix_thoughts_response(limit: int = _DEFAULT_THOUGHT_LIMIT) -> list[dict[str, Any]]:
"""Build the Matrix thoughts response from the thinking engine.
Returns recent thoughts formatted for Matrix display:
- id: thought UUID
- text: thought content (truncated to 500 chars)
- created_at: ISO-8601 timestamp
- chain_id: parent thought ID (or null if root thought)
Returns empty list if thinking engine is disabled or fails.
"""
try:
from timmy.thinking import thinking_engine
thoughts = thinking_engine.get_recent_thoughts(limit=limit)
return [
{
"id": t.id,
"text": t.content[:_MAX_THOUGHT_TEXT_LEN],
"created_at": t.created_at,
"chain_id": t.parent_id,
}
for t in thoughts
]
except Exception as exc:
logger.warning("Failed to load thoughts for Matrix: %s", exc)
return []
@matrix_router.get("/thoughts")
async def get_matrix_thoughts(limit: int = _DEFAULT_THOUGHT_LIMIT) -> JSONResponse:
"""Return Timmy's recent thoughts formatted for Matrix display.
Query params:
- limit: Number of thoughts to return (default 10, max 50)
Returns empty array if thinking engine is disabled or fails.
"""
# Clamp limit to valid range
if limit < 1:
limit = 1
elif limit > _MAX_THOUGHT_LIMIT:
limit = _MAX_THOUGHT_LIMIT
thoughts = _build_matrix_thoughts_response(limit=limit)
return JSONResponse(
content=thoughts,
headers={"Cache-Control": "no-cache, no-store"},
)
# Health check cache (5-second TTL for capability checks)
_health_cache: dict | None = None
_health_cache_ts: float = 0.0
_HEALTH_CACHE_TTL = 5.0
def _check_capability_thinking() -> bool:
"""Check if thinking engine is available."""
try:
from timmy.thinking import thinking_engine
# Check if the engine has been initialized (has a db path)
return hasattr(thinking_engine, "_db") and thinking_engine._db is not None
except Exception:
return False
def _check_capability_memory() -> bool:
"""Check if memory system is available."""
try:
from timmy.memory_system import HOT_MEMORY_PATH
return HOT_MEMORY_PATH.exists()
except Exception:
return False
def _check_capability_bark() -> bool:
"""Check if bark production is available."""
try:
from infrastructure.presence import produce_bark
return callable(produce_bark)
except Exception:
return False
def _check_capability_familiar() -> bool:
"""Check if familiar (Pip) is available."""
try:
from timmy.familiar import pip_familiar
return pip_familiar is not None
except Exception:
return False
def _check_capability_lightning() -> bool:
"""Check if Lightning payments are available."""
# Lightning is currently disabled per health.py
# Returns False until properly re-implemented
return False
def _build_matrix_health_response() -> dict[str, Any]:
"""Build the Matrix health response with capability checks.
Performs lightweight checks (<100ms total) to determine which features
are available. Returns 200 even if some capabilities are degraded.
"""
capabilities = {
"thinking": _check_capability_thinking(),
"memory": _check_capability_memory(),
"bark": _check_capability_bark(),
"familiar": _check_capability_familiar(),
"lightning": _check_capability_lightning(),
}
# Status is ok if core capabilities (thinking, memory, bark) are available
core_caps = ["thinking", "memory", "bark"]
core_available = all(capabilities[c] for c in core_caps)
status = "ok" if core_available else "degraded"
return {
"status": status,
"version": "1.0.0",
"capabilities": capabilities,
}
@matrix_router.get("/health")
async def get_matrix_health() -> JSONResponse:
"""Return health status and capability availability for Matrix frontend.
Returns 200 even if some capabilities are degraded.
"""
response = _build_matrix_health_response()
status_code = 200 # Always 200, even if degraded
return JSONResponse(
content=response,
status_code=status_code,
headers={"Cache-Control": "no-cache, no-store"},
)
# Rate limiting: 1 search per 5 seconds per IP
_MEMORY_SEARCH_RATE_LIMIT_SECONDS = 5
_memory_search_last_request: dict[str, float] = {}
_MAX_MEMORY_RESULTS = 5
_MAX_MEMORY_TEXT_LENGTH = 200
def _build_matrix_memory_response(
memories: list,
) -> list[dict[str, Any]]:
"""Build the Matrix memory search response.
Formats memory entries for Matrix display:
- text: truncated to 200 characters
- relevance: 0-1 score from relevance_score
- created_at: ISO-8601 timestamp
- context_type: the memory type
Results are capped at _MAX_MEMORY_RESULTS.
"""
results = []
for mem in memories[:_MAX_MEMORY_RESULTS]:
text = mem.content
if len(text) > _MAX_MEMORY_TEXT_LENGTH:
text = text[:_MAX_MEMORY_TEXT_LENGTH] + "..."
results.append(
{
"text": text,
"relevance": round(mem.relevance_score or 0.0, 4),
"created_at": mem.timestamp,
"context_type": mem.context_type,
}
)
return results
@matrix_router.get("/memory/search")
async def get_matrix_memory_search(request: Request, q: str | None = None) -> JSONResponse:
"""Search Timmy's memory for relevant snippets.
Rate limited to 1 search per 5 seconds per IP.
Returns 200 with results, 400 if missing query, or 429 if rate limited.
"""
# Validate query parameter
query = q.strip() if q else ""
if not query:
return JSONResponse(
status_code=400,
content={"error": "Query parameter 'q' is required"},
)
# Rate limiting check by IP
client_ip = _get_client_ip(request)
now = time.time()
last_request = _memory_search_last_request.get(client_ip, 0)
time_since_last = now - last_request
if time_since_last < _MEMORY_SEARCH_RATE_LIMIT_SECONDS:
retry_after = _MEMORY_SEARCH_RATE_LIMIT_SECONDS - time_since_last
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded. Try again later."},
headers={"Retry-After": str(int(retry_after) + 1)},
)
# Record this request
_memory_search_last_request[client_ip] = now
# Search memories
try:
memories = search_memories(query, limit=_MAX_MEMORY_RESULTS)
results = _build_matrix_memory_response(memories)
except Exception as exc:
logger.warning("Memory search failed: %s", exc)
results = []
return JSONResponse(
content=results,
headers={"Cache-Control": "no-cache, no-store"},
)

View File

@@ -0,0 +1,75 @@
"""World state functions — presence file reading and state API."""
import json
import logging
import time
from datetime import UTC, datetime
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from infrastructure.presence import serialize_presence
from timmy.workshop_state import PRESENCE_FILE
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/world", tags=["world"])
_STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild
def _read_presence_file() -> dict | None:
"""Read presence.json if it exists and is fresh enough."""
try:
if not PRESENCE_FILE.exists():
return None
age = time.time() - PRESENCE_FILE.stat().st_mtime
if age > _STALE_THRESHOLD:
logger.debug("presence.json is stale (%.0fs old)", age)
return None
return json.loads(PRESENCE_FILE.read_text())
except (OSError, json.JSONDecodeError) as exc:
logger.warning("Failed to read presence.json: %s", exc)
return None
def _build_world_state(presence: dict) -> dict:
"""Transform presence dict into the world/state API response."""
return serialize_presence(presence)
def _get_current_state() -> dict:
"""Build the current world-state dict from best available source."""
presence = _read_presence_file()
if presence is None:
try:
from timmy.workshop_state import get_state_dict
presence = get_state_dict()
except Exception as exc:
logger.warning("Live state build failed: %s", exc)
presence = {
"version": 1,
"liveness": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"),
"mood": "calm",
"current_focus": "",
"active_threads": [],
"recent_events": [],
"concerns": [],
}
return _build_world_state(presence)
@router.get("/state")
async def get_world_state() -> JSONResponse:
"""Return Timmy's current world state for Workshop bootstrap.
Reads from ``~/.timmy/presence.json`` if fresh, otherwise
rebuilds live from cognitive state.
"""
return JSONResponse(
content=_get_current_state(),
headers={"Cache-Control": "no-cache, no-store"},
)

View File

@@ -0,0 +1,85 @@
"""Shared utilities for the world route submodules."""
import math
# Agent color mapping — consistent with Matrix visual identity
_AGENT_COLORS: dict[str, str] = {
"timmy": "#FFD700", # Gold
"orchestrator": "#FFD700", # Gold
"perplexity": "#3B82F6", # Blue
"replit": "#F97316", # Orange
"kimi": "#06B6D4", # Cyan
"claude": "#A855F7", # Purple
"researcher": "#10B981", # Emerald
"coder": "#EF4444", # Red
"writer": "#EC4899", # Pink
"memory": "#8B5CF6", # Violet
"experimenter": "#14B8A6", # Teal
"forge": "#EF4444", # Red (coder alias)
"seer": "#10B981", # Emerald (researcher alias)
"quill": "#EC4899", # Pink (writer alias)
"echo": "#8B5CF6", # Violet (memory alias)
"lab": "#14B8A6", # Teal (experimenter alias)
}
# Agent shape mapping for 3D visualization
_AGENT_SHAPES: dict[str, str] = {
"timmy": "sphere",
"orchestrator": "sphere",
"perplexity": "cube",
"replit": "cylinder",
"kimi": "dodecahedron",
"claude": "octahedron",
"researcher": "icosahedron",
"coder": "cube",
"writer": "cone",
"memory": "torus",
"experimenter": "tetrahedron",
"forge": "cube",
"seer": "icosahedron",
"quill": "cone",
"echo": "torus",
"lab": "tetrahedron",
}
# Default fallback values
_DEFAULT_COLOR = "#9CA3AF" # Gray
_DEFAULT_SHAPE = "sphere"
_DEFAULT_STATUS = "available"
def _get_agent_color(agent_id: str) -> str:
"""Get the Matrix color for an agent."""
return _AGENT_COLORS.get(agent_id.lower(), _DEFAULT_COLOR)
def _get_agent_shape(agent_id: str) -> str:
"""Get the Matrix shape for an agent."""
return _AGENT_SHAPES.get(agent_id.lower(), _DEFAULT_SHAPE)
def _compute_circular_positions(count: int, radius: float = 3.0) -> list[dict[str, float]]:
"""Compute circular positions for agents in the Matrix.
Agents are arranged in a circle on the XZ plane at y=0.
"""
positions = []
for i in range(count):
angle = (2 * math.pi * i) / count
x = radius * math.cos(angle)
z = radius * math.sin(angle)
positions.append({"x": round(x, 2), "y": 0.0, "z": round(z, 2)})
return positions
def _get_client_ip(request) -> str:
"""Extract client IP from request, respecting X-Forwarded-For header."""
# Check for forwarded IP (when behind proxy)
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
# Take the first IP in the chain
return forwarded.split(",")[0].strip()
# Fall back to direct client IP
if request.client:
return request.client.host
return "unknown"

View File

@@ -0,0 +1,160 @@
"""WebSocket relay for live state changes."""
import asyncio
import json
import logging
from fastapi import APIRouter, WebSocket
from config import settings
from .bark import _handle_client_message
from .state import _get_current_state
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/world", tags=["world"])
_ws_clients: list[WebSocket] = []
_HEARTBEAT_INTERVAL = 15 # seconds — ping to detect dead iPad/Safari connections
async def _heartbeat(websocket: WebSocket) -> None:
"""Send periodic pings to detect dead connections (iPad resilience).
Safari suspends background tabs, killing the TCP socket silently.
A 15-second ping ensures we notice within one interval.
Rescued from stale PR #399.
"""
try:
while True:
await asyncio.sleep(_HEARTBEAT_INTERVAL)
await websocket.send_text(json.dumps({"type": "ping"}))
except Exception:
logger.debug("Heartbeat stopped — connection gone")
async def _authenticate_ws(websocket: WebSocket) -> bool:
"""Authenticate WebSocket connection using matrix_ws_token.
Checks for token in query param ?token=<token>. If no query param,
accepts the connection and waits for first message with
{"type": "auth", "token": "<token>"}.
Returns True if authenticated (or if auth is disabled).
Returns False and closes connection with code 4001 if invalid.
"""
token_setting = settings.matrix_ws_token
# Auth disabled in dev mode (empty/unset token)
if not token_setting:
return True
# Check query param first (can validate before accept)
query_token = websocket.query_params.get("token", "")
if query_token:
if query_token == token_setting:
return True
# Invalid token in query param - we need to accept to close properly
await websocket.accept()
await websocket.close(code=4001, reason="Invalid token")
return False
# No query token - accept and wait for auth message
await websocket.accept()
# Wait for auth message as first message
try:
raw = await websocket.receive_text()
data = json.loads(raw)
if data.get("type") == "auth" and data.get("token") == token_setting:
return True
# Invalid auth message
await websocket.close(code=4001, reason="Invalid token")
return False
except (json.JSONDecodeError, TypeError):
# Non-JSON first message without valid token
await websocket.close(code=4001, reason="Authentication required")
return False
@router.websocket("/ws")
async def world_ws(websocket: WebSocket) -> None:
"""Accept a Workshop client and keep it alive for state broadcasts.
Sends a full ``world_state`` snapshot immediately on connect so the
client never starts from a blank slate. Incoming frames are parsed
as JSON — ``visitor_message`` triggers a bark response. A background
heartbeat ping runs every 15 s to detect dead connections early.
Authentication:
- If matrix_ws_token is configured, clients must provide it via
?token=<token> param or in the first message as
{"type": "auth", "token": "<token>"}.
- Invalid token results in close code 4001.
- Valid token receives a connection_ack message.
"""
# Authenticate (may accept connection internally)
is_authed = await _authenticate_ws(websocket)
if not is_authed:
logger.info("World WS connection rejected — invalid token")
return
# Auth passed - accept if not already accepted
if websocket.client_state.name != "CONNECTED":
await websocket.accept()
# Send connection_ack if auth was required
if settings.matrix_ws_token:
await websocket.send_text(json.dumps({"type": "connection_ack"}))
_ws_clients.append(websocket)
logger.info("World WS connected — %d clients", len(_ws_clients))
# Send full world-state snapshot so client bootstraps instantly
try:
snapshot = _get_current_state()
await websocket.send_text(json.dumps({"type": "world_state", **snapshot}))
except Exception as exc:
logger.warning("Failed to send WS snapshot: %s", exc)
ping_task = asyncio.create_task(_heartbeat(websocket))
try:
while True:
raw = await websocket.receive_text()
await _handle_client_message(raw)
except Exception:
logger.debug("WebSocket receive loop ended")
finally:
ping_task.cancel()
if websocket in _ws_clients:
_ws_clients.remove(websocket)
logger.info("World WS disconnected — %d clients", len(_ws_clients))
async def _broadcast(message: str) -> None:
"""Send *message* to every connected Workshop client, pruning dead ones."""
dead: list[WebSocket] = []
for ws in _ws_clients:
try:
await ws.send_text(message)
except Exception:
logger.debug("Pruning dead WebSocket client")
dead.append(ws)
for ws in dead:
if ws in _ws_clients:
_ws_clients.remove(ws)
async def broadcast_world_state(presence: dict) -> None:
"""Broadcast a ``timmy_state`` message to all connected Workshop clients.
Called by :class:`~timmy.workshop_state.WorkshopHeartbeat` via its
``on_change`` callback.
"""
from .state import _build_world_state
state = _build_world_state(presence)
await _broadcast(json.dumps({"type": "timmy_state", **state["timmyState"]}))

278
src/dashboard/schedulers.py Normal file
View File

@@ -0,0 +1,278 @@
"""Background scheduler coroutines for the Timmy dashboard."""
import asyncio
import json
import logging
from pathlib import Path
from config import settings
from timmy.workshop_state import PRESENCE_FILE
logger = logging.getLogger(__name__)
__all__ = [
"_BRIEFING_INTERVAL_HOURS",
"_briefing_scheduler",
"_thinking_scheduler",
"_hermes_scheduler",
"_loop_qa_scheduler",
"_PRESENCE_POLL_SECONDS",
"_PRESENCE_INITIAL_DELAY",
"_SYNTHESIZED_STATE",
"_presence_watcher",
"_start_chat_integrations_background",
"_discord_token_watcher",
]
_BRIEFING_INTERVAL_HOURS = 6
async def _briefing_scheduler() -> None:
"""Background task: regenerate Timmy's briefing every 6 hours."""
from infrastructure.notifications.push import notify_briefing_ready
from timmy.briefing import engine as briefing_engine
await asyncio.sleep(2)
while True:
try:
if briefing_engine.needs_refresh():
logger.info("Generating morning briefing…")
briefing = briefing_engine.generate()
await notify_briefing_ready(briefing)
else:
logger.info("Briefing is fresh; skipping generation.")
except Exception as exc:
logger.error("Briefing scheduler error: %s", exc)
await asyncio.sleep(_BRIEFING_INTERVAL_HOURS * 3600)
async def _thinking_scheduler() -> None:
"""Background task: execute Timmy's thinking cycle every N seconds."""
from timmy.thinking import thinking_engine
await asyncio.sleep(5) # Stagger after briefing scheduler
while True:
try:
if settings.thinking_enabled:
await asyncio.wait_for(
thinking_engine.think_once(),
timeout=settings.thinking_timeout_seconds,
)
except TimeoutError:
logger.warning(
"Thinking cycle timed out after %ds — Ollama may be unresponsive",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Thinking scheduler error: %s", exc)
await asyncio.sleep(settings.thinking_interval_seconds)
async def _hermes_scheduler() -> None:
"""Background task: Hermes system health monitor, runs every 5 minutes.
Checks memory, disk, Ollama, processes, and network.
Auto-resolves what it can; fires push notifications when human help is needed.
"""
from infrastructure.hermes.monitor import hermes_monitor
await asyncio.sleep(20) # Stagger after other schedulers
while True:
try:
if settings.hermes_enabled:
report = await hermes_monitor.run_cycle()
if report.has_issues:
logger.warning(
"Hermes health issues detected — overall: %s",
report.overall.value,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Hermes scheduler error: %s", exc)
await asyncio.sleep(settings.hermes_interval_seconds)
async def _loop_qa_scheduler() -> None:
"""Background task: run capability self-tests on a separate timer.
Independent of the thinking loop — runs every N thinking ticks
to probe subsystems and detect degradation.
"""
from timmy.loop_qa import loop_qa_orchestrator
await asyncio.sleep(10) # Stagger after thinking scheduler
while True:
try:
if settings.loop_qa_enabled:
result = await asyncio.wait_for(
loop_qa_orchestrator.run_next_test(),
timeout=settings.thinking_timeout_seconds,
)
if result:
status = "PASS" if result["success"] else "FAIL"
logger.info(
"Loop QA [%s]: %s%s",
result["capability"],
status,
result.get("details", "")[:80],
)
except TimeoutError:
logger.warning(
"Loop QA test timed out after %ds",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Loop QA scheduler error: %s", exc)
interval = settings.thinking_interval_seconds * settings.loop_qa_interval_ticks
await asyncio.sleep(interval)
_PRESENCE_POLL_SECONDS = 30
_PRESENCE_INITIAL_DELAY = 3
_SYNTHESIZED_STATE: dict = {
"version": 1,
"liveness": None,
"current_focus": "",
"mood": "idle",
"active_threads": [],
"recent_events": [],
"concerns": [],
}
async def _presence_watcher() -> None:
"""Background task: watch ~/.timmy/presence.json and broadcast changes via WS.
Polls the file every 30 seconds (matching Timmy's write cadence).
If the file doesn't exist, broadcasts a synthesised idle state.
"""
from infrastructure.ws_manager.handler import ws_manager as ws_mgr
await asyncio.sleep(_PRESENCE_INITIAL_DELAY) # Stagger after other schedulers
last_mtime: float = 0.0
while True:
try:
if PRESENCE_FILE.exists():
mtime = PRESENCE_FILE.stat().st_mtime
if mtime != last_mtime:
last_mtime = mtime
raw = await asyncio.to_thread(PRESENCE_FILE.read_text)
state = json.loads(raw)
await ws_mgr.broadcast("timmy_state", state)
else:
# File absent — broadcast synthesised state once per cycle
if last_mtime != -1.0:
last_mtime = -1.0
await ws_mgr.broadcast("timmy_state", _SYNTHESIZED_STATE)
except json.JSONDecodeError as exc:
logger.warning("presence.json parse error: %s", exc)
except Exception as exc:
logger.warning("Presence watcher error: %s", exc)
await asyncio.sleep(_PRESENCE_POLL_SECONDS)
async def _start_chat_integrations_background() -> None:
"""Background task: start chat integrations without blocking startup."""
from integrations.chat_bridge.registry import platform_registry
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot
await asyncio.sleep(0.5)
# Register Discord in the platform registry
platform_registry.register(discord_bot)
if settings.telegram_token:
try:
await telegram_bot.start()
logger.info("Telegram bot started")
except Exception as exc:
logger.warning("Failed to start Telegram bot: %s", exc)
else:
logger.debug("Telegram: no token configured, skipping")
if settings.discord_token or discord_bot.load_token():
try:
await discord_bot.start()
logger.info("Discord bot started")
except Exception as exc:
logger.warning("Failed to start Discord bot: %s", exc)
else:
logger.debug("Discord: no token configured, skipping")
# If Discord isn't connected yet, start a watcher that polls for the
# token to appear in the environment or .env file.
if discord_bot.state.name != "CONNECTED":
asyncio.create_task(_discord_token_watcher())
async def _discord_token_watcher() -> None:
"""Poll for DISCORD_TOKEN appearing in env or .env and auto-start Discord bot."""
from integrations.chat_bridge.vendors.discord import discord_bot
# Don't poll if discord.py isn't even installed
try:
import discord as _discord_check # noqa: F401
except ImportError:
logger.debug("discord.py not installed — token watcher exiting")
return
while True:
await asyncio.sleep(30)
if discord_bot.state.name == "CONNECTED":
return # Already running — stop watching
# 1. Check settings (pydantic-settings reads env on instantiation;
# hot-reload is handled by re-reading .env below)
token = settings.discord_token
# 2. Re-read .env file for hot-reload
if not token:
try:
from dotenv import dotenv_values
env_path = Path(settings.repo_root) / ".env"
if env_path.exists():
vals = dotenv_values(env_path)
token = vals.get("DISCORD_TOKEN", "")
except ImportError:
pass # python-dotenv not installed
# 3. Check state file (written by /discord/setup)
if not token:
token = discord_bot.load_token() or ""
if token:
try:
logger.info(
"Discord watcher: token found, attempting start (state=%s)",
discord_bot.state.name,
)
success = await discord_bot.start(token=token)
if success:
logger.info("Discord bot auto-started (token detected)")
return # Done — stop watching
logger.warning(
"Discord watcher: start() returned False (state=%s)",
discord_bot.state.name,
)
except Exception as exc:
logger.warning("Discord auto-start failed: %s", exc)

205
src/dashboard/startup.py Normal file
View File

@@ -0,0 +1,205 @@
"""Application lifecycle management — startup, shutdown, and background task orchestration."""
import asyncio
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI
from config import settings
from dashboard.schedulers import (
_briefing_scheduler,
_hermes_scheduler,
_loop_qa_scheduler,
_presence_watcher,
_start_chat_integrations_background,
_thinking_scheduler,
)
logger = logging.getLogger(__name__)
def _startup_init() -> None:
"""Validate config and enable event persistence."""
from config import validate_startup
validate_startup()
from infrastructure.events.bus import init_event_bus_persistence
init_event_bus_persistence()
from spark.engine import get_spark_engine
if get_spark_engine().enabled:
logger.info("Spark Intelligence active — event capture enabled")
def _startup_background_tasks() -> list[asyncio.Task]:
"""Spawn all recurring background tasks (non-blocking)."""
bg_tasks = [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
asyncio.create_task(_hermes_scheduler()),
]
try:
from timmy.paperclip import start_paperclip_poller
bg_tasks.append(asyncio.create_task(start_paperclip_poller()))
logger.info("Paperclip poller started")
except ImportError:
logger.debug("Paperclip module not found, skipping poller")
return bg_tasks
def _try_prune(label: str, prune_fn, days: int) -> None:
"""Run a prune function, log results, swallow errors."""
try:
pruned = prune_fn()
if pruned:
logger.info(
"%s auto-prune: removed %d entries older than %d days",
label,
pruned,
days,
)
except Exception as exc:
logger.debug("%s auto-prune skipped: %s", label, exc)
def _check_vault_size() -> None:
"""Warn if the memory vault exceeds the configured size limit."""
try:
vault_path = Path(settings.repo_root) / "memory" / "notes"
if vault_path.exists():
total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file())
total_mb = total_bytes / (1024 * 1024)
if total_mb > settings.memory_vault_max_mb:
logger.warning(
"Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes",
total_mb,
settings.memory_vault_max_mb,
)
except Exception as exc:
logger.debug("Vault size check skipped: %s", exc)
def _startup_pruning() -> None:
"""Auto-prune old memories, thoughts, and events on startup."""
if settings.memory_prune_days > 0:
from timmy.memory_system import prune_memories
_try_prune(
"Memory",
lambda: prune_memories(
older_than_days=settings.memory_prune_days,
keep_facts=settings.memory_prune_keep_facts,
),
settings.memory_prune_days,
)
if settings.thoughts_prune_days > 0:
from timmy.thinking import thinking_engine
_try_prune(
"Thought",
lambda: thinking_engine.prune_old_thoughts(
keep_days=settings.thoughts_prune_days,
keep_min=settings.thoughts_prune_keep_min,
),
settings.thoughts_prune_days,
)
if settings.events_prune_days > 0:
from swarm.event_log import prune_old_events
_try_prune(
"Event",
lambda: prune_old_events(
keep_days=settings.events_prune_days,
keep_min=settings.events_prune_keep_min,
),
settings.events_prune_days,
)
if settings.memory_vault_max_mb > 0:
_check_vault_size()
async def _shutdown_cleanup(
bg_tasks: list[asyncio.Task],
workshop_heartbeat,
) -> None:
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot
await discord_bot.stop()
await telegram_bot.stop()
try:
from timmy.mcp_tools import close_mcp_sessions
await close_mcp_sessions()
except Exception as exc:
logger.debug("MCP shutdown: %s", exc)
await workshop_heartbeat.stop()
for task in bg_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
_startup_init()
bg_tasks = _startup_background_tasks()
_startup_pruning()
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Register session logger with error capture
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
logger.debug("Failed to register error recorder")
# Mark session start for sovereignty duration tracking
try:
from timmy.sovereignty import mark_session_start
mark_session_start()
except Exception:
logger.debug("Failed to mark sovereignty session start")
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
# Generate and commit sovereignty session report
try:
from timmy.sovereignty import generate_and_commit_report
await generate_and_commit_report()
except Exception as exc:
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)

View File

@@ -8,26 +8,40 @@
<div class="container-fluid nexus-layout py-3">
<div class="nexus-header mb-3">
<div class="nexus-title">// NEXUS</div>
<div class="nexus-subtitle">
Persistent conversational awareness &mdash; always present, always learning.
<div class="d-flex justify-content-between align-items-center">
<div>
<div class="nexus-title">// NEXUS</div>
<div class="nexus-subtitle">
Persistent conversational awareness &mdash; always present, always learning.
</div>
</div>
<!-- Sovereignty Pulse badge -->
<div class="nexus-pulse-badge" id="nexus-pulse-badge">
<span class="nexus-pulse-dot nexus-pulse-{{ pulse.health }}"></span>
<span class="nexus-pulse-label">SOVEREIGNTY</span>
<span class="nexus-pulse-value" id="pulse-overall">{{ pulse.overall_pct }}%</span>
</div>
</div>
</div>
<div class="nexus-grid">
<div class="nexus-grid-v2">
<!-- ── LEFT: Conversation ────────────────────────────────── -->
<div class="nexus-chat-col">
<div class="card mc-panel nexus-chat-panel">
<div class="card-header mc-panel-header d-flex justify-content-between align-items-center">
<span>// CONVERSATION</span>
<button class="mc-btn mc-btn-sm"
hx-delete="/nexus/history"
hx-target="#nexus-chat-log"
hx-swap="beforeend"
hx-confirm="Clear nexus conversation?">
CLEAR
</button>
<div class="d-flex align-items-center gap-2">
<span class="nexus-msg-count" id="nexus-msg-count"
title="Messages in this session">{{ messages|length }} msgs</span>
<button class="mc-btn mc-btn-sm"
hx-delete="/nexus/history"
hx-target="#nexus-chat-log"
hx-swap="beforeend"
hx-confirm="Clear nexus conversation?">
CLEAR
</button>
</div>
</div>
<div class="card-body p-2" id="nexus-chat-log">
@@ -67,14 +81,115 @@
</div>
</div>
<!-- ── RIGHT: Memory sidebar ─────────────────────────────── -->
<!-- ── RIGHT: Awareness sidebar ──────────────────────────── -->
<div class="nexus-sidebar-col">
<!-- Live memory context (updated with each response) -->
<!-- Cognitive State Panel -->
<div class="card mc-panel nexus-cognitive-panel mb-3">
<div class="card-header mc-panel-header">
<span>// COGNITIVE STATE</span>
<span class="nexus-engagement-badge" id="cog-engagement">
{{ introspection.cognitive.engagement | upper }}
</span>
</div>
<div class="card-body p-2">
<div class="nexus-cog-grid">
<div class="nexus-cog-item">
<div class="nexus-cog-label">MOOD</div>
<div class="nexus-cog-value" id="cog-mood">{{ introspection.cognitive.mood }}</div>
</div>
<div class="nexus-cog-item">
<div class="nexus-cog-label">FOCUS</div>
<div class="nexus-cog-value nexus-cog-focus" id="cog-focus">
{{ introspection.cognitive.focus_topic or '—' }}
</div>
</div>
<div class="nexus-cog-item">
<div class="nexus-cog-label">DEPTH</div>
<div class="nexus-cog-value" id="cog-depth">{{ introspection.cognitive.conversation_depth }}</div>
</div>
<div class="nexus-cog-item">
<div class="nexus-cog-label">INITIATIVE</div>
<div class="nexus-cog-value nexus-cog-focus" id="cog-initiative">
{{ introspection.cognitive.last_initiative or '—' }}
</div>
</div>
</div>
{% if introspection.cognitive.active_commitments %}
<div class="nexus-commitments mt-2">
<div class="nexus-cog-label">ACTIVE COMMITMENTS</div>
{% for c in introspection.cognitive.active_commitments %}
<div class="nexus-commitment-item">{{ c | e }}</div>
{% endfor %}
</div>
{% endif %}
</div>
</div>
<!-- Recent Thoughts Panel -->
<div class="card mc-panel nexus-thoughts-panel mb-3">
<div class="card-header mc-panel-header">
<span>// THOUGHT STREAM</span>
</div>
<div class="card-body p-2" id="nexus-thoughts-body">
{% if introspection.recent_thoughts %}
{% for t in introspection.recent_thoughts %}
<div class="nexus-thought-item">
<div class="nexus-thought-meta">
<span class="nexus-thought-seed">{{ t.seed_type }}</span>
<span class="nexus-thought-time">{{ t.created_at[:16] }}</span>
</div>
<div class="nexus-thought-content">{{ t.content | e }}</div>
</div>
{% endfor %}
{% else %}
<div class="nexus-empty-state">No thoughts yet. The thinking engine will populate this.</div>
{% endif %}
</div>
</div>
<!-- Sovereignty Pulse Detail -->
<div class="card mc-panel nexus-sovereignty-panel mb-3">
<div class="card-header mc-panel-header">
<span>// SOVEREIGNTY PULSE</span>
<span class="nexus-health-badge nexus-health-{{ pulse.health }}" id="pulse-health">
{{ pulse.health | upper }}
</span>
</div>
<div class="card-body p-2">
<div class="nexus-pulse-meters" id="nexus-pulse-meters">
{% for layer in pulse.layers %}
<div class="nexus-pulse-layer">
<div class="nexus-pulse-layer-label">{{ layer.name | upper }}</div>
<div class="nexus-pulse-bar-track">
<div class="nexus-pulse-bar-fill" style="width: {{ layer.sovereign_pct }}%"></div>
</div>
<div class="nexus-pulse-layer-pct">{{ layer.sovereign_pct }}%</div>
</div>
{% endfor %}
</div>
<div class="nexus-pulse-stats mt-2">
<div class="nexus-pulse-stat">
<span class="nexus-pulse-stat-label">Crystallizations</span>
<span class="nexus-pulse-stat-value" id="pulse-cryst">{{ pulse.crystallizations_last_hour }}</span>
</div>
<div class="nexus-pulse-stat">
<span class="nexus-pulse-stat-label">API Independence</span>
<span class="nexus-pulse-stat-value" id="pulse-api-indep">{{ pulse.api_independence_pct }}%</span>
</div>
<div class="nexus-pulse-stat">
<span class="nexus-pulse-stat-label">Total Events</span>
<span class="nexus-pulse-stat-value" id="pulse-events">{{ pulse.total_events }}</span>
</div>
</div>
</div>
</div>
<!-- Live Memory Context -->
<div class="card mc-panel nexus-memory-panel mb-3">
<div class="card-header mc-panel-header">
<span>// LIVE MEMORY</span>
<span class="badge ms-2" style="background:var(--purple-dim); color:var(--purple);">
<span class="badge ms-2" style="background:var(--purple-dim, rgba(168,85,247,0.15)); color:var(--purple);">
{{ stats.total_entries }} stored
</span>
</div>
@@ -85,7 +200,32 @@
</div>
</div>
<!-- Teaching panel -->
<!-- Session Analytics -->
<div class="card mc-panel nexus-analytics-panel mb-3">
<div class="card-header mc-panel-header">// SESSION ANALYTICS</div>
<div class="card-body p-2">
<div class="nexus-analytics-grid" id="nexus-analytics">
<div class="nexus-analytics-item">
<span class="nexus-analytics-label">Messages</span>
<span class="nexus-analytics-value" id="analytics-msgs">{{ introspection.analytics.total_messages }}</span>
</div>
<div class="nexus-analytics-item">
<span class="nexus-analytics-label">Avg Response</span>
<span class="nexus-analytics-value" id="analytics-avg">{{ introspection.analytics.avg_response_length }} chars</span>
</div>
<div class="nexus-analytics-item">
<span class="nexus-analytics-label">Memory Hits</span>
<span class="nexus-analytics-value" id="analytics-mem">{{ introspection.analytics.memory_hits_total }}</span>
</div>
<div class="nexus-analytics-item">
<span class="nexus-analytics-label">Duration</span>
<span class="nexus-analytics-value" id="analytics-dur">{{ introspection.analytics.session_duration_minutes }} min</span>
</div>
</div>
</div>
</div>
<!-- Teaching Panel -->
<div class="card mc-panel nexus-teach-panel">
<div class="card-header mc-panel-header">// TEACH TIMMY</div>
<div class="card-body p-2">
@@ -119,4 +259,128 @@
</div><!-- /nexus-grid -->
</div>
<!-- WebSocket for live Nexus updates -->
<script>
(function() {
var wsProto = location.protocol === 'https:' ? 'wss:' : 'ws:';
var wsUrl = wsProto + '//' + location.host + '/nexus/ws';
var ws = null;
var reconnectDelay = 2000;
function connect() {
ws = new WebSocket(wsUrl);
ws.onmessage = function(e) {
try {
var data = JSON.parse(e.data);
if (data.type === 'nexus_state') {
updateCognitive(data.introspection.cognitive);
updateThoughts(data.introspection.recent_thoughts);
updateAnalytics(data.introspection.analytics);
updatePulse(data.sovereignty_pulse);
}
} catch(err) { /* ignore parse errors */ }
};
ws.onclose = function() {
setTimeout(connect, reconnectDelay);
};
ws.onerror = function() { ws.close(); };
}
function updateCognitive(c) {
var el;
el = document.getElementById('cog-mood');
if (el) el.textContent = c.mood;
el = document.getElementById('cog-engagement');
if (el) el.textContent = c.engagement.toUpperCase();
el = document.getElementById('cog-focus');
if (el) el.textContent = c.focus_topic || '\u2014';
el = document.getElementById('cog-depth');
if (el) el.textContent = c.conversation_depth;
el = document.getElementById('cog-initiative');
if (el) el.textContent = c.last_initiative || '\u2014';
}
function updateThoughts(thoughts) {
var container = document.getElementById('nexus-thoughts-body');
if (!container || !thoughts || thoughts.length === 0) return;
var html = '';
for (var i = 0; i < thoughts.length; i++) {
var t = thoughts[i];
html += '<div class="nexus-thought-item">'
+ '<div class="nexus-thought-meta">'
+ '<span class="nexus-thought-seed">' + escHtml(t.seed_type) + '</span>'
+ '<span class="nexus-thought-time">' + escHtml((t.created_at || '').substring(0,16)) + '</span>'
+ '</div>'
+ '<div class="nexus-thought-content">' + escHtml(t.content) + '</div>'
+ '</div>';
}
container.innerHTML = html;
}
function updateAnalytics(a) {
var el;
el = document.getElementById('analytics-msgs');
if (el) el.textContent = a.total_messages;
el = document.getElementById('analytics-avg');
if (el) el.textContent = a.avg_response_length + ' chars';
el = document.getElementById('analytics-mem');
if (el) el.textContent = a.memory_hits_total;
el = document.getElementById('analytics-dur');
if (el) el.textContent = a.session_duration_minutes + ' min';
}
function updatePulse(p) {
var el;
el = document.getElementById('pulse-overall');
if (el) el.textContent = p.overall_pct + '%';
el = document.getElementById('pulse-health');
if (el) {
el.textContent = p.health.toUpperCase();
el.className = 'nexus-health-badge nexus-health-' + p.health;
}
el = document.getElementById('pulse-cryst');
if (el) el.textContent = p.crystallizations_last_hour;
el = document.getElementById('pulse-api-indep');
if (el) el.textContent = p.api_independence_pct + '%';
el = document.getElementById('pulse-events');
if (el) el.textContent = p.total_events;
// Update pulse badge dot
var badge = document.getElementById('nexus-pulse-badge');
if (badge) {
var dot = badge.querySelector('.nexus-pulse-dot');
if (dot) {
dot.className = 'nexus-pulse-dot nexus-pulse-' + p.health;
}
}
// Update layer bars
var meters = document.getElementById('nexus-pulse-meters');
if (meters && p.layers) {
var html = '';
for (var i = 0; i < p.layers.length; i++) {
var l = p.layers[i];
html += '<div class="nexus-pulse-layer">'
+ '<div class="nexus-pulse-layer-label">' + escHtml(l.name.toUpperCase()) + '</div>'
+ '<div class="nexus-pulse-bar-track">'
+ '<div class="nexus-pulse-bar-fill" style="width:' + l.sovereign_pct + '%"></div>'
+ '</div>'
+ '<div class="nexus-pulse-layer-pct">' + l.sovereign_pct + '%</div>'
+ '</div>';
}
meters.innerHTML = html;
}
}
function escHtml(s) {
if (!s) return '';
var d = document.createElement('div');
d.textContent = s;
return d.innerHTML;
}
connect();
})();
</script>
{% endblock %}

View File

@@ -9,12 +9,7 @@ models for image inputs and falls back through capability chains.
"""
import asyncio
import base64
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any
@@ -33,148 +28,25 @@ try:
except ImportError:
requests = None # type: ignore
# Re-export data models so existing ``from …cascade import X`` keeps working.
from .models import ( # noqa: F401 re-exports
CircuitState,
ContentType,
ModelCapability,
Provider,
ProviderMetrics,
ProviderStatus,
RouterConfig,
)
# Mixins
from .health import HealthMixin
from .providers import ProviderCallsMixin
logger = logging.getLogger(__name__)
# Quota monitor — optional, degrades gracefully if unavailable
try:
from infrastructure.claude_quota import QuotaMonitor, get_quota_monitor
_quota_monitor: "QuotaMonitor | None" = get_quota_monitor()
except Exception as _exc: # pragma: no cover
logger.debug("Quota monitor not available: %s", _exc)
_quota_monitor = None
class ProviderStatus(Enum):
"""Health status of a provider."""
HEALTHY = "healthy"
DEGRADED = "degraded" # Working but slow or occasional errors
UNHEALTHY = "unhealthy" # Circuit breaker open
DISABLED = "disabled"
class CircuitState(Enum):
"""Circuit breaker state."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, rejecting requests
HALF_OPEN = "half_open" # Testing if recovered
class ContentType(Enum):
"""Type of content in the request."""
TEXT = "text"
VISION = "vision" # Contains images
AUDIO = "audio" # Contains audio
MULTIMODAL = "multimodal" # Multiple content types
@dataclass
class ProviderMetrics:
"""Metrics for a single provider."""
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
total_latency_ms: float = 0.0
last_request_time: str | None = None
last_error_time: str | None = None
consecutive_failures: int = 0
@property
def avg_latency_ms(self) -> float:
if self.total_requests == 0:
return 0.0
return self.total_latency_ms / self.total_requests
@property
def error_rate(self) -> float:
if self.total_requests == 0:
return 0.0
return self.failed_requests / self.total_requests
@dataclass
class ModelCapability:
"""Capabilities a model supports."""
name: str
supports_vision: bool = False
supports_audio: bool = False
supports_tools: bool = False
supports_json: bool = False
supports_streaming: bool = True
context_window: int = 4096
@dataclass
class Provider:
"""LLM provider configuration and state."""
name: str
type: str # ollama, openai, anthropic
enabled: bool
priority: int
tier: str | None = None # e.g., "local", "standard_cloud", "frontier"
url: str | None = None
api_key: str | None = None
base_url: str | None = None
models: list[dict] = field(default_factory=list)
# Runtime state
status: ProviderStatus = ProviderStatus.HEALTHY
metrics: ProviderMetrics = field(default_factory=ProviderMetrics)
circuit_state: CircuitState = CircuitState.CLOSED
circuit_opened_at: float | None = None
half_open_calls: int = 0
def get_default_model(self) -> str | None:
"""Get the default model for this provider."""
for model in self.models:
if model.get("default"):
return model["name"]
if self.models:
return self.models[0]["name"]
return None
def get_model_with_capability(self, capability: str) -> str | None:
"""Get a model that supports the given capability."""
for model in self.models:
capabilities = model.get("capabilities", [])
if capability in capabilities:
return model["name"]
# Fall back to default
return self.get_default_model()
def model_has_capability(self, model_name: str, capability: str) -> bool:
"""Check if a specific model has a capability."""
for model in self.models:
if model["name"] == model_name:
capabilities = model.get("capabilities", [])
return capability in capabilities
return False
@dataclass
class RouterConfig:
"""Cascade router configuration."""
timeout_seconds: int = 30
max_retries_per_provider: int = 2
retry_delay_seconds: int = 1
circuit_breaker_failure_threshold: int = 5
circuit_breaker_recovery_timeout: int = 60
circuit_breaker_half_open_max_calls: int = 2
cost_tracking_enabled: bool = True
budget_daily_usd: float = 10.0
# Multi-modal settings
auto_pull_models: bool = True
fallback_chains: dict = field(default_factory=dict)
class CascadeRouter:
class CascadeRouter(HealthMixin, ProviderCallsMixin):
"""Routes LLM requests with automatic failover.
Now with multi-modal support:
@@ -487,50 +359,6 @@ class CascadeRouter:
raise RuntimeError("; ".join(errors))
def _quota_allows_cloud(self, provider: Provider) -> bool:
"""Check quota before routing to a cloud provider.
Uses the metabolic protocol via select_model(): cloud calls are only
allowed when the quota monitor recommends a cloud model (BURST tier).
Returns True (allow cloud) if quota monitor is unavailable or returns None.
"""
if _quota_monitor is None:
return True
try:
suggested = _quota_monitor.select_model("high")
# Cloud is allowed only when select_model recommends the cloud model
allows = suggested == "claude-sonnet-4-6"
if not allows:
status = _quota_monitor.check()
tier = status.recommended_tier.value if status else "unknown"
logger.info(
"Metabolic protocol: %s tier — downshifting %s to local (%s)",
tier,
provider.name,
suggested,
)
return allows
except Exception as exc:
logger.warning("Quota check failed, allowing cloud: %s", exc)
return True
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
return False
if provider.status == ProviderStatus.UNHEALTHY:
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
return False
return True
def _filter_providers(self, cascade_tier: str | None) -> list["Provider"]:
"""Return the provider list filtered by tier.
@@ -641,9 +469,9 @@ class CascadeRouter:
- Supports image URLs, paths, and base64 encoding
Complexity-based routing (issue #1065):
- ``complexity_hint="simple"`` routes to Qwen3-8B (low-latency)
- ``complexity_hint="complex"`` routes to Qwen3-14B (quality)
- ``complexity_hint=None`` (default) auto-classifies from messages
- ``complexity_hint="simple"`` -> routes to Qwen3-8B (low-latency)
- ``complexity_hint="complex"`` -> routes to Qwen3-14B (quality)
- ``complexity_hint=None`` (default) -> auto-classifies from messages
Args:
messages: List of message dicts with role and content
@@ -668,7 +496,7 @@ class CascadeRouter:
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
# Resolve task complexity ─────────────────────────────────────────────
# Resolve task complexity
# Skip complexity routing when caller explicitly specifies a model.
complexity: TaskComplexity | None = None
if model is None:
@@ -698,7 +526,7 @@ class CascadeRouter:
)
continue
# Complexity-based model selection (only when no explicit model) ──
# Complexity-based model selection (only when no explicit model)
effective_model = model
if effective_model is None and complexity is not None:
effective_model = self._get_model_for_complexity(provider, complexity)
@@ -740,357 +568,6 @@ class CascadeRouter:
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
async def _try_provider(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
content_type: ContentType = ContentType.TEXT,
) -> dict:
"""Try a single provider request."""
start_time = time.time()
if provider.type == "ollama":
result = await self._call_ollama(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
elif provider.type == "openai":
result = await self._call_openai(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
elif provider.type == "anthropic":
result = await self._call_anthropic(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
elif provider.type == "grok":
result = await self._call_grok(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
elif provider.type == "vllm_mlx":
result = await self._call_vllm_mlx(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
else:
raise ValueError(f"Unknown provider type: {provider.type}")
latency_ms = (time.time() - start_time) * 1000
result["latency_ms"] = latency_ms
return result
async def _call_ollama(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None = None,
content_type: ContentType = ContentType.TEXT,
) -> dict:
"""Call Ollama API with multi-modal support."""
import aiohttp
url = f"{provider.url or settings.ollama_url}/api/chat"
# Transform messages for Ollama format (including images)
transformed_messages = self._transform_messages_for_ollama(messages)
options = {"temperature": temperature}
if max_tokens:
options["num_predict"] = max_tokens
payload = {
"model": model,
"messages": transformed_messages,
"stream": False,
"options": options,
}
timeout = aiohttp.ClientTimeout(total=self.config.timeout_seconds)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, json=payload) as response:
if response.status != 200:
text = await response.text()
raise RuntimeError(f"Ollama error {response.status}: {text}")
data = await response.json()
return {
"content": data["message"]["content"],
"model": model,
}
def _transform_messages_for_ollama(self, messages: list[dict]) -> list[dict]:
"""Transform messages to Ollama format, handling images."""
transformed = []
for msg in messages:
new_msg = {
"role": msg.get("role", "user"),
"content": msg.get("content", ""),
}
# Handle images
images = msg.get("images", [])
if images:
new_msg["images"] = []
for img in images:
if isinstance(img, str):
if img.startswith("data:image/"):
# Base64 encoded image
new_msg["images"].append(img.split(",")[1])
elif img.startswith("http://") or img.startswith("https://"):
# URL - would need to download, skip for now
logger.warning("Image URLs not yet supported, skipping: %s", img)
elif Path(img).exists():
# Local file path - read and encode
try:
with open(img, "rb") as f:
img_data = base64.b64encode(f.read()).decode()
new_msg["images"].append(img_data)
except Exception as exc:
logger.error("Failed to read image %s: %s", img, exc)
transformed.append(new_msg)
return transformed
async def _call_openai(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call OpenAI API."""
import openai
client = openai.AsyncOpenAI(
api_key=provider.api_key,
base_url=provider.base_url,
timeout=self.config.timeout_seconds,
)
kwargs = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await client.chat.completions.create(**kwargs)
return {
"content": response.choices[0].message.content,
"model": response.model,
}
async def _call_anthropic(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call Anthropic API."""
import anthropic
client = anthropic.AsyncAnthropic(
api_key=provider.api_key,
timeout=self.config.timeout_seconds,
)
# Convert messages to Anthropic format
system_msg = None
conversation = []
for msg in messages:
if msg["role"] == "system":
system_msg = msg["content"]
else:
conversation.append(
{
"role": msg["role"],
"content": msg["content"],
}
)
kwargs = {
"model": model,
"messages": conversation,
"temperature": temperature,
"max_tokens": max_tokens or 1024,
}
if system_msg:
kwargs["system"] = system_msg
response = await client.messages.create(**kwargs)
return {
"content": response.content[0].text,
"model": response.model,
}
async def _call_grok(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call xAI Grok API via OpenAI-compatible SDK."""
import httpx
import openai
client = openai.AsyncOpenAI(
api_key=provider.api_key,
base_url=provider.base_url or settings.xai_base_url,
timeout=httpx.Timeout(300.0),
)
kwargs = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await client.chat.completions.create(**kwargs)
return {
"content": response.choices[0].message.content,
"model": response.model,
}
async def _call_vllm_mlx(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call vllm-mlx via its OpenAI-compatible API.
vllm-mlx exposes the same /v1/chat/completions endpoint as OpenAI,
so we reuse the OpenAI client pointed at the local server.
No API key is required for local deployments.
"""
import openai
base_url = provider.base_url or provider.url or "http://localhost:8000"
# Ensure the base_url ends with /v1 as expected by the OpenAI client
if not base_url.rstrip("/").endswith("/v1"):
base_url = base_url.rstrip("/") + "/v1"
client = openai.AsyncOpenAI(
api_key=provider.api_key or "no-key-required",
base_url=base_url,
timeout=self.config.timeout_seconds,
)
kwargs: dict = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await client.chat.completions.create(**kwargs)
return {
"content": response.choices[0].message.content,
"model": response.model,
}
def _record_success(self, provider: Provider, latency_ms: float) -> None:
"""Record a successful request."""
provider.metrics.total_requests += 1
provider.metrics.successful_requests += 1
provider.metrics.total_latency_ms += latency_ms
provider.metrics.last_request_time = datetime.now(UTC).isoformat()
provider.metrics.consecutive_failures = 0
# Close circuit breaker if half-open
if provider.circuit_state == CircuitState.HALF_OPEN:
provider.half_open_calls += 1
if provider.half_open_calls >= self.config.circuit_breaker_half_open_max_calls:
self._close_circuit(provider)
# Update status based on error rate
if provider.metrics.error_rate < 0.1:
provider.status = ProviderStatus.HEALTHY
elif provider.metrics.error_rate < 0.3:
provider.status = ProviderStatus.DEGRADED
def _record_failure(self, provider: Provider) -> None:
"""Record a failed request."""
provider.metrics.total_requests += 1
provider.metrics.failed_requests += 1
provider.metrics.last_error_time = datetime.now(UTC).isoformat()
provider.metrics.consecutive_failures += 1
# Check if we should open circuit breaker
if provider.metrics.consecutive_failures >= self.config.circuit_breaker_failure_threshold:
self._open_circuit(provider)
# Update status
if provider.metrics.error_rate > 0.3:
provider.status = ProviderStatus.DEGRADED
if provider.metrics.error_rate > 0.5:
provider.status = ProviderStatus.UNHEALTHY
def _open_circuit(self, provider: Provider) -> None:
"""Open the circuit breaker for a provider."""
provider.circuit_state = CircuitState.OPEN
provider.circuit_opened_at = time.time()
provider.status = ProviderStatus.UNHEALTHY
logger.warning("Circuit breaker OPEN for %s", provider.name)
def _can_close_circuit(self, provider: Provider) -> bool:
"""Check if circuit breaker can transition to half-open."""
if provider.circuit_opened_at is None:
return False
elapsed = time.time() - provider.circuit_opened_at
return elapsed >= self.config.circuit_breaker_recovery_timeout
def _close_circuit(self, provider: Provider) -> None:
"""Close the circuit breaker (provider healthy again)."""
provider.circuit_state = CircuitState.CLOSED
provider.circuit_opened_at = None
provider.half_open_calls = 0
provider.metrics.consecutive_failures = 0
provider.status = ProviderStatus.HEALTHY
logger.info("Circuit breaker CLOSED for %s", provider.name)
def reload_config(self) -> dict:
"""Hot-reload providers.yaml, preserving runtime state.

View File

@@ -0,0 +1,137 @@
"""Health monitoring and circuit breaker mixin for the Cascade Router.
Provides failure tracking, circuit breaker state transitions,
and quota-based cloud provider gating.
"""
from __future__ import annotations
import logging
import time
from datetime import UTC, datetime
from .models import CircuitState, Provider, ProviderMetrics, ProviderStatus
logger = logging.getLogger(__name__)
# Quota monitor — optional, degrades gracefully if unavailable
try:
from infrastructure.claude_quota import QuotaMonitor, get_quota_monitor
_quota_monitor: "QuotaMonitor | None" = get_quota_monitor()
except Exception as _exc: # pragma: no cover
logger.debug("Quota monitor not available: %s", _exc)
_quota_monitor = None
class HealthMixin:
"""Mixin providing health tracking, circuit breaker, and quota checks.
Expects the consuming class to have:
- self.config: RouterConfig
- self.providers: list[Provider]
"""
def _record_success(self, provider: Provider, latency_ms: float) -> None:
"""Record a successful request."""
provider.metrics.total_requests += 1
provider.metrics.successful_requests += 1
provider.metrics.total_latency_ms += latency_ms
provider.metrics.last_request_time = datetime.now(UTC).isoformat()
provider.metrics.consecutive_failures = 0
# Close circuit breaker if half-open
if provider.circuit_state == CircuitState.HALF_OPEN:
provider.half_open_calls += 1
if provider.half_open_calls >= self.config.circuit_breaker_half_open_max_calls:
self._close_circuit(provider)
# Update status based on error rate
if provider.metrics.error_rate < 0.1:
provider.status = ProviderStatus.HEALTHY
elif provider.metrics.error_rate < 0.3:
provider.status = ProviderStatus.DEGRADED
def _record_failure(self, provider: Provider) -> None:
"""Record a failed request."""
provider.metrics.total_requests += 1
provider.metrics.failed_requests += 1
provider.metrics.last_error_time = datetime.now(UTC).isoformat()
provider.metrics.consecutive_failures += 1
# Check if we should open circuit breaker
if provider.metrics.consecutive_failures >= self.config.circuit_breaker_failure_threshold:
self._open_circuit(provider)
# Update status
if provider.metrics.error_rate > 0.3:
provider.status = ProviderStatus.DEGRADED
if provider.metrics.error_rate > 0.5:
provider.status = ProviderStatus.UNHEALTHY
def _open_circuit(self, provider: Provider) -> None:
"""Open the circuit breaker for a provider."""
provider.circuit_state = CircuitState.OPEN
provider.circuit_opened_at = time.time()
provider.status = ProviderStatus.UNHEALTHY
logger.warning("Circuit breaker OPEN for %s", provider.name)
def _can_close_circuit(self, provider: Provider) -> bool:
"""Check if circuit breaker can transition to half-open."""
if provider.circuit_opened_at is None:
return False
elapsed = time.time() - provider.circuit_opened_at
return elapsed >= self.config.circuit_breaker_recovery_timeout
def _close_circuit(self, provider: Provider) -> None:
"""Close the circuit breaker (provider healthy again)."""
provider.circuit_state = CircuitState.CLOSED
provider.circuit_opened_at = None
provider.half_open_calls = 0
provider.metrics.consecutive_failures = 0
provider.status = ProviderStatus.HEALTHY
logger.info("Circuit breaker CLOSED for %s", provider.name)
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
return False
if provider.status == ProviderStatus.UNHEALTHY:
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
return False
return True
def _quota_allows_cloud(self, provider: Provider) -> bool:
"""Check quota before routing to a cloud provider.
Uses the metabolic protocol via select_model(): cloud calls are only
allowed when the quota monitor recommends a cloud model (BURST tier).
Returns True (allow cloud) if quota monitor is unavailable or returns None.
"""
if _quota_monitor is None:
return True
try:
suggested = _quota_monitor.select_model("high")
# Cloud is allowed only when select_model recommends the cloud model
allows = suggested == "claude-sonnet-4-6"
if not allows:
status = _quota_monitor.check()
tier = status.recommended_tier.value if status else "unknown"
logger.info(
"Metabolic protocol: %s tier — downshifting %s to local (%s)",
tier,
provider.name,
suggested,
)
return allows
except Exception as exc:
logger.warning("Quota check failed, allowing cloud: %s", exc)
return True

View File

@@ -0,0 +1,138 @@
"""Data models for the Cascade LLM Router.
Enums, dataclasses, and configuration objects shared across router modules.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
class ProviderStatus(Enum):
"""Health status of a provider."""
HEALTHY = "healthy"
DEGRADED = "degraded" # Working but slow or occasional errors
UNHEALTHY = "unhealthy" # Circuit breaker open
DISABLED = "disabled"
class CircuitState(Enum):
"""Circuit breaker state."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, rejecting requests
HALF_OPEN = "half_open" # Testing if recovered
class ContentType(Enum):
"""Type of content in the request."""
TEXT = "text"
VISION = "vision" # Contains images
AUDIO = "audio" # Contains audio
MULTIMODAL = "multimodal" # Multiple content types
@dataclass
class ProviderMetrics:
"""Metrics for a single provider."""
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
total_latency_ms: float = 0.0
last_request_time: str | None = None
last_error_time: str | None = None
consecutive_failures: int = 0
@property
def avg_latency_ms(self) -> float:
if self.total_requests == 0:
return 0.0
return self.total_latency_ms / self.total_requests
@property
def error_rate(self) -> float:
if self.total_requests == 0:
return 0.0
return self.failed_requests / self.total_requests
@dataclass
class ModelCapability:
"""Capabilities a model supports."""
name: str
supports_vision: bool = False
supports_audio: bool = False
supports_tools: bool = False
supports_json: bool = False
supports_streaming: bool = True
context_window: int = 4096
@dataclass
class Provider:
"""LLM provider configuration and state."""
name: str
type: str # ollama, openai, anthropic
enabled: bool
priority: int
tier: str | None = None # e.g., "local", "standard_cloud", "frontier"
url: str | None = None
api_key: str | None = None
base_url: str | None = None
models: list[dict] = field(default_factory=list)
# Runtime state
status: ProviderStatus = ProviderStatus.HEALTHY
metrics: ProviderMetrics = field(default_factory=ProviderMetrics)
circuit_state: CircuitState = CircuitState.CLOSED
circuit_opened_at: float | None = None
half_open_calls: int = 0
def get_default_model(self) -> str | None:
"""Get the default model for this provider."""
for model in self.models:
if model.get("default"):
return model["name"]
if self.models:
return self.models[0]["name"]
return None
def get_model_with_capability(self, capability: str) -> str | None:
"""Get a model that supports the given capability."""
for model in self.models:
capabilities = model.get("capabilities", [])
if capability in capabilities:
return model["name"]
# Fall back to default
return self.get_default_model()
def model_has_capability(self, model_name: str, capability: str) -> bool:
"""Check if a specific model has a capability."""
for model in self.models:
if model["name"] == model_name:
capabilities = model.get("capabilities", [])
return capability in capabilities
return False
@dataclass
class RouterConfig:
"""Cascade router configuration."""
timeout_seconds: int = 30
max_retries_per_provider: int = 2
retry_delay_seconds: int = 1
circuit_breaker_failure_threshold: int = 5
circuit_breaker_recovery_timeout: int = 60
circuit_breaker_half_open_max_calls: int = 2
cost_tracking_enabled: bool = True
budget_daily_usd: float = 10.0
# Multi-modal settings
auto_pull_models: bool = True
fallback_chains: dict = field(default_factory=dict)

View File

@@ -0,0 +1,318 @@
"""Provider API call mixin for the Cascade Router.
Contains methods for calling individual LLM provider APIs
(Ollama, OpenAI, Anthropic, Grok, vllm-mlx).
"""
from __future__ import annotations
import base64
import logging
import time
from pathlib import Path
from typing import Any
from config import settings
from .models import ContentType, Provider
logger = logging.getLogger(__name__)
class ProviderCallsMixin:
"""Mixin providing LLM provider API call methods.
Expects the consuming class to have:
- self.config: RouterConfig
"""
async def _try_provider(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
content_type: ContentType = ContentType.TEXT,
) -> dict:
"""Try a single provider request."""
start_time = time.time()
if provider.type == "ollama":
result = await self._call_ollama(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
elif provider.type == "openai":
result = await self._call_openai(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
elif provider.type == "anthropic":
result = await self._call_anthropic(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
elif provider.type == "grok":
result = await self._call_grok(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
elif provider.type == "vllm_mlx":
result = await self._call_vllm_mlx(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
else:
raise ValueError(f"Unknown provider type: {provider.type}")
latency_ms = (time.time() - start_time) * 1000
result["latency_ms"] = latency_ms
return result
async def _call_ollama(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None = None,
content_type: ContentType = ContentType.TEXT,
) -> dict:
"""Call Ollama API with multi-modal support."""
import aiohttp
url = f"{provider.url or settings.ollama_url}/api/chat"
# Transform messages for Ollama format (including images)
transformed_messages = self._transform_messages_for_ollama(messages)
options: dict[str, Any] = {"temperature": temperature}
if max_tokens:
options["num_predict"] = max_tokens
payload = {
"model": model,
"messages": transformed_messages,
"stream": False,
"options": options,
}
timeout = aiohttp.ClientTimeout(total=self.config.timeout_seconds)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, json=payload) as response:
if response.status != 200:
text = await response.text()
raise RuntimeError(f"Ollama error {response.status}: {text}")
data = await response.json()
return {
"content": data["message"]["content"],
"model": model,
}
def _transform_messages_for_ollama(self, messages: list[dict]) -> list[dict]:
"""Transform messages to Ollama format, handling images."""
transformed = []
for msg in messages:
new_msg: dict[str, Any] = {
"role": msg.get("role", "user"),
"content": msg.get("content", ""),
}
# Handle images
images = msg.get("images", [])
if images:
new_msg["images"] = []
for img in images:
if isinstance(img, str):
if img.startswith("data:image/"):
# Base64 encoded image
new_msg["images"].append(img.split(",")[1])
elif img.startswith("http://") or img.startswith("https://"):
# URL - would need to download, skip for now
logger.warning("Image URLs not yet supported, skipping: %s", img)
elif Path(img).exists():
# Local file path - read and encode
try:
with open(img, "rb") as f:
img_data = base64.b64encode(f.read()).decode()
new_msg["images"].append(img_data)
except Exception as exc:
logger.error("Failed to read image %s: %s", img, exc)
transformed.append(new_msg)
return transformed
async def _call_openai(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call OpenAI API."""
import openai
client = openai.AsyncOpenAI(
api_key=provider.api_key,
base_url=provider.base_url,
timeout=self.config.timeout_seconds,
)
kwargs: dict[str, Any] = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await client.chat.completions.create(**kwargs)
return {
"content": response.choices[0].message.content,
"model": response.model,
}
async def _call_anthropic(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call Anthropic API."""
import anthropic
client = anthropic.AsyncAnthropic(
api_key=provider.api_key,
timeout=self.config.timeout_seconds,
)
# Convert messages to Anthropic format
system_msg = None
conversation = []
for msg in messages:
if msg["role"] == "system":
system_msg = msg["content"]
else:
conversation.append(
{
"role": msg["role"],
"content": msg["content"],
}
)
kwargs: dict[str, Any] = {
"model": model,
"messages": conversation,
"temperature": temperature,
"max_tokens": max_tokens or 1024,
}
if system_msg:
kwargs["system"] = system_msg
response = await client.messages.create(**kwargs)
return {
"content": response.content[0].text,
"model": response.model,
}
async def _call_grok(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call xAI Grok API via OpenAI-compatible SDK."""
import httpx
import openai
client = openai.AsyncOpenAI(
api_key=provider.api_key,
base_url=provider.base_url or settings.xai_base_url,
timeout=httpx.Timeout(300.0),
)
kwargs: dict[str, Any] = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await client.chat.completions.create(**kwargs)
return {
"content": response.choices[0].message.content,
"model": response.model,
}
async def _call_vllm_mlx(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call vllm-mlx via its OpenAI-compatible API.
vllm-mlx exposes the same /v1/chat/completions endpoint as OpenAI,
so we reuse the OpenAI client pointed at the local server.
No API key is required for local deployments.
"""
import openai
base_url = provider.base_url or provider.url or "http://localhost:8000"
# Ensure the base_url ends with /v1 as expected by the OpenAI client
if not base_url.rstrip("/").endswith("/v1"):
base_url = base_url.rstrip("/") + "/v1"
client = openai.AsyncOpenAI(
api_key=provider.api_key or "no-key-required",
base_url=base_url,
timeout=self.config.timeout_seconds,
)
kwargs: dict[str, Any] = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await client.chat.completions.create(**kwargs)
return {
"content": response.choices[0].message.content,
"model": response.model,
}

View File

@@ -89,7 +89,12 @@ class HotMemory:
"""Read hot memory — computed view of top facts + last reflection from DB."""
try:
facts = recall_personal_facts()
lines = ["# Timmy Hot Memory\n"]
now = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
lines = [
"# Timmy Hot Memory\n",
"> Working RAM — always loaded, ~300 lines max, pruned monthly",
f"> Last updated: {now}\n",
]
if facts:
lines.append("## Known Facts\n")

View File

@@ -0,0 +1,15 @@
"""Nexus subsystem — Timmy's sovereign conversational awareness space.
Extends the Nexus v1 chat interface with:
- **Introspection engine** — real-time cognitive state, thought-stream
integration, and session analytics surfaced directly in the Nexus.
- **Persistent sessions** — SQLite-backed conversation history that
survives process restarts.
- **Sovereignty pulse** — a live dashboard-within-dashboard showing
Timmy's sovereignty health, crystallization rate, and API independence.
"""
from timmy.nexus.introspection import NexusIntrospector # noqa: F401
from timmy.nexus.persistence import NexusStore # noqa: F401
from timmy.nexus.sovereignty_pulse import SovereigntyPulse # noqa: F401

View File

@@ -0,0 +1,236 @@
"""Nexus Introspection Engine — cognitive self-awareness for Timmy.
Aggregates live signals from the CognitiveTracker, ThinkingEngine, and
MemorySystem into a unified introspection snapshot. The Nexus template
renders this as an always-visible cognitive state panel so the operator
can observe Timmy's inner life in real time.
Design principles:
- Read-only observer — never mutates cognitive state.
- Graceful degradation — if any upstream is unavailable, the snapshot
still returns with partial data instead of crashing.
- JSON-serializable — every method returns plain dicts ready for
WebSocket push or Jinja2 template rendering.
Refs: #1090 (Nexus Epic), architecture-v2.md §Intelligence Surface
"""
from __future__ import annotations
import logging
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime
logger = logging.getLogger(__name__)
# ── Data models ──────────────────────────────────────────────────────────────
@dataclass
class CognitiveSummary:
"""Distilled view of Timmy's current cognitive state."""
mood: str = "settled"
engagement: str = "idle"
focus_topic: str | None = None
conversation_depth: int = 0
active_commitments: list[str] = field(default_factory=list)
last_initiative: str | None = None
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class ThoughtSummary:
"""Compact representation of a single thought for the Nexus viewer."""
id: str
content: str
seed_type: str
created_at: str
parent_id: str | None = None
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class SessionAnalytics:
"""Conversation-level analytics for the active Nexus session."""
total_messages: int = 0
user_messages: int = 0
assistant_messages: int = 0
avg_response_length: float = 0.0
topics_discussed: list[str] = field(default_factory=list)
session_start: str | None = None
session_duration_minutes: float = 0.0
memory_hits_total: int = 0
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class IntrospectionSnapshot:
"""Everything the Nexus template needs to render the cognitive panel."""
cognitive: CognitiveSummary = field(default_factory=CognitiveSummary)
recent_thoughts: list[ThoughtSummary] = field(default_factory=list)
analytics: SessionAnalytics = field(default_factory=SessionAnalytics)
timestamp: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
def to_dict(self) -> dict:
return {
"cognitive": self.cognitive.to_dict(),
"recent_thoughts": [t.to_dict() for t in self.recent_thoughts],
"analytics": self.analytics.to_dict(),
"timestamp": self.timestamp,
}
# ── Introspector ─────────────────────────────────────────────────────────────
class NexusIntrospector:
"""Aggregates cognitive signals into a single introspection snapshot.
Lazily pulls from:
- ``timmy.cognitive_state.cognitive_tracker``
- ``timmy.thinking.thinking_engine``
- Nexus conversation log (passed in to avoid circular import)
"""
def __init__(self) -> None:
self._session_start: datetime | None = None
self._topics: list[str] = []
self._memory_hit_count: int = 0
# ── Public API ────────────────────────────────────────────────────────
def snapshot(
self,
conversation_log: list[dict] | None = None,
) -> IntrospectionSnapshot:
"""Build a complete introspection snapshot.
Parameters
----------
conversation_log:
The in-memory ``_nexus_log`` from the routes module (list of
dicts with ``role``, ``content``, ``timestamp`` keys).
"""
return IntrospectionSnapshot(
cognitive=self._read_cognitive(),
recent_thoughts=self._read_thoughts(),
analytics=self._compute_analytics(conversation_log or []),
)
def record_memory_hits(self, count: int) -> None:
"""Track cumulative memory hits for session analytics."""
self._memory_hit_count += count
def reset(self) -> None:
"""Reset session-scoped analytics (e.g. on history clear)."""
self._session_start = None
self._topics.clear()
self._memory_hit_count = 0
# ── Cognitive state reader ────────────────────────────────────────────
def _read_cognitive(self) -> CognitiveSummary:
"""Pull current state from the CognitiveTracker singleton."""
try:
from timmy.cognitive_state import cognitive_tracker
state = cognitive_tracker.get_state()
return CognitiveSummary(
mood=state.mood,
engagement=state.engagement,
focus_topic=state.focus_topic,
conversation_depth=state.conversation_depth,
active_commitments=list(state.active_commitments),
last_initiative=state.last_initiative,
)
except Exception as exc:
logger.debug("Introspection: cognitive state unavailable: %s", exc)
return CognitiveSummary()
# ── Thought stream reader ─────────────────────────────────────────────
def _read_thoughts(self, limit: int = 5) -> list[ThoughtSummary]:
"""Pull recent thoughts from the ThinkingEngine."""
try:
from timmy.thinking import thinking_engine
thoughts = thinking_engine.get_recent_thoughts(limit=limit)
return [
ThoughtSummary(
id=t.id,
content=(
t.content[:200] + "" if len(t.content) > 200 else t.content
),
seed_type=t.seed_type,
created_at=t.created_at,
parent_id=t.parent_id,
)
for t in thoughts
]
except Exception as exc:
logger.debug("Introspection: thought stream unavailable: %s", exc)
return []
# ── Session analytics ─────────────────────────────────────────────────
def _compute_analytics(
self, conversation_log: list[dict]
) -> SessionAnalytics:
"""Derive analytics from the Nexus conversation log."""
if not conversation_log:
return SessionAnalytics()
if self._session_start is None:
self._session_start = datetime.now(UTC)
user_msgs = [m for m in conversation_log if m.get("role") == "user"]
asst_msgs = [
m for m in conversation_log if m.get("role") == "assistant"
]
avg_len = 0.0
if asst_msgs:
total_chars = sum(len(m.get("content", "")) for m in asst_msgs)
avg_len = total_chars / len(asst_msgs)
# Extract topics from user messages (simple: first 40 chars)
topics = []
seen: set[str] = set()
for m in user_msgs:
topic = m.get("content", "")[:40].strip()
if topic and topic.lower() not in seen:
topics.append(topic)
seen.add(topic.lower())
# Keep last 8 topics
topics = topics[-8:]
elapsed = (datetime.now(UTC) - self._session_start).total_seconds() / 60
return SessionAnalytics(
total_messages=len(conversation_log),
user_messages=len(user_msgs),
assistant_messages=len(asst_msgs),
avg_response_length=round(avg_len, 1),
topics_discussed=topics,
session_start=self._session_start.strftime("%H:%M:%S"),
session_duration_minutes=round(elapsed, 1),
memory_hits_total=self._memory_hit_count,
)
# ── Module singleton ─────────────────────────────────────────────────────────
nexus_introspector = NexusIntrospector()

View File

@@ -0,0 +1,230 @@
"""Nexus Session Persistence — durable conversation history.
The v1 Nexus kept conversations in a Python ``list`` that vanished on
every process restart. This module provides a SQLite-backed store so
Nexus conversations survive reboots while remaining fully local.
Schema:
nexus_messages(id, role, content, timestamp, session_tag)
Design decisions:
- One table, one DB file (``data/nexus.db``). Cheap, portable, sovereign.
- ``session_tag`` enables future per-operator sessions (#1090 deferred scope).
- Bounded history: ``MAX_MESSAGES`` rows per session tag. Oldest are pruned
automatically on insert.
- Thread-safe via SQLite WAL mode + module-level singleton.
Refs: #1090 (Nexus Epic — session persistence), architecture-v2.md §Data Layer
"""
from __future__ import annotations
import logging
import sqlite3
from contextlib import closing
from datetime import UTC, datetime
from pathlib import Path
from typing import TypedDict
logger = logging.getLogger(__name__)
# ── Defaults ─────────────────────────────────────────────────────────────────
_DEFAULT_DB_DIR = Path("data")
DB_PATH: Path = _DEFAULT_DB_DIR / "nexus.db"
MAX_MESSAGES = 500 # per session tag
DEFAULT_SESSION_TAG = "nexus"
# ── Schema ───────────────────────────────────────────────────────────────────
_SCHEMA = """\
CREATE TABLE IF NOT EXISTS nexus_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
session_tag TEXT NOT NULL DEFAULT 'nexus'
);
CREATE INDEX IF NOT EXISTS idx_nexus_session ON nexus_messages(session_tag);
CREATE INDEX IF NOT EXISTS idx_nexus_ts ON nexus_messages(timestamp);
"""
# ── Typed dict for rows ──────────────────────────────────────────────────────
class NexusMessage(TypedDict):
id: int
role: str
content: str
timestamp: str
session_tag: str
# ── Store ────────────────────────────────────────────────────────────────────
class NexusStore:
"""SQLite-backed persistence for Nexus conversations.
Usage::
store = NexusStore() # uses module-level DB_PATH
store.append("user", "hi")
msgs = store.get_history() # → list[NexusMessage]
store.clear() # wipe session
"""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or DB_PATH
self._conn: sqlite3.Connection | None = None
# ── Connection management ─────────────────────────────────────────────
def _get_conn(self) -> sqlite3.Connection:
if self._conn is None:
self._db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(
str(self._db_path),
check_same_thread=False,
)
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.executescript(_SCHEMA)
return self._conn
def close(self) -> None:
"""Close the underlying connection (idempotent)."""
if self._conn is not None:
try:
self._conn.close()
except Exception:
pass
self._conn = None
# ── Write ─────────────────────────────────────────────────────────────
def append(
self,
role: str,
content: str,
*,
timestamp: str | None = None,
session_tag: str = DEFAULT_SESSION_TAG,
) -> int:
"""Insert a message and return its row id.
Automatically prunes oldest messages when the session exceeds
``MAX_MESSAGES``.
"""
ts = timestamp or datetime.now(UTC).strftime("%H:%M:%S")
conn = self._get_conn()
with closing(conn.cursor()) as cur:
cur.execute(
"INSERT INTO nexus_messages (role, content, timestamp, session_tag) "
"VALUES (?, ?, ?, ?)",
(role, content, ts, session_tag),
)
row_id: int = cur.lastrowid # type: ignore[assignment]
conn.commit()
# Prune
self._prune(session_tag)
return row_id
def _prune(self, session_tag: str) -> None:
"""Remove oldest rows that exceed MAX_MESSAGES for *session_tag*."""
conn = self._get_conn()
with closing(conn.cursor()) as cur:
cur.execute(
"SELECT COUNT(*) FROM nexus_messages WHERE session_tag = ?",
(session_tag,),
)
count = cur.fetchone()[0]
if count > MAX_MESSAGES:
excess = count - MAX_MESSAGES
cur.execute(
"DELETE FROM nexus_messages WHERE id IN ("
" SELECT id FROM nexus_messages "
" WHERE session_tag = ? ORDER BY id ASC LIMIT ?"
")",
(session_tag, excess),
)
conn.commit()
# ── Read ──────────────────────────────────────────────────────────────
def get_history(
self,
session_tag: str = DEFAULT_SESSION_TAG,
limit: int = 200,
) -> list[NexusMessage]:
"""Return the most recent *limit* messages for *session_tag*.
Results are ordered oldest-first (ascending id).
"""
conn = self._get_conn()
with closing(conn.cursor()) as cur:
cur.execute(
"SELECT id, role, content, timestamp, session_tag "
"FROM nexus_messages "
"WHERE session_tag = ? "
"ORDER BY id DESC LIMIT ?",
(session_tag, limit),
)
rows = cur.fetchall()
# Reverse to chronological order
messages: list[NexusMessage] = [
NexusMessage(
id=r["id"],
role=r["role"],
content=r["content"],
timestamp=r["timestamp"],
session_tag=r["session_tag"],
)
for r in reversed(rows)
]
return messages
def message_count(
self, session_tag: str = DEFAULT_SESSION_TAG
) -> int:
"""Return total message count for *session_tag*."""
conn = self._get_conn()
with closing(conn.cursor()) as cur:
cur.execute(
"SELECT COUNT(*) FROM nexus_messages WHERE session_tag = ?",
(session_tag,),
)
return cur.fetchone()[0]
# ── Delete ────────────────────────────────────────────────────────────
def clear(self, session_tag: str = DEFAULT_SESSION_TAG) -> int:
"""Delete all messages for *session_tag*. Returns count deleted."""
conn = self._get_conn()
with closing(conn.cursor()) as cur:
cur.execute(
"DELETE FROM nexus_messages WHERE session_tag = ?",
(session_tag,),
)
deleted: int = cur.rowcount
conn.commit()
return deleted
def clear_all(self) -> int:
"""Delete every message across all session tags."""
conn = self._get_conn()
with closing(conn.cursor()) as cur:
cur.execute("DELETE FROM nexus_messages")
deleted: int = cur.rowcount
conn.commit()
return deleted
# ── Module singleton ─────────────────────────────────────────────────────────
nexus_store = NexusStore()

View File

@@ -0,0 +1,153 @@
"""Sovereignty Pulse — real-time sovereignty health for the Nexus.
Reads from the ``SovereigntyMetricsStore`` (created in PR #1331) and
distils it into a compact "pulse" that the Nexus template can render
as a persistent health badge.
The pulse answers one question at a glance: *how sovereign is Timmy
right now?*
Signals:
- Overall sovereignty percentage (0100).
- Per-layer breakdown (perception, decision, narration).
- Crystallization velocity — new rules learned in the last hour.
- API independence — percentage of recent inferences served locally.
- Health rating (sovereign / degraded / dependent).
All methods return plain dicts — no imports leak into the template layer.
Refs: #953 (Sovereignty Loop), #954 (metrics), #1090 (Nexus epic)
"""
from __future__ import annotations
import logging
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime
logger = logging.getLogger(__name__)
# ── Data model ───────────────────────────────────────────────────────────────
@dataclass
class LayerPulse:
"""Sovereignty metrics for a single AI layer."""
name: str
sovereign_pct: float = 0.0
cache_hits: int = 0
model_calls: int = 0
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class SovereigntyPulseSnapshot:
"""Complete sovereignty health reading for the Nexus display."""
overall_pct: float = 0.0
health: str = "unknown" # sovereign | degraded | dependent | unknown
layers: list[LayerPulse] = field(default_factory=list)
crystallizations_last_hour: int = 0
api_independence_pct: float = 0.0
total_events: int = 0
timestamp: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
def to_dict(self) -> dict:
return {
"overall_pct": self.overall_pct,
"health": self.health,
"layers": [layer.to_dict() for layer in self.layers],
"crystallizations_last_hour": self.crystallizations_last_hour,
"api_independence_pct": self.api_independence_pct,
"total_events": self.total_events,
"timestamp": self.timestamp,
}
# ── Pulse reader ─────────────────────────────────────────────────────────────
def _classify_health(pct: float) -> str:
"""Map overall sovereignty percentage to a human-readable health label."""
if pct >= 80.0:
return "sovereign"
if pct >= 50.0:
return "degraded"
if pct > 0.0:
return "dependent"
return "unknown"
class SovereigntyPulse:
"""Reads sovereignty metrics and emits pulse snapshots.
Lazily imports from ``timmy.sovereignty.metrics`` so the Nexus
module has no hard compile-time dependency on the Sovereignty Loop.
"""
def snapshot(self) -> SovereigntyPulseSnapshot:
"""Build a pulse snapshot from the live metrics store."""
try:
return self._read_metrics()
except Exception as exc:
logger.debug("SovereigntyPulse: metrics unavailable: %s", exc)
return SovereigntyPulseSnapshot()
def _read_metrics(self) -> SovereigntyPulseSnapshot:
"""Internal reader — allowed to raise if imports fail."""
from timmy.sovereignty.metrics import get_metrics_store
store = get_metrics_store()
snap = store.get_snapshot()
# Parse per-layer stats from the snapshot
layers = []
layer_pcts: list[float] = []
for layer_name in ("perception", "decision", "narration"):
layer_data = snap.get(layer_name, {})
hits = layer_data.get("cache_hits", 0)
calls = layer_data.get("model_calls", 0)
total = hits + calls
pct = (hits / total * 100) if total > 0 else 0.0
layers.append(
LayerPulse(
name=layer_name,
sovereign_pct=round(pct, 1),
cache_hits=hits,
model_calls=calls,
)
)
layer_pcts.append(pct)
overall = round(sum(layer_pcts) / len(layer_pcts), 1) if layer_pcts else 0.0
# Crystallization count
cryst = snap.get("crystallizations", 0)
# API independence: cache_hits / total across all layers
total_hits = sum(layer.cache_hits for layer in layers)
total_calls = sum(layer.model_calls for layer in layers)
total_all = total_hits + total_calls
api_indep = round((total_hits / total_all * 100), 1) if total_all > 0 else 0.0
total_events = snap.get("total_events", 0)
return SovereigntyPulseSnapshot(
overall_pct=overall,
health=_classify_health(overall),
layers=layers,
crystallizations_last_hour=cryst,
api_independence_pct=api_indep,
total_events=total_events,
)
# ── Module singleton ─────────────────────────────────────────────────────────
sovereignty_pulse = SovereigntyPulse()

View File

@@ -2665,25 +2665,27 @@
}
.vs-btn-save:hover { opacity: 0.85; }
/* ── Nexus ────────────────────────────────────────────────── */
.nexus-layout { max-width: 1400px; margin: 0 auto; }
/* ── Nexus v2 ─────────────────────────────────────────────── */
.nexus-layout { max-width: 1600px; margin: 0 auto; }
.nexus-header { border-bottom: 1px solid var(--border); padding-bottom: 0.5rem; }
.nexus-title { font-size: 1.4rem; font-weight: 700; color: var(--purple); letter-spacing: 0.1em; }
.nexus-subtitle { font-size: 0.8rem; color: var(--text-dim); margin-top: 0.2rem; }
.nexus-grid {
/* v2 grid: wider sidebar for awareness panels */
.nexus-grid-v2 {
display: grid;
grid-template-columns: 1fr 320px;
grid-template-columns: 1fr 360px;
gap: 1rem;
align-items: start;
}
@media (max-width: 900px) {
.nexus-grid { grid-template-columns: 1fr; }
@media (max-width: 1000px) {
.nexus-grid-v2 { grid-template-columns: 1fr; }
}
.nexus-chat-panel { height: calc(100vh - 180px); display: flex; flex-direction: column; }
.nexus-chat-panel .card-body { overflow-y: auto; flex: 1; }
.nexus-msg-count { font-size: 0.7rem; color: var(--text-dim); letter-spacing: 0.05em; }
.nexus-empty-state {
color: var(--text-dim);
@@ -2693,6 +2695,177 @@
text-align: center;
}
/* Sidebar scrollable on short screens */
.nexus-sidebar-col { max-height: calc(100vh - 140px); overflow-y: auto; }
/* ── Sovereignty Pulse Badge (header) ── */
.nexus-pulse-badge {
display: flex;
align-items: center;
gap: 0.4rem;
background: var(--bg-card);
border: 1px solid var(--border);
border-radius: var(--radius-md);
padding: 0.3rem 0.7rem;
font-size: 0.72rem;
letter-spacing: 0.05em;
}
.nexus-pulse-dot {
width: 8px; height: 8px;
border-radius: 50%;
display: inline-block;
}
.nexus-pulse-dot.nexus-pulse-sovereign { background: var(--green); box-shadow: 0 0 6px var(--green); }
.nexus-pulse-dot.nexus-pulse-degraded { background: var(--amber); box-shadow: 0 0 6px var(--amber); }
.nexus-pulse-dot.nexus-pulse-dependent { background: var(--red); box-shadow: 0 0 6px var(--red); }
.nexus-pulse-dot.nexus-pulse-unknown { background: var(--text-dim); }
.nexus-pulse-label { color: var(--text-dim); }
.nexus-pulse-value { color: var(--text-bright); font-weight: 600; }
/* ── Cognitive State Panel ── */
.nexus-cognitive-panel .card-body { font-size: 0.78rem; }
.nexus-engagement-badge {
font-size: 0.65rem;
letter-spacing: 0.08em;
padding: 0.15rem 0.5rem;
border-radius: 3px;
background: rgba(168,85,247,0.12);
color: var(--purple);
}
.nexus-cog-grid {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 0.5rem;
}
.nexus-cog-item {
background: rgba(255,255,255,0.02);
border-radius: 4px;
padding: 0.35rem 0.5rem;
}
.nexus-cog-label {
font-size: 0.62rem;
color: var(--text-dim);
letter-spacing: 0.08em;
margin-bottom: 0.15rem;
}
.nexus-cog-value {
color: var(--text-bright);
font-size: 0.8rem;
}
.nexus-cog-focus {
font-size: 0.72rem;
color: var(--text);
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
max-width: 140px;
}
.nexus-commitments { font-size: 0.72rem; }
.nexus-commitment-item {
color: var(--text);
padding: 0.2rem 0;
border-bottom: 1px solid rgba(59,26,92,0.4);
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
/* ── Thought Stream Panel ── */
.nexus-thoughts-panel .card-body { max-height: 200px; overflow-y: auto; }
.nexus-thought-item {
border-left: 2px solid var(--purple);
padding: 0.3rem 0.5rem;
margin-bottom: 0.5rem;
font-size: 0.76rem;
background: rgba(168,85,247,0.04);
border-radius: 0 4px 4px 0;
}
.nexus-thought-meta {
display: flex;
justify-content: space-between;
margin-bottom: 0.2rem;
}
.nexus-thought-seed {
color: var(--purple);
font-size: 0.65rem;
letter-spacing: 0.06em;
text-transform: uppercase;
}
.nexus-thought-time { color: var(--text-dim); font-size: 0.62rem; }
.nexus-thought-content { color: var(--text); line-height: 1.4; }
/* ── Sovereignty Pulse Detail Panel ── */
.nexus-health-badge {
font-size: 0.62rem;
letter-spacing: 0.08em;
padding: 0.15rem 0.5rem;
border-radius: 3px;
}
.nexus-health-sovereign { background: rgba(0,232,122,0.12); color: var(--green); }
.nexus-health-degraded { background: rgba(255,184,0,0.12); color: var(--amber); }
.nexus-health-dependent { background: rgba(255,68,85,0.12); color: var(--red); }
.nexus-health-unknown { background: rgba(107,74,138,0.12); color: var(--text-dim); }
.nexus-pulse-layer {
display: flex;
align-items: center;
gap: 0.4rem;
margin-bottom: 0.35rem;
font-size: 0.72rem;
}
.nexus-pulse-layer-label {
color: var(--text-dim);
min-width: 80px;
letter-spacing: 0.06em;
font-size: 0.65rem;
}
.nexus-pulse-bar-track {
flex: 1;
height: 6px;
background: rgba(59,26,92,0.5);
border-radius: 3px;
overflow: hidden;
}
.nexus-pulse-bar-fill {
height: 100%;
background: linear-gradient(90deg, var(--purple), var(--green));
border-radius: 3px;
transition: width 0.6s ease;
}
.nexus-pulse-layer-pct {
color: var(--text-bright);
font-size: 0.68rem;
min-width: 36px;
text-align: right;
}
.nexus-pulse-stats { font-size: 0.72rem; }
.nexus-pulse-stat {
display: flex;
justify-content: space-between;
padding: 0.2rem 0;
border-bottom: 1px solid rgba(59,26,92,0.3);
}
.nexus-pulse-stat-label { color: var(--text-dim); }
.nexus-pulse-stat-value { color: var(--text-bright); }
/* ── Session Analytics Panel ── */
.nexus-analytics-grid {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 0.4rem;
font-size: 0.72rem;
}
.nexus-analytics-item {
display: flex;
justify-content: space-between;
padding: 0.25rem 0.4rem;
background: rgba(255,255,255,0.02);
border-radius: 4px;
}
.nexus-analytics-label { color: var(--text-dim); }
.nexus-analytics-value { color: var(--text-bright); }
/* Memory sidebar */
.nexus-memory-hits { font-size: 0.78rem; }
.nexus-memory-label { color: var(--text-dim); font-size: 0.72rem; margin-bottom: 0.4rem; letter-spacing: 0.05em; }

View File

@@ -33,6 +33,7 @@ for _mod in [
"sentence_transformers",
"swarm",
"swarm.event_log",
"cv2", # OpenCV import can hang under pytest-xdist parallel workers
]:
sys.modules.setdefault(_mod, MagicMock())

View File

@@ -1,4 +1,4 @@
"""Tests for the Nexus conversational awareness routes."""
"""Tests for the Nexus v2 conversational awareness routes."""
from unittest.mock import patch
@@ -24,6 +24,41 @@ def test_nexus_page_contains_teach_form(client):
assert "/nexus/teach" in response.text
def test_nexus_page_contains_cognitive_panel(client):
"""Nexus v2 page must include the cognitive state panel."""
response = client.get("/nexus")
assert response.status_code == 200
assert "COGNITIVE STATE" in response.text
def test_nexus_page_contains_thought_stream(client):
"""Nexus v2 page must include the thought stream panel."""
response = client.get("/nexus")
assert response.status_code == 200
assert "THOUGHT STREAM" in response.text
def test_nexus_page_contains_sovereignty_pulse(client):
"""Nexus v2 page must include the sovereignty pulse panel."""
response = client.get("/nexus")
assert response.status_code == 200
assert "SOVEREIGNTY PULSE" in response.text
def test_nexus_page_contains_session_analytics(client):
"""Nexus v2 page must include the session analytics panel."""
response = client.get("/nexus")
assert response.status_code == 200
assert "SESSION ANALYTICS" in response.text
def test_nexus_page_contains_websocket_script(client):
"""Nexus v2 page must include the WebSocket connection script."""
response = client.get("/nexus")
assert response.status_code == 200
assert "/nexus/ws" in response.text
def test_nexus_chat_empty_message_returns_empty(client):
"""POST /nexus/chat with blank message returns empty response."""
response = client.post("/nexus/chat", data={"message": " "})
@@ -72,3 +107,17 @@ def test_nexus_clear_history(client):
response = client.request("DELETE", "/nexus/history")
assert response.status_code == 200
assert "cleared" in response.text.lower()
def test_nexus_introspect_api(client):
"""GET /nexus/introspect should return JSON introspection snapshot."""
response = client.get("/nexus/introspect")
assert response.status_code == 200
data = response.json()
assert "introspection" in data
assert "sovereignty_pulse" in data
assert "cognitive" in data["introspection"]
assert "recent_thoughts" in data["introspection"]
assert "analytics" in data["introspection"]
assert "overall_pct" in data["sovereignty_pulse"]
assert "health" in data["sovereignty_pulse"]

View File

@@ -0,0 +1,598 @@
"""Unit tests for models/budget.py — comprehensive coverage for budget management.
Tests budget allocation, tracking, limit enforcement, and edge cases including:
- Zero budget scenarios
- Over-budget handling
- Budget reset behavior
- In-memory fallback when DB is unavailable
"""
import threading
import time
from datetime import UTC, date, datetime, timedelta
from unittest.mock import patch
import pytest
from infrastructure.models.budget import (
BudgetTracker,
SpendRecord,
estimate_cost_usd,
get_budget_tracker,
)
pytestmark = pytest.mark.unit
# ── Test SpendRecord dataclass ────────────────────────────────────────────────
class TestSpendRecord:
"""Tests for the SpendRecord dataclass."""
def test_spend_record_creation(self):
"""Test creating a SpendRecord with all fields."""
ts = time.time()
record = SpendRecord(
ts=ts,
provider="anthropic",
model="claude-haiku-4-5",
tokens_in=100,
tokens_out=200,
cost_usd=0.001,
tier="cloud",
)
assert record.ts == ts
assert record.provider == "anthropic"
assert record.model == "claude-haiku-4-5"
assert record.tokens_in == 100
assert record.tokens_out == 200
assert record.cost_usd == 0.001
assert record.tier == "cloud"
def test_spend_record_with_zero_tokens(self):
"""Test SpendRecord with zero tokens."""
ts = time.time()
record = SpendRecord(ts=ts, provider="openai", model="gpt-4o", tokens_in=0, tokens_out=0, cost_usd=0.0, tier="cloud")
assert record.tokens_in == 0
assert record.tokens_out == 0
# ── Test estimate_cost_usd function ───────────────────────────────────────────
class TestEstimateCostUsd:
"""Tests for the estimate_cost_usd function."""
def test_haiku_cheaper_than_sonnet(self):
"""Haiku should be cheaper than Sonnet for same tokens."""
haiku_cost = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
sonnet_cost = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
assert haiku_cost < sonnet_cost
def test_zero_tokens_is_zero_cost(self):
"""Zero tokens should result in zero cost."""
assert estimate_cost_usd("gpt-4o", 0, 0) == 0.0
def test_only_input_tokens(self):
"""Cost calculation with only input tokens."""
cost = estimate_cost_usd("gpt-4o", 1000, 0)
expected = (1000 * 0.0025) / 1000.0 # $0.0025 per 1K input tokens
assert cost == pytest.approx(expected)
def test_only_output_tokens(self):
"""Cost calculation with only output tokens."""
cost = estimate_cost_usd("gpt-4o", 0, 1000)
expected = (1000 * 0.01) / 1000.0 # $0.01 per 1K output tokens
assert cost == pytest.approx(expected)
def test_unknown_model_uses_default(self):
"""Unknown model should use conservative default cost."""
cost = estimate_cost_usd("some-unknown-model-xyz", 1000, 1000)
assert cost > 0 # Uses conservative default, not zero
# Default is 0.003 input, 0.015 output per 1K
expected = (1000 * 0.003 + 1000 * 0.015) / 1000.0
assert cost == pytest.approx(expected)
def test_versioned_model_name_matches(self):
"""Versioned model names should match base model rates."""
cost1 = estimate_cost_usd("claude-haiku-4-5-20251001", 1000, 0)
cost2 = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
assert cost1 == cost2
def test_gpt4o_mini_cheaper_than_gpt4o(self):
"""GPT-4o mini should be cheaper than GPT-4o."""
mini = estimate_cost_usd("gpt-4o-mini", 1000, 1000)
full = estimate_cost_usd("gpt-4o", 1000, 1000)
assert mini < full
def test_opus_most_expensive_claude(self):
"""Opus should be the most expensive Claude model."""
opus = estimate_cost_usd("claude-opus-4-5", 1000, 1000)
sonnet = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
haiku = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
assert opus > sonnet > haiku
def test_grok_variants(self):
"""Test Grok model cost estimation."""
cost = estimate_cost_usd("grok-3", 1000, 1000)
assert cost > 0
cost_fast = estimate_cost_usd("grok-3-fast", 1000, 1000)
assert cost_fast > 0
def test_case_insensitive_matching(self):
"""Model name matching should be case insensitive."""
cost_lower = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
cost_upper = estimate_cost_usd("CLAUDE-HAIKU-4-5", 1000, 0)
cost_mixed = estimate_cost_usd("Claude-Haiku-4-5", 1000, 0)
assert cost_lower == cost_upper == cost_mixed
def test_returns_float(self):
"""Function should always return a float."""
assert isinstance(estimate_cost_usd("haiku", 100, 200), float)
assert isinstance(estimate_cost_usd("unknown-model", 100, 200), float)
assert isinstance(estimate_cost_usd("haiku", 0, 0), float)
# ── Test BudgetTracker initialization ─────────────────────────────────────────
class TestBudgetTrackerInit:
"""Tests for BudgetTracker initialization."""
def test_creates_with_memory_db(self):
"""Tracker should initialize with in-memory database."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker._db_ok is True
def test_in_memory_fallback_empty_on_creation(self):
"""In-memory fallback should start empty."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker._in_memory == []
def test_custom_db_path(self, tmp_path):
"""Tracker should use custom database path."""
db_file = tmp_path / "custom_budget.db"
tracker = BudgetTracker(db_path=str(db_file))
assert tracker._db_ok is True
assert tracker._db_path == str(db_file)
assert db_file.exists()
def test_db_path_directory_creation(self, tmp_path):
"""Tracker should create parent directories if needed."""
db_file = tmp_path / "nested" / "dirs" / "budget.db"
tracker = BudgetTracker(db_path=str(db_file))
assert tracker._db_ok is True
assert db_file.parent.exists()
def test_invalid_db_path_fallback(self):
"""Tracker should fallback to in-memory on invalid path."""
# Use a path that cannot be created (e.g., permission denied simulation)
tracker = BudgetTracker.__new__(BudgetTracker)
tracker._db_path = "/nonexistent/invalid/path/budget.db"
tracker._lock = threading.Lock()
tracker._in_memory = []
tracker._db_ok = False
# Should still work with in-memory fallback
cost = tracker.record_spend("test", "model", cost_usd=0.01)
assert cost == 0.01
# ── Test BudgetTracker record_spend ───────────────────────────────────────────
class TestBudgetTrackerRecordSpend:
"""Tests for recording spend events."""
def test_record_spend_returns_cost(self):
"""record_spend should return the calculated cost."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
assert cost > 0
def test_record_spend_explicit_cost(self):
"""record_spend should use explicit cost when provided."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "model", cost_usd=1.23)
assert cost == pytest.approx(1.23)
def test_record_spend_accumulates(self):
"""Multiple spend records should accumulate correctly."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
tracker.record_spend("openai", "gpt-4o", cost_usd=0.02)
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
def test_record_spend_with_tier_label(self):
"""record_spend should accept custom tier labels."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "haiku", tier="cloud_api")
assert cost >= 0
def test_record_spend_with_provider(self):
"""record_spend should track provider correctly."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
tracker.record_spend("anthropic", "claude-haiku", cost_usd=0.02)
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
def test_record_zero_cost(self):
"""Recording zero cost should work correctly."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("test", "model", cost_usd=0.0)
assert cost == 0.0
assert tracker.get_daily_spend() == 0.0
def test_record_negative_cost(self):
"""Recording negative cost (refund) should work."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("test", "model", cost_usd=-0.50)
assert cost == -0.50
assert tracker.get_daily_spend() == -0.50
# ── Test BudgetTracker daily/monthly spend queries ────────────────────────────
class TestBudgetTrackerSpendQueries:
"""Tests for daily and monthly spend queries."""
def test_monthly_spend_includes_daily(self):
"""Monthly spend should be >= daily spend."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=5.00)
assert tracker.get_monthly_spend() >= tracker.get_daily_spend()
def test_get_daily_spend_empty(self):
"""Daily spend should be zero when no records."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.get_daily_spend() == 0.0
def test_get_monthly_spend_empty(self):
"""Monthly spend should be zero when no records."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.get_monthly_spend() == 0.0
def test_daily_spend_isolation(self):
"""Daily spend should only include today's records, not old ones."""
tracker = BudgetTracker(db_path=":memory:")
# Force use of in-memory fallback
tracker._db_ok = False
# Add record for today
today_ts = datetime.combine(date.today(), datetime.min.time(), tzinfo=UTC).timestamp()
tracker._in_memory.append(
SpendRecord(today_ts + 3600, "test", "model", 0, 0, 1.0, "cloud")
)
# Add old record (2 days ago)
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "old_model", 0, 0, 2.0, "cloud")
)
# Daily should only include today's 1.0
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
# Monthly should include both (both are in current month)
assert tracker.get_monthly_spend() == pytest.approx(3.0, abs=1e-9)
# ── Test BudgetTracker cloud_allowed ──────────────────────────────────────────
class TestBudgetTrackerCloudAllowed:
"""Tests for cloud budget limit enforcement."""
def test_allowed_when_no_spend(self):
"""Cloud should be allowed when no spend recorded."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.cloud_allowed() is True
def test_blocked_when_daily_limit_exceeded(self):
"""Cloud should be blocked when daily limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
# With default daily limit of 5.0, 999 should block
assert tracker.cloud_allowed() is False
def test_allowed_when_daily_limit_zero(self):
"""Cloud should be allowed when daily limit is 0 (disabled)."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # disabled
mock_settings.tier_cloud_monthly_budget_usd = 0 # disabled
assert tracker.cloud_allowed() is True
def test_blocked_when_monthly_limit_exceeded(self):
"""Cloud should be blocked when monthly limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # daily disabled
mock_settings.tier_cloud_monthly_budget_usd = 10.0
assert tracker.cloud_allowed() is False
def test_allowed_at_exact_daily_limit(self):
"""Cloud should be allowed when exactly at daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 5.0
mock_settings.tier_cloud_monthly_budget_usd = 0
# Record exactly at limit
tracker.record_spend("test", "model", cost_usd=5.0)
# At exactly the limit, it should return False (blocked)
# because spend >= limit
assert tracker.cloud_allowed() is False
def test_allowed_below_daily_limit(self):
"""Cloud should be allowed when below daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 5.0
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=4.99)
assert tracker.cloud_allowed() is True
def test_zero_budget_blocks_all(self):
"""Zero budget should block all cloud usage."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0.01 # Very small budget
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=0.02)
# Over the tiny budget, should be blocked
assert tracker.cloud_allowed() is False
def test_both_limits_checked(self):
"""Both daily and monthly limits should be checked."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 100.0
mock_settings.tier_cloud_monthly_budget_usd = 10.0
tracker.record_spend("test", "model", cost_usd=15.0)
# Under daily but over monthly
assert tracker.cloud_allowed() is False
# ── Test BudgetTracker summary ────────────────────────────────────────────────
class TestBudgetTrackerSummary:
"""Tests for budget summary functionality."""
def test_summary_keys_present(self):
"""Summary should contain all expected keys."""
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert "daily_usd" in summary
assert "monthly_usd" in summary
assert "daily_limit_usd" in summary
assert "monthly_limit_usd" in summary
assert "daily_ok" in summary
assert "monthly_ok" in summary
def test_summary_daily_ok_true_on_empty(self):
"""daily_ok and monthly_ok should be True when empty."""
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert summary["daily_ok"] is True
assert summary["monthly_ok"] is True
def test_summary_daily_ok_false_when_exceeded(self):
"""daily_ok should be False when daily limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=999.0)
summary = tracker.get_summary()
assert summary["daily_ok"] is False
def test_summary_monthly_ok_false_when_exceeded(self):
"""monthly_ok should be False when monthly limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0
mock_settings.tier_cloud_monthly_budget_usd = 10.0
tracker.record_spend("openai", "gpt-4o", cost_usd=15.0)
summary = tracker.get_summary()
assert summary["monthly_ok"] is False
def test_summary_values_rounded(self):
"""Summary values should be rounded appropriately."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=1.123456789)
summary = tracker.get_summary()
# daily_usd should be rounded to 6 decimal places
assert summary["daily_usd"] == 1.123457
def test_summary_with_disabled_limits(self):
"""Summary should handle disabled limits (0)."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=100.0)
summary = tracker.get_summary()
assert summary["daily_limit_usd"] == 0
assert summary["monthly_limit_usd"] == 0
assert summary["daily_ok"] is True
assert summary["monthly_ok"] is True
# ── Test BudgetTracker in-memory fallback ─────────────────────────────────────
class TestBudgetTrackerInMemoryFallback:
"""Tests for in-memory fallback when DB is unavailable."""
def test_in_memory_records_persisted(self):
"""Records should be stored in memory when DB is unavailable."""
tracker = BudgetTracker(db_path=":memory:")
# Force DB to appear unavailable
tracker._db_ok = False
tracker.record_spend("test", "model", cost_usd=0.01)
assert len(tracker._in_memory) == 1
assert tracker._in_memory[0].cost_usd == 0.01
def test_in_memory_query_spend(self):
"""Query spend should work with in-memory fallback."""
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
tracker.record_spend("test", "model", cost_usd=0.01)
# Query should work from in-memory
since_ts = (datetime.now(UTC) - timedelta(hours=1)).timestamp()
result = tracker._query_spend(since_ts)
assert result == 0.01
def test_in_memory_older_records_not_counted(self):
"""In-memory records older than since_ts should not be counted."""
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "model", 0, 0, 1.0, "cloud")
)
# Query for records in last day
since_ts = (datetime.now(UTC) - timedelta(days=1)).timestamp()
result = tracker._query_spend(since_ts)
assert result == 0.0
# ── Test BudgetTracker thread safety ──────────────────────────────────────────
class TestBudgetTrackerThreadSafety:
"""Tests for thread-safe operations."""
def test_concurrent_record_spend(self):
"""Multiple threads should safely record spend concurrently."""
tracker = BudgetTracker(db_path=":memory:")
results = []
errors = []
def record_spends():
try:
for _ in range(10):
cost = tracker.record_spend("test", "model", cost_usd=0.01)
results.append(cost)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=record_spends) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(results) == 50
assert tracker.get_daily_spend() == pytest.approx(0.50, abs=1e-9)
# ── Test BudgetTracker edge cases ─────────────────────────────────────────────
class TestBudgetTrackerEdgeCases:
"""Tests for edge cases and boundary conditions."""
def test_very_small_cost(self):
"""Tracker should handle very small costs."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=0.000001)
assert tracker.get_daily_spend() == pytest.approx(0.000001, abs=1e-9)
def test_very_large_cost(self):
"""Tracker should handle very large costs."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=1_000_000.0)
assert tracker.get_daily_spend() == pytest.approx(1_000_000.0, abs=1e-9)
def test_many_records(self):
"""Tracker should handle many records efficiently."""
tracker = BudgetTracker(db_path=":memory:")
for i in range(100):
tracker.record_spend(f"provider_{i}", f"model_{i}", cost_usd=0.01)
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
def test_empty_provider_name(self):
"""Tracker should handle empty provider name."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("", "model", cost_usd=0.01)
assert cost == 0.01
def test_empty_model_name(self):
"""Tracker should handle empty model name."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("provider", "", cost_usd=0.01)
assert cost == 0.01
# ── Test get_budget_tracker singleton ─────────────────────────────────────────
class TestGetBudgetTrackerSingleton:
"""Tests for the module-level BudgetTracker singleton."""
def test_returns_budget_tracker(self):
"""Singleton should return a BudgetTracker instance."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker = get_budget_tracker()
assert isinstance(tracker, BudgetTracker)
def test_returns_same_instance(self):
"""Singleton should return the same instance."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
t1 = get_budget_tracker()
t2 = get_budget_tracker()
assert t1 is t2
def test_singleton_persists_state(self):
"""Singleton should persist state across calls."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker1 = get_budget_tracker()
# Record spend
tracker1.record_spend("test", "model", cost_usd=1.0)
# Get singleton again
tracker2 = get_budget_tracker()
assert tracker1 is tracker2
# ── Test BudgetTracker with mocked settings ───────────────────────────────────
class TestBudgetTrackerWithMockedSettings:
"""Tests using mocked settings for different scenarios."""
def test_high_daily_limit(self):
"""Test with high daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 1000.0
mock_settings.tier_cloud_monthly_budget_usd = 10000.0
tracker.record_spend("test", "model", cost_usd=500.0)
assert tracker.cloud_allowed() is True
def test_low_daily_limit(self):
"""Test with low daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 1.0
mock_settings.tier_cloud_monthly_budget_usd = 100.0
tracker.record_spend("test", "model", cost_usd=2.0)
assert tracker.cloud_allowed() is False
def test_only_monthly_limit_enabled(self):
"""Test with only monthly limit enabled."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # Disabled
mock_settings.tier_cloud_monthly_budget_usd = 50.0
tracker.record_spend("test", "model", cost_usd=30.0)
assert tracker.cloud_allowed() is True
tracker.record_spend("test", "model", cost_usd=25.0)
assert tracker.cloud_allowed() is False

View File

@@ -677,7 +677,7 @@ class TestVllmMlxProvider:
router.providers = [provider]
# Quota monitor downshifts to local (ACTIVE tier) — vllm_mlx should still be tried
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
with patch("infrastructure.router.health._quota_monitor") as mock_qm:
mock_qm.select_model.return_value = "qwen3:14b"
mock_qm.check.return_value = None
@@ -713,7 +713,7 @@ class TestMetabolicProtocol:
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [self._make_anthropic_provider()]
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
with patch("infrastructure.router.health._quota_monitor") as mock_qm:
# select_model returns cloud model → BURST tier
mock_qm.select_model.return_value = "claude-sonnet-4-6"
mock_qm.check.return_value = None
@@ -732,7 +732,7 @@ class TestMetabolicProtocol:
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [self._make_anthropic_provider()]
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
with patch("infrastructure.router.health._quota_monitor") as mock_qm:
# select_model returns local 14B → ACTIVE tier
mock_qm.select_model.return_value = "qwen3:14b"
mock_qm.check.return_value = None
@@ -750,7 +750,7 @@ class TestMetabolicProtocol:
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [self._make_anthropic_provider()]
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
with patch("infrastructure.router.health._quota_monitor") as mock_qm:
# select_model returns local 8B → RESTING tier
mock_qm.select_model.return_value = "qwen3:8b"
mock_qm.check.return_value = None
@@ -776,7 +776,7 @@ class TestMetabolicProtocol:
)
router.providers = [provider]
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
with patch("infrastructure.router.health._quota_monitor") as mock_qm:
mock_qm.select_model.return_value = "qwen3:8b" # RESTING tier
with patch.object(router, "_call_ollama") as mock_call:
@@ -793,7 +793,7 @@ class TestMetabolicProtocol:
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [self._make_anthropic_provider()]
with patch("infrastructure.router.cascade._quota_monitor", None):
with patch("infrastructure.router.health._quota_monitor", None):
with patch.object(router, "_call_anthropic") as mock_call:
mock_call.return_value = {"content": "Cloud response", "model": "claude-sonnet-4-6"}
result = await router.complete(
@@ -1200,7 +1200,7 @@ class TestCascadeTierFiltering:
async def test_frontier_required_uses_anthropic(self):
router = self._make_router()
with patch("infrastructure.router.cascade._quota_monitor", None):
with patch("infrastructure.router.health._quota_monitor", None):
with patch.object(router, "_call_anthropic") as mock_call:
mock_call.return_value = {
"content": "frontier response",
@@ -1464,7 +1464,7 @@ class TestTrySingleProvider:
router = self._router()
provider = self._provider(ptype="anthropic")
errors: list[str] = []
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
with patch("infrastructure.router.health._quota_monitor") as mock_qm:
mock_qm.select_model.return_value = "qwen3:14b" # non-cloud → ACTIVE tier
mock_qm.check.return_value = None
result = await router._try_single_provider(

View File

@@ -1,7 +1,6 @@
from unittest.mock import MagicMock, patch
from unittest.mock import patch
import pytest
from scripts.llm_triage import (
get_context,
get_prompt,
@@ -9,6 +8,7 @@ from scripts.llm_triage import (
run_triage,
)
# ── Mocks ──────────────────────────────────────────────────────────────────
@pytest.fixture
def mock_files(tmp_path):

313
tests/spark/test_engine.py Normal file
View File

@@ -0,0 +1,313 @@
"""Unit tests for spark/engine.py.
Covers the public API and internal helpers not exercised in other test files:
- get_memories / get_predictions query methods
- get_spark_engine singleton lifecycle and reset_spark_engine
- Module-level __getattr__ lazy access
- on_task_posted without candidate agents (no EIDOS call)
- on_task_completed with winning_bid parameter
- _maybe_consolidate early-return paths (<5 events, <3 outcomes)
- Disabled-engine guard for every mutating method
"""
from unittest.mock import MagicMock, patch
import pytest
@pytest.fixture(autouse=True)
def tmp_spark_db(tmp_path, monkeypatch):
"""Redirect all Spark SQLite writes to a temp directory."""
db_path = tmp_path / "spark.db"
monkeypatch.setattr("spark.memory.DB_PATH", db_path)
monkeypatch.setattr("spark.eidos.DB_PATH", db_path)
yield db_path
@pytest.fixture(autouse=True)
def reset_engine():
"""Ensure the engine singleton is cleared between tests."""
from spark.engine import reset_spark_engine
reset_spark_engine()
yield
reset_spark_engine()
# ── Query methods ─────────────────────────────────────────────────────────────
@pytest.mark.unit
class TestGetMemories:
def test_returns_empty_list_initially(self):
from spark.engine import SparkEngine
engine = SparkEngine(enabled=True)
assert engine.get_memories() == []
def test_returns_stored_memories(self):
from spark.engine import SparkEngine
from spark.memory import store_memory
store_memory("pattern", "agent-x", "Reliable performer", confidence=0.8)
engine = SparkEngine(enabled=True)
memories = engine.get_memories()
assert len(memories) == 1
assert memories[0].subject == "agent-x"
def test_limit_parameter(self):
from spark.engine import SparkEngine
from spark.memory import store_memory
for i in range(5):
store_memory("pattern", f"agent-{i}", f"Content {i}")
engine = SparkEngine(enabled=True)
assert len(engine.get_memories(limit=3)) == 3
def test_works_when_disabled(self):
"""get_memories is not gated by enabled — it always reads."""
from spark.engine import SparkEngine
from spark.memory import store_memory
store_memory("anomaly", "agent-z", "Bad actor")
engine = SparkEngine(enabled=False)
assert len(engine.get_memories()) == 1
@pytest.mark.unit
class TestGetPredictions:
def test_returns_empty_list_initially(self):
from spark.engine import SparkEngine
engine = SparkEngine(enabled=True)
assert engine.get_predictions() == []
def test_returns_predictions_after_task_posted(self):
from spark.engine import SparkEngine
engine = SparkEngine(enabled=True)
engine.on_task_posted("t1", "Deploy service", ["agent-a", "agent-b"])
preds = engine.get_predictions()
assert len(preds) >= 1
def test_limit_parameter(self):
from spark.engine import SparkEngine
engine = SparkEngine(enabled=True)
for i in range(5):
engine.on_task_posted(f"t{i}", f"Task {i}", ["agent-a"])
assert len(engine.get_predictions(limit=2)) == 2
# ── Singleton lifecycle ───────────────────────────────────────────────────────
@pytest.mark.unit
class TestGetSparkEngineSingleton:
def test_returns_spark_engine_instance(self):
from spark.engine import SparkEngine, get_spark_engine
engine = get_spark_engine()
assert isinstance(engine, SparkEngine)
def test_same_instance_on_repeated_calls(self):
from spark.engine import get_spark_engine
e1 = get_spark_engine()
e2 = get_spark_engine()
assert e1 is e2
def test_reset_clears_singleton(self):
from spark.engine import get_spark_engine, reset_spark_engine
e1 = get_spark_engine()
reset_spark_engine()
e2 = get_spark_engine()
assert e1 is not e2
def test_get_spark_engine_uses_settings(self, monkeypatch):
"""get_spark_engine respects spark_enabled from config."""
mock_settings = MagicMock()
mock_settings.spark_enabled = False
with patch("spark.engine.settings", mock_settings, create=True):
from spark.engine import reset_spark_engine
reset_spark_engine()
# Patch at import time by mocking the config module in engine
import spark.engine as engine_module
def patched_get():
global _spark_engine
try:
engine_module._spark_engine = engine_module.SparkEngine(
enabled=mock_settings.spark_enabled
)
except Exception:
engine_module._spark_engine = engine_module.SparkEngine(enabled=True)
return engine_module._spark_engine
reset_spark_engine()
def test_get_spark_engine_falls_back_on_settings_error(self, monkeypatch):
"""get_spark_engine creates enabled engine when settings import fails."""
from spark.engine import get_spark_engine, reset_spark_engine
reset_spark_engine()
# Patch config to raise on import
with patch.dict("sys.modules", {"config": None}):
# The engine catches the exception and defaults to enabled=True
engine = get_spark_engine()
# May or may not succeed depending on import cache, just ensure no crash
assert engine is not None
@pytest.mark.unit
class TestModuleLevelGetattr:
def test_spark_engine_attribute_returns_engine(self):
import spark.engine as engine_module
engine = engine_module.spark_engine
assert isinstance(engine, engine_module.SparkEngine)
def test_unknown_attribute_raises(self):
import spark.engine as engine_module
with pytest.raises(AttributeError):
_ = engine_module.nonexistent_attribute_xyz
# ── Event capture edge cases ──────────────────────────────────────────────────
@pytest.mark.unit
class TestOnTaskPostedWithoutCandidates:
def test_no_eidos_prediction_when_no_candidates(self):
"""When candidate_agents is empty, no EIDOS prediction should be stored."""
from spark.eidos import get_predictions
from spark.engine import SparkEngine
engine = SparkEngine(enabled=True)
eid = engine.on_task_posted("t1", "Background task", candidate_agents=[])
assert eid is not None
# No candidates → no prediction
preds = get_predictions(task_id="t1")
assert len(preds) == 0
def test_no_candidates_defaults_to_none(self):
"""on_task_posted with no candidate_agents kwarg still records event."""
from spark.engine import SparkEngine
from spark.memory import get_events
engine = SparkEngine(enabled=True)
eid = engine.on_task_posted("t2", "Orphan task")
assert eid is not None
events = get_events(task_id="t2")
assert len(events) == 1
@pytest.mark.unit
class TestOnTaskCompletedWithBid:
def test_winning_bid_stored_in_data(self):
"""winning_bid is serialised into the event data field."""
import json
from spark.engine import SparkEngine
from spark.memory import get_events
engine = SparkEngine(enabled=True)
engine.on_task_completed("t1", "agent-a", "All done", winning_bid=42)
events = get_events(event_type="task_completed")
assert len(events) == 1
data = json.loads(events[0].data)
assert data["winning_bid"] == 42
def test_without_winning_bid_is_none(self):
import json
from spark.engine import SparkEngine
from spark.memory import get_events
engine = SparkEngine(enabled=True)
engine.on_task_completed("t2", "agent-b", "Done")
events = get_events(event_type="task_completed")
data = json.loads(events[0].data)
assert data["winning_bid"] is None
@pytest.mark.unit
class TestDisabledEngineGuards:
"""Every method that mutates state should return None when disabled."""
def setup_method(self):
from spark.engine import SparkEngine
self.engine = SparkEngine(enabled=False)
def test_on_task_posted_disabled(self):
assert self.engine.on_task_posted("t", "x") is None
def test_on_bid_submitted_disabled(self):
assert self.engine.on_bid_submitted("t", "a", 10) is None
def test_on_task_assigned_disabled(self):
assert self.engine.on_task_assigned("t", "a") is None
def test_on_task_completed_disabled(self):
assert self.engine.on_task_completed("t", "a", "r") is None
def test_on_task_failed_disabled(self):
assert self.engine.on_task_failed("t", "a", "reason") is None
def test_on_agent_joined_disabled(self):
assert self.engine.on_agent_joined("a", "Echo") is None
def test_on_tool_executed_disabled(self):
assert self.engine.on_tool_executed("a", "git_push") is None
def test_on_creative_step_disabled(self):
assert self.engine.on_creative_step("p", "storyboard", "pixel") is None
def test_get_advisories_disabled_returns_empty(self):
assert self.engine.get_advisories() == []
# ── _maybe_consolidate early-return paths ─────────────────────────────────────
@pytest.mark.unit
class TestMaybeConsolidateEarlyReturns:
"""Test the guard conditions at the top of _maybe_consolidate."""
@patch("spark.engine.spark_memory")
def test_fewer_than_5_events_skips(self, mock_memory):
"""With fewer than 5 events, consolidation is skipped immediately."""
from spark.engine import SparkEngine
mock_memory.get_events.return_value = [MagicMock(event_type="task_completed")] * 3
engine = SparkEngine(enabled=True)
engine._maybe_consolidate("agent-x")
mock_memory.store_memory.assert_not_called()
@patch("spark.engine.spark_memory")
def test_fewer_than_3_outcomes_skips(self, mock_memory):
"""With 5+ events but fewer than 3 completion/failure outcomes, skip."""
from spark.engine import SparkEngine
# 6 events but only 2 are outcomes (completions + failures)
events = [MagicMock(event_type="task_posted")] * 4
events += [MagicMock(event_type="task_completed")] * 2
mock_memory.get_events.return_value = events
engine = SparkEngine(enabled=True)
engine._maybe_consolidate("agent-x")
mock_memory.store_memory.assert_not_called()
mock_memory.get_memories.assert_not_called()
@patch("spark.engine.spark_memory")
def test_neutral_success_rate_skips(self, mock_memory):
"""Success rate between 0.3 and 0.8 triggers no memory."""
from spark.engine import SparkEngine
events = [MagicMock(event_type="task_posted")] * 2
events += [MagicMock(event_type="task_completed")] * 2
events += [MagicMock(event_type="task_failed")] * 2
mock_memory.get_events.return_value = events
engine = SparkEngine(enabled=True)
engine._maybe_consolidate("agent-x")
mock_memory.store_memory.assert_not_called()

View File

View File

@@ -0,0 +1,199 @@
"""Tests for the Nexus Introspection Engine."""
from unittest.mock import MagicMock, patch
from timmy.nexus.introspection import (
CognitiveSummary,
IntrospectionSnapshot,
NexusIntrospector,
SessionAnalytics,
ThoughtSummary,
)
# ── Data model tests ─────────────────────────────────────────────────────────
class TestCognitiveSummary:
def test_defaults(self):
s = CognitiveSummary()
assert s.mood == "settled"
assert s.engagement == "idle"
assert s.focus_topic is None
def test_to_dict(self):
s = CognitiveSummary(mood="curious", engagement="deep", focus_topic="architecture")
d = s.to_dict()
assert d["mood"] == "curious"
assert d["engagement"] == "deep"
assert d["focus_topic"] == "architecture"
class TestThoughtSummary:
def test_to_dict(self):
t = ThoughtSummary(
id="t1", content="Hello world", seed_type="freeform", created_at="2026-01-01"
)
d = t.to_dict()
assert d["id"] == "t1"
assert d["seed_type"] == "freeform"
assert d["parent_id"] is None
class TestSessionAnalytics:
def test_defaults(self):
a = SessionAnalytics()
assert a.total_messages == 0
assert a.avg_response_length == 0.0
assert a.topics_discussed == []
class TestIntrospectionSnapshot:
def test_to_dict_structure(self):
snap = IntrospectionSnapshot()
d = snap.to_dict()
assert "cognitive" in d
assert "recent_thoughts" in d
assert "analytics" in d
assert "timestamp" in d
def test_to_dict_with_data(self):
snap = IntrospectionSnapshot(
cognitive=CognitiveSummary(mood="energized"),
recent_thoughts=[
ThoughtSummary(id="x", content="test", seed_type="s", created_at="now"),
],
)
d = snap.to_dict()
assert d["cognitive"]["mood"] == "energized"
assert len(d["recent_thoughts"]) == 1
# ── Introspector tests ───────────────────────────────────────────────────────
class TestNexusIntrospector:
def test_snapshot_empty_log(self):
intro = NexusIntrospector()
snap = intro.snapshot(conversation_log=[])
assert isinstance(snap, IntrospectionSnapshot)
assert snap.analytics.total_messages == 0
def test_snapshot_with_messages(self):
intro = NexusIntrospector()
log = [
{"role": "user", "content": "hello", "timestamp": "10:00:00"},
{"role": "assistant", "content": "Hi there!", "timestamp": "10:00:01"},
{"role": "user", "content": "architecture question", "timestamp": "10:00:02"},
]
snap = intro.snapshot(conversation_log=log)
assert snap.analytics.total_messages == 3
assert snap.analytics.user_messages == 2
assert snap.analytics.assistant_messages == 1
assert snap.analytics.avg_response_length > 0
def test_record_memory_hits(self):
intro = NexusIntrospector()
intro.record_memory_hits(3)
intro.record_memory_hits(2)
snap = intro.snapshot(
conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}]
)
assert snap.analytics.memory_hits_total == 5
def test_reset_clears_state(self):
intro = NexusIntrospector()
intro.record_memory_hits(10)
intro.reset()
snap = intro.snapshot(
conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}]
)
assert snap.analytics.memory_hits_total == 0
def test_topics_deduplication(self):
intro = NexusIntrospector()
log = [
{"role": "user", "content": "hello", "timestamp": "t"},
{"role": "user", "content": "hello", "timestamp": "t"},
{"role": "user", "content": "different topic", "timestamp": "t"},
]
snap = intro.snapshot(conversation_log=log)
assert len(snap.analytics.topics_discussed) == 2
def test_topics_capped_at_8(self):
intro = NexusIntrospector()
log = [{"role": "user", "content": f"topic {i}", "timestamp": "t"} for i in range(15)]
snap = intro.snapshot(conversation_log=log)
assert len(snap.analytics.topics_discussed) <= 8
def test_cognitive_read_fallback(self):
"""If cognitive read fails, snapshot still works with defaults."""
intro = NexusIntrospector()
# Patch the module-level import inside _read_cognitive
with patch.dict("sys.modules", {"timmy.cognitive_state": None}):
snap = intro.snapshot(conversation_log=[])
# Should not raise — fallback to default
assert snap.cognitive.mood == "settled"
def test_thoughts_read_fallback(self):
"""If thought read fails, snapshot still works with empty list."""
intro = NexusIntrospector()
with patch.dict("sys.modules", {"timmy.thinking": None}):
snap = intro.snapshot(conversation_log=[])
assert snap.recent_thoughts == []
def test_read_cognitive_from_tracker(self):
intro = NexusIntrospector()
mock_state = MagicMock()
mock_state.mood = "curious"
mock_state.engagement = "deep"
mock_state.focus_topic = "sovereignty"
mock_state.conversation_depth = 5
mock_state.active_commitments = ["build something"]
mock_state.last_initiative = "build something"
mock_tracker = MagicMock()
mock_tracker.get_state.return_value = mock_state
with patch("timmy.cognitive_state.cognitive_tracker", mock_tracker):
summary = intro._read_cognitive()
assert summary.mood == "curious"
assert summary.engagement == "deep"
assert summary.focus_topic == "sovereignty"
assert summary.conversation_depth == 5
def test_read_thoughts_from_engine(self):
intro = NexusIntrospector()
mock_thought = MagicMock()
mock_thought.id = "t1"
mock_thought.content = "Deep thought about sovereignty"
mock_thought.seed_type = "existential"
mock_thought.created_at = "2026-03-23T10:00:00"
mock_thought.parent_id = None
mock_engine = MagicMock()
mock_engine.get_recent_thoughts.return_value = [mock_thought]
with patch("timmy.thinking.thinking_engine", mock_engine):
thoughts = intro._read_thoughts(limit=5)
assert len(thoughts) == 1
assert thoughts[0].id == "t1"
assert thoughts[0].seed_type == "existential"
def test_read_thoughts_truncates_long_content(self):
intro = NexusIntrospector()
mock_thought = MagicMock()
mock_thought.id = "t2"
mock_thought.content = "x" * 300
mock_thought.seed_type = "freeform"
mock_thought.created_at = "2026-03-23"
mock_thought.parent_id = None
mock_engine = MagicMock()
mock_engine.get_recent_thoughts.return_value = [mock_thought]
with patch("timmy.thinking.thinking_engine", mock_engine):
thoughts = intro._read_thoughts(limit=5)
assert len(thoughts[0].content) <= 201 # 200 + "…"

View File

@@ -0,0 +1,144 @@
"""Tests for the Nexus Session Persistence store."""
import pytest
from timmy.nexus.persistence import MAX_MESSAGES, NexusStore
@pytest.fixture
def store(tmp_path):
"""Provide a NexusStore backed by a temp database."""
db = tmp_path / "test_nexus.db"
s = NexusStore(db_path=db)
yield s
s.close()
class TestNexusStoreBasic:
def test_append_and_retrieve(self, store):
store.append("user", "hello")
store.append("assistant", "hi there")
history = store.get_history()
assert len(history) == 2
assert history[0]["role"] == "user"
assert history[0]["content"] == "hello"
assert history[1]["role"] == "assistant"
def test_message_count(self, store):
assert store.message_count() == 0
store.append("user", "a")
store.append("user", "b")
assert store.message_count() == 2
def test_custom_timestamp(self, store):
store.append("user", "msg", timestamp="12:34:56")
history = store.get_history()
assert history[0]["timestamp"] == "12:34:56"
def test_clear_session(self, store):
store.append("user", "a")
store.append("assistant", "b")
deleted = store.clear()
assert deleted == 2
assert store.message_count() == 0
def test_clear_empty_session(self, store):
deleted = store.clear()
assert deleted == 0
def test_clear_all(self, store):
store.append("user", "a", session_tag="s1")
store.append("user", "b", session_tag="s2")
deleted = store.clear_all()
assert deleted == 2
assert store.message_count(session_tag="s1") == 0
assert store.message_count(session_tag="s2") == 0
class TestNexusStoreOrdering:
def test_chronological_order(self, store):
for i in range(5):
store.append("user", f"msg-{i}")
history = store.get_history()
contents = [m["content"] for m in history]
assert contents == ["msg-0", "msg-1", "msg-2", "msg-3", "msg-4"]
def test_limit_parameter(self, store):
for i in range(10):
store.append("user", f"msg-{i}")
history = store.get_history(limit=3)
assert len(history) == 3
# Should be the 3 most recent
assert history[0]["content"] == "msg-7"
assert history[2]["content"] == "msg-9"
class TestNexusStoreSessionTags:
def test_session_isolation(self, store):
store.append("user", "nexus-msg", session_tag="nexus")
store.append("user", "other-msg", session_tag="other")
nexus_history = store.get_history(session_tag="nexus")
other_history = store.get_history(session_tag="other")
assert len(nexus_history) == 1
assert len(other_history) == 1
assert nexus_history[0]["content"] == "nexus-msg"
def test_clear_only_affects_target_session(self, store):
store.append("user", "a", session_tag="s1")
store.append("user", "b", session_tag="s2")
store.clear(session_tag="s1")
assert store.message_count(session_tag="s1") == 0
assert store.message_count(session_tag="s2") == 1
class TestNexusStorePruning:
def test_prune_excess_messages(self, tmp_path):
"""Inserting beyond MAX_MESSAGES should prune oldest."""
db = tmp_path / "prune_test.db"
s = NexusStore(db_path=db)
# Insert MAX_MESSAGES + 5 to trigger pruning
for i in range(MAX_MESSAGES + 5):
s.append("user", f"msg-{i}")
assert s.message_count() == MAX_MESSAGES
# Get full history — oldest remaining should be msg-5
history = s.get_history(limit=MAX_MESSAGES)
assert history[0]["content"] == "msg-5"
s.close()
class TestNexusStoreReopen:
def test_data_survives_close_reopen(self, tmp_path):
"""Data persists across store instances (simulates process restart)."""
db = tmp_path / "reopen.db"
s1 = NexusStore(db_path=db)
s1.append("user", "persistent message")
s1.close()
s2 = NexusStore(db_path=db)
history = s2.get_history()
assert len(history) == 1
assert history[0]["content"] == "persistent message"
s2.close()
class TestNexusStoreReturnedId:
def test_append_returns_row_id(self, store):
id1 = store.append("user", "first")
id2 = store.append("user", "second")
assert isinstance(id1, int)
assert id2 > id1
class TestNexusStoreClose:
def test_close_is_idempotent(self, store):
store.close()
store.close() # Should not raise
def test_operations_after_close_reconnect(self, store):
"""After close, next operation should reconnect automatically."""
store.append("user", "before close")
store.close()
# Should auto-reconnect
store.append("user", "after close")
assert store.message_count() == 2

View File

@@ -0,0 +1,151 @@
"""Tests for the Sovereignty Pulse module."""
from unittest.mock import MagicMock, patch
from timmy.nexus.sovereignty_pulse import (
LayerPulse,
SovereigntyPulse,
SovereigntyPulseSnapshot,
_classify_health,
)
class TestClassifyHealth:
def test_sovereign(self):
assert _classify_health(95.0) == "sovereign"
assert _classify_health(80.0) == "sovereign"
def test_degraded(self):
assert _classify_health(79.9) == "degraded"
assert _classify_health(50.0) == "degraded"
def test_dependent(self):
assert _classify_health(49.9) == "dependent"
assert _classify_health(0.1) == "dependent"
def test_unknown(self):
assert _classify_health(0.0) == "unknown"
class TestLayerPulse:
def test_to_dict(self):
lp = LayerPulse(name="perception", sovereign_pct=75.0, cache_hits=15, model_calls=5)
d = lp.to_dict()
assert d["name"] == "perception"
assert d["sovereign_pct"] == 75.0
assert d["cache_hits"] == 15
class TestSovereigntyPulseSnapshot:
def test_defaults(self):
snap = SovereigntyPulseSnapshot()
assert snap.overall_pct == 0.0
assert snap.health == "unknown"
assert snap.layers == []
def test_to_dict_structure(self):
snap = SovereigntyPulseSnapshot(
overall_pct=85.0,
health="sovereign",
layers=[LayerPulse(name="perception", sovereign_pct=90.0)],
crystallizations_last_hour=3,
api_independence_pct=88.0,
total_events=42,
)
d = snap.to_dict()
assert d["overall_pct"] == 85.0
assert d["health"] == "sovereign"
assert len(d["layers"]) == 1
assert d["layers"][0]["name"] == "perception"
assert d["crystallizations_last_hour"] == 3
assert d["api_independence_pct"] == 88.0
assert d["total_events"] == 42
assert "timestamp" in d
class TestSovereigntyPulse:
def test_snapshot_graceful_degradation(self):
"""When metrics are unavailable, should return default snapshot."""
pulse = SovereigntyPulse()
with patch.object(
pulse,
"_read_metrics",
side_effect=ImportError("no metrics"),
):
snap = pulse.snapshot()
assert isinstance(snap, SovereigntyPulseSnapshot)
assert snap.health == "unknown"
def test_snapshot_with_metrics(self):
"""When metrics are available, should read and compute correctly."""
pulse = SovereigntyPulse()
mock_snapshot = {
"perception": {"cache_hits": 8, "model_calls": 2},
"decision": {"cache_hits": 6, "model_calls": 4},
"narration": {"cache_hits": 10, "model_calls": 0},
"crystallizations": 7,
"total_events": 100,
}
mock_store = MagicMock()
mock_store.get_snapshot.return_value = mock_snapshot
with patch(
"timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store
):
snap = pulse.snapshot()
# Perception: 8/10 = 80%, Decision: 6/10 = 60%, Narration: 10/10 = 100%
# Overall: (80 + 60 + 100) / 3 = 80.0
assert len(snap.layers) == 3
assert snap.layers[0].name == "perception"
assert snap.layers[0].sovereign_pct == 80.0
assert snap.layers[1].name == "decision"
assert snap.layers[1].sovereign_pct == 60.0
assert snap.layers[2].name == "narration"
assert snap.layers[2].sovereign_pct == 100.0
assert snap.overall_pct == 80.0
assert snap.health == "sovereign"
assert snap.crystallizations_last_hour == 7
assert snap.total_events == 100
def test_api_independence_calculation(self):
pulse = SovereigntyPulse()
mock_snapshot = {
"perception": {"cache_hits": 5, "model_calls": 5},
"decision": {"cache_hits": 5, "model_calls": 5},
"narration": {"cache_hits": 5, "model_calls": 5},
"crystallizations": 0,
"total_events": 0,
}
mock_store = MagicMock()
mock_store.get_snapshot.return_value = mock_snapshot
with patch(
"timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store
):
snap = pulse.snapshot()
# Total hits: 15, Total calls: 15, Total: 30
# Independence: 15/30 = 50%
assert snap.api_independence_pct == 50.0
def test_zero_events_no_division_error(self):
pulse = SovereigntyPulse()
mock_snapshot = {
"perception": {"cache_hits": 0, "model_calls": 0},
"decision": {"cache_hits": 0, "model_calls": 0},
"narration": {"cache_hits": 0, "model_calls": 0},
"crystallizations": 0,
"total_events": 0,
}
mock_store = MagicMock()
mock_store.get_snapshot.return_value = mock_snapshot
with patch(
"timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store
):
snap = pulse.snapshot()
assert snap.overall_pct == 0.0
assert snap.api_independence_pct == 0.0
assert snap.health == "unknown"

View File

@@ -0,0 +1,889 @@
"""Unit tests for timmy.memory.crud — Memory CRUD operations."""
import json
from datetime import UTC, datetime, timedelta
from unittest.mock import patch
import pytest
from timmy.memory.crud import (
_build_search_filters,
_fetch_memory_candidates,
_row_to_entry,
_score_and_filter,
delete_memory,
get_memory_context,
get_memory_stats,
prune_memories,
recall_last_reflection,
recall_personal_facts,
recall_personal_facts_with_ids,
search_memories,
store_last_reflection,
store_memory,
store_personal_fact,
update_personal_fact,
)
from timmy.memory.db import MemoryEntry, get_connection
@pytest.fixture
def tmp_db_path(tmp_path):
"""Provide a temporary database path."""
return tmp_path / "test_memory.db"
@pytest.fixture
def patched_db(tmp_db_path):
"""Patch DB_PATH to use temporary database for tests."""
with patch("timmy.memory.db.DB_PATH", tmp_db_path):
# Create schema
with get_connection():
pass # Schema is created by get_connection
yield tmp_db_path
@pytest.fixture
def sample_entry():
"""Create a sample MemoryEntry for testing."""
return MemoryEntry(
id="test-id-123",
content="Test memory content",
source="test",
context_type="conversation",
agent_id="agent-1",
task_id="task-1",
session_id="session-1",
metadata={"key": "value"},
embedding=[0.1, 0.2, 0.3],
timestamp=datetime.now(UTC).isoformat(),
)
@pytest.mark.unit
class TestStoreMemory:
"""Tests for store_memory function."""
def test_store_memory_basic(self, patched_db):
"""Test storing a basic memory entry."""
entry = store_memory(
content="Hello world",
source="test",
context_type="conversation",
)
assert isinstance(entry, MemoryEntry)
assert entry.content == "Hello world"
assert entry.source == "test"
assert entry.context_type == "conversation"
assert entry.id is not None
def test_store_memory_with_all_fields(self, patched_db):
"""Test storing memory with all optional fields."""
entry = store_memory(
content="Full test content",
source="user",
context_type="fact",
agent_id="agent-42",
task_id="task-99",
session_id="session-xyz",
metadata={"priority": "high", "tags": ["test"]},
compute_embedding=False,
)
assert entry.content == "Full test content"
assert entry.agent_id == "agent-42"
assert entry.task_id == "task-99"
assert entry.session_id == "session-xyz"
assert entry.metadata == {"priority": "high", "tags": ["test"]}
assert entry.embedding is None
def test_store_memory_with_embedding(self, patched_db):
"""Test storing memory with embedding computation."""
# TIMMY_SKIP_EMBEDDINGS=1 in conftest, so uses hash fallback
entry = store_memory(
content="Test content for embedding",
source="system",
compute_embedding=True,
)
assert entry.embedding is not None
assert isinstance(entry.embedding, list)
assert len(entry.embedding) == 128 # Hash embedding dimension
def test_store_memory_without_embedding(self, patched_db):
"""Test storing memory without embedding."""
entry = store_memory(
content="No embedding needed",
source="test",
compute_embedding=False,
)
assert entry.embedding is None
def test_store_memory_persists_to_db(self, patched_db):
"""Test that stored memory is actually written to database."""
entry = store_memory(
content="Persisted content",
source="db_test",
context_type="document",
)
# Verify directly in DB
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM memories WHERE id = ?",
(entry.id,),
).fetchone()
assert row is not None
assert row["content"] == "Persisted content"
assert row["memory_type"] == "document"
assert row["source"] == "db_test"
@pytest.mark.unit
class TestBuildSearchFilters:
"""Tests for _build_search_filters helper function."""
def test_no_filters(self):
"""Test building filters with no criteria."""
where_clause, params = _build_search_filters(None, None, None)
assert where_clause == ""
assert params == []
def test_context_type_filter(self):
"""Test filter by context_type only."""
where_clause, params = _build_search_filters("fact", None, None)
assert where_clause == "WHERE memory_type = ?"
assert params == ["fact"]
def test_agent_id_filter(self):
"""Test filter by agent_id only."""
where_clause, params = _build_search_filters(None, "agent-1", None)
assert where_clause == "WHERE agent_id = ?"
assert params == ["agent-1"]
def test_session_id_filter(self):
"""Test filter by session_id only."""
where_clause, params = _build_search_filters(None, None, "session-1")
assert where_clause == "WHERE session_id = ?"
assert params == ["session-1"]
def test_multiple_filters(self):
"""Test combining multiple filters."""
where_clause, params = _build_search_filters("conversation", "agent-1", "session-1")
assert where_clause == "WHERE memory_type = ? AND agent_id = ? AND session_id = ?"
assert params == ["conversation", "agent-1", "session-1"]
@pytest.mark.unit
class TestFetchMemoryCandidates:
"""Tests for _fetch_memory_candidates helper function."""
def test_fetch_with_data(self, patched_db):
"""Test fetching candidates when data exists."""
# Store some test data
for i in range(5):
store_memory(
content=f"Test content {i}",
source="fetch_test",
compute_embedding=False,
)
rows = _fetch_memory_candidates("", [], 10)
assert len(rows) == 5
def test_fetch_with_limit(self, patched_db):
"""Test that limit is respected."""
for i in range(10):
store_memory(
content=f"Test content {i}",
source="limit_test",
compute_embedding=False,
)
rows = _fetch_memory_candidates("", [], 3)
assert len(rows) == 3
def test_fetch_with_where_clause(self, patched_db):
"""Test fetching with WHERE clause."""
store_memory(
content="Fact content",
source="test",
context_type="fact",
compute_embedding=False,
)
store_memory(
content="Conversation content",
source="test",
context_type="conversation",
compute_embedding=False,
)
where_clause, params = _build_search_filters("fact", None, None)
rows = _fetch_memory_candidates(where_clause, params, 10)
assert len(rows) == 1
assert rows[0]["content"] == "Fact content"
@pytest.mark.unit
class TestRowToEntry:
"""Tests for _row_to_entry conversion function."""
def test_convert_basic_row(self):
"""Test converting a basic sqlite Row to MemoryEntry."""
# Create mock row
row_data = {
"id": "row-1",
"content": "Row content",
"memory_type": "conversation",
"source": "test",
"agent_id": "agent-1",
"task_id": "task-1",
"session_id": "session-1",
"metadata": None,
"embedding": None,
"created_at": "2026-03-23T10:00:00",
}
# Mock sqlite3.Row behavior
class MockRow:
def __getitem__(self, key):
return row_data.get(key)
entry = _row_to_entry(MockRow())
assert entry.id == "row-1"
assert entry.content == "Row content"
assert entry.context_type == "conversation" # memory_type -> context_type
assert entry.agent_id == "agent-1"
def test_convert_with_metadata(self):
"""Test converting row with JSON metadata."""
row_data = {
"id": "row-2",
"content": "Content with metadata",
"memory_type": "fact",
"source": "test",
"agent_id": None,
"task_id": None,
"session_id": None,
"metadata": '{"key": "value", "num": 42}',
"embedding": None,
"created_at": "2026-03-23T10:00:00",
}
class MockRow:
def __getitem__(self, key):
return row_data.get(key)
entry = _row_to_entry(MockRow())
assert entry.metadata == {"key": "value", "num": 42}
def test_convert_with_embedding(self):
"""Test converting row with JSON embedding."""
row_data = {
"id": "row-3",
"content": "Content with embedding",
"memory_type": "conversation",
"source": "test",
"agent_id": None,
"task_id": None,
"session_id": None,
"metadata": None,
"embedding": "[0.1, 0.2, 0.3]",
"created_at": "2026-03-23T10:00:00",
}
class MockRow:
def __getitem__(self, key):
return row_data.get(key)
entry = _row_to_entry(MockRow())
assert entry.embedding == [0.1, 0.2, 0.3]
@pytest.mark.unit
class TestScoreAndFilter:
"""Tests for _score_and_filter function."""
def test_empty_rows(self):
"""Test filtering empty rows list."""
results = _score_and_filter([], "query", [0.1, 0.2], 0.5)
assert results == []
def test_filter_by_min_relevance(self, patched_db):
"""Test filtering by minimum relevance score."""
# Create rows with embeddings
rows = []
for i in range(3):
entry = store_memory(
content=f"Content {i}",
source="test",
context_type="conversation",
compute_embedding=True, # Get actual embeddings
)
# Fetch row back
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM memories WHERE id = ?",
(entry.id,),
).fetchone()
rows.append(row)
query_embedding = [0.1] * 128
results = _score_and_filter(rows, "query", query_embedding, 0.99) # High threshold
# Should filter out all results with high threshold
assert len(results) <= len(rows)
def test_keyword_fallback_no_embedding(self, patched_db):
"""Test keyword overlap fallback when row has no embedding."""
# Store without embedding
entry = store_memory(
content="hello world test",
source="test",
compute_embedding=False,
)
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM memories WHERE id = ?",
(entry.id,),
).fetchone()
results = _score_and_filter([row], "hello world", [0.1] * 128, 0.0)
assert len(results) == 1
assert results[0].relevance_score is not None
@pytest.mark.unit
class TestSearchMemories:
"""Tests for search_memories function."""
def test_search_empty_db(self, patched_db):
"""Test searching empty database."""
results = search_memories("anything")
assert results == []
def test_search_returns_results(self, patched_db):
"""Test search returns matching results."""
store_memory(
content="Python programming is fun",
source="test",
context_type="conversation",
compute_embedding=True,
)
store_memory(
content="JavaScript is different",
source="test",
context_type="conversation",
compute_embedding=True,
)
results = search_memories("python programming", limit=5)
assert len(results) > 0
def test_search_with_filters(self, patched_db):
"""Test search with context_type filter."""
store_memory(
content="Fact about Python",
source="test",
context_type="fact",
compute_embedding=False,
)
store_memory(
content="Conversation about Python",
source="test",
context_type="conversation",
compute_embedding=False,
)
results = search_memories("python", context_type="fact")
assert len(results) == 1
assert results[0].context_type == "fact"
def test_search_with_agent_filter(self, patched_db):
"""Test search with agent_id filter."""
store_memory(
content="Agent 1 memory",
source="test",
agent_id="agent-1",
compute_embedding=False,
)
store_memory(
content="Agent 2 memory",
source="test",
agent_id="agent-2",
compute_embedding=False,
)
results = search_memories("memory", agent_id="agent-1")
assert len(results) == 1
assert results[0].agent_id == "agent-1"
def test_search_respects_limit(self, patched_db):
"""Test that limit parameter is respected."""
for i in range(10):
store_memory(
content=f"Memory {i}",
source="test",
compute_embedding=False,
)
results = search_memories("memory", limit=3)
assert len(results) <= 3
def test_search_with_min_relevance(self, patched_db):
"""Test search with min_relevance threshold."""
for i in range(5):
store_memory(
content=f"Unique content xyz{i}",
source="test",
compute_embedding=False,
)
# High threshold should return fewer results
results = search_memories("xyz", min_relevance=0.9, limit=10)
# With hash embeddings, high threshold may filter everything
assert isinstance(results, list)
@pytest.mark.unit
class TestDeleteMemory:
"""Tests for delete_memory function."""
def test_delete_existing_memory(self, patched_db):
"""Test deleting an existing memory."""
entry = store_memory(
content="To be deleted",
source="test",
compute_embedding=False,
)
result = delete_memory(entry.id)
assert result is True
# Verify deletion
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM memories WHERE id = ?",
(entry.id,),
).fetchone()
assert row is None
def test_delete_nonexistent_memory(self, patched_db):
"""Test deleting a non-existent memory."""
result = delete_memory("nonexistent-id-12345")
assert result is False
def test_delete_multiple_memories(self, patched_db):
"""Test deleting multiple memories one by one."""
entries = []
for i in range(3):
entry = store_memory(
content=f"Delete me {i}",
source="test",
compute_embedding=False,
)
entries.append(entry)
for entry in entries:
result = delete_memory(entry.id)
assert result is True
@pytest.mark.unit
class TestGetMemoryStats:
"""Tests for get_memory_stats function."""
def test_stats_empty_db(self, patched_db):
"""Test stats on empty database."""
stats = get_memory_stats()
assert stats["total_entries"] == 0
assert stats["by_type"] == {}
assert stats["with_embeddings"] == 0
def test_stats_with_entries(self, patched_db):
"""Test stats with various entries."""
store_memory(
content="Fact 1",
source="test",
context_type="fact",
compute_embedding=False,
)
store_memory(
content="Fact 2",
source="test",
context_type="fact",
compute_embedding=True,
)
store_memory(
content="Conversation 1",
source="test",
context_type="conversation",
compute_embedding=False,
)
stats = get_memory_stats()
assert stats["total_entries"] == 3
assert stats["by_type"]["fact"] == 2
assert stats["by_type"]["conversation"] == 1
assert stats["with_embeddings"] == 1
def test_stats_embedding_model_status(self, patched_db):
"""Test that stats reports embedding model status."""
stats = get_memory_stats()
assert "has_embedding_model" in stats
# In test mode, embeddings are skipped
assert stats["has_embedding_model"] is False
@pytest.mark.unit
class TestPruneMemories:
"""Tests for prune_memories function."""
def test_prune_empty_db(self, patched_db):
"""Test pruning empty database."""
deleted = prune_memories(older_than_days=30)
assert deleted == 0
def test_prune_old_memories(self, patched_db):
"""Test pruning old memories."""
# Store a memory
store_memory(
content="Recent memory",
source="test",
compute_embedding=False,
)
# Prune shouldn't delete recent memories
deleted = prune_memories(older_than_days=30)
assert deleted == 0
def test_prune_keeps_facts(self, patched_db):
"""Test that prune keeps fact-type memories when keep_facts=True."""
# Insert an old fact directly
old_time = (datetime.now(UTC) - timedelta(days=100)).isoformat()
with get_connection() as conn:
conn.execute(
"""
INSERT INTO memories
(id, content, memory_type, source, created_at)
VALUES (?, ?, 'fact', 'test', ?)
""",
("old-fact-1", "Old fact", old_time),
)
conn.commit()
# Prune with keep_facts=True should not delete facts
deleted = prune_memories(older_than_days=30, keep_facts=True)
assert deleted == 0
# Verify fact still exists
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM memories WHERE id = ?",
("old-fact-1",),
).fetchone()
assert row is not None
def test_prune_deletes_facts_when_keep_false(self, patched_db):
"""Test that prune deletes facts when keep_facts=False."""
# Insert an old fact directly
old_time = (datetime.now(UTC) - timedelta(days=100)).isoformat()
with get_connection() as conn:
conn.execute(
"""
INSERT INTO memories
(id, content, memory_type, source, created_at)
VALUES (?, ?, 'fact', 'test', ?)
""",
("old-fact-2", "Old fact to delete", old_time),
)
conn.commit()
# Prune with keep_facts=False should delete facts
deleted = prune_memories(older_than_days=30, keep_facts=False)
assert deleted == 1
def test_prune_non_fact_memories(self, patched_db):
"""Test pruning non-fact memories."""
# Insert old non-fact memory
old_time = (datetime.now(UTC) - timedelta(days=100)).isoformat()
with get_connection() as conn:
conn.execute(
"""
INSERT INTO memories
(id, content, memory_type, source, created_at)
VALUES (?, ?, 'conversation', 'test', ?)
""",
("old-conv-1", "Old conversation", old_time),
)
conn.commit()
deleted = prune_memories(older_than_days=30, keep_facts=True)
assert deleted == 1
@pytest.mark.unit
class TestGetMemoryContext:
"""Tests for get_memory_context function."""
def test_empty_context(self, patched_db):
"""Test getting context from empty database."""
context = get_memory_context("query")
assert context == ""
def test_context_with_results(self, patched_db):
"""Test getting context with matching results."""
store_memory(
content="Python is a programming language",
source="user",
compute_embedding=False,
)
context = get_memory_context("python programming")
# May or may not match depending on search
assert isinstance(context, str)
def test_context_respects_max_tokens(self, patched_db):
"""Test that max_tokens limits context size."""
for i in range(20):
store_memory(
content=f"This is memory number {i} with some content",
source="test",
compute_embedding=False,
)
context = get_memory_context("memory", max_tokens=100)
# Rough approximation: 100 tokens * 4 chars = 400 chars
assert len(context) <= 500 or context == ""
def test_context_formatting(self, patched_db):
"""Test that context is properly formatted."""
store_memory(
content="Important information",
source="system",
compute_embedding=False,
)
context = get_memory_context("important")
if context:
assert "Relevant context from memory:" in context or context == ""
@pytest.mark.unit
class TestPersonalFacts:
"""Tests for personal facts functions."""
def test_recall_personal_facts_empty(self, patched_db):
"""Test recalling facts when none exist."""
facts = recall_personal_facts()
assert facts == []
def test_store_and_recall_personal_fact(self, patched_db):
"""Test storing and recalling a personal fact."""
entry = store_personal_fact("User likes Python", agent_id="agent-1")
assert entry.context_type == "fact"
assert entry.content == "User likes Python"
assert entry.agent_id == "agent-1"
facts = recall_personal_facts()
assert "User likes Python" in facts
def test_recall_personal_facts_with_agent_filter(self, patched_db):
"""Test recalling facts filtered by agent_id."""
store_personal_fact("Fact for agent 1", agent_id="agent-1")
store_personal_fact("Fact for agent 2", agent_id="agent-2")
facts = recall_personal_facts(agent_id="agent-1")
assert len(facts) == 1
assert "Fact for agent 1" in facts
def test_recall_personal_facts_with_ids(self, patched_db):
"""Test recalling facts with their IDs."""
entry = store_personal_fact("Fact with ID", agent_id="agent-1")
facts_with_ids = recall_personal_facts_with_ids()
assert len(facts_with_ids) == 1
assert facts_with_ids[0]["id"] == entry.id
assert facts_with_ids[0]["content"] == "Fact with ID"
def test_update_personal_fact(self, patched_db):
"""Test updating a personal fact."""
entry = store_personal_fact("Original fact", agent_id="agent-1")
result = update_personal_fact(entry.id, "Updated fact")
assert result is True
facts = recall_personal_facts()
assert "Updated fact" in facts
assert "Original fact" not in facts
def test_update_nonexistent_fact(self, patched_db):
"""Test updating a non-existent fact."""
result = update_personal_fact("nonexistent-id", "New content")
assert result is False
def test_update_only_affects_facts(self, patched_db):
"""Test that update only affects fact-type memories."""
# Store a non-fact memory
entry = store_memory(
content="Not a fact",
source="test",
context_type="conversation",
compute_embedding=False,
)
# Try to update it as if it were a fact
result = update_personal_fact(entry.id, "Updated content")
assert result is False
@pytest.mark.unit
class TestReflections:
"""Tests for reflection storage and recall."""
def test_store_and_recall_reflection(self, patched_db):
"""Test storing and recalling a reflection."""
store_last_reflection("This is my reflection")
result = recall_last_reflection()
assert result == "This is my reflection"
def test_reflection_replaces_previous(self, patched_db):
"""Test that storing reflection replaces the previous one."""
store_last_reflection("First reflection")
store_last_reflection("Second reflection")
result = recall_last_reflection()
assert result == "Second reflection"
# Verify only one reflection in DB
with get_connection() as conn:
count = conn.execute(
"SELECT COUNT(*) FROM memories WHERE memory_type = 'reflection'"
).fetchone()[0]
assert count == 1
def test_store_empty_reflection(self, patched_db):
"""Test that empty reflection is not stored."""
store_last_reflection("")
store_last_reflection(" ")
store_last_reflection(None)
result = recall_last_reflection()
assert result is None
def test_recall_no_reflection(self, patched_db):
"""Test recalling when no reflection exists."""
result = recall_last_reflection()
assert result is None
def test_reflection_strips_whitespace(self, patched_db):
"""Test that reflection content is stripped."""
store_last_reflection(" Reflection with whitespace ")
result = recall_last_reflection()
assert result == "Reflection with whitespace"
@pytest.mark.unit
class TestEdgeCases:
"""Edge cases and error handling."""
def test_unicode_content(self, patched_db):
"""Test handling of unicode content."""
entry = store_memory(
content="Unicode: 你好世界 🎉 café naïve",
source="test",
compute_embedding=False,
)
assert entry.content == "Unicode: 你好世界 🎉 café naïve"
# Verify in DB
with get_connection() as conn:
row = conn.execute(
"SELECT content FROM memories WHERE id = ?",
(entry.id,),
).fetchone()
assert "你好世界" in row["content"]
def test_special_characters_in_content(self, patched_db):
"""Test handling of special characters."""
content = """<script>alert('xss')</script>
SQL: SELECT * FROM users
JSON: {"key": "value"}
Escapes: \\n \\t"""
entry = store_memory(
content=content,
source="test",
compute_embedding=False,
)
assert entry.content == content
def test_very_long_content(self, patched_db):
"""Test handling of very long content."""
long_content = "Word " * 1000
entry = store_memory(
content=long_content,
source="test",
compute_embedding=False,
)
assert len(entry.content) == len(long_content)
def test_metadata_with_nested_structure(self, patched_db):
"""Test storing metadata with nested structure."""
metadata = {
"level1": {
"level2": {
"level3": ["item1", "item2"]
}
},
"number": 42,
"boolean": True,
"null": None,
}
entry = store_memory(
content="Nested metadata test",
source="test",
metadata=metadata,
compute_embedding=False,
)
# Verify metadata round-trips correctly
with get_connection() as conn:
row = conn.execute(
"SELECT metadata FROM memories WHERE id = ?",
(entry.id,),
).fetchone()
loaded = json.loads(row["metadata"])
assert loaded["level1"]["level2"]["level3"] == ["item1", "item2"]
assert loaded["number"] == 42
assert loaded["boolean"] is True
def test_duplicate_keys_not_prevented(self, patched_db):
"""Test that duplicate content is allowed."""
entry1 = store_memory(
content="Duplicate content",
source="test",
compute_embedding=False,
)
entry2 = store_memory(
content="Duplicate content",
source="test",
compute_embedding=False,
)
assert entry1.id != entry2.id
stats = get_memory_stats()
assert stats["total_entries"] == 2

View File

@@ -0,0 +1,616 @@
"""Unit tests for timmy/tools/system_tools.py.
Covers: _safe_eval, calculator, consult_grok, web_fetch,
create_aider_tool (AiderTool), create_code_tools,
create_security_tools, create_devops_tools.
"""
from __future__ import annotations
import ast
import math
import subprocess
from unittest.mock import MagicMock, patch
import pytest
from timmy.tools.system_tools import (
_safe_eval,
calculator,
consult_grok,
create_aider_tool,
web_fetch,
)
pytestmark = pytest.mark.unit
# ── _safe_eval ────────────────────────────────────────────────────────────────
def _parse_eval(expr: str):
allowed = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed["math"] = math
allowed["abs"] = abs
allowed["round"] = round
allowed["min"] = min
allowed["max"] = max
tree = ast.parse(expr, mode="eval")
return _safe_eval(tree, allowed)
class TestSafeEval:
@pytest.mark.unit
def test_integer_constant(self):
assert _parse_eval("42") == 42
@pytest.mark.unit
def test_float_constant(self):
assert _parse_eval("3.14") == pytest.approx(3.14)
@pytest.mark.unit
def test_addition(self):
assert _parse_eval("1 + 2") == 3
@pytest.mark.unit
def test_subtraction(self):
assert _parse_eval("10 - 4") == 6
@pytest.mark.unit
def test_multiplication(self):
assert _parse_eval("3 * 7") == 21
@pytest.mark.unit
def test_division(self):
assert _parse_eval("10 / 4") == 2.5
@pytest.mark.unit
def test_floor_division(self):
assert _parse_eval("10 // 3") == 3
@pytest.mark.unit
def test_modulo(self):
assert _parse_eval("10 % 3") == 1
@pytest.mark.unit
def test_power(self):
assert _parse_eval("2 ** 8") == 256
@pytest.mark.unit
def test_unary_minus(self):
assert _parse_eval("-5") == -5
@pytest.mark.unit
def test_unary_plus(self):
assert _parse_eval("+5") == 5
@pytest.mark.unit
def test_math_attribute(self):
assert _parse_eval("math.pi") == pytest.approx(math.pi)
@pytest.mark.unit
def test_math_function_call(self):
assert _parse_eval("math.sqrt(16)") == pytest.approx(4.0)
@pytest.mark.unit
def test_allowed_name_abs(self):
assert _parse_eval("abs(-10)") == 10
@pytest.mark.unit
def test_allowed_name_round(self):
assert _parse_eval("round(3.7)") == 4
@pytest.mark.unit
def test_allowed_name_min(self):
assert _parse_eval("min(5, 2, 8)") == 2
@pytest.mark.unit
def test_allowed_name_max(self):
assert _parse_eval("max(5, 2, 8)") == 8
@pytest.mark.unit
def test_string_constant_rejected(self):
with pytest.raises(ValueError, match="Unsupported constant"):
_parse_eval("'hello'")
@pytest.mark.unit
def test_unknown_name_rejected(self):
with pytest.raises(ValueError, match="Unknown name"):
_parse_eval("xyz")
@pytest.mark.unit
def test_unsupported_binary_op(self):
with pytest.raises(ValueError, match="Unsupported"):
_parse_eval("3 & 5")
@pytest.mark.unit
def test_unsupported_unary_op(self):
with pytest.raises(ValueError, match="Unsupported"):
_parse_eval("~5")
@pytest.mark.unit
def test_attribute_on_non_math_rejected(self):
with pytest.raises(ValueError, match="Attribute access not allowed"):
_parse_eval("abs.__class__")
@pytest.mark.unit
def test_invalid_math_attr_rejected(self):
with pytest.raises(ValueError, match="Attribute access not allowed"):
_parse_eval("math.__builtins__")
@pytest.mark.unit
def test_unsupported_syntax_subscript(self):
with pytest.raises(ValueError, match="Unsupported syntax"):
_parse_eval("[1, 2][0]")
@pytest.mark.unit
def test_expression_wrapper(self):
"""ast.Expression node is unwrapped correctly."""
allowed = {"abs": abs}
tree = ast.parse("abs(-1)", mode="eval")
assert isinstance(tree, ast.Expression)
assert _safe_eval(tree, allowed) == 1
# ── calculator ────────────────────────────────────────────────────────────────
class TestCalculator:
@pytest.mark.unit
def test_basic_addition(self):
assert calculator("2 + 3") == "5"
@pytest.mark.unit
def test_multiplication(self):
assert calculator("6 * 7") == "42"
@pytest.mark.unit
def test_math_function(self):
assert calculator("math.sqrt(9)") == "3.0"
@pytest.mark.unit
def test_exponent(self):
assert calculator("2**10") == "1024"
@pytest.mark.unit
def test_error_on_syntax(self):
result = calculator("2 +")
assert "Error" in result
@pytest.mark.unit
def test_error_on_empty(self):
result = calculator("")
assert "Error" in result
@pytest.mark.unit
def test_error_on_division_by_zero(self):
result = calculator("1 / 0")
assert "Error" in result
@pytest.mark.unit
def test_error_message_contains_expression(self):
result = calculator("bad expr!!!")
assert "bad expr!!!" in result
@pytest.mark.unit
def test_injection_import(self):
result = calculator("__import__('os').system('echo hi')")
assert "Error" in result
@pytest.mark.unit
def test_injection_builtins(self):
result = calculator("__builtins__")
assert "Error" in result
@pytest.mark.unit
def test_string_literal_rejected(self):
result = calculator("'hello'")
assert "Error" in result
# ── consult_grok ──────────────────────────────────────────────────────────────
class TestConsultGrok:
@pytest.mark.unit
def test_grok_not_available(self):
with patch("timmy.backends.grok_available", return_value=False):
result = consult_grok("test query")
assert "not available" in result.lower()
@pytest.mark.unit
def test_grok_free_mode(self):
mock_backend = MagicMock()
mock_backend.run.return_value = MagicMock(content="Answer text")
mock_settings = MagicMock()
mock_settings.grok_free = True
with patch("timmy.backends.grok_available", return_value=True), \
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
patch("config.settings", mock_settings):
result = consult_grok("What is 2+2?")
assert result == "Answer text"
mock_backend.run.assert_called_once_with("What is 2+2?")
@pytest.mark.unit
def test_grok_spark_logging_failure_is_silent(self):
"""Spark logging failure should not crash consult_grok."""
mock_backend = MagicMock()
mock_backend.run.return_value = MagicMock(content="ok")
mock_settings = MagicMock()
mock_settings.grok_free = True
with patch("timmy.backends.grok_available", return_value=True), \
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
patch("config.settings", mock_settings), \
patch.dict("sys.modules", {"spark.engine": None}):
result = consult_grok("hello")
assert result == "ok"
@pytest.mark.unit
def test_grok_paid_mode_lightning_failure(self):
"""When Lightning invoice creation fails, return an error message."""
mock_backend = MagicMock()
mock_settings = MagicMock()
mock_settings.grok_free = False
mock_settings.grok_max_sats_per_query = 10
mock_settings.grok_sats_hard_cap = 100
mock_lightning = MagicMock()
mock_ln_backend = MagicMock()
mock_ln_backend.create_invoice.side_effect = OSError("LN down")
mock_lightning.get_backend.return_value = mock_ln_backend
with patch("timmy.backends.grok_available", return_value=True), \
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
patch("config.settings", mock_settings), \
patch.dict("sys.modules", {"lightning.factory": mock_lightning}):
result = consult_grok("expensive query")
assert "Error" in result
# ── web_fetch ─────────────────────────────────────────────────────────────────
class TestWebFetch:
@pytest.mark.unit
def test_invalid_scheme_ftp(self):
result = web_fetch("ftp://example.com")
assert "Error: invalid URL" in result
@pytest.mark.unit
def test_empty_url(self):
result = web_fetch("")
assert "Error: invalid URL" in result
@pytest.mark.unit
def test_no_scheme(self):
result = web_fetch("example.com/page")
assert "Error: invalid URL" in result
@pytest.mark.unit
def test_missing_requests_package(self):
with patch.dict("sys.modules", {"requests": None}):
result = web_fetch("https://example.com")
assert "requests" in result and "not installed" in result
@pytest.mark.unit
def test_missing_trafilatura_package(self):
mock_requests = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": None}):
result = web_fetch("https://example.com")
assert "trafilatura" in result and "not installed" in result
@pytest.mark.unit
def test_extraction_returns_none(self):
mock_requests = MagicMock()
mock_trafilatura = MagicMock()
mock_resp = MagicMock()
mock_resp.text = "<html></html>"
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = None
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
result = web_fetch("https://example.com")
assert "Error: could not extract" in result
@pytest.mark.unit
def test_truncation_applied(self):
mock_requests = MagicMock()
mock_trafilatura = MagicMock()
long_text = "x" * 10000
mock_resp = MagicMock()
mock_resp.text = "<html><body>" + long_text + "</body></html>"
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = long_text
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
result = web_fetch("https://example.com", max_tokens=100)
assert "[…truncated" in result
assert len(result) < 600
@pytest.mark.unit
def test_successful_fetch(self):
mock_requests = MagicMock()
mock_trafilatura = MagicMock()
mock_resp = MagicMock()
mock_resp.text = "<html><body><p>Hello</p></body></html>"
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = "Hello"
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
result = web_fetch("https://example.com")
assert result == "Hello"
@pytest.mark.unit
def test_timeout_error(self):
exc_mod = _make_request_exceptions()
mock_requests = MagicMock()
mock_requests.exceptions = exc_mod
mock_requests.get.side_effect = exc_mod.Timeout("timed out")
mock_trafilatura = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
result = web_fetch("https://example.com")
assert "timed out" in result
@pytest.mark.unit
def test_http_error_404(self):
exc_mod = _make_request_exceptions()
mock_requests = MagicMock()
mock_requests.exceptions = exc_mod
mock_response = MagicMock()
mock_response.status_code = 404
mock_requests.get.return_value.raise_for_status.side_effect = exc_mod.HTTPError(
response=mock_response
)
mock_trafilatura = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
result = web_fetch("https://example.com/nope")
assert "404" in result
@pytest.mark.unit
def test_request_exception(self):
exc_mod = _make_request_exceptions()
mock_requests = MagicMock()
mock_requests.exceptions = exc_mod
mock_requests.get.side_effect = exc_mod.RequestException("connection refused")
mock_trafilatura = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
result = web_fetch("https://example.com")
assert "Error" in result
@pytest.mark.unit
def test_http_url_accepted(self):
"""http:// URLs should pass the scheme check."""
mock_requests = MagicMock()
mock_trafilatura = MagicMock()
mock_resp = MagicMock()
mock_resp.text = "<html><body><p>content</p></body></html>"
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = "content"
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
result = web_fetch("http://example.com")
assert result == "content"
# ── create_aider_tool / AiderTool ─────────────────────────────────────────────
class TestAiderTool:
@pytest.mark.unit
def test_factory_returns_tool(self, tmp_path):
tool = create_aider_tool(tmp_path)
assert hasattr(tool, "run_aider")
@pytest.mark.unit
def test_base_dir_set(self, tmp_path):
tool = create_aider_tool(tmp_path)
assert tool.base_dir == tmp_path
@pytest.mark.unit
@patch("subprocess.run")
def test_run_aider_success(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(returncode=0, stdout="code generated")
tool = create_aider_tool(tmp_path)
result = tool.run_aider("add a function")
assert result == "code generated"
@pytest.mark.unit
@patch("subprocess.run")
def test_run_aider_success_empty_stdout(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(returncode=0, stdout="")
tool = create_aider_tool(tmp_path)
result = tool.run_aider("do something")
assert "successfully" in result.lower()
@pytest.mark.unit
@patch("subprocess.run")
def test_run_aider_failure(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(returncode=1, stderr="fatal error")
tool = create_aider_tool(tmp_path)
result = tool.run_aider("bad prompt")
assert "error" in result.lower()
assert "fatal error" in result
@pytest.mark.unit
@patch("subprocess.run")
def test_run_aider_not_installed(self, mock_run, tmp_path):
mock_run.side_effect = FileNotFoundError
tool = create_aider_tool(tmp_path)
result = tool.run_aider("task")
assert "not installed" in result.lower()
@pytest.mark.unit
@patch("subprocess.run")
def test_run_aider_timeout(self, mock_run, tmp_path):
mock_run.side_effect = subprocess.TimeoutExpired(cmd="aider", timeout=120)
tool = create_aider_tool(tmp_path)
result = tool.run_aider("long task")
assert "timed out" in result.lower()
@pytest.mark.unit
@patch("subprocess.run")
def test_run_aider_os_error(self, mock_run, tmp_path):
mock_run.side_effect = OSError("permission denied")
tool = create_aider_tool(tmp_path)
result = tool.run_aider("task")
assert "error" in result.lower()
@pytest.mark.unit
@patch("subprocess.run")
def test_custom_model_passed_to_subprocess(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(returncode=0, stdout="ok")
tool = create_aider_tool(tmp_path)
tool.run_aider("task", model="mistral:7b")
call_args = mock_run.call_args[0][0]
assert "ollama/mistral:7b" in call_args
@pytest.mark.unit
@patch("subprocess.run")
def test_default_model_is_passed(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(returncode=0, stdout="ok")
tool = create_aider_tool(tmp_path)
tool.run_aider("task")
call_args = mock_run.call_args[0][0]
assert "--model" in call_args
@pytest.mark.unit
@patch("subprocess.run")
def test_no_git_flag_present(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(returncode=0, stdout="ok")
tool = create_aider_tool(tmp_path)
tool.run_aider("task")
call_args = mock_run.call_args[0][0]
assert "--no-git" in call_args
@pytest.mark.unit
@patch("subprocess.run")
def test_cwd_is_base_dir(self, mock_run, tmp_path):
mock_run.return_value = MagicMock(returncode=0, stdout="ok")
tool = create_aider_tool(tmp_path)
tool.run_aider("task")
assert mock_run.call_args[1]["cwd"] == str(tmp_path)
# ── create_code_tools / create_security_tools / create_devops_tools ───────────
class TestToolkitFactories:
@pytest.mark.unit
def test_create_code_tools_requires_agno(self):
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
if _AGNO_TOOLS_AVAILABLE:
pytest.skip("Agno is available — ImportError path not testable")
from timmy.tools.system_tools import create_code_tools
with pytest.raises(ImportError):
create_code_tools()
@pytest.mark.unit
def test_create_security_tools_requires_agno(self):
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
if _AGNO_TOOLS_AVAILABLE:
pytest.skip("Agno is available — ImportError path not testable")
from timmy.tools.system_tools import create_security_tools
with pytest.raises(ImportError):
create_security_tools()
@pytest.mark.unit
def test_create_devops_tools_requires_agno(self):
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
if _AGNO_TOOLS_AVAILABLE:
pytest.skip("Agno is available — ImportError path not testable")
from timmy.tools.system_tools import create_devops_tools
with pytest.raises(ImportError):
create_devops_tools()
@pytest.mark.unit
def test_create_code_tools_with_agno(self, tmp_path):
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
if not _AGNO_TOOLS_AVAILABLE:
pytest.skip("Agno not available")
from timmy.tools.system_tools import create_code_tools
mock_settings = MagicMock()
mock_settings.repo_root = str(tmp_path)
with patch("config.settings", mock_settings):
toolkit = create_code_tools(base_dir=tmp_path)
assert toolkit is not None
assert toolkit.name == "code"
@pytest.mark.unit
def test_create_security_tools_with_agno(self, tmp_path):
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
if not _AGNO_TOOLS_AVAILABLE:
pytest.skip("Agno not available")
from timmy.tools.system_tools import create_security_tools
mock_settings = MagicMock()
mock_settings.repo_root = str(tmp_path)
with patch("config.settings", mock_settings):
toolkit = create_security_tools(base_dir=tmp_path)
assert toolkit is not None
assert toolkit.name == "security"
@pytest.mark.unit
def test_create_devops_tools_with_agno(self, tmp_path):
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
if not _AGNO_TOOLS_AVAILABLE:
pytest.skip("Agno not available")
from timmy.tools.system_tools import create_devops_tools
mock_settings = MagicMock()
mock_settings.repo_root = str(tmp_path)
with patch("config.settings", mock_settings):
toolkit = create_devops_tools(base_dir=tmp_path)
assert toolkit is not None
assert toolkit.name == "devops"
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_request_exceptions():
"""Create a mock requests.exceptions module with real exception classes."""
class Timeout(Exception):
pass
class HTTPError(Exception):
def __init__(self, *args, response=None, **kwargs):
super().__init__(*args, **kwargs)
self.response = response
class RequestException(Exception):
pass
mod = MagicMock()
mod.Timeout = Timeout
mod.HTTPError = HTTPError
mod.RequestException = RequestException
return mod