"""System introspection tools for Timmy to query his own environment. This provides true sovereignty - Timmy introspects his environment rather than being told about it in the system prompt. """ import logging import platform import sqlite3 import sys from contextlib import closing from datetime import UTC, datetime from pathlib import Path from typing import Any import httpx logger = logging.getLogger(__name__) def get_system_info() -> dict[str, Any]: """Introspect the runtime environment to discover system information. Returns: Dict containing: - python_version: Python version - platform: OS platform - model: Current Ollama model (queried from API) - model_backend: Configured backend (ollama/grok/claude) - ollama_url: Ollama host URL - repo_root: Repository root path - grok_enabled: Whether GROK is enabled - spark_enabled: Whether Spark is enabled - memory_vault_exists: Whether memory vault is initialized """ from config import settings info = { "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", "platform": platform.system(), "model_backend": settings.timmy_model_backend, "ollama_url": settings.ollama_url, "repo_root": settings.repo_root, "grok_enabled": settings.grok_enabled, "spark_enabled": settings.spark_enabled, } # Query Ollama for current model model_name = _get_ollama_model() info["model"] = model_name # Check if memory vault exists vault_path = Path(settings.repo_root) / "memory" / "self" info["memory_vault_exists"] = vault_path.exists() return info def _get_ollama_model() -> str: """Query Ollama API to get the actual running model. Strategy: 1. /api/ps — models currently loaded in memory (most accurate) 2. /api/tags — all installed models (fallback) Both use exact name match to avoid prefix collisions (e.g. 'qwen3:8b' vs 'qwen3:30b'). """ from config import settings configured = settings.ollama_model try: # First: check actually loaded models via /api/ps response = httpx.get(f"{settings.ollama_url}/api/ps", timeout=5) if response.status_code == 200: running = response.json().get("models", []) for model in running: name = model.get("name", "") if name == configured or name == f"{configured}:latest": return name # Configured model not loaded — return first running model # so Timmy reports what's *actually* serving his requests if running: return running[0].get("name", configured) # Second: check installed models via /api/tags (exact match) response = httpx.get(f"{settings.ollama_url}/api/tags", timeout=5) if response.status_code == 200: installed = response.json().get("models", []) for model in installed: name = model.get("name", "") if name == configured or name == f"{configured}:latest": return configured except Exception as exc: logger.debug("Model validation failed: %s", exc) pass # Fallback to configured model return configured def check_ollama_health() -> dict[str, Any]: """Check if Ollama is accessible and healthy. Returns: Dict with status, model, and available models """ from config import settings result = { "accessible": False, "model": settings.ollama_model, "available_models": [], "error": None, } try: # Check tags endpoint response = httpx.get(f"{settings.ollama_url}/api/tags", timeout=5) if response.status_code == 200: result["accessible"] = True models = response.json().get("models", []) result["available_models"] = [m.get("name", "") for m in models] except Exception as e: logger.exception("Ollama health check failed") result["error"] = str(e) return result def _hot_memory_info(repo_root: Path) -> dict[str, Any]: """Tier 1: Hot memory (MEMORY.md) status.""" memory_md = repo_root / "MEMORY.md" tier1_exists = memory_md.exists() tier1_content = "" if tier1_exists: tier1_content = memory_md.read_text()[:500] info: dict[str, Any] = { "exists": tier1_exists, "path": str(memory_md), "preview": " ".join(tier1_content[:200].split()) if tier1_content else None, } if tier1_exists: lines = memory_md.read_text().splitlines() info["line_count"] = len(lines) info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")] return info def _vault_info(repo_root: Path) -> dict[str, Any]: """Tier 2: Vault (memory/ directory tree) status.""" vault_path = repo_root / "memory" / "self" tier2_exists = vault_path.exists() tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()] if tier2_exists else [] vault_root = repo_root / "memory" info: dict[str, Any] = { "exists": tier2_exists, "path": str(vault_path), "file_count": len(tier2_files), "files": tier2_files[:10], } if vault_root.exists(): info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()] info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md")) return info def _semantic_memory_info(repo_root: Path) -> dict[str, Any]: """Tier 3: Semantic memory (vector DB) status.""" info: dict[str, Any] = {"available": False} try: sem_db = repo_root / "data" / "memory.db" if sem_db.exists(): with closing(sqlite3.connect(str(sem_db))) as conn: row = conn.execute( "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='chunks'" ).fetchone() if row and row[0]: count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone() info["available"] = True info["vector_count"] = count[0] if count else 0 except Exception as exc: logger.debug("Memory status query failed: %s", exc) return info def _journal_info(repo_root: Path) -> dict[str, Any]: """Self-coding journal statistics.""" info: dict[str, Any] = {"available": False} try: journal_db = repo_root / "data" / "self_coding.db" if journal_db.exists(): with closing(sqlite3.connect(str(journal_db))) as conn: conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT outcome, COUNT(*) as cnt FROM modification_journal GROUP BY outcome" ).fetchall() if rows: counts = {r["outcome"]: r["cnt"] for r in rows} total = sum(counts.values()) info = { "available": True, "total_attempts": total, "successes": counts.get("success", 0), "failures": counts.get("failure", 0), "success_rate": round(counts.get("success", 0) / total, 2) if total else 0, } except Exception as exc: logger.debug("Journal stats query failed: %s", exc) return info def get_memory_status() -> dict[str, Any]: """Get the status of Timmy's memory system. Returns: Dict with memory tier information """ from config import settings repo_root = Path(settings.repo_root) return { "tier1_hot_memory": _hot_memory_info(repo_root), "tier2_vault": _vault_info(repo_root), "tier3_semantic": _semantic_memory_info(repo_root), "self_coding_journal": _journal_info(repo_root), } def get_task_queue_status() -> dict[str, Any]: """Get current task queue status. The swarm task queue was removed. This returns a stub indicating the subsystem is not available. """ return { "counts": {}, "total": 0, "current_task": None, "note": "Task queue not available (swarm module removed)", } def get_agent_roster() -> dict[str, Any]: """Get the agent roster from agents.yaml config. Returns: Dict with agent list and summary. """ try: from timmy.agents.loader import list_agents agents = list_agents() roster = [ { "id": a["id"], "name": a["name"], "status": a.get("status", "available"), "capabilities": ", ".join(a.get("tools", [])), "role": a.get("role", ""), "model": a.get("model", ""), } for a in agents ] return { "agents": roster, "total": len(roster), } except Exception as exc: logger.debug("Agent roster unavailable: %s", exc) return {"error": str(exc)} def get_live_system_status() -> dict[str, Any]: """Comprehensive live system status — Timmy's primary introspection tool. Combines system info, task queue, agent roster, and memory status into a single snapshot. Each subsystem degrades gracefully. Returns: Dict with system, task_queue, agents, memory, and uptime sections. """ result: dict[str, Any] = {} # System info try: result["system"] = get_system_info() except Exception as exc: logger.exception("Failed to get system info") result["system"] = {"error": str(exc)} # Task queue result["task_queue"] = get_task_queue_status() # Agent roster result["agents"] = get_agent_roster() # Memory status try: result["memory"] = get_memory_status() except Exception as exc: logger.exception("Failed to get memory status") result["memory"] = {"error": str(exc)} # Uptime try: from config import APP_START_TIME uptime = (datetime.now(UTC) - APP_START_TIME).total_seconds() result["uptime_seconds"] = int(uptime) except Exception as exc: logger.debug("Uptime calculation failed: %s", exc) result["uptime_seconds"] = None # Discord status try: from integrations.chat_bridge.vendors.discord import discord_bot result["discord"] = {"state": discord_bot.state.name} except Exception as exc: logger.debug("Discord status check failed: %s", exc) result["discord"] = {"state": "unknown"} result["timestamp"] = datetime.now(UTC).isoformat() return result def _build_pytest_cmd(venv_python: Path, scope: str) -> list[str]: """Build the pytest command list for the given scope.""" cmd = [str(venv_python), "-m", "pytest", "-x", "-q", "--tb=short", "--timeout=30"] if scope == "fast": cmd.extend( [ "--ignore=tests/functional", "--ignore=tests/e2e", "--ignore=tests/integrations", "tests/", ] ) elif scope == "full": cmd.append("tests/") else: cmd.append(scope) return cmd def _parse_pytest_output(output: str) -> dict[str, int]: """Extract passed/failed/error counts from pytest output.""" import re passed = failed = errors = 0 for line in output.splitlines(): if "passed" in line or "failed" in line or "error" in line: nums = re.findall(r"(\d+) (passed|failed|error)", line) for count, kind in nums: if kind == "passed": passed = int(count) elif kind == "failed": failed = int(count) elif kind == "error": errors = int(count) return {"passed": passed, "failed": failed, "errors": errors} def run_self_tests(scope: str = "fast", _repo_root: str | None = None) -> dict[str, Any]: """Run Timmy's own test suite and report results. A sovereign agent verifies his own integrity. This runs pytest on the codebase and returns a structured summary. Args: scope: Test scope — "fast" (unit tests only, ~30s timeout), "full" (all tests), or a specific path like "tests/timmy/" _repo_root: Optional repo root for testing (overrides settings) Returns: Dict with passed, failed, errors, total counts and summary text. """ import subprocess from config import settings repo = _repo_root if _repo_root else settings.repo_root venv_python = Path(repo) / ".venv" / "bin" / "python" if not venv_python.exists(): return {"success": False, "error": f"No venv found at {venv_python}"} cmd = _build_pytest_cmd(venv_python, scope) try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=120, cwd=repo) output = result.stdout + result.stderr counts = _parse_pytest_output(output) return { "success": result.returncode == 0, **counts, "total": counts["passed"] + counts["failed"] + counts["errors"], "return_code": result.returncode, "summary": output[-2000:] if len(output) > 2000 else output, } except subprocess.TimeoutExpired: return {"success": False, "error": "Test run timed out (120s limit)"} except Exception as exc: logger.exception("Self-test run failed") return {"success": False, "error": str(exc)}