diff --git a/src/dashboard/app.py b/src/dashboard/app.py index ff7e3e62..25252942 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -45,6 +45,7 @@ from dashboard.routes.memory import router as memory_router from dashboard.routes.mobile import router as mobile_router from dashboard.routes.models import api_router as models_api_router from dashboard.routes.models import router as models_router +from dashboard.routes.monitoring import router as monitoring_router from dashboard.routes.nexus import router as nexus_router from dashboard.routes.quests import router as quests_router from dashboard.routes.scorecards import router as scorecards_router @@ -57,7 +58,6 @@ from dashboard.routes.tasks import router as tasks_router from dashboard.routes.telegram import router as telegram_router from dashboard.routes.thinking import router as thinking_router from dashboard.routes.three_strike import router as three_strike_router -from dashboard.routes.monitoring import router as monitoring_router from dashboard.routes.tools import router as tools_router from dashboard.routes.tower import router as tower_router from dashboard.routes.voice import router as voice_router diff --git a/src/dashboard/routes/monitoring.py b/src/dashboard/routes/monitoring.py index 56dd9294..8d1490b6 100644 --- a/src/dashboard/routes/monitoring.py +++ b/src/dashboard/routes/monitoring.py @@ -18,7 +18,7 @@ import logging from datetime import UTC, datetime from fastapi import APIRouter, Request -from fastapi.responses import HTMLResponse, JSONResponse +from fastapi.responses import HTMLResponse from config import APP_START_TIME as _START_TIME from config import settings diff --git a/src/infrastructure/world/adapters/threejs.py b/src/infrastructure/world/adapters/threejs.py index aaee4d0b..dbfd614b 100644 --- a/src/infrastructure/world/adapters/threejs.py +++ b/src/infrastructure/world/adapters/threejs.py @@ -25,7 +25,7 @@ from __future__ import annotations import logging from infrastructure.world.interface import WorldInterface -from infrastructure.world.types import ActionResult, ActionStatus, CommandInput, PerceptionOutput +from infrastructure.world.types import ActionResult, CommandInput, PerceptionOutput logger = logging.getLogger(__name__) diff --git a/src/timmy/memory/consolidation.py b/src/timmy/memory/consolidation.py new file mode 100644 index 00000000..d3068cbc --- /dev/null +++ b/src/timmy/memory/consolidation.py @@ -0,0 +1,301 @@ +"""HotMemory and VaultMemory classes — file-based memory tiers. + +HotMemory: Tier 1 — computed view of top facts from the DB (+ MEMORY.md fallback). +VaultMemory: Tier 2 — structured vault (memory/) with append-only markdown. +""" + +import logging +import re +from datetime import UTC, datetime +from pathlib import Path + +from timmy.memory.crud import recall_last_reflection, recall_personal_facts +from timmy.memory.db import HOT_MEMORY_PATH, VAULT_PATH + +logger = logging.getLogger(__name__) + +# ── Default template ───────────────────────────────────────────────────────── + +_DEFAULT_HOT_MEMORY_TEMPLATE = """\ +# Timmy Hot Memory + +> Working RAM — always loaded, ~300 lines max, pruned monthly +> Last updated: {date} + +--- + +## Current Status + +**Agent State:** Operational +**Mode:** Development +**Active Tasks:** 0 +**Pending Decisions:** None + +--- + +## Standing Rules + +1. **Sovereignty First** — No cloud dependencies +2. **Local-Only Inference** — Ollama on localhost +3. **Privacy by Design** — Telemetry disabled +4. **Tool Minimalism** — Use tools only when necessary +5. **Memory Discipline** — Write handoffs at session end + +--- + +## Agent Roster + +| Agent | Role | Status | +|-------|------|--------| +| Timmy | Core | Active | + +--- + +## User Profile + +**Name:** (not set) +**Interests:** (to be learned) + +--- + +## Key Decisions + +(none yet) + +--- + +## Pending Actions + +- [ ] Learn user's name + +--- + +*Prune date: {prune_date}* +""" + + +# ── HotMemory ──────────────────────────────────────────────────────────────── + + +class HotMemory: + """Tier 1: Hot memory — computed view of top facts from DB.""" + + def __init__(self) -> None: + self.path = HOT_MEMORY_PATH + self._content: str | None = None + self._last_modified: float | None = None + + def read(self, force_refresh: bool = False) -> str: + """Read hot memory — computed view of top facts + last reflection from DB.""" + try: + facts = recall_personal_facts() + lines = ["# Timmy Hot Memory\n"] + + if facts: + lines.append("## Known Facts\n") + for f in facts[:15]: + lines.append(f"- {f}") + + # Include the last reflection if available + reflection = recall_last_reflection() + if reflection: + lines.append("\n## Last Reflection\n") + lines.append(reflection) + + if len(lines) > 1: + return "\n".join(lines) + except Exception: + logger.debug("DB context read failed, falling back to file") + + # Fallback to file if DB unavailable + if self.path.exists(): + return self.path.read_text() + + return "# Timmy Hot Memory\n\nNo memories stored yet.\n" + + def update_section(self, section: str, content: str) -> None: + """Update a specific section in MEMORY.md. + + DEPRECATED: Hot memory is now computed from the database. + This method is kept for backward compatibility during transition. + Use memory_write() to store facts in the database. + """ + logger.warning( + "HotMemory.update_section() is deprecated. " + "Use memory_write() to store facts in the database." + ) + + # Keep file-writing for backward compatibility during transition + # Guard against empty or excessively large writes + if not content or not content.strip(): + logger.warning("HotMemory: Refusing empty write to section '%s'", section) + return + if len(content) > 2000: + logger.warning("HotMemory: Truncating oversized write to section '%s'", section) + content = content[:2000] + "\n... [truncated]" + + if not self.path.exists(): + self._create_default() + + full_content = self.read() + + # Find section + pattern = rf"(## {re.escape(section)}.*?)(?=\n## |\Z)" + match = re.search(pattern, full_content, re.DOTALL) + + if match: + # Replace section + new_section = f"## {section}\n\n{content}\n\n" + full_content = full_content[: match.start()] + new_section + full_content[match.end() :] + else: + # Append section — guard against missing prune marker + insert_point = full_content.rfind("*Prune date:") + new_section = f"## {section}\n\n{content}\n\n" + if insert_point < 0: + # No prune marker — just append at end + full_content = full_content.rstrip() + "\n\n" + new_section + else: + full_content = ( + full_content[:insert_point] + new_section + "\n" + full_content[insert_point:] + ) + + self.path.write_text(full_content) + self._content = full_content + self._last_modified = self.path.stat().st_mtime + logger.info("HotMemory: Updated section '%s'", section) + + def _create_default(self) -> None: + """Create default MEMORY.md if missing. + + DEPRECATED: Hot memory is now computed from the database. + This method is kept for backward compatibility during transition. + """ + logger.debug( + "HotMemory._create_default() - creating default MEMORY.md for backward compatibility" + ) + now = datetime.now(UTC) + content = _DEFAULT_HOT_MEMORY_TEMPLATE.format( + date=now.strftime("%Y-%m-%d"), + prune_date=now.replace(day=25).strftime("%Y-%m-%d"), + ) + self.path.write_text(content) + logger.info("HotMemory: Created default MEMORY.md") + + +# ── VaultMemory ────────────────────────────────────────────────────────────── + + +class VaultMemory: + """Tier 2: Structured vault (memory/) — append-only markdown.""" + + def __init__(self) -> None: + self.path = VAULT_PATH + self._ensure_structure() + + def _ensure_structure(self) -> None: + """Ensure vault directory structure exists.""" + (self.path / "self").mkdir(parents=True, exist_ok=True) + (self.path / "notes").mkdir(parents=True, exist_ok=True) + (self.path / "aar").mkdir(parents=True, exist_ok=True) + + def write_note(self, name: str, content: str, namespace: str = "notes") -> Path: + """Write a note to the vault.""" + # Add timestamp to filename + timestamp = datetime.now(UTC).strftime("%Y%m%d") + filename = f"{timestamp}_{name}.md" + filepath = self.path / namespace / filename + + # Add header + full_content = f"""# {name.replace("_", " ").title()} + +> Created: {datetime.now(UTC).isoformat()} +> Namespace: {namespace} + +--- + +{content} + +--- + +*Auto-generated by Timmy Memory System* +""" + + filepath.write_text(full_content) + logger.info("VaultMemory: Wrote %s", filepath) + return filepath + + def read_file(self, filepath: Path) -> str: + """Read a file from the vault.""" + if not filepath.exists(): + return "" + return filepath.read_text() + + def update_user_profile(self, key: str, value: str) -> None: + """Update a field in user_profile.md. + + DEPRECATED: User profile updates should now use memory_write() to store + facts in the database. This method is kept for backward compatibility. + """ + logger.warning( + "VaultMemory.update_user_profile() is deprecated. " + "Use memory_write() to store user facts in the database." + ) + # Still update the file for backward compatibility during transition + profile_path = self.path / "self" / "user_profile.md" + + if not profile_path.exists(): + self._create_default_profile() + + content = profile_path.read_text() + + pattern = rf"(\*\*{re.escape(key)}:\*\*).*" + if re.search(pattern, content): + safe_value = value.strip() + content = re.sub(pattern, lambda m: f"{m.group(1)} {safe_value}", content) + else: + facts_section = "## Important Facts" + if facts_section in content: + insert_point = content.find(facts_section) + len(facts_section) + content = content[:insert_point] + f"\n- {key}: {value}" + content[insert_point:] + + content = re.sub( + r"\*Last updated:.*\*", + f"*Last updated: {datetime.now(UTC).strftime('%Y-%m-%d')}*", + content, + ) + + profile_path.write_text(content) + logger.info("VaultMemory: Updated user profile: %s = %s", key, value) + + def _create_default_profile(self) -> None: + """Create default user profile.""" + profile_path = self.path / "self" / "user_profile.md" + default = """# User Profile + +> Learned information about the user. + +## Basic Information + +**Name:** (unknown) +**Location:** (unknown) +**Occupation:** (unknown) + +## Interests & Expertise + +- (to be learned) + +## Preferences + +- Response style: concise, technical +- Tool usage: minimal + +## Important Facts + +- (to be extracted) + +--- + +*Last updated: {date}* +""".format(date=datetime.now(UTC).strftime("%Y-%m-%d")) + + profile_path.write_text(default) diff --git a/src/timmy/memory/crud.py b/src/timmy/memory/crud.py new file mode 100644 index 00000000..ddd740dd --- /dev/null +++ b/src/timmy/memory/crud.py @@ -0,0 +1,395 @@ +"""CRUD operations, personal facts, and reflections for Timmy's memory system.""" + +import json +import logging +import sqlite3 +import uuid +from datetime import UTC, datetime, timedelta + +from timmy.memory.db import MemoryEntry, get_connection +from timmy.memory.embeddings import ( + _get_embedding_model, + _keyword_overlap, + cosine_similarity, + embed_text, +) + +logger = logging.getLogger(__name__) + + +def store_memory( + content: str, + source: str, + context_type: str = "conversation", + agent_id: str | None = None, + task_id: str | None = None, + session_id: str | None = None, + metadata: dict | None = None, + compute_embedding: bool = True, +) -> MemoryEntry: + """Store a memory entry with optional embedding.""" + embedding = None + if compute_embedding: + embedding = embed_text(content) + + entry = MemoryEntry( + content=content, + source=source, + context_type=context_type, + agent_id=agent_id, + task_id=task_id, + session_id=session_id, + metadata=metadata, + embedding=embedding, + ) + + with get_connection() as conn: + conn.execute( + """ + INSERT INTO memories + (id, content, memory_type, source, agent_id, task_id, session_id, + metadata, embedding, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + entry.id, + entry.content, + entry.context_type, # DB column is memory_type + entry.source, + entry.agent_id, + entry.task_id, + entry.session_id, + json.dumps(metadata) if metadata else None, + json.dumps(embedding) if embedding else None, + entry.timestamp, + ), + ) + conn.commit() + + return entry + + +def _build_search_filters( + context_type: str | None, + agent_id: str | None, + session_id: str | None, +) -> tuple[str, list]: + """Build SQL WHERE clause and params from search filters.""" + conditions: list[str] = [] + params: list = [] + + if context_type: + conditions.append("memory_type = ?") + params.append(context_type) + if agent_id: + conditions.append("agent_id = ?") + params.append(agent_id) + if session_id: + conditions.append("session_id = ?") + params.append(session_id) + + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + return where_clause, params + + +def _fetch_memory_candidates( + where_clause: str, params: list, candidate_limit: int +) -> list[sqlite3.Row]: + """Fetch candidate memory rows from the database.""" + query_sql = f""" + SELECT * FROM memories + {where_clause} + ORDER BY created_at DESC + LIMIT ? + """ + params.append(candidate_limit) + + with get_connection() as conn: + return conn.execute(query_sql, params).fetchall() + + +def _row_to_entry(row: sqlite3.Row) -> MemoryEntry: + """Convert a database row to a MemoryEntry.""" + return MemoryEntry( + id=row["id"], + content=row["content"], + source=row["source"], + context_type=row["memory_type"], # DB column -> API field + agent_id=row["agent_id"], + task_id=row["task_id"], + session_id=row["session_id"], + metadata=json.loads(row["metadata"]) if row["metadata"] else None, + embedding=json.loads(row["embedding"]) if row["embedding"] else None, + timestamp=row["created_at"], + ) + + +def _score_and_filter( + rows: list[sqlite3.Row], + query: str, + query_embedding: list[float], + min_relevance: float, +) -> list[MemoryEntry]: + """Score candidate rows by similarity and filter by min_relevance.""" + results = [] + for row in rows: + entry = _row_to_entry(row) + + if entry.embedding: + score = cosine_similarity(query_embedding, entry.embedding) + else: + score = _keyword_overlap(query, entry.content) + + entry.relevance_score = score + if score >= min_relevance: + results.append(entry) + + results.sort(key=lambda x: x.relevance_score or 0, reverse=True) + return results + + +def search_memories( + query: str, + limit: int = 10, + context_type: str | None = None, + agent_id: str | None = None, + session_id: str | None = None, + min_relevance: float = 0.0, +) -> list[MemoryEntry]: + """Search for memories by semantic similarity. + + Args: + query: Search query text + limit: Maximum results + context_type: Filter by memory type (maps to DB memory_type column) + agent_id: Filter by agent + session_id: Filter by session + min_relevance: Minimum similarity score (0-1) + + Returns: + List of MemoryEntry objects sorted by relevance + """ + query_embedding = embed_text(query) + where_clause, params = _build_search_filters(context_type, agent_id, session_id) + rows = _fetch_memory_candidates(where_clause, params, limit * 3) + results = _score_and_filter(rows, query, query_embedding, min_relevance) + return results[:limit] + + +def delete_memory(memory_id: str) -> bool: + """Delete a memory entry by ID. + + Returns: + True if deleted, False if not found + """ + with get_connection() as conn: + cursor = conn.execute( + "DELETE FROM memories WHERE id = ?", + (memory_id,), + ) + conn.commit() + return cursor.rowcount > 0 + + +def get_memory_stats() -> dict: + """Get statistics about the memory store. + + Returns: + Dict with counts by type, total entries, etc. + """ + with get_connection() as conn: + total = conn.execute("SELECT COUNT(*) as count FROM memories").fetchone()["count"] + + by_type = {} + rows = conn.execute( + "SELECT memory_type, COUNT(*) as count FROM memories GROUP BY memory_type" + ).fetchall() + for row in rows: + by_type[row["memory_type"]] = row["count"] + + with_embeddings = conn.execute( + "SELECT COUNT(*) as count FROM memories WHERE embedding IS NOT NULL" + ).fetchone()["count"] + + return { + "total_entries": total, + "by_type": by_type, + "with_embeddings": with_embeddings, + "has_embedding_model": _get_embedding_model() is not False, + } + + +def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int: + """Delete old memories to manage storage. + + Args: + older_than_days: Delete memories older than this + keep_facts: Whether to preserve fact-type memories + + Returns: + Number of entries deleted + """ + cutoff = (datetime.now(UTC) - timedelta(days=older_than_days)).isoformat() + + with get_connection() as conn: + if keep_facts: + cursor = conn.execute( + """ + DELETE FROM memories + WHERE created_at < ? AND memory_type != 'fact' + """, + (cutoff,), + ) + else: + cursor = conn.execute( + "DELETE FROM memories WHERE created_at < ?", + (cutoff,), + ) + + deleted = cursor.rowcount + conn.commit() + + return deleted + + +def get_memory_context(query: str, max_tokens: int = 2000, **filters) -> str: + """Get relevant memory context as formatted text for LLM prompts. + + Args: + query: Search query + max_tokens: Approximate maximum tokens to return + **filters: Additional filters (agent_id, session_id, etc.) + + Returns: + Formatted context string for inclusion in prompts + """ + memories = search_memories(query, limit=20, **filters) + + context_parts = [] + total_chars = 0 + max_chars = max_tokens * 4 # Rough approximation + + for mem in memories: + formatted = f"[{mem.source}]: {mem.content}" + if total_chars + len(formatted) > max_chars: + break + context_parts.append(formatted) + total_chars += len(formatted) + + if not context_parts: + return "" + + return "Relevant context from memory:\n" + "\n\n".join(context_parts) + + +# ── Personal facts & reflections ───────────────────────────────────────────── + + +def recall_personal_facts(agent_id: str | None = None) -> list[str]: + """Recall personal facts about the user or system. + + Args: + agent_id: Optional agent filter + + Returns: + List of fact strings + """ + with get_connection() as conn: + if agent_id: + rows = conn.execute( + """ + SELECT content FROM memories + WHERE memory_type = 'fact' AND agent_id = ? + ORDER BY created_at DESC + LIMIT 100 + """, + (agent_id,), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT content FROM memories + WHERE memory_type = 'fact' + ORDER BY created_at DESC + LIMIT 100 + """, + ).fetchall() + + return [r["content"] for r in rows] + + +def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]: + """Recall personal facts with their IDs for edit/delete operations.""" + with get_connection() as conn: + if agent_id: + rows = conn.execute( + "SELECT id, content FROM memories WHERE memory_type = 'fact' AND agent_id = ? ORDER BY created_at DESC LIMIT 100", + (agent_id,), + ).fetchall() + else: + rows = conn.execute( + "SELECT id, content FROM memories WHERE memory_type = 'fact' ORDER BY created_at DESC LIMIT 100", + ).fetchall() + return [{"id": r["id"], "content": r["content"]} for r in rows] + + +def update_personal_fact(memory_id: str, new_content: str) -> bool: + """Update a personal fact's content.""" + with get_connection() as conn: + cursor = conn.execute( + "UPDATE memories SET content = ? WHERE id = ? AND memory_type = 'fact'", + (new_content, memory_id), + ) + conn.commit() + return cursor.rowcount > 0 + + +def store_personal_fact(fact: str, agent_id: str | None = None) -> MemoryEntry: + """Store a personal fact about the user or system. + + Args: + fact: The fact to store + agent_id: Associated agent + + Returns: + The stored MemoryEntry + """ + return store_memory( + content=fact, + source="system", + context_type="fact", + agent_id=agent_id, + metadata={"auto_extracted": False}, + ) + + +def store_last_reflection(reflection: str) -> None: + """Store the last reflection, replacing any previous one. + + Uses a single row with memory_type='reflection' to avoid accumulation. + """ + if not reflection or not reflection.strip(): + return + with get_connection() as conn: + # Delete previous reflections — only the latest matters + conn.execute("DELETE FROM memories WHERE memory_type = 'reflection'") + conn.execute( + """ + INSERT INTO memories + (id, content, memory_type, source, created_at) + VALUES (?, ?, 'reflection', 'system', ?) + """, + (str(uuid.uuid4()), reflection.strip(), datetime.now(UTC).isoformat()), + ) + conn.commit() + logger.debug("Stored last reflection in DB") + + +def recall_last_reflection() -> str | None: + """Recall the most recent reflection, or None if absent.""" + with get_connection() as conn: + row = conn.execute( + "SELECT content FROM memories WHERE memory_type = 'reflection' " + "ORDER BY created_at DESC LIMIT 1" + ).fetchone() + return row["content"] if row else None diff --git a/src/timmy/memory/db.py b/src/timmy/memory/db.py new file mode 100644 index 00000000..f7948941 --- /dev/null +++ b/src/timmy/memory/db.py @@ -0,0 +1,212 @@ +"""Database connection, schema, migrations, path constants, and data classes. + +This module contains the lowest-level database primitives for Timmy's +memory system — connection management, schema creation / migration, +path constants, and the core data classes (MemoryEntry, MemoryChunk). +""" + +import logging +import sqlite3 +import uuid +from collections.abc import Generator +from contextlib import closing, contextmanager +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path + +from config import settings + +logger = logging.getLogger(__name__) + +# ── Path constants ─────────────────────────────────────────────────────────── +PROJECT_ROOT = Path(__file__).parent.parent.parent.parent +HOT_MEMORY_PATH = PROJECT_ROOT / "MEMORY.md" +VAULT_PATH = PROJECT_ROOT / "memory" +SOUL_PATH = VAULT_PATH / "self" / "soul.md" +DB_PATH = PROJECT_ROOT / "data" / "memory.db" + +# ── Database connection ────────────────────────────────────────────────────── + + +@contextmanager +def get_connection() -> Generator[sqlite3.Connection, None, None]: + """Get database connection to unified memory database.""" + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + with closing(sqlite3.connect(str(DB_PATH))) as conn: + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") + _ensure_schema(conn) + yield conn + + +def _ensure_schema(conn: sqlite3.Connection) -> None: + """Create the unified memories table and indexes if they don't exist.""" + conn.execute(""" + CREATE TABLE IF NOT EXISTS memories ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + memory_type TEXT NOT NULL DEFAULT 'fact', + source TEXT NOT NULL DEFAULT 'agent', + embedding TEXT, + metadata TEXT, + source_hash TEXT, + agent_id TEXT, + task_id TEXT, + session_id TEXT, + confidence REAL NOT NULL DEFAULT 0.8, + tags TEXT NOT NULL DEFAULT '[]', + created_at TEXT NOT NULL, + last_accessed TEXT, + access_count INTEGER NOT NULL DEFAULT 0 + ) + """) + + # Create indexes for efficient querying + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_session ON memories(session_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") + conn.commit() + + # Run migration if needed + _migrate_schema(conn) + + +def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]: + """Get the column names for a table.""" + cursor = conn.execute(f"PRAGMA table_info({table_name})") + return {row[1] for row in cursor.fetchall()} + + +def _migrate_episodes(conn: sqlite3.Connection) -> None: + """Migrate episodes table rows into the unified memories table.""" + logger.info("Migration: Converting episodes table to memories") + try: + cols = _get_table_columns(conn, "episodes") + context_type_col = "context_type" if "context_type" in cols else "'conversation'" + + conn.execute(f""" + INSERT INTO memories ( + id, content, memory_type, source, embedding, + metadata, agent_id, task_id, session_id, + created_at, access_count, last_accessed + ) + SELECT + id, content, + COALESCE({context_type_col}, 'conversation'), + COALESCE(source, 'agent'), + embedding, + metadata, agent_id, task_id, session_id, + COALESCE(timestamp, datetime('now')), 0, NULL + FROM episodes + """) + conn.execute("DROP TABLE episodes") + logger.info("Migration: Migrated episodes to memories") + except sqlite3.Error as exc: + logger.warning("Migration: Failed to migrate episodes: %s", exc) + + +def _migrate_chunks(conn: sqlite3.Connection) -> None: + """Migrate chunks table rows into the unified memories table.""" + logger.info("Migration: Converting chunks table to memories") + try: + cols = _get_table_columns(conn, "chunks") + + id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)" + content_col = "content" if "content" in cols else "text" + source_col = ( + "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'") + ) + embedding_col = "embedding" if "embedding" in cols else "NULL" + created_col = "created_at" if "created_at" in cols else "datetime('now')" + + conn.execute(f""" + INSERT INTO memories ( + id, content, memory_type, source, embedding, + created_at, access_count + ) + SELECT + {id_col}, {content_col}, 'vault_chunk', {source_col}, + {embedding_col}, {created_col}, 0 + FROM chunks + """) + conn.execute("DROP TABLE chunks") + logger.info("Migration: Migrated chunks to memories") + except sqlite3.Error as exc: + logger.warning("Migration: Failed to migrate chunks: %s", exc) + + +def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None: + """Drop a legacy table if it exists.""" + try: + conn.execute(f"DROP TABLE {table}") # noqa: S608 + logger.info("Migration: Dropped old %s table", table) + except sqlite3.Error as exc: + logger.warning("Migration: Failed to drop %s: %s", table, exc) + + +def _migrate_schema(conn: sqlite3.Connection) -> None: + """Migrate from old three-table schema to unified memories table. + + Migration paths: + - episodes table -> memories (context_type -> memory_type) + - chunks table -> memories with memory_type='vault_chunk' + - facts table -> dropped (unused, 0 rows expected) + """ + cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") + tables = {row[0] for row in cursor.fetchall()} + + has_memories = "memories" in tables + + if not has_memories and (tables & {"episodes", "chunks", "facts"}): + logger.info("Migration: Creating unified memories table") + + if "episodes" in tables and has_memories: + _migrate_episodes(conn) + if "chunks" in tables and has_memories: + _migrate_chunks(conn) + if "facts" in tables: + _drop_legacy_table(conn, "facts") + + conn.commit() + + +# Alias for backward compatibility +get_conn = get_connection + + +# ── Data classes ───────────────────────────────────────────────────────────── + + +@dataclass +class MemoryEntry: + """A memory entry with vector embedding. + + Note: The DB column is `memory_type` but this field is named `context_type` + for backward API compatibility. + """ + + id: str = field(default_factory=lambda: str(uuid.uuid4())) + content: str = "" # The actual text content + source: str = "" # Where it came from (agent, user, system) + context_type: str = "conversation" # API field name; DB column is memory_type + agent_id: str | None = None + task_id: str | None = None + session_id: str | None = None + metadata: dict | None = None + embedding: list[float] | None = None + timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + relevance_score: float | None = None # Set during search + + +@dataclass +class MemoryChunk: + """A searchable chunk of memory.""" + + id: str + source: str # filepath + content: str + embedding: list[float] + created_at: str diff --git a/src/timmy/memory/semantic.py b/src/timmy/memory/semantic.py new file mode 100644 index 00000000..ddba971e --- /dev/null +++ b/src/timmy/memory/semantic.py @@ -0,0 +1,300 @@ +"""SemanticMemory and MemorySearcher — vector-based search over vault content. + +SemanticMemory: indexes markdown files into chunks with embeddings, supports search. +MemorySearcher: high-level multi-tier search interface. +""" + +import hashlib +import json +import logging +import sqlite3 +from collections.abc import Generator +from contextlib import closing, contextmanager +from datetime import UTC, datetime +from pathlib import Path + +from config import settings +from timmy.memory.db import DB_PATH, VAULT_PATH, get_connection +from timmy.memory.embeddings import ( + EMBEDDING_DIM, + _get_embedding_model, + cosine_similarity, + embed_text, +) + +logger = logging.getLogger(__name__) + + +class SemanticMemory: + """Vector-based semantic search over vault content.""" + + def __init__(self) -> None: + self.db_path = DB_PATH + self.vault_path = VAULT_PATH + + @contextmanager + def _get_conn(self) -> Generator[sqlite3.Connection, None, None]: + """Get connection to the instance's db_path (backward compatibility). + + Uses self.db_path if set differently from global DB_PATH, + otherwise uses the global get_connection(). + """ + if self.db_path == DB_PATH: + # Use global connection (normal production path) + with get_connection() as conn: + yield conn + else: + # Use instance-specific db_path (test path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + with closing(sqlite3.connect(str(self.db_path))) as conn: + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") + # Ensure schema exists + conn.execute(""" + CREATE TABLE IF NOT EXISTS memories ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + memory_type TEXT NOT NULL DEFAULT 'fact', + source TEXT NOT NULL DEFAULT 'agent', + embedding TEXT, + metadata TEXT, + source_hash TEXT, + agent_id TEXT, + task_id TEXT, + session_id TEXT, + confidence REAL NOT NULL DEFAULT 0.8, + tags TEXT NOT NULL DEFAULT '[]', + created_at TEXT NOT NULL, + last_accessed TEXT, + access_count INTEGER NOT NULL DEFAULT 0 + ) + """) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)" + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") + conn.commit() + yield conn + + def _init_db(self) -> None: + """Initialize database at self.db_path (backward compatibility). + + This method is kept for backward compatibility with existing code and tests. + Schema creation is handled by _get_conn. + """ + # Trigger schema creation via _get_conn + with self._get_conn(): + pass + + def index_file(self, filepath: Path) -> int: + """Index a single file into semantic memory.""" + if not filepath.exists(): + return 0 + + content = filepath.read_text() + file_hash = hashlib.md5(content.encode()).hexdigest() + + with self._get_conn() as conn: + # Check if already indexed with same hash + cursor = conn.execute( + "SELECT metadata FROM memories WHERE source = ? AND memory_type = 'vault_chunk' LIMIT 1", + (str(filepath),), + ) + existing = cursor.fetchone() + if existing and existing[0]: + try: + meta = json.loads(existing[0]) + if meta.get("source_hash") == file_hash: + return 0 # Already indexed + except json.JSONDecodeError: + pass + + # Delete old chunks for this file + conn.execute( + "DELETE FROM memories WHERE source = ? AND memory_type = 'vault_chunk'", + (str(filepath),), + ) + + # Split into chunks (paragraphs) + chunks = self._split_into_chunks(content) + + # Index each chunk + now = datetime.now(UTC).isoformat() + for i, chunk_text in enumerate(chunks): + if len(chunk_text.strip()) < 20: # Skip tiny chunks + continue + + chunk_id = f"{filepath.stem}_{i}" + chunk_embedding = embed_text(chunk_text) + + conn.execute( + """INSERT INTO memories + (id, content, memory_type, source, metadata, embedding, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + ( + chunk_id, + chunk_text, + "vault_chunk", + str(filepath), + json.dumps({"source_hash": file_hash, "chunk_index": i}), + json.dumps(chunk_embedding), + now, + ), + ) + + conn.commit() + + logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks)) + return len(chunks) + + def _split_into_chunks(self, text: str, max_chunk_size: int = 500) -> list[str]: + """Split text into semantic chunks.""" + # Split by paragraphs first + paragraphs = text.split("\n\n") + chunks = [] + + for para in paragraphs: + para = para.strip() + if not para: + continue + + # If paragraph is small enough, keep as one chunk + if len(para) <= max_chunk_size: + chunks.append(para) + else: + # Split long paragraphs by sentences + sentences = para.replace(". ", ".\n").split("\n") + current_chunk = "" + + for sent in sentences: + if len(current_chunk) + len(sent) < max_chunk_size: + current_chunk += " " + sent if current_chunk else sent + else: + if current_chunk: + chunks.append(current_chunk.strip()) + current_chunk = sent + + if current_chunk: + chunks.append(current_chunk.strip()) + + return chunks + + def index_vault(self) -> int: + """Index entire vault directory.""" + total_chunks = 0 + + for md_file in self.vault_path.rglob("*.md"): + # Skip handoff file (handled separately) + if "last-session-handoff" in md_file.name: + continue + total_chunks += self.index_file(md_file) + + logger.info("SemanticMemory: Indexed vault (%d total chunks)", total_chunks) + return total_chunks + + def search(self, query: str, top_k: int = 5) -> list[tuple[str, float]]: + """Search for relevant memory chunks.""" + query_embedding = embed_text(query) + + with self._get_conn() as conn: + conn.row_factory = sqlite3.Row + + # Get all vault chunks + rows = conn.execute( + "SELECT source, content, embedding FROM memories WHERE memory_type = 'vault_chunk'" + ).fetchall() + + # Calculate similarities + scored = [] + for row in rows: + embedding = json.loads(row["embedding"]) + score = cosine_similarity(query_embedding, embedding) + scored.append((row["source"], row["content"], score)) + + # Sort by score descending + scored.sort(key=lambda x: x[2], reverse=True) + + # Return top_k + return [(content, score) for _, content, score in scored[:top_k]] + + def get_relevant_context(self, query: str, max_chars: int = 2000) -> str: + """Get formatted context string for a query.""" + results = self.search(query, top_k=3) + + if not results: + return "" + + parts = [] + total_chars = 0 + + for content, score in results: + if score < 0.3: # Similarity threshold + continue + + chunk = f"[Relevant memory - score {score:.2f}]: {content[:400]}..." + if total_chars + len(chunk) > max_chars: + break + + parts.append(chunk) + total_chars += len(chunk) + + return "\n\n".join(parts) if parts else "" + + def stats(self) -> dict: + """Get indexing statistics.""" + with self._get_conn() as conn: + cursor = conn.execute( + "SELECT COUNT(*), COUNT(DISTINCT source) FROM memories WHERE memory_type = 'vault_chunk'" + ) + total_chunks, total_files = cursor.fetchone() + + return { + "total_chunks": total_chunks, + "total_files": total_files, + "embedding_dim": EMBEDDING_DIM if _get_embedding_model() else 128, + } + + +class MemorySearcher: + """High-level interface for memory search.""" + + def __init__(self) -> None: + self.semantic = SemanticMemory() + + def search(self, query: str, tiers: list[str] = None) -> dict: + """Search across memory tiers. + + Args: + query: Search query + tiers: List of tiers to search ["hot", "vault", "semantic"] + + Returns: + Dict with results from each tier + """ + tiers = tiers or ["semantic"] # Default to semantic only + results = {} + + if "semantic" in tiers: + semantic_results = self.semantic.search(query, top_k=5) + results["semantic"] = [ + {"content": content, "score": score} for content, score in semantic_results + ] + + return results + + def get_context_for_query(self, query: str) -> str: + """Get comprehensive context for a user query.""" + # Get semantic context + semantic_context = self.semantic.get_relevant_context(query) + + if semantic_context: + return f"## Relevant Past Context\n\n{semantic_context}" + + return "" + + +# Module-level singletons +semantic_memory = SemanticMemory() +memory_searcher = MemorySearcher() diff --git a/src/timmy/memory/tools.py b/src/timmy/memory/tools.py new file mode 100644 index 00000000..a9e63ef9 --- /dev/null +++ b/src/timmy/memory/tools.py @@ -0,0 +1,253 @@ +"""Tool functions for Timmy's memory system. + +memory_search, memory_read, memory_store, memory_forget — runtime tool wrappers. +jot_note, log_decision — artifact production tools. +""" + +import logging +import re +from datetime import UTC, datetime +from pathlib import Path + +from timmy.memory.crud import delete_memory, search_memories, store_memory +from timmy.memory.semantic import semantic_memory + +logger = logging.getLogger(__name__) + + +def memory_search(query: str, limit: int = 10) -> str: + """Search past conversations, notes, and stored facts for relevant context. + + Searches across both the vault (indexed markdown files) and the + runtime memory store (facts and conversation fragments stored via + memory_write). + + Args: + query: What to search for (e.g. "Bitcoin strategy", "server setup"). + limit: Number of results to return (default 10). + + Returns: + Formatted string of relevant memory results. + """ + # Guard: model sometimes passes None for limit + if limit is None: + limit = 10 + + parts: list[str] = [] + + # 1. Search semantic vault (indexed markdown files) + vault_results = semantic_memory.search(query, limit) + for content, score in vault_results: + if score < 0.2: + continue + parts.append(f"[vault score {score:.2f}] {content[:300]}") + + # 2. Search runtime vector store (stored facts/conversations) + try: + runtime_results = search_memories(query, limit=limit, min_relevance=0.2) + for entry in runtime_results: + label = entry.context_type or "memory" + parts.append(f"[{label}] {entry.content[:300]}") + except Exception as exc: + logger.debug("Vector store search unavailable: %s", exc) + + if not parts: + return "No relevant memories found." + return "\n\n".join(parts) + + +def memory_read(query: str = "", top_k: int = 5) -> str: + """Read from persistent memory — search facts, notes, and past conversations. + + This is the primary tool for recalling stored information. If no query + is given, returns the most recent personal facts. With a query, it + searches semantically across all stored memories. + + Args: + query: Optional search term. Leave empty to list recent facts. + top_k: Maximum results to return (default 5). + + Returns: + Formatted string of memory contents. + """ + if top_k is None: + top_k = 5 + + parts: list[str] = [] + + # Always include personal facts first + try: + facts = search_memories(query or "", limit=top_k, min_relevance=0.0) + fact_entries = [e for e in facts if (e.context_type or "") == "fact"] + if fact_entries: + parts.append("## Personal Facts") + for entry in fact_entries[:top_k]: + parts.append(f"- {entry.content[:300]}") + except Exception as exc: + logger.debug("Vector store unavailable for memory_read: %s", exc) + + # If a query was provided, also do semantic search + if query: + search_result = memory_search(query, top_k) + if search_result and search_result != "No relevant memories found.": + parts.append("\n## Search Results") + parts.append(search_result) + + if not parts: + return "No memories stored yet. Use memory_write to store information." + return "\n".join(parts) + + +def memory_store(topic: str, report: str, type: str = "research") -> str: + """Store a piece of information in persistent memory, particularly for research outputs. + + Use this tool to store structured research findings or other important documents. + Stored memories are searchable via memory_search across all channels. + + Args: + topic: A concise title or topic for the research output. + report: The detailed content of the research output or document. + type: Type of memory — "research" for research outputs (default), + "fact" for permanent facts, "conversation" for conversation context, + "document" for other document fragments. + + Returns: + Confirmation that the memory was stored. + """ + if not report or not report.strip(): + return "Nothing to store — report is empty." + + # Combine topic and report for embedding and storage content + full_content = f"Topic: {topic.strip()}\n\nReport: {report.strip()}" + + valid_types = ("fact", "conversation", "document", "research") + if type not in valid_types: + type = "research" + + try: + # Dedup check for facts and research — skip if similar exists + if type in ("fact", "research"): + existing = search_memories(full_content, limit=3, context_type=type, min_relevance=0.75) + if existing: + return ( + f"Similar {type} already stored (id={existing[0].id[:8]}). Skipping duplicate." + ) + + entry = store_memory( + content=full_content, + source="agent", + context_type=type, + metadata={"topic": topic}, + ) + return f"Stored in memory (type={type}, id={entry.id[:8]}). This is now searchable across all channels." + except Exception as exc: + logger.error("Failed to write memory: %s", exc) + return f"Failed to store memory: {exc}" + + +def memory_forget(query: str) -> str: + """Remove a stored memory that is outdated, incorrect, or no longer relevant. + + Searches for memories matching the query and deletes the closest match. + Use this when the user says to forget something or when stored information + has changed. + + Args: + query: Description of the memory to forget (e.g. "my phone number", + "the old server address"). + + Returns: + Confirmation of what was forgotten, or a message if nothing matched. + """ + if not query or not query.strip(): + return "Nothing to forget — query is empty." + + try: + results = search_memories(query.strip(), limit=3, min_relevance=0.3) + if not results: + return "No matching memories found to forget." + + # Delete the closest match + best = results[0] + deleted = delete_memory(best.id) + if deleted: + return f'Forgotten: "{best.content[:80]}" (type={best.context_type})' + return "Memory not found (may have already been deleted)." + except Exception as exc: + logger.error("Failed to forget memory: %s", exc) + return f"Failed to forget: {exc}" + + +# ── Artifact tools ─────────────────────────────────────────────────────────── + +NOTES_DIR = Path.home() / ".timmy" / "notes" +DECISION_LOG = Path.home() / ".timmy" / "decisions.md" + + +def jot_note(title: str, body: str) -> str: + """Write a markdown note to Timmy's workspace (~/.timmy/notes/). + + Use this tool to capture ideas, drafts, summaries, or any artifact that + should persist beyond the conversation. Each note is saved as a + timestamped markdown file. + + Args: + title: Short descriptive title (used as filename slug). + body: Markdown content of the note. + + Returns: + Confirmation with the file path of the saved note. + """ + if not title or not title.strip(): + return "Cannot jot — title is empty." + if not body or not body.strip(): + return "Cannot jot — body is empty." + + NOTES_DIR.mkdir(parents=True, exist_ok=True) + + slug = re.sub(r"[^a-z0-9]+", "-", title.strip().lower()).strip("-")[:60] + timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") + filename = f"{timestamp}_{slug}.md" + filepath = NOTES_DIR / filename + + content = f"# {title.strip()}\n\n> Created: {datetime.now(UTC).isoformat()}\n\n{body.strip()}\n" + filepath.write_text(content) + logger.info("jot_note: wrote %s", filepath) + return f"Note saved: {filepath}" + + +def log_decision(decision: str, rationale: str = "") -> str: + """Append an architectural or design decision to the running decision log. + + Use this tool when a significant decision is made during conversation — + technology choices, design trade-offs, scope changes, etc. + + Args: + decision: One-line summary of the decision. + rationale: Why this decision was made (optional but encouraged). + + Returns: + Confirmation that the decision was logged. + """ + if not decision or not decision.strip(): + return "Cannot log — decision is empty." + + DECISION_LOG.parent.mkdir(parents=True, exist_ok=True) + + # Create file with header if it doesn't exist + if not DECISION_LOG.exists(): + DECISION_LOG.write_text( + "# Decision Log\n\nRunning log of architectural and design decisions.\n\n" + ) + + stamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") + entry = f"## {stamp} — {decision.strip()}\n\n" + if rationale and rationale.strip(): + entry += f"{rationale.strip()}\n\n" + entry += "---\n\n" + + with open(DECISION_LOG, "a") as f: + f.write(entry) + + logger.info("log_decision: %s", decision.strip()[:80]) + return f"Decision logged: {decision.strip()}" diff --git a/src/timmy/memory_system.py b/src/timmy/memory_system.py index 3f37a48a..bab814b9 100644 --- a/src/timmy/memory_system.py +++ b/src/timmy/memory_system.py @@ -8,1447 +8,82 @@ Architecture: - Classes: HotMemory, VaultMemory, MemorySystem, SemanticMemory, MemorySearcher """ -import hashlib -import json import logging -import re -import sqlite3 -import uuid -from collections.abc import Generator -from contextlib import closing, contextmanager -from dataclasses import dataclass, field -from datetime import UTC, datetime, timedelta -from pathlib import Path -from config import settings -from timmy.memory.embeddings import ( +# ── Re-exports from submodules ─────────────────────────────────────────────── +# Every public name that used to live here is re-exported so that existing +# ``from timmy.memory_system import X`` statements keep working. +from timmy.memory.consolidation import ( # noqa: F401 + _DEFAULT_HOT_MEMORY_TEMPLATE, + HotMemory, + VaultMemory, +) +from timmy.memory.crud import ( # noqa: F401 + _build_search_filters, + _fetch_memory_candidates, + _row_to_entry, + _score_and_filter, + delete_memory, + get_memory_context, + get_memory_stats, + prune_memories, + recall_last_reflection, + recall_personal_facts, + recall_personal_facts_with_ids, + search_memories, + store_last_reflection, + store_memory, + store_personal_fact, + update_personal_fact, +) +from timmy.memory.db import ( # noqa: F401 + DB_PATH, + HOT_MEMORY_PATH, + PROJECT_ROOT, + SOUL_PATH, + VAULT_PATH, + MemoryChunk, + MemoryEntry, + _drop_legacy_table, + _ensure_schema, + _get_table_columns, + _migrate_chunks, + _migrate_episodes, + _migrate_schema, + get_conn, + get_connection, +) +from timmy.memory.embeddings import ( # noqa: F401 EMBEDDING_DIM, - EMBEDDING_MODEL, # noqa: F401 — re-exported for backward compatibility - _cosine_similarity, # noqa: F401 — re-exported for backward compatibility + EMBEDDING_MODEL, + _cosine_similarity, _get_embedding_model, _keyword_overlap, - _simple_hash_embedding, # noqa: F401 — re-exported for backward compatibility + _simple_hash_embedding, cosine_similarity, embed_text, ) +from timmy.memory.semantic import ( # noqa: F401 + MemorySearcher, + SemanticMemory, + memory_searcher, + semantic_memory, +) +from timmy.memory.tools import ( # noqa: F401 + DECISION_LOG, + NOTES_DIR, + jot_note, + log_decision, + memory_forget, + memory_read, + memory_search, + memory_store, +) logger = logging.getLogger(__name__) -# Paths -PROJECT_ROOT = Path(__file__).parent.parent.parent -HOT_MEMORY_PATH = PROJECT_ROOT / "MEMORY.md" -VAULT_PATH = PROJECT_ROOT / "memory" -SOUL_PATH = VAULT_PATH / "self" / "soul.md" -DB_PATH = PROJECT_ROOT / "data" / "memory.db" - -# ─────────────────────────────────────────────────────────────────────────────── -# Database Connection -# ─────────────────────────────────────────────────────────────────────────────── - - -_DEFAULT_HOT_MEMORY_TEMPLATE = """\ -# Timmy Hot Memory - -> Working RAM — always loaded, ~300 lines max, pruned monthly -> Last updated: {date} - ---- - -## Current Status - -**Agent State:** Operational -**Mode:** Development -**Active Tasks:** 0 -**Pending Decisions:** None - ---- - -## Standing Rules - -1. **Sovereignty First** — No cloud dependencies -2. **Local-Only Inference** — Ollama on localhost -3. **Privacy by Design** — Telemetry disabled -4. **Tool Minimalism** — Use tools only when necessary -5. **Memory Discipline** — Write handoffs at session end - ---- - -## Agent Roster - -| Agent | Role | Status | -|-------|------|--------| -| Timmy | Core | Active | - ---- - -## User Profile - -**Name:** (not set) -**Interests:** (to be learned) - ---- - -## Key Decisions - -(none yet) - ---- - -## Pending Actions - -- [ ] Learn user's name - ---- - -*Prune date: {prune_date}* -""" - - -@contextmanager -def get_connection() -> Generator[sqlite3.Connection, None, None]: - """Get database connection to unified memory database.""" - DB_PATH.parent.mkdir(parents=True, exist_ok=True) - with closing(sqlite3.connect(str(DB_PATH))) as conn: - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") - _ensure_schema(conn) - yield conn - - -def _ensure_schema(conn: sqlite3.Connection) -> None: - """Create the unified memories table and indexes if they don't exist.""" - conn.execute(""" - CREATE TABLE IF NOT EXISTS memories ( - id TEXT PRIMARY KEY, - content TEXT NOT NULL, - memory_type TEXT NOT NULL DEFAULT 'fact', - source TEXT NOT NULL DEFAULT 'agent', - embedding TEXT, - metadata TEXT, - source_hash TEXT, - agent_id TEXT, - task_id TEXT, - session_id TEXT, - confidence REAL NOT NULL DEFAULT 0.8, - tags TEXT NOT NULL DEFAULT '[]', - created_at TEXT NOT NULL, - last_accessed TEXT, - access_count INTEGER NOT NULL DEFAULT 0 - ) - """) - - # Create indexes for efficient querying - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_session ON memories(session_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") - conn.commit() - - # Run migration if needed - _migrate_schema(conn) - - -def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]: - """Get the column names for a table.""" - cursor = conn.execute(f"PRAGMA table_info({table_name})") - return {row[1] for row in cursor.fetchall()} - - -def _migrate_episodes(conn: sqlite3.Connection) -> None: - """Migrate episodes table rows into the unified memories table.""" - logger.info("Migration: Converting episodes table to memories") - try: - cols = _get_table_columns(conn, "episodes") - context_type_col = "context_type" if "context_type" in cols else "'conversation'" - - conn.execute(f""" - INSERT INTO memories ( - id, content, memory_type, source, embedding, - metadata, agent_id, task_id, session_id, - created_at, access_count, last_accessed - ) - SELECT - id, content, - COALESCE({context_type_col}, 'conversation'), - COALESCE(source, 'agent'), - embedding, - metadata, agent_id, task_id, session_id, - COALESCE(timestamp, datetime('now')), 0, NULL - FROM episodes - """) - conn.execute("DROP TABLE episodes") - logger.info("Migration: Migrated episodes to memories") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to migrate episodes: %s", exc) - - -def _migrate_chunks(conn: sqlite3.Connection) -> None: - """Migrate chunks table rows into the unified memories table.""" - logger.info("Migration: Converting chunks table to memories") - try: - cols = _get_table_columns(conn, "chunks") - - id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)" - content_col = "content" if "content" in cols else "text" - source_col = ( - "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'") - ) - embedding_col = "embedding" if "embedding" in cols else "NULL" - created_col = "created_at" if "created_at" in cols else "datetime('now')" - - conn.execute(f""" - INSERT INTO memories ( - id, content, memory_type, source, embedding, - created_at, access_count - ) - SELECT - {id_col}, {content_col}, 'vault_chunk', {source_col}, - {embedding_col}, {created_col}, 0 - FROM chunks - """) - conn.execute("DROP TABLE chunks") - logger.info("Migration: Migrated chunks to memories") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to migrate chunks: %s", exc) - - -def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None: - """Drop a legacy table if it exists.""" - try: - conn.execute(f"DROP TABLE {table}") # noqa: S608 - logger.info("Migration: Dropped old %s table", table) - except sqlite3.Error as exc: - logger.warning("Migration: Failed to drop %s: %s", table, exc) - - -def _migrate_schema(conn: sqlite3.Connection) -> None: - """Migrate from old three-table schema to unified memories table. - - Migration paths: - - episodes table -> memories (context_type -> memory_type) - - chunks table -> memories with memory_type='vault_chunk' - - facts table -> dropped (unused, 0 rows expected) - """ - cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") - tables = {row[0] for row in cursor.fetchall()} - - has_memories = "memories" in tables - - if not has_memories and (tables & {"episodes", "chunks", "facts"}): - logger.info("Migration: Creating unified memories table") - - if "episodes" in tables and has_memories: - _migrate_episodes(conn) - if "chunks" in tables and has_memories: - _migrate_chunks(conn) - if "facts" in tables: - _drop_legacy_table(conn, "facts") - - conn.commit() - - -# Alias for backward compatibility -get_conn = get_connection - - -# ─────────────────────────────────────────────────────────────────────────────── -# Data Classes -# ─────────────────────────────────────────────────────────────────────────────── - - -@dataclass -class MemoryEntry: - """A memory entry with vector embedding. - - Note: The DB column is `memory_type` but this field is named `context_type` - for backward API compatibility. - """ - - id: str = field(default_factory=lambda: str(uuid.uuid4())) - content: str = "" # The actual text content - source: str = "" # Where it came from (agent, user, system) - context_type: str = "conversation" # API field name; DB column is memory_type - agent_id: str | None = None - task_id: str | None = None - session_id: str | None = None - metadata: dict | None = None - embedding: list[float] | None = None - timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) - relevance_score: float | None = None # Set during search - - -@dataclass -class MemoryChunk: - """A searchable chunk of memory.""" - - id: str - source: str # filepath - content: str - embedding: list[float] - created_at: str - - -# ─────────────────────────────────────────────────────────────────────────────── -# CRUD Operations -# ─────────────────────────────────────────────────────────────────────────────── - - -def store_memory( - content: str, - source: str, - context_type: str = "conversation", - agent_id: str | None = None, - task_id: str | None = None, - session_id: str | None = None, - metadata: dict | None = None, - compute_embedding: bool = True, -) -> MemoryEntry: - """Store a memory entry with optional embedding. - - Args: - content: The text content to store - source: Source of the memory (agent name, user, system) - context_type: Type of context (conversation, document, fact, vault_chunk) - agent_id: Associated agent ID - task_id: Associated task ID - session_id: Session identifier - metadata: Additional structured data - compute_embedding: Whether to compute vector embedding - - Returns: - The stored MemoryEntry - """ - embedding = None - if compute_embedding: - embedding = embed_text(content) - - entry = MemoryEntry( - content=content, - source=source, - context_type=context_type, - agent_id=agent_id, - task_id=task_id, - session_id=session_id, - metadata=metadata, - embedding=embedding, - ) - - with get_connection() as conn: - conn.execute( - """ - INSERT INTO memories - (id, content, memory_type, source, agent_id, task_id, session_id, - metadata, embedding, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - entry.id, - entry.content, - entry.context_type, # DB column is memory_type - entry.source, - entry.agent_id, - entry.task_id, - entry.session_id, - json.dumps(metadata) if metadata else None, - json.dumps(embedding) if embedding else None, - entry.timestamp, - ), - ) - conn.commit() - - return entry - - -def _build_search_filters( - context_type: str | None, - agent_id: str | None, - session_id: str | None, -) -> tuple[str, list]: - """Build SQL WHERE clause and params from search filters.""" - conditions: list[str] = [] - params: list = [] - - if context_type: - conditions.append("memory_type = ?") - params.append(context_type) - if agent_id: - conditions.append("agent_id = ?") - params.append(agent_id) - if session_id: - conditions.append("session_id = ?") - params.append(session_id) - - where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" - return where_clause, params - - -def _fetch_memory_candidates( - where_clause: str, params: list, candidate_limit: int -) -> list[sqlite3.Row]: - """Fetch candidate memory rows from the database.""" - query_sql = f""" - SELECT * FROM memories - {where_clause} - ORDER BY created_at DESC - LIMIT ? - """ - params.append(candidate_limit) - - with get_connection() as conn: - return conn.execute(query_sql, params).fetchall() - - -def _row_to_entry(row: sqlite3.Row) -> MemoryEntry: - """Convert a database row to a MemoryEntry.""" - return MemoryEntry( - id=row["id"], - content=row["content"], - source=row["source"], - context_type=row["memory_type"], # DB column -> API field - agent_id=row["agent_id"], - task_id=row["task_id"], - session_id=row["session_id"], - metadata=json.loads(row["metadata"]) if row["metadata"] else None, - embedding=json.loads(row["embedding"]) if row["embedding"] else None, - timestamp=row["created_at"], - ) - - -def _score_and_filter( - rows: list[sqlite3.Row], - query: str, - query_embedding: list[float], - min_relevance: float, -) -> list[MemoryEntry]: - """Score candidate rows by similarity and filter by min_relevance.""" - results = [] - for row in rows: - entry = _row_to_entry(row) - - if entry.embedding: - score = cosine_similarity(query_embedding, entry.embedding) - else: - score = _keyword_overlap(query, entry.content) - - entry.relevance_score = score - if score >= min_relevance: - results.append(entry) - - results.sort(key=lambda x: x.relevance_score or 0, reverse=True) - return results - - -def search_memories( - query: str, - limit: int = 10, - context_type: str | None = None, - agent_id: str | None = None, - session_id: str | None = None, - min_relevance: float = 0.0, -) -> list[MemoryEntry]: - """Search for memories by semantic similarity. - - Args: - query: Search query text - limit: Maximum results - context_type: Filter by memory type (maps to DB memory_type column) - agent_id: Filter by agent - session_id: Filter by session - min_relevance: Minimum similarity score (0-1) - - Returns: - List of MemoryEntry objects sorted by relevance - """ - query_embedding = embed_text(query) - where_clause, params = _build_search_filters(context_type, agent_id, session_id) - rows = _fetch_memory_candidates(where_clause, params, limit * 3) - results = _score_and_filter(rows, query, query_embedding, min_relevance) - return results[:limit] - - -def delete_memory(memory_id: str) -> bool: - """Delete a memory entry by ID. - - Returns: - True if deleted, False if not found - """ - with get_connection() as conn: - cursor = conn.execute( - "DELETE FROM memories WHERE id = ?", - (memory_id,), - ) - conn.commit() - return cursor.rowcount > 0 - - -def get_memory_stats() -> dict: - """Get statistics about the memory store. - - Returns: - Dict with counts by type, total entries, etc. - """ - with get_connection() as conn: - total = conn.execute("SELECT COUNT(*) as count FROM memories").fetchone()["count"] - - by_type = {} - rows = conn.execute( - "SELECT memory_type, COUNT(*) as count FROM memories GROUP BY memory_type" - ).fetchall() - for row in rows: - by_type[row["memory_type"]] = row["count"] - - with_embeddings = conn.execute( - "SELECT COUNT(*) as count FROM memories WHERE embedding IS NOT NULL" - ).fetchone()["count"] - - return { - "total_entries": total, - "by_type": by_type, - "with_embeddings": with_embeddings, - "has_embedding_model": _get_embedding_model() is not False, - } - - -def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int: - """Delete old memories to manage storage. - - Args: - older_than_days: Delete memories older than this - keep_facts: Whether to preserve fact-type memories - - Returns: - Number of entries deleted - """ - cutoff = (datetime.now(UTC) - timedelta(days=older_than_days)).isoformat() - - with get_connection() as conn: - if keep_facts: - cursor = conn.execute( - """ - DELETE FROM memories - WHERE created_at < ? AND memory_type != 'fact' - """, - (cutoff,), - ) - else: - cursor = conn.execute( - "DELETE FROM memories WHERE created_at < ?", - (cutoff,), - ) - - deleted = cursor.rowcount - conn.commit() - - return deleted - - -def get_memory_context(query: str, max_tokens: int = 2000, **filters) -> str: - """Get relevant memory context as formatted text for LLM prompts. - - Args: - query: Search query - max_tokens: Approximate maximum tokens to return - **filters: Additional filters (agent_id, session_id, etc.) - - Returns: - Formatted context string for inclusion in prompts - """ - memories = search_memories(query, limit=20, **filters) - - context_parts = [] - total_chars = 0 - max_chars = max_tokens * 4 # Rough approximation - - for mem in memories: - formatted = f"[{mem.source}]: {mem.content}" - if total_chars + len(formatted) > max_chars: - break - context_parts.append(formatted) - total_chars += len(formatted) - - if not context_parts: - return "" - - return "Relevant context from memory:\n" + "\n\n".join(context_parts) - - -# ─────────────────────────────────────────────────────────────────────────────── -# Personal Facts Functions -# ─────────────────────────────────────────────────────────────────────────────── - - -def recall_personal_facts(agent_id: str | None = None) -> list[str]: - """Recall personal facts about the user or system. - - Args: - agent_id: Optional agent filter - - Returns: - List of fact strings - """ - with get_connection() as conn: - if agent_id: - rows = conn.execute( - """ - SELECT content FROM memories - WHERE memory_type = 'fact' AND agent_id = ? - ORDER BY created_at DESC - LIMIT 100 - """, - (agent_id,), - ).fetchall() - else: - rows = conn.execute( - """ - SELECT content FROM memories - WHERE memory_type = 'fact' - ORDER BY created_at DESC - LIMIT 100 - """, - ).fetchall() - - return [r["content"] for r in rows] - - -def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]: - """Recall personal facts with their IDs for edit/delete operations.""" - with get_connection() as conn: - if agent_id: - rows = conn.execute( - "SELECT id, content FROM memories WHERE memory_type = 'fact' AND agent_id = ? ORDER BY created_at DESC LIMIT 100", - (agent_id,), - ).fetchall() - else: - rows = conn.execute( - "SELECT id, content FROM memories WHERE memory_type = 'fact' ORDER BY created_at DESC LIMIT 100", - ).fetchall() - return [{"id": r["id"], "content": r["content"]} for r in rows] - - -def update_personal_fact(memory_id: str, new_content: str) -> bool: - """Update a personal fact's content.""" - with get_connection() as conn: - cursor = conn.execute( - "UPDATE memories SET content = ? WHERE id = ? AND memory_type = 'fact'", - (new_content, memory_id), - ) - conn.commit() - return cursor.rowcount > 0 - - -def store_personal_fact(fact: str, agent_id: str | None = None) -> MemoryEntry: - """Store a personal fact about the user or system. - - Args: - fact: The fact to store - agent_id: Associated agent - - Returns: - The stored MemoryEntry - """ - return store_memory( - content=fact, - source="system", - context_type="fact", - agent_id=agent_id, - metadata={"auto_extracted": False}, - ) - - -def store_last_reflection(reflection: str) -> None: - """Store the last reflection, replacing any previous one. - - Uses a single row with memory_type='reflection' to avoid accumulation. - """ - if not reflection or not reflection.strip(): - return - with get_connection() as conn: - # Delete previous reflections — only the latest matters - conn.execute("DELETE FROM memories WHERE memory_type = 'reflection'") - conn.execute( - """ - INSERT INTO memories - (id, content, memory_type, source, created_at) - VALUES (?, ?, 'reflection', 'system', ?) - """, - (str(uuid.uuid4()), reflection.strip(), datetime.now(UTC).isoformat()), - ) - conn.commit() - logger.debug("Stored last reflection in DB") - - -def recall_last_reflection() -> str | None: - """Recall the most recent reflection, or None if absent.""" - with get_connection() as conn: - row = conn.execute( - "SELECT content FROM memories WHERE memory_type = 'reflection' " - "ORDER BY created_at DESC LIMIT 1" - ).fetchone() - return row["content"] if row else None - - -# ─────────────────────────────────────────────────────────────────────────────── -# Hot Memory (computed from DB instead of MEMORY.md) -# ─────────────────────────────────────────────────────────────────────────────── - - -class HotMemory: - """Tier 1: Hot memory — computed view of top facts from DB.""" - - def __init__(self) -> None: - self.path = HOT_MEMORY_PATH - self._content: str | None = None - self._last_modified: float | None = None - - def read(self, force_refresh: bool = False) -> str: - """Read hot memory — computed view of top facts + last reflection from DB.""" - try: - facts = recall_personal_facts() - lines = ["# Timmy Hot Memory\n"] - - if facts: - lines.append("## Known Facts\n") - for f in facts[:15]: - lines.append(f"- {f}") - - # Include the last reflection if available - reflection = recall_last_reflection() - if reflection: - lines.append("\n## Last Reflection\n") - lines.append(reflection) - - if len(lines) > 1: - return "\n".join(lines) - except Exception: - logger.debug("DB context read failed, falling back to file") - - # Fallback to file if DB unavailable - if self.path.exists(): - return self.path.read_text() - - return "# Timmy Hot Memory\n\nNo memories stored yet.\n" - - def update_section(self, section: str, content: str) -> None: - """Update a specific section in MEMORY.md. - - DEPRECATED: Hot memory is now computed from the database. - This method is kept for backward compatibility during transition. - Use memory_write() to store facts in the database. - """ - logger.warning( - "HotMemory.update_section() is deprecated. " - "Use memory_write() to store facts in the database." - ) - - # Keep file-writing for backward compatibility during transition - # Guard against empty or excessively large writes - if not content or not content.strip(): - logger.warning("HotMemory: Refusing empty write to section '%s'", section) - return - if len(content) > 2000: - logger.warning("HotMemory: Truncating oversized write to section '%s'", section) - content = content[:2000] + "\n... [truncated]" - - if not self.path.exists(): - self._create_default() - - full_content = self.read() - - # Find section - pattern = rf"(## {re.escape(section)}.*?)(?=\n## |\Z)" - match = re.search(pattern, full_content, re.DOTALL) - - if match: - # Replace section - new_section = f"## {section}\n\n{content}\n\n" - full_content = full_content[: match.start()] + new_section + full_content[match.end() :] - else: - # Append section — guard against missing prune marker - insert_point = full_content.rfind("*Prune date:") - new_section = f"## {section}\n\n{content}\n\n" - if insert_point < 0: - # No prune marker — just append at end - full_content = full_content.rstrip() + "\n\n" + new_section - else: - full_content = ( - full_content[:insert_point] + new_section + "\n" + full_content[insert_point:] - ) - - self.path.write_text(full_content) - self._content = full_content - self._last_modified = self.path.stat().st_mtime - logger.info("HotMemory: Updated section '%s'", section) - - def _create_default(self) -> None: - """Create default MEMORY.md if missing. - - DEPRECATED: Hot memory is now computed from the database. - This method is kept for backward compatibility during transition. - """ - logger.debug( - "HotMemory._create_default() - creating default MEMORY.md for backward compatibility" - ) - now = datetime.now(UTC) - content = _DEFAULT_HOT_MEMORY_TEMPLATE.format( - date=now.strftime("%Y-%m-%d"), - prune_date=now.replace(day=25).strftime("%Y-%m-%d"), - ) - self.path.write_text(content) - logger.info("HotMemory: Created default MEMORY.md") - - -# ─────────────────────────────────────────────────────────────────────────────── -# Vault Memory (memory/ directory) -# ─────────────────────────────────────────────────────────────────────────────── - - -class VaultMemory: - """Tier 2: Structured vault (memory/) — append-only markdown.""" - - def __init__(self) -> None: - self.path = VAULT_PATH - self._ensure_structure() - - def _ensure_structure(self) -> None: - """Ensure vault directory structure exists.""" - (self.path / "self").mkdir(parents=True, exist_ok=True) - (self.path / "notes").mkdir(parents=True, exist_ok=True) - (self.path / "aar").mkdir(parents=True, exist_ok=True) - - def write_note(self, name: str, content: str, namespace: str = "notes") -> Path: - """Write a note to the vault.""" - # Add timestamp to filename - timestamp = datetime.now(UTC).strftime("%Y%m%d") - filename = f"{timestamp}_{name}.md" - filepath = self.path / namespace / filename - - # Add header - full_content = f"""# {name.replace("_", " ").title()} - -> Created: {datetime.now(UTC).isoformat()} -> Namespace: {namespace} - ---- - -{content} - ---- - -*Auto-generated by Timmy Memory System* -""" - - filepath.write_text(full_content) - logger.info("VaultMemory: Wrote %s", filepath) - return filepath - - def read_file(self, filepath: Path) -> str: - """Read a file from the vault.""" - if not filepath.exists(): - return "" - return filepath.read_text() - - def update_user_profile(self, key: str, value: str) -> None: - """Update a field in user_profile.md. - - DEPRECATED: User profile updates should now use memory_write() to store - facts in the database. This method is kept for backward compatibility. - """ - logger.warning( - "VaultMemory.update_user_profile() is deprecated. " - "Use memory_write() to store user facts in the database." - ) - # Still update the file for backward compatibility during transition - profile_path = self.path / "self" / "user_profile.md" - - if not profile_path.exists(): - self._create_default_profile() - - content = profile_path.read_text() - - pattern = rf"(\*\*{re.escape(key)}:\*\*).*" - if re.search(pattern, content): - safe_value = value.strip() - content = re.sub(pattern, lambda m: f"{m.group(1)} {safe_value}", content) - else: - facts_section = "## Important Facts" - if facts_section in content: - insert_point = content.find(facts_section) + len(facts_section) - content = content[:insert_point] + f"\n- {key}: {value}" + content[insert_point:] - - content = re.sub( - r"\*Last updated:.*\*", - f"*Last updated: {datetime.now(UTC).strftime('%Y-%m-%d')}*", - content, - ) - - profile_path.write_text(content) - logger.info("VaultMemory: Updated user profile: %s = %s", key, value) - - def _create_default_profile(self) -> None: - """Create default user profile.""" - profile_path = self.path / "self" / "user_profile.md" - default = """# User Profile - -> Learned information about the user. - -## Basic Information - -**Name:** (unknown) -**Location:** (unknown) -**Occupation:** (unknown) - -## Interests & Expertise - -- (to be learned) - -## Preferences - -- Response style: concise, technical -- Tool usage: minimal - -## Important Facts - -- (to be extracted) - ---- - -*Last updated: {date}* -""".format(date=datetime.now(UTC).strftime("%Y-%m-%d")) - - profile_path.write_text(default) - - -# ─────────────────────────────────────────────────────────────────────────────── -# Semantic Memory (Vector search) -# ─────────────────────────────────────────────────────────────────────────────── - - -class SemanticMemory: - """Vector-based semantic search over vault content.""" - - def __init__(self) -> None: - self.db_path = DB_PATH - self.vault_path = VAULT_PATH - - @contextmanager - def _get_conn(self) -> Generator[sqlite3.Connection, None, None]: - """Get connection to the instance's db_path (backward compatibility). - - Uses self.db_path if set differently from global DB_PATH, - otherwise uses the global get_connection(). - """ - if self.db_path == DB_PATH: - # Use global connection (normal production path) - with get_connection() as conn: - yield conn - else: - # Use instance-specific db_path (test path) - self.db_path.parent.mkdir(parents=True, exist_ok=True) - with closing(sqlite3.connect(str(self.db_path))) as conn: - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") - # Ensure schema exists - conn.execute(""" - CREATE TABLE IF NOT EXISTS memories ( - id TEXT PRIMARY KEY, - content TEXT NOT NULL, - memory_type TEXT NOT NULL DEFAULT 'fact', - source TEXT NOT NULL DEFAULT 'agent', - embedding TEXT, - metadata TEXT, - source_hash TEXT, - agent_id TEXT, - task_id TEXT, - session_id TEXT, - confidence REAL NOT NULL DEFAULT 0.8, - tags TEXT NOT NULL DEFAULT '[]', - created_at TEXT NOT NULL, - last_accessed TEXT, - access_count INTEGER NOT NULL DEFAULT 0 - ) - """) - conn.execute( - "CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)" - ) - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") - conn.commit() - yield conn - - def _init_db(self) -> None: - """Initialize database at self.db_path (backward compatibility). - - This method is kept for backward compatibility with existing code and tests. - Schema creation is handled by _get_conn. - """ - # Trigger schema creation via _get_conn - with self._get_conn(): - pass - - def index_file(self, filepath: Path) -> int: - """Index a single file into semantic memory.""" - if not filepath.exists(): - return 0 - - content = filepath.read_text() - file_hash = hashlib.md5(content.encode()).hexdigest() - - with self._get_conn() as conn: - # Check if already indexed with same hash - cursor = conn.execute( - "SELECT metadata FROM memories WHERE source = ? AND memory_type = 'vault_chunk' LIMIT 1", - (str(filepath),), - ) - existing = cursor.fetchone() - if existing and existing[0]: - try: - meta = json.loads(existing[0]) - if meta.get("source_hash") == file_hash: - return 0 # Already indexed - except json.JSONDecodeError: - pass - - # Delete old chunks for this file - conn.execute( - "DELETE FROM memories WHERE source = ? AND memory_type = 'vault_chunk'", - (str(filepath),), - ) - - # Split into chunks (paragraphs) - chunks = self._split_into_chunks(content) - - # Index each chunk - now = datetime.now(UTC).isoformat() - for i, chunk_text in enumerate(chunks): - if len(chunk_text.strip()) < 20: # Skip tiny chunks - continue - - chunk_id = f"{filepath.stem}_{i}" - chunk_embedding = embed_text(chunk_text) - - conn.execute( - """INSERT INTO memories - (id, content, memory_type, source, metadata, embedding, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?)""", - ( - chunk_id, - chunk_text, - "vault_chunk", - str(filepath), - json.dumps({"source_hash": file_hash, "chunk_index": i}), - json.dumps(chunk_embedding), - now, - ), - ) - - conn.commit() - - logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks)) - return len(chunks) - - def _split_into_chunks(self, text: str, max_chunk_size: int = 500) -> list[str]: - """Split text into semantic chunks.""" - # Split by paragraphs first - paragraphs = text.split("\n\n") - chunks = [] - - for para in paragraphs: - para = para.strip() - if not para: - continue - - # If paragraph is small enough, keep as one chunk - if len(para) <= max_chunk_size: - chunks.append(para) - else: - # Split long paragraphs by sentences - sentences = para.replace(". ", ".\n").split("\n") - current_chunk = "" - - for sent in sentences: - if len(current_chunk) + len(sent) < max_chunk_size: - current_chunk += " " + sent if current_chunk else sent - else: - if current_chunk: - chunks.append(current_chunk.strip()) - current_chunk = sent - - if current_chunk: - chunks.append(current_chunk.strip()) - - return chunks - - def index_vault(self) -> int: - """Index entire vault directory.""" - total_chunks = 0 - - for md_file in self.vault_path.rglob("*.md"): - # Skip handoff file (handled separately) - if "last-session-handoff" in md_file.name: - continue - total_chunks += self.index_file(md_file) - - logger.info("SemanticMemory: Indexed vault (%d total chunks)", total_chunks) - return total_chunks - - def search(self, query: str, top_k: int = 5) -> list[tuple[str, float]]: - """Search for relevant memory chunks.""" - query_embedding = embed_text(query) - - with self._get_conn() as conn: - conn.row_factory = sqlite3.Row - - # Get all vault chunks - rows = conn.execute( - "SELECT source, content, embedding FROM memories WHERE memory_type = 'vault_chunk'" - ).fetchall() - - # Calculate similarities - scored = [] - for row in rows: - embedding = json.loads(row["embedding"]) - score = cosine_similarity(query_embedding, embedding) - scored.append((row["source"], row["content"], score)) - - # Sort by score descending - scored.sort(key=lambda x: x[2], reverse=True) - - # Return top_k - return [(content, score) for _, content, score in scored[:top_k]] - - def get_relevant_context(self, query: str, max_chars: int = 2000) -> str: - """Get formatted context string for a query.""" - results = self.search(query, top_k=3) - - if not results: - return "" - - parts = [] - total_chars = 0 - - for content, score in results: - if score < 0.3: # Similarity threshold - continue - - chunk = f"[Relevant memory - score {score:.2f}]: {content[:400]}..." - if total_chars + len(chunk) > max_chars: - break - - parts.append(chunk) - total_chars += len(chunk) - - return "\n\n".join(parts) if parts else "" - - def stats(self) -> dict: - """Get indexing statistics.""" - with self._get_conn() as conn: - cursor = conn.execute( - "SELECT COUNT(*), COUNT(DISTINCT source) FROM memories WHERE memory_type = 'vault_chunk'" - ) - total_chunks, total_files = cursor.fetchone() - - return { - "total_chunks": total_chunks, - "total_files": total_files, - "embedding_dim": EMBEDDING_DIM if _get_embedding_model() else 128, - } - - -class MemorySearcher: - """High-level interface for memory search.""" - - def __init__(self) -> None: - self.semantic = SemanticMemory() - - def search(self, query: str, tiers: list[str] = None) -> dict: - """Search across memory tiers. - - Args: - query: Search query - tiers: List of tiers to search ["hot", "vault", "semantic"] - - Returns: - Dict with results from each tier - """ - tiers = tiers or ["semantic"] # Default to semantic only - results = {} - - if "semantic" in tiers: - semantic_results = self.semantic.search(query, top_k=5) - results["semantic"] = [ - {"content": content, "score": score} for content, score in semantic_results - ] - - return results - - def get_context_for_query(self, query: str) -> str: - """Get comprehensive context for a user query.""" - # Get semantic context - semantic_context = self.semantic.get_relevant_context(query) - - if semantic_context: - return f"## Relevant Past Context\n\n{semantic_context}" - - return "" - - -# Module-level singletons -semantic_memory = SemanticMemory() -memory_searcher = MemorySearcher() - - -# ─────────────────────────────────────────────────────────────────────────────── -# Tool Functions -# ─────────────────────────────────────────────────────────────────────────────── - - -def memory_search(query: str, limit: int = 10) -> str: - """Search past conversations, notes, and stored facts for relevant context. - - Searches across both the vault (indexed markdown files) and the - runtime memory store (facts and conversation fragments stored via - memory_write). - - Args: - query: What to search for (e.g. "Bitcoin strategy", "server setup"). - limit: Number of results to return (default 10). - - Returns: - Formatted string of relevant memory results. - """ - # Guard: model sometimes passes None for limit - if limit is None: - limit = 10 - - parts: list[str] = [] - - # 1. Search semantic vault (indexed markdown files) - vault_results = semantic_memory.search(query, limit) - for content, score in vault_results: - if score < 0.2: - continue - parts.append(f"[vault score {score:.2f}] {content[:300]}") - - # 2. Search runtime vector store (stored facts/conversations) - try: - runtime_results = search_memories(query, limit=limit, min_relevance=0.2) - for entry in runtime_results: - label = entry.context_type or "memory" - parts.append(f"[{label}] {entry.content[:300]}") - except Exception as exc: - logger.debug("Vector store search unavailable: %s", exc) - - if not parts: - return "No relevant memories found." - return "\n\n".join(parts) - - -def memory_read(query: str = "", top_k: int = 5) -> str: - """Read from persistent memory — search facts, notes, and past conversations. - - This is the primary tool for recalling stored information. If no query - is given, returns the most recent personal facts. With a query, it - searches semantically across all stored memories. - - Args: - query: Optional search term. Leave empty to list recent facts. - top_k: Maximum results to return (default 5). - - Returns: - Formatted string of memory contents. - """ - if top_k is None: - top_k = 5 - - parts: list[str] = [] - - # Always include personal facts first - try: - facts = search_memories(query or "", limit=top_k, min_relevance=0.0) - fact_entries = [e for e in facts if (e.context_type or "") == "fact"] - if fact_entries: - parts.append("## Personal Facts") - for entry in fact_entries[:top_k]: - parts.append(f"- {entry.content[:300]}") - except Exception as exc: - logger.debug("Vector store unavailable for memory_read: %s", exc) - - # If a query was provided, also do semantic search - if query: - search_result = memory_search(query, top_k) - if search_result and search_result != "No relevant memories found.": - parts.append("\n## Search Results") - parts.append(search_result) - - if not parts: - return "No memories stored yet. Use memory_write to store information." - return "\n".join(parts) - - -def memory_store(topic: str, report: str, type: str = "research") -> str: - """Store a piece of information in persistent memory, particularly for research outputs. - - Use this tool to store structured research findings or other important documents. - Stored memories are searchable via memory_search across all channels. - - Args: - topic: A concise title or topic for the research output. - report: The detailed content of the research output or document. - type: Type of memory — "research" for research outputs (default), - "fact" for permanent facts, "conversation" for conversation context, - "document" for other document fragments. - - Returns: - Confirmation that the memory was stored. - """ - if not report or not report.strip(): - return "Nothing to store — report is empty." - - # Combine topic and report for embedding and storage content - full_content = f"Topic: {topic.strip()}\n\nReport: {report.strip()}" - - valid_types = ("fact", "conversation", "document", "research") - if type not in valid_types: - type = "research" - - try: - # Dedup check for facts and research — skip if similar exists - if type in ("fact", "research"): - existing = search_memories(full_content, limit=3, context_type=type, min_relevance=0.75) - if existing: - return ( - f"Similar {type} already stored (id={existing[0].id[:8]}). Skipping duplicate." - ) - - entry = store_memory( - content=full_content, - source="agent", - context_type=type, - metadata={"topic": topic}, - ) - return f"Stored in memory (type={type}, id={entry.id[:8]}). This is now searchable across all channels." - except Exception as exc: - logger.error("Failed to write memory: %s", exc) - return f"Failed to store memory: {exc}" - - -def memory_forget(query: str) -> str: - """Remove a stored memory that is outdated, incorrect, or no longer relevant. - - Searches for memories matching the query and deletes the closest match. - Use this when the user says to forget something or when stored information - has changed. - - Args: - query: Description of the memory to forget (e.g. "my phone number", - "the old server address"). - - Returns: - Confirmation of what was forgotten, or a message if nothing matched. - """ - if not query or not query.strip(): - return "Nothing to forget — query is empty." - - try: - results = search_memories(query.strip(), limit=3, min_relevance=0.3) - if not results: - return "No matching memories found to forget." - - # Delete the closest match - best = results[0] - deleted = delete_memory(best.id) - if deleted: - return f'Forgotten: "{best.content[:80]}" (type={best.context_type})' - return "Memory not found (may have already been deleted)." - except Exception as exc: - logger.error("Failed to forget memory: %s", exc) - return f"Failed to forget: {exc}" - - -# ─────────────────────────────────────────────────────────────────────────────── -# Artifact Tools — "hands" for producing artifacts during conversation -# ─────────────────────────────────────────────────────────────────────────────── - -NOTES_DIR = Path.home() / ".timmy" / "notes" -DECISION_LOG = Path.home() / ".timmy" / "decisions.md" - - -def jot_note(title: str, body: str) -> str: - """Write a markdown note to Timmy's workspace (~/.timmy/notes/). - - Use this tool to capture ideas, drafts, summaries, or any artifact that - should persist beyond the conversation. Each note is saved as a - timestamped markdown file. - - Args: - title: Short descriptive title (used as filename slug). - body: Markdown content of the note. - - Returns: - Confirmation with the file path of the saved note. - """ - if not title or not title.strip(): - return "Cannot jot — title is empty." - if not body or not body.strip(): - return "Cannot jot — body is empty." - - NOTES_DIR.mkdir(parents=True, exist_ok=True) - - slug = re.sub(r"[^a-z0-9]+", "-", title.strip().lower()).strip("-")[:60] - timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - filename = f"{timestamp}_{slug}.md" - filepath = NOTES_DIR / filename - - content = f"# {title.strip()}\n\n> Created: {datetime.now(UTC).isoformat()}\n\n{body.strip()}\n" - filepath.write_text(content) - logger.info("jot_note: wrote %s", filepath) - return f"Note saved: {filepath}" - - -def log_decision(decision: str, rationale: str = "") -> str: - """Append an architectural or design decision to the running decision log. - - Use this tool when a significant decision is made during conversation — - technology choices, design trade-offs, scope changes, etc. - - Args: - decision: One-line summary of the decision. - rationale: Why this decision was made (optional but encouraged). - - Returns: - Confirmation that the decision was logged. - """ - if not decision or not decision.strip(): - return "Cannot log — decision is empty." - - DECISION_LOG.parent.mkdir(parents=True, exist_ok=True) - - # Create file with header if it doesn't exist - if not DECISION_LOG.exists(): - DECISION_LOG.write_text( - "# Decision Log\n\nRunning log of architectural and design decisions.\n\n" - ) - - stamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") - entry = f"## {stamp} — {decision.strip()}\n\n" - if rationale and rationale.strip(): - entry += f"{rationale.strip()}\n\n" - entry += "---\n\n" - - with open(DECISION_LOG, "a") as f: - f.write(entry) - - logger.info("log_decision: %s", decision.strip()[:80]) - return f"Decision logged: {decision.strip()}" - - -# ─────────────────────────────────────────────────────────────────────────────── -# Memory System (Central Coordinator) -# ─────────────────────────────────────────────────────────────────────────────── +# ── MemorySystem (central coordinator) ─────────────────────────────────────── class MemorySystem: @@ -1518,7 +153,7 @@ class MemorySystem: return "\n\n---\n\n".join(context_parts) -# ── Lazy singleton ──────────────────────────────────────────────────────────── +# ── Lazy singleton ─────────────────────────────────────────────────────────── _memory_system: MemorySystem | None = None diff --git a/src/timmy/thinking/__init__.py b/src/timmy/thinking/__init__.py index 1f2ad804..66d99906 100644 --- a/src/timmy/thinking/__init__.py +++ b/src/timmy/thinking/__init__.py @@ -22,21 +22,20 @@ import sqlite3 from datetime import datetime from pathlib import Path -from timmy.thinking._db import Thought, _get_conn -from timmy.thinking.engine import ThinkingEngine -from timmy.thinking.seeds import ( - SEED_TYPES, - _SENSITIVE_PATTERNS, - _META_OBSERVATION_PHRASES, - _THINK_TAG_RE, - _THINKING_PROMPT, -) - # Re-export HOT_MEMORY_PATH and SOUL_PATH so existing patch targets continue to work. # Tests that patch "timmy.thinking.HOT_MEMORY_PATH" or "timmy.thinking.SOUL_PATH" # should instead patch "timmy.thinking._snapshot.HOT_MEMORY_PATH" etc., but these # re-exports are kept for any code that reads them from the top-level namespace. from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH # noqa: F401 +from timmy.thinking._db import Thought, _get_conn +from timmy.thinking.engine import ThinkingEngine +from timmy.thinking.seeds import ( + _META_OBSERVATION_PHRASES, + _SENSITIVE_PATTERNS, + _THINK_TAG_RE, + _THINKING_PROMPT, + SEED_TYPES, +) logger = logging.getLogger(__name__) diff --git a/src/timmy/thinking/_distillation.py b/src/timmy/thinking/_distillation.py index 4adbf07f..07c719fc 100644 --- a/src/timmy/thinking/_distillation.py +++ b/src/timmy/thinking/_distillation.py @@ -4,7 +4,6 @@ import logging from pathlib import Path from config import settings - from timmy.thinking.seeds import _META_OBSERVATION_PHRASES, _SENSITIVE_PATTERNS logger = logging.getLogger(__name__) diff --git a/src/timmy/thinking/_seeds_mixin.py b/src/timmy/thinking/_seeds_mixin.py index 14aa6c55..76ea74d8 100644 --- a/src/timmy/thinking/_seeds_mixin.py +++ b/src/timmy/thinking/_seeds_mixin.py @@ -5,11 +5,11 @@ import random from datetime import UTC, datetime from timmy.thinking.seeds import ( - SEED_TYPES, _CREATIVE_SEEDS, _EXISTENTIAL_SEEDS, _OBSERVATION_SEEDS, _SOVEREIGNTY_SEEDS, + SEED_TYPES, ) logger = logging.getLogger(__name__) diff --git a/src/timmy/thinking/_snapshot.py b/src/timmy/thinking/_snapshot.py index 77d32c43..471312c3 100644 --- a/src/timmy/thinking/_snapshot.py +++ b/src/timmy/thinking/_snapshot.py @@ -1,7 +1,7 @@ """System snapshot and memory context mixin for the thinking engine.""" import logging -from datetime import UTC, datetime +from datetime import datetime from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH diff --git a/src/timmy/thinking/engine.py b/src/timmy/thinking/engine.py index bbc32566..f616da4e 100644 --- a/src/timmy/thinking/engine.py +++ b/src/timmy/thinking/engine.py @@ -7,8 +7,7 @@ from difflib import SequenceMatcher from pathlib import Path from config import settings - -from timmy.thinking._db import Thought, _DEFAULT_DB, _get_conn, _row_to_thought +from timmy.thinking._db import _DEFAULT_DB, Thought, _get_conn, _row_to_thought from timmy.thinking._distillation import _DistillationMixin from timmy.thinking._issue_filing import _IssueFilingMixin from timmy.thinking._seeds_mixin import _SeedsMixin diff --git a/tests/timmy/test_backlog_triage.py b/tests/timmy/test_backlog_triage.py index 4bc1a79b..7ca56909 100644 --- a/tests/timmy/test_backlog_triage.py +++ b/tests/timmy/test_backlog_triage.py @@ -5,7 +5,6 @@ from __future__ import annotations from datetime import UTC, datetime, timedelta from unittest.mock import AsyncMock, MagicMock, patch -import httpx import pytest from timmy.backlog_triage import ( @@ -28,7 +27,6 @@ from timmy.backlog_triage import ( score_issue, ) - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- diff --git a/tests/timmy/test_orchestration_loop.py b/tests/timmy/test_orchestration_loop.py index eaaf5a3b..e6b0087c 100644 --- a/tests/timmy/test_orchestration_loop.py +++ b/tests/timmy/test_orchestration_loop.py @@ -2,14 +2,12 @@ from __future__ import annotations -import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator - # --------------------------------------------------------------------------- # VassalCycleRecord # --------------------------------------------------------------------------- diff --git a/tests/timmy/test_quest_system.py b/tests/timmy/test_quest_system.py index 3dc71686..6050009e 100644 --- a/tests/timmy/test_quest_system.py +++ b/tests/timmy/test_quest_system.py @@ -34,7 +34,6 @@ from timmy.quest_system import ( update_quest_progress, ) - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- diff --git a/tests/timmy/test_research_tools.py b/tests/timmy/test_research_tools.py index 057b60bd..4075b0ce 100644 --- a/tests/timmy/test_research_tools.py +++ b/tests/timmy/test_research_tools.py @@ -15,7 +15,6 @@ if "serpapi" not in sys.modules: from timmy.research_tools import get_llm_client, google_web_search # noqa: E402 - # --------------------------------------------------------------------------- # google_web_search # ---------------------------------------------------------------------------