This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/timmy/tools_intro/__init__.py
Kimi Agent fdc5b861ca fix: replace 59 bare except clauses with proper logging (#25)
All `except Exception:` now catch as `except Exception as exc:` with
appropriate logging (warning for critical paths, debug for graceful degradation).

Added logger setup to 4 files that lacked it:
- src/timmy/memory/vector_store.py
- src/dashboard/middleware/csrf.py
- src/dashboard/middleware/security_headers.py
- src/spark/memory.py

31 files changed across timmy core, dashboard, infrastructure, integrations.
Zero bare excepts remain. 1340 tests passing.
2026-03-14 19:07:14 -04:00

324 lines
10 KiB
Python

"""System introspection tools for Timmy to query his own environment.
This provides true sovereignty - Timmy introspects his environment rather than
being told about it in the system prompt.
"""
import logging
import platform
import sys
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import httpx
logger = logging.getLogger(__name__)
def get_system_info() -> dict[str, Any]:
"""Introspect the runtime environment to discover system information.
Returns:
Dict containing:
- python_version: Python version
- platform: OS platform
- model: Current Ollama model (queried from API)
- model_backend: Configured backend (ollama/airllm/grok)
- ollama_url: Ollama host URL
- repo_root: Repository root path
- grok_enabled: Whether GROK is enabled
- spark_enabled: Whether Spark is enabled
- memory_vault_exists: Whether memory vault is initialized
"""
from config import settings
info = {
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"platform": platform.system(),
"model_backend": settings.timmy_model_backend,
"ollama_url": settings.ollama_url,
"repo_root": settings.repo_root,
"grok_enabled": settings.grok_enabled,
"spark_enabled": settings.spark_enabled,
}
# Query Ollama for current model
model_name = _get_ollama_model()
info["model"] = model_name
# Check if memory vault exists
vault_path = Path(settings.repo_root) / "memory" / "self"
info["memory_vault_exists"] = vault_path.exists()
return info
def _get_ollama_model() -> str:
"""Query Ollama API to get the actual running model.
Strategy:
1. /api/ps — models currently loaded in memory (most accurate)
2. /api/tags — all installed models (fallback)
Both use exact name match to avoid prefix collisions
(e.g. 'qwen3:30b' vs 'qwen3.5:latest').
"""
from config import settings
configured = settings.ollama_model
try:
# First: check actually loaded models via /api/ps
response = httpx.get(f"{settings.ollama_url}/api/ps", timeout=5)
if response.status_code == 200:
running = response.json().get("models", [])
for model in running:
name = model.get("name", "")
if name == configured or name == f"{configured}:latest":
return name
# Configured model not loaded — return first running model
# so Timmy reports what's *actually* serving his requests
if running:
return running[0].get("name", configured)
# Second: check installed models via /api/tags (exact match)
response = httpx.get(f"{settings.ollama_url}/api/tags", timeout=5)
if response.status_code == 200:
installed = response.json().get("models", [])
for model in installed:
name = model.get("name", "")
if name == configured or name == f"{configured}:latest":
return configured
except Exception as exc:
logger.debug("Model validation failed: %s", exc)
pass
# Fallback to configured model
return configured
def check_ollama_health() -> dict[str, Any]:
"""Check if Ollama is accessible and healthy.
Returns:
Dict with status, model, and available models
"""
from config import settings
result = {
"accessible": False,
"model": settings.ollama_model,
"available_models": [],
"error": None,
}
try:
# Check tags endpoint
response = httpx.get(f"{settings.ollama_url}/api/tags", timeout=5)
if response.status_code == 200:
result["accessible"] = True
models = response.json().get("models", [])
result["available_models"] = [m.get("name", "") for m in models]
except Exception as e:
result["error"] = str(e)
return result
def get_memory_status() -> dict[str, Any]:
"""Get the status of Timmy's memory system.
Returns:
Dict with memory tier information
"""
from config import settings
repo_root = Path(settings.repo_root)
# Check tier 1: Hot memory
memory_md = repo_root / "MEMORY.md"
tier1_exists = memory_md.exists()
tier1_content = ""
if tier1_exists:
tier1_content = memory_md.read_text()[:500] # First 500 chars
# Check tier 2: Vault
vault_path = repo_root / "memory" / "self"
tier2_exists = vault_path.exists()
tier2_files = []
if tier2_exists:
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()]
tier1_info: dict[str, Any] = {
"exists": tier1_exists,
"path": str(memory_md),
"preview": " ".join(tier1_content[:200].split()) if tier1_content else None,
}
if tier1_exists:
lines = memory_md.read_text().splitlines()
tier1_info["line_count"] = len(lines)
tier1_info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
# Vault — scan all subdirs under memory/
vault_root = repo_root / "memory"
vault_info: dict[str, Any] = {
"exists": tier2_exists,
"path": str(vault_path),
"file_count": len(tier2_files),
"files": tier2_files[:10],
}
if vault_root.exists():
vault_info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
vault_info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
# Tier 3: Semantic memory row count
tier3_info: dict[str, Any] = {"available": False}
try:
import sqlite3
sem_db = repo_root / "data" / "memory.db"
if sem_db.exists():
conn = sqlite3.connect(str(sem_db))
row = conn.execute(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='chunks'"
).fetchone()
if row and row[0]:
count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
tier3_info["available"] = True
tier3_info["vector_count"] = count[0] if count else 0
conn.close()
except Exception as exc:
logger.debug("Memory status query failed: %s", exc)
pass
# Self-coding journal stats
journal_info: dict[str, Any] = {"available": False}
try:
import sqlite3 as _sqlite3
journal_db = repo_root / "data" / "self_coding.db"
if journal_db.exists():
conn = _sqlite3.connect(str(journal_db))
conn.row_factory = _sqlite3.Row
rows = conn.execute(
"SELECT outcome, COUNT(*) as cnt FROM modification_journal GROUP BY outcome"
).fetchall()
if rows:
counts = {r["outcome"]: r["cnt"] for r in rows}
total = sum(counts.values())
journal_info = {
"available": True,
"total_attempts": total,
"successes": counts.get("success", 0),
"failures": counts.get("failure", 0),
"success_rate": round(counts.get("success", 0) / total, 2) if total else 0,
}
conn.close()
except Exception as exc:
logger.debug("Journal stats query failed: %s", exc)
pass
return {
"tier1_hot_memory": tier1_info,
"tier2_vault": vault_info,
"tier3_semantic": tier3_info,
"self_coding_journal": journal_info,
}
def get_task_queue_status() -> dict[str, Any]:
"""Get current task queue status.
The swarm task queue was removed. This returns a stub indicating
the subsystem is not available.
"""
return {
"counts": {},
"total": 0,
"current_task": None,
"note": "Task queue not available (swarm module removed)",
}
def get_agent_roster() -> dict[str, Any]:
"""Get the agent roster from agents.yaml config.
Returns:
Dict with agent list and summary.
"""
try:
from timmy.agents.loader import list_agents
agents = list_agents()
roster = [
{
"id": a["id"],
"name": a["name"],
"status": a.get("status", "available"),
"capabilities": ", ".join(a.get("tools", [])),
"role": a.get("role", ""),
"model": a.get("model", ""),
}
for a in agents
]
return {
"agents": roster,
"total": len(roster),
}
except Exception as exc:
logger.debug("Agent roster unavailable: %s", exc)
return {"error": str(exc)}
def get_live_system_status() -> dict[str, Any]:
"""Comprehensive live system status — Timmy's primary introspection tool.
Combines system info, task queue, agent roster, and memory status
into a single snapshot. Each subsystem degrades gracefully.
Returns:
Dict with system, task_queue, agents, memory, and uptime sections.
"""
result: dict[str, Any] = {}
# System info
try:
result["system"] = get_system_info()
except Exception as exc:
result["system"] = {"error": str(exc)}
# Task queue
result["task_queue"] = get_task_queue_status()
# Agent roster
result["agents"] = get_agent_roster()
# Memory status
try:
result["memory"] = get_memory_status()
except Exception as exc:
result["memory"] = {"error": str(exc)}
# Uptime
try:
from dashboard.routes.health import _START_TIME
uptime = (datetime.now(UTC) - _START_TIME).total_seconds()
result["uptime_seconds"] = int(uptime)
except Exception as exc:
logger.debug("Uptime calculation failed: %s", exc)
result["uptime_seconds"] = None
# Discord status
try:
from integrations.chat_bridge.vendors.discord import discord_bot
result["discord"] = {"state": discord_bot.state.name}
except Exception as exc:
logger.debug("Discord status check failed: %s", exc)
result["discord"] = {"state": "unknown"}
result["timestamp"] = datetime.now(UTC).isoformat()
return result