refactor: break up get_memory_status() into helpers (#537)
Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
This commit was merged in pull request #537.
This commit is contained in:
@@ -127,54 +127,48 @@ def check_ollama_health() -> dict[str, Any]:
|
||||
return result
|
||||
|
||||
|
||||
def get_memory_status() -> dict[str, Any]:
|
||||
"""Get the status of Timmy's memory system.
|
||||
|
||||
Returns:
|
||||
Dict with memory tier information
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
repo_root = Path(settings.repo_root)
|
||||
|
||||
# Check tier 1: Hot memory
|
||||
def _hot_memory_info(repo_root: Path) -> dict[str, Any]:
|
||||
"""Tier 1: Hot memory (MEMORY.md) status."""
|
||||
memory_md = repo_root / "MEMORY.md"
|
||||
tier1_exists = memory_md.exists()
|
||||
tier1_content = ""
|
||||
if tier1_exists:
|
||||
tier1_content = memory_md.read_text()[:500] # First 500 chars
|
||||
tier1_content = memory_md.read_text()[:500]
|
||||
|
||||
# Check tier 2: Vault
|
||||
vault_path = repo_root / "memory" / "self"
|
||||
tier2_exists = vault_path.exists()
|
||||
tier2_files = []
|
||||
if tier2_exists:
|
||||
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()]
|
||||
|
||||
tier1_info: dict[str, Any] = {
|
||||
info: dict[str, Any] = {
|
||||
"exists": tier1_exists,
|
||||
"path": str(memory_md),
|
||||
"preview": " ".join(tier1_content[:200].split()) if tier1_content else None,
|
||||
}
|
||||
if tier1_exists:
|
||||
lines = memory_md.read_text().splitlines()
|
||||
tier1_info["line_count"] = len(lines)
|
||||
tier1_info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
|
||||
info["line_count"] = len(lines)
|
||||
info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
|
||||
return info
|
||||
|
||||
|
||||
def _vault_info(repo_root: Path) -> dict[str, Any]:
|
||||
"""Tier 2: Vault (memory/ directory tree) status."""
|
||||
vault_path = repo_root / "memory" / "self"
|
||||
tier2_exists = vault_path.exists()
|
||||
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()] if tier2_exists else []
|
||||
|
||||
# Vault — scan all subdirs under memory/
|
||||
vault_root = repo_root / "memory"
|
||||
vault_info: dict[str, Any] = {
|
||||
info: dict[str, Any] = {
|
||||
"exists": tier2_exists,
|
||||
"path": str(vault_path),
|
||||
"file_count": len(tier2_files),
|
||||
"files": tier2_files[:10],
|
||||
}
|
||||
if vault_root.exists():
|
||||
vault_info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
|
||||
vault_info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
|
||||
info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
|
||||
info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
|
||||
return info
|
||||
|
||||
# Tier 3: Semantic memory row count
|
||||
tier3_info: dict[str, Any] = {"available": False}
|
||||
|
||||
def _semantic_memory_info(repo_root: Path) -> dict[str, Any]:
|
||||
"""Tier 3: Semantic memory (vector DB) status."""
|
||||
info: dict[str, Any] = {"available": False}
|
||||
try:
|
||||
sem_db = repo_root / "data" / "memory.db"
|
||||
if sem_db.exists():
|
||||
@@ -184,14 +178,16 @@ def get_memory_status() -> dict[str, Any]:
|
||||
).fetchone()
|
||||
if row and row[0]:
|
||||
count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
|
||||
tier3_info["available"] = True
|
||||
tier3_info["vector_count"] = count[0] if count else 0
|
||||
info["available"] = True
|
||||
info["vector_count"] = count[0] if count else 0
|
||||
except Exception as exc:
|
||||
logger.debug("Memory status query failed: %s", exc)
|
||||
pass
|
||||
return info
|
||||
|
||||
# Self-coding journal stats
|
||||
journal_info: dict[str, Any] = {"available": False}
|
||||
|
||||
def _journal_info(repo_root: Path) -> dict[str, Any]:
|
||||
"""Self-coding journal statistics."""
|
||||
info: dict[str, Any] = {"available": False}
|
||||
try:
|
||||
journal_db = repo_root / "data" / "self_coding.db"
|
||||
if journal_db.exists():
|
||||
@@ -203,7 +199,7 @@ def get_memory_status() -> dict[str, Any]:
|
||||
if rows:
|
||||
counts = {r["outcome"]: r["cnt"] for r in rows}
|
||||
total = sum(counts.values())
|
||||
journal_info = {
|
||||
info = {
|
||||
"available": True,
|
||||
"total_attempts": total,
|
||||
"successes": counts.get("success", 0),
|
||||
@@ -212,13 +208,24 @@ def get_memory_status() -> dict[str, Any]:
|
||||
}
|
||||
except Exception as exc:
|
||||
logger.debug("Journal stats query failed: %s", exc)
|
||||
pass
|
||||
return info
|
||||
|
||||
|
||||
def get_memory_status() -> dict[str, Any]:
|
||||
"""Get the status of Timmy's memory system.
|
||||
|
||||
Returns:
|
||||
Dict with memory tier information
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
repo_root = Path(settings.repo_root)
|
||||
|
||||
return {
|
||||
"tier1_hot_memory": tier1_info,
|
||||
"tier2_vault": vault_info,
|
||||
"tier3_semantic": tier3_info,
|
||||
"self_coding_journal": journal_info,
|
||||
"tier1_hot_memory": _hot_memory_info(repo_root),
|
||||
"tier2_vault": _vault_info(repo_root),
|
||||
"tier3_semantic": _semantic_memory_info(repo_root),
|
||||
"self_coding_journal": _journal_info(repo_root),
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user