[loop-cycle-1] refactor: split memory_system.py into submodules (#1277) #1323

Merged
Timmy merged 1 commits from refactor/split-memory-system into main 2026-03-24 02:21:43 +00:00
18 changed files with 1541 additions and 1454 deletions

View File

@@ -45,6 +45,7 @@ from dashboard.routes.memory import router as memory_router
from dashboard.routes.mobile import router as mobile_router
from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router
from dashboard.routes.monitoring import router as monitoring_router
from dashboard.routes.nexus import router as nexus_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
@@ -57,7 +58,6 @@ from dashboard.routes.tasks import router as tasks_router
from dashboard.routes.telegram import router as telegram_router
from dashboard.routes.thinking import router as thinking_router
from dashboard.routes.three_strike import router as three_strike_router
from dashboard.routes.monitoring import router as monitoring_router
from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router

View File

@@ -18,7 +18,7 @@ import logging
from datetime import UTC, datetime
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.responses import HTMLResponse
from config import APP_START_TIME as _START_TIME
from config import settings

View File

@@ -25,7 +25,7 @@ from __future__ import annotations
import logging
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import ActionResult, ActionStatus, CommandInput, PerceptionOutput
from infrastructure.world.types import ActionResult, CommandInput, PerceptionOutput
logger = logging.getLogger(__name__)

View File

@@ -0,0 +1,301 @@
"""HotMemory and VaultMemory classes — file-based memory tiers.
HotMemory: Tier 1 — computed view of top facts from the DB (+ MEMORY.md fallback).
VaultMemory: Tier 2 — structured vault (memory/) with append-only markdown.
"""
import logging
import re
from datetime import UTC, datetime
from pathlib import Path
from timmy.memory.crud import recall_last_reflection, recall_personal_facts
from timmy.memory.db import HOT_MEMORY_PATH, VAULT_PATH
logger = logging.getLogger(__name__)
# ── Default template ─────────────────────────────────────────────────────────
_DEFAULT_HOT_MEMORY_TEMPLATE = """\
# Timmy Hot Memory
> Working RAM — always loaded, ~300 lines max, pruned monthly
> Last updated: {date}
---
## Current Status
**Agent State:** Operational
**Mode:** Development
**Active Tasks:** 0
**Pending Decisions:** None
---
## Standing Rules
1. **Sovereignty First** — No cloud dependencies
2. **Local-Only Inference** — Ollama on localhost
3. **Privacy by Design** — Telemetry disabled
4. **Tool Minimalism** — Use tools only when necessary
5. **Memory Discipline** — Write handoffs at session end
---
## Agent Roster
| Agent | Role | Status |
|-------|------|--------|
| Timmy | Core | Active |
---
## User Profile
**Name:** (not set)
**Interests:** (to be learned)
---
## Key Decisions
(none yet)
---
## Pending Actions
- [ ] Learn user's name
---
*Prune date: {prune_date}*
"""
# ── HotMemory ────────────────────────────────────────────────────────────────
class HotMemory:
"""Tier 1: Hot memory — computed view of top facts from DB."""
def __init__(self) -> None:
self.path = HOT_MEMORY_PATH
self._content: str | None = None
self._last_modified: float | None = None
def read(self, force_refresh: bool = False) -> str:
"""Read hot memory — computed view of top facts + last reflection from DB."""
try:
facts = recall_personal_facts()
lines = ["# Timmy Hot Memory\n"]
if facts:
lines.append("## Known Facts\n")
for f in facts[:15]:
lines.append(f"- {f}")
# Include the last reflection if available
reflection = recall_last_reflection()
if reflection:
lines.append("\n## Last Reflection\n")
lines.append(reflection)
if len(lines) > 1:
return "\n".join(lines)
except Exception:
logger.debug("DB context read failed, falling back to file")
# Fallback to file if DB unavailable
if self.path.exists():
return self.path.read_text()
return "# Timmy Hot Memory\n\nNo memories stored yet.\n"
def update_section(self, section: str, content: str) -> None:
"""Update a specific section in MEMORY.md.
DEPRECATED: Hot memory is now computed from the database.
This method is kept for backward compatibility during transition.
Use memory_write() to store facts in the database.
"""
logger.warning(
"HotMemory.update_section() is deprecated. "
"Use memory_write() to store facts in the database."
)
# Keep file-writing for backward compatibility during transition
# Guard against empty or excessively large writes
if not content or not content.strip():
logger.warning("HotMemory: Refusing empty write to section '%s'", section)
return
if len(content) > 2000:
logger.warning("HotMemory: Truncating oversized write to section '%s'", section)
content = content[:2000] + "\n... [truncated]"
if not self.path.exists():
self._create_default()
full_content = self.read()
# Find section
pattern = rf"(## {re.escape(section)}.*?)(?=\n## |\Z)"
match = re.search(pattern, full_content, re.DOTALL)
if match:
# Replace section
new_section = f"## {section}\n\n{content}\n\n"
full_content = full_content[: match.start()] + new_section + full_content[match.end() :]
else:
# Append section — guard against missing prune marker
insert_point = full_content.rfind("*Prune date:")
new_section = f"## {section}\n\n{content}\n\n"
if insert_point < 0:
# No prune marker — just append at end
full_content = full_content.rstrip() + "\n\n" + new_section
else:
full_content = (
full_content[:insert_point] + new_section + "\n" + full_content[insert_point:]
)
self.path.write_text(full_content)
self._content = full_content
self._last_modified = self.path.stat().st_mtime
logger.info("HotMemory: Updated section '%s'", section)
def _create_default(self) -> None:
"""Create default MEMORY.md if missing.
DEPRECATED: Hot memory is now computed from the database.
This method is kept for backward compatibility during transition.
"""
logger.debug(
"HotMemory._create_default() - creating default MEMORY.md for backward compatibility"
)
now = datetime.now(UTC)
content = _DEFAULT_HOT_MEMORY_TEMPLATE.format(
date=now.strftime("%Y-%m-%d"),
prune_date=now.replace(day=25).strftime("%Y-%m-%d"),
)
self.path.write_text(content)
logger.info("HotMemory: Created default MEMORY.md")
# ── VaultMemory ──────────────────────────────────────────────────────────────
class VaultMemory:
"""Tier 2: Structured vault (memory/) — append-only markdown."""
def __init__(self) -> None:
self.path = VAULT_PATH
self._ensure_structure()
def _ensure_structure(self) -> None:
"""Ensure vault directory structure exists."""
(self.path / "self").mkdir(parents=True, exist_ok=True)
(self.path / "notes").mkdir(parents=True, exist_ok=True)
(self.path / "aar").mkdir(parents=True, exist_ok=True)
def write_note(self, name: str, content: str, namespace: str = "notes") -> Path:
"""Write a note to the vault."""
# Add timestamp to filename
timestamp = datetime.now(UTC).strftime("%Y%m%d")
filename = f"{timestamp}_{name}.md"
filepath = self.path / namespace / filename
# Add header
full_content = f"""# {name.replace("_", " ").title()}
> Created: {datetime.now(UTC).isoformat()}
> Namespace: {namespace}
---
{content}
---
*Auto-generated by Timmy Memory System*
"""
filepath.write_text(full_content)
logger.info("VaultMemory: Wrote %s", filepath)
return filepath
def read_file(self, filepath: Path) -> str:
"""Read a file from the vault."""
if not filepath.exists():
return ""
return filepath.read_text()
def update_user_profile(self, key: str, value: str) -> None:
"""Update a field in user_profile.md.
DEPRECATED: User profile updates should now use memory_write() to store
facts in the database. This method is kept for backward compatibility.
"""
logger.warning(
"VaultMemory.update_user_profile() is deprecated. "
"Use memory_write() to store user facts in the database."
)
# Still update the file for backward compatibility during transition
profile_path = self.path / "self" / "user_profile.md"
if not profile_path.exists():
self._create_default_profile()
content = profile_path.read_text()
pattern = rf"(\*\*{re.escape(key)}:\*\*).*"
if re.search(pattern, content):
safe_value = value.strip()
content = re.sub(pattern, lambda m: f"{m.group(1)} {safe_value}", content)
else:
facts_section = "## Important Facts"
if facts_section in content:
insert_point = content.find(facts_section) + len(facts_section)
content = content[:insert_point] + f"\n- {key}: {value}" + content[insert_point:]
content = re.sub(
r"\*Last updated:.*\*",
f"*Last updated: {datetime.now(UTC).strftime('%Y-%m-%d')}*",
content,
)
profile_path.write_text(content)
logger.info("VaultMemory: Updated user profile: %s = %s", key, value)
def _create_default_profile(self) -> None:
"""Create default user profile."""
profile_path = self.path / "self" / "user_profile.md"
default = """# User Profile
> Learned information about the user.
## Basic Information
**Name:** (unknown)
**Location:** (unknown)
**Occupation:** (unknown)
## Interests & Expertise
- (to be learned)
## Preferences
- Response style: concise, technical
- Tool usage: minimal
## Important Facts
- (to be extracted)
---
*Last updated: {date}*
""".format(date=datetime.now(UTC).strftime("%Y-%m-%d"))
profile_path.write_text(default)

395
src/timmy/memory/crud.py Normal file
View File

@@ -0,0 +1,395 @@
"""CRUD operations, personal facts, and reflections for Timmy's memory system."""
import json
import logging
import sqlite3
import uuid
from datetime import UTC, datetime, timedelta
from timmy.memory.db import MemoryEntry, get_connection
from timmy.memory.embeddings import (
_get_embedding_model,
_keyword_overlap,
cosine_similarity,
embed_text,
)
logger = logging.getLogger(__name__)
def store_memory(
content: str,
source: str,
context_type: str = "conversation",
agent_id: str | None = None,
task_id: str | None = None,
session_id: str | None = None,
metadata: dict | None = None,
compute_embedding: bool = True,
) -> MemoryEntry:
"""Store a memory entry with optional embedding."""
embedding = None
if compute_embedding:
embedding = embed_text(content)
entry = MemoryEntry(
content=content,
source=source,
context_type=context_type,
agent_id=agent_id,
task_id=task_id,
session_id=session_id,
metadata=metadata,
embedding=embedding,
)
with get_connection() as conn:
conn.execute(
"""
INSERT INTO memories
(id, content, memory_type, source, agent_id, task_id, session_id,
metadata, embedding, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
entry.id,
entry.content,
entry.context_type, # DB column is memory_type
entry.source,
entry.agent_id,
entry.task_id,
entry.session_id,
json.dumps(metadata) if metadata else None,
json.dumps(embedding) if embedding else None,
entry.timestamp,
),
)
conn.commit()
return entry
def _build_search_filters(
context_type: str | None,
agent_id: str | None,
session_id: str | None,
) -> tuple[str, list]:
"""Build SQL WHERE clause and params from search filters."""
conditions: list[str] = []
params: list = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
return where_clause, params
def _fetch_memory_candidates(
where_clause: str, params: list, candidate_limit: int
) -> list[sqlite3.Row]:
"""Fetch candidate memory rows from the database."""
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(candidate_limit)
with get_connection() as conn:
return conn.execute(query_sql, params).fetchall()
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
"""Convert a database row to a MemoryEntry."""
return MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
def _score_and_filter(
rows: list[sqlite3.Row],
query: str,
query_embedding: list[float],
min_relevance: float,
) -> list[MemoryEntry]:
"""Score candidate rows by similarity and filter by min_relevance."""
results = []
for row in rows:
entry = _row_to_entry(row)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
else:
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results
def search_memories(
query: str,
limit: int = 10,
context_type: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
min_relevance: float = 0.0,
) -> list[MemoryEntry]:
"""Search for memories by semantic similarity.
Args:
query: Search query text
limit: Maximum results
context_type: Filter by memory type (maps to DB memory_type column)
agent_id: Filter by agent
session_id: Filter by session
min_relevance: Minimum similarity score (0-1)
Returns:
List of MemoryEntry objects sorted by relevance
"""
query_embedding = embed_text(query)
where_clause, params = _build_search_filters(context_type, agent_id, session_id)
rows = _fetch_memory_candidates(where_clause, params, limit * 3)
results = _score_and_filter(rows, query, query_embedding, min_relevance)
return results[:limit]
def delete_memory(memory_id: str) -> bool:
"""Delete a memory entry by ID.
Returns:
True if deleted, False if not found
"""
with get_connection() as conn:
cursor = conn.execute(
"DELETE FROM memories WHERE id = ?",
(memory_id,),
)
conn.commit()
return cursor.rowcount > 0
def get_memory_stats() -> dict:
"""Get statistics about the memory store.
Returns:
Dict with counts by type, total entries, etc.
"""
with get_connection() as conn:
total = conn.execute("SELECT COUNT(*) as count FROM memories").fetchone()["count"]
by_type = {}
rows = conn.execute(
"SELECT memory_type, COUNT(*) as count FROM memories GROUP BY memory_type"
).fetchall()
for row in rows:
by_type[row["memory_type"]] = row["count"]
with_embeddings = conn.execute(
"SELECT COUNT(*) as count FROM memories WHERE embedding IS NOT NULL"
).fetchone()["count"]
return {
"total_entries": total,
"by_type": by_type,
"with_embeddings": with_embeddings,
"has_embedding_model": _get_embedding_model() is not False,
}
def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int:
"""Delete old memories to manage storage.
Args:
older_than_days: Delete memories older than this
keep_facts: Whether to preserve fact-type memories
Returns:
Number of entries deleted
"""
cutoff = (datetime.now(UTC) - timedelta(days=older_than_days)).isoformat()
with get_connection() as conn:
if keep_facts:
cursor = conn.execute(
"""
DELETE FROM memories
WHERE created_at < ? AND memory_type != 'fact'
""",
(cutoff,),
)
else:
cursor = conn.execute(
"DELETE FROM memories WHERE created_at < ?",
(cutoff,),
)
deleted = cursor.rowcount
conn.commit()
return deleted
def get_memory_context(query: str, max_tokens: int = 2000, **filters) -> str:
"""Get relevant memory context as formatted text for LLM prompts.
Args:
query: Search query
max_tokens: Approximate maximum tokens to return
**filters: Additional filters (agent_id, session_id, etc.)
Returns:
Formatted context string for inclusion in prompts
"""
memories = search_memories(query, limit=20, **filters)
context_parts = []
total_chars = 0
max_chars = max_tokens * 4 # Rough approximation
for mem in memories:
formatted = f"[{mem.source}]: {mem.content}"
if total_chars + len(formatted) > max_chars:
break
context_parts.append(formatted)
total_chars += len(formatted)
if not context_parts:
return ""
return "Relevant context from memory:\n" + "\n\n".join(context_parts)
# ── Personal facts & reflections ─────────────────────────────────────────────
def recall_personal_facts(agent_id: str | None = None) -> list[str]:
"""Recall personal facts about the user or system.
Args:
agent_id: Optional agent filter
Returns:
List of fact strings
"""
with get_connection() as conn:
if agent_id:
rows = conn.execute(
"""
SELECT content FROM memories
WHERE memory_type = 'fact' AND agent_id = ?
ORDER BY created_at DESC
LIMIT 100
""",
(agent_id,),
).fetchall()
else:
rows = conn.execute(
"""
SELECT content FROM memories
WHERE memory_type = 'fact'
ORDER BY created_at DESC
LIMIT 100
""",
).fetchall()
return [r["content"] for r in rows]
def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]:
"""Recall personal facts with their IDs for edit/delete operations."""
with get_connection() as conn:
if agent_id:
rows = conn.execute(
"SELECT id, content FROM memories WHERE memory_type = 'fact' AND agent_id = ? ORDER BY created_at DESC LIMIT 100",
(agent_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT id, content FROM memories WHERE memory_type = 'fact' ORDER BY created_at DESC LIMIT 100",
).fetchall()
return [{"id": r["id"], "content": r["content"]} for r in rows]
def update_personal_fact(memory_id: str, new_content: str) -> bool:
"""Update a personal fact's content."""
with get_connection() as conn:
cursor = conn.execute(
"UPDATE memories SET content = ? WHERE id = ? AND memory_type = 'fact'",
(new_content, memory_id),
)
conn.commit()
return cursor.rowcount > 0
def store_personal_fact(fact: str, agent_id: str | None = None) -> MemoryEntry:
"""Store a personal fact about the user or system.
Args:
fact: The fact to store
agent_id: Associated agent
Returns:
The stored MemoryEntry
"""
return store_memory(
content=fact,
source="system",
context_type="fact",
agent_id=agent_id,
metadata={"auto_extracted": False},
)
def store_last_reflection(reflection: str) -> None:
"""Store the last reflection, replacing any previous one.
Uses a single row with memory_type='reflection' to avoid accumulation.
"""
if not reflection or not reflection.strip():
return
with get_connection() as conn:
# Delete previous reflections — only the latest matters
conn.execute("DELETE FROM memories WHERE memory_type = 'reflection'")
conn.execute(
"""
INSERT INTO memories
(id, content, memory_type, source, created_at)
VALUES (?, ?, 'reflection', 'system', ?)
""",
(str(uuid.uuid4()), reflection.strip(), datetime.now(UTC).isoformat()),
)
conn.commit()
logger.debug("Stored last reflection in DB")
def recall_last_reflection() -> str | None:
"""Recall the most recent reflection, or None if absent."""
with get_connection() as conn:
row = conn.execute(
"SELECT content FROM memories WHERE memory_type = 'reflection' "
"ORDER BY created_at DESC LIMIT 1"
).fetchone()
return row["content"] if row else None

212
src/timmy/memory/db.py Normal file
View File

@@ -0,0 +1,212 @@
"""Database connection, schema, migrations, path constants, and data classes.
This module contains the lowest-level database primitives for Timmy's
memory system — connection management, schema creation / migration,
path constants, and the core data classes (MemoryEntry, MemoryChunk).
"""
import logging
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
# ── Path constants ───────────────────────────────────────────────────────────
PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
HOT_MEMORY_PATH = PROJECT_ROOT / "MEMORY.md"
VAULT_PATH = PROJECT_ROOT / "memory"
SOUL_PATH = VAULT_PATH / "self" / "soul.md"
DB_PATH = PROJECT_ROOT / "data" / "memory.db"
# ── Database connection ──────────────────────────────────────────────────────
@contextmanager
def get_connection() -> Generator[sqlite3.Connection, None, None]:
"""Get database connection to unified memory database."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
_ensure_schema(conn)
yield conn
def _ensure_schema(conn: sqlite3.Connection) -> None:
"""Create the unified memories table and indexes if they don't exist."""
conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL DEFAULT 'fact',
source TEXT NOT NULL DEFAULT 'agent',
embedding TEXT,
metadata TEXT,
source_hash TEXT,
agent_id TEXT,
task_id TEXT,
session_id TEXT,
confidence REAL NOT NULL DEFAULT 0.8,
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
# Create indexes for efficient querying
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_session ON memories(session_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)")
conn.commit()
# Run migration if needed
_migrate_schema(conn)
def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
"""Get the column names for a table."""
cursor = conn.execute(f"PRAGMA table_info({table_name})")
return {row[1] for row in cursor.fetchall()}
def _migrate_episodes(conn: sqlite3.Connection) -> None:
"""Migrate episodes table rows into the unified memories table."""
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection) -> None:
"""Migrate chunks table rows into the unified memories table."""
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None:
"""Drop a legacy table if it exists."""
try:
conn.execute(f"DROP TABLE {table}") # noqa: S608
logger.info("Migration: Dropped old %s table", table)
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop %s: %s", table, exc)
def _migrate_schema(conn: sqlite3.Connection) -> None:
"""Migrate from old three-table schema to unified memories table.
Migration paths:
- episodes table -> memories (context_type -> memory_type)
- chunks table -> memories with memory_type='vault_chunk'
- facts table -> dropped (unused, 0 rows expected)
"""
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables
if not has_memories and (tables & {"episodes", "chunks", "facts"}):
logger.info("Migration: Creating unified memories table")
if "episodes" in tables and has_memories:
_migrate_episodes(conn)
if "chunks" in tables and has_memories:
_migrate_chunks(conn)
if "facts" in tables:
_drop_legacy_table(conn, "facts")
conn.commit()
# Alias for backward compatibility
get_conn = get_connection
# ── Data classes ─────────────────────────────────────────────────────────────
@dataclass
class MemoryEntry:
"""A memory entry with vector embedding.
Note: The DB column is `memory_type` but this field is named `context_type`
for backward API compatibility.
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
content: str = "" # The actual text content
source: str = "" # Where it came from (agent, user, system)
context_type: str = "conversation" # API field name; DB column is memory_type
agent_id: str | None = None
task_id: str | None = None
session_id: str | None = None
metadata: dict | None = None
embedding: list[float] | None = None
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
relevance_score: float | None = None # Set during search
@dataclass
class MemoryChunk:
"""A searchable chunk of memory."""
id: str
source: str # filepath
content: str
embedding: list[float]
created_at: str

View File

@@ -0,0 +1,300 @@
"""SemanticMemory and MemorySearcher — vector-based search over vault content.
SemanticMemory: indexes markdown files into chunks with embeddings, supports search.
MemorySearcher: high-level multi-tier search interface.
"""
import hashlib
import json
import logging
import sqlite3
from collections.abc import Generator
from contextlib import closing, contextmanager
from datetime import UTC, datetime
from pathlib import Path
from config import settings
from timmy.memory.db import DB_PATH, VAULT_PATH, get_connection
from timmy.memory.embeddings import (
EMBEDDING_DIM,
_get_embedding_model,
cosine_similarity,
embed_text,
)
logger = logging.getLogger(__name__)
class SemanticMemory:
"""Vector-based semantic search over vault content."""
def __init__(self) -> None:
self.db_path = DB_PATH
self.vault_path = VAULT_PATH
@contextmanager
def _get_conn(self) -> Generator[sqlite3.Connection, None, None]:
"""Get connection to the instance's db_path (backward compatibility).
Uses self.db_path if set differently from global DB_PATH,
otherwise uses the global get_connection().
"""
if self.db_path == DB_PATH:
# Use global connection (normal production path)
with get_connection() as conn:
yield conn
else:
# Use instance-specific db_path (test path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self.db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
# Ensure schema exists
conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL DEFAULT 'fact',
source TEXT NOT NULL DEFAULT 'agent',
embedding TEXT,
metadata TEXT,
source_hash TEXT,
agent_id TEXT,
task_id TEXT,
session_id TEXT,
confidence REAL NOT NULL DEFAULT 0.8,
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)"
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)")
conn.commit()
yield conn
def _init_db(self) -> None:
"""Initialize database at self.db_path (backward compatibility).
This method is kept for backward compatibility with existing code and tests.
Schema creation is handled by _get_conn.
"""
# Trigger schema creation via _get_conn
with self._get_conn():
pass
def index_file(self, filepath: Path) -> int:
"""Index a single file into semantic memory."""
if not filepath.exists():
return 0
content = filepath.read_text()
file_hash = hashlib.md5(content.encode()).hexdigest()
with self._get_conn() as conn:
# Check if already indexed with same hash
cursor = conn.execute(
"SELECT metadata FROM memories WHERE source = ? AND memory_type = 'vault_chunk' LIMIT 1",
(str(filepath),),
)
existing = cursor.fetchone()
if existing and existing[0]:
try:
meta = json.loads(existing[0])
if meta.get("source_hash") == file_hash:
return 0 # Already indexed
except json.JSONDecodeError:
pass
# Delete old chunks for this file
conn.execute(
"DELETE FROM memories WHERE source = ? AND memory_type = 'vault_chunk'",
(str(filepath),),
)
# Split into chunks (paragraphs)
chunks = self._split_into_chunks(content)
# Index each chunk
now = datetime.now(UTC).isoformat()
for i, chunk_text in enumerate(chunks):
if len(chunk_text.strip()) < 20: # Skip tiny chunks
continue
chunk_id = f"{filepath.stem}_{i}"
chunk_embedding = embed_text(chunk_text)
conn.execute(
"""INSERT INTO memories
(id, content, memory_type, source, metadata, embedding, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
chunk_id,
chunk_text,
"vault_chunk",
str(filepath),
json.dumps({"source_hash": file_hash, "chunk_index": i}),
json.dumps(chunk_embedding),
now,
),
)
conn.commit()
logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks))
return len(chunks)
def _split_into_chunks(self, text: str, max_chunk_size: int = 500) -> list[str]:
"""Split text into semantic chunks."""
# Split by paragraphs first
paragraphs = text.split("\n\n")
chunks = []
for para in paragraphs:
para = para.strip()
if not para:
continue
# If paragraph is small enough, keep as one chunk
if len(para) <= max_chunk_size:
chunks.append(para)
else:
# Split long paragraphs by sentences
sentences = para.replace(". ", ".\n").split("\n")
current_chunk = ""
for sent in sentences:
if len(current_chunk) + len(sent) < max_chunk_size:
current_chunk += " " + sent if current_chunk else sent
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sent
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def index_vault(self) -> int:
"""Index entire vault directory."""
total_chunks = 0
for md_file in self.vault_path.rglob("*.md"):
# Skip handoff file (handled separately)
if "last-session-handoff" in md_file.name:
continue
total_chunks += self.index_file(md_file)
logger.info("SemanticMemory: Indexed vault (%d total chunks)", total_chunks)
return total_chunks
def search(self, query: str, top_k: int = 5) -> list[tuple[str, float]]:
"""Search for relevant memory chunks."""
query_embedding = embed_text(query)
with self._get_conn() as conn:
conn.row_factory = sqlite3.Row
# Get all vault chunks
rows = conn.execute(
"SELECT source, content, embedding FROM memories WHERE memory_type = 'vault_chunk'"
).fetchall()
# Calculate similarities
scored = []
for row in rows:
embedding = json.loads(row["embedding"])
score = cosine_similarity(query_embedding, embedding)
scored.append((row["source"], row["content"], score))
# Sort by score descending
scored.sort(key=lambda x: x[2], reverse=True)
# Return top_k
return [(content, score) for _, content, score in scored[:top_k]]
def get_relevant_context(self, query: str, max_chars: int = 2000) -> str:
"""Get formatted context string for a query."""
results = self.search(query, top_k=3)
if not results:
return ""
parts = []
total_chars = 0
for content, score in results:
if score < 0.3: # Similarity threshold
continue
chunk = f"[Relevant memory - score {score:.2f}]: {content[:400]}..."
if total_chars + len(chunk) > max_chars:
break
parts.append(chunk)
total_chars += len(chunk)
return "\n\n".join(parts) if parts else ""
def stats(self) -> dict:
"""Get indexing statistics."""
with self._get_conn() as conn:
cursor = conn.execute(
"SELECT COUNT(*), COUNT(DISTINCT source) FROM memories WHERE memory_type = 'vault_chunk'"
)
total_chunks, total_files = cursor.fetchone()
return {
"total_chunks": total_chunks,
"total_files": total_files,
"embedding_dim": EMBEDDING_DIM if _get_embedding_model() else 128,
}
class MemorySearcher:
"""High-level interface for memory search."""
def __init__(self) -> None:
self.semantic = SemanticMemory()
def search(self, query: str, tiers: list[str] = None) -> dict:
"""Search across memory tiers.
Args:
query: Search query
tiers: List of tiers to search ["hot", "vault", "semantic"]
Returns:
Dict with results from each tier
"""
tiers = tiers or ["semantic"] # Default to semantic only
results = {}
if "semantic" in tiers:
semantic_results = self.semantic.search(query, top_k=5)
results["semantic"] = [
{"content": content, "score": score} for content, score in semantic_results
]
return results
def get_context_for_query(self, query: str) -> str:
"""Get comprehensive context for a user query."""
# Get semantic context
semantic_context = self.semantic.get_relevant_context(query)
if semantic_context:
return f"## Relevant Past Context\n\n{semantic_context}"
return ""
# Module-level singletons
semantic_memory = SemanticMemory()
memory_searcher = MemorySearcher()

253
src/timmy/memory/tools.py Normal file
View File

@@ -0,0 +1,253 @@
"""Tool functions for Timmy's memory system.
memory_search, memory_read, memory_store, memory_forget — runtime tool wrappers.
jot_note, log_decision — artifact production tools.
"""
import logging
import re
from datetime import UTC, datetime
from pathlib import Path
from timmy.memory.crud import delete_memory, search_memories, store_memory
from timmy.memory.semantic import semantic_memory
logger = logging.getLogger(__name__)
def memory_search(query: str, limit: int = 10) -> str:
"""Search past conversations, notes, and stored facts for relevant context.
Searches across both the vault (indexed markdown files) and the
runtime memory store (facts and conversation fragments stored via
memory_write).
Args:
query: What to search for (e.g. "Bitcoin strategy", "server setup").
limit: Number of results to return (default 10).
Returns:
Formatted string of relevant memory results.
"""
# Guard: model sometimes passes None for limit
if limit is None:
limit = 10
parts: list[str] = []
# 1. Search semantic vault (indexed markdown files)
vault_results = semantic_memory.search(query, limit)
for content, score in vault_results:
if score < 0.2:
continue
parts.append(f"[vault score {score:.2f}] {content[:300]}")
# 2. Search runtime vector store (stored facts/conversations)
try:
runtime_results = search_memories(query, limit=limit, min_relevance=0.2)
for entry in runtime_results:
label = entry.context_type or "memory"
parts.append(f"[{label}] {entry.content[:300]}")
except Exception as exc:
logger.debug("Vector store search unavailable: %s", exc)
if not parts:
return "No relevant memories found."
return "\n\n".join(parts)
def memory_read(query: str = "", top_k: int = 5) -> str:
"""Read from persistent memory — search facts, notes, and past conversations.
This is the primary tool for recalling stored information. If no query
is given, returns the most recent personal facts. With a query, it
searches semantically across all stored memories.
Args:
query: Optional search term. Leave empty to list recent facts.
top_k: Maximum results to return (default 5).
Returns:
Formatted string of memory contents.
"""
if top_k is None:
top_k = 5
parts: list[str] = []
# Always include personal facts first
try:
facts = search_memories(query or "", limit=top_k, min_relevance=0.0)
fact_entries = [e for e in facts if (e.context_type or "") == "fact"]
if fact_entries:
parts.append("## Personal Facts")
for entry in fact_entries[:top_k]:
parts.append(f"- {entry.content[:300]}")
except Exception as exc:
logger.debug("Vector store unavailable for memory_read: %s", exc)
# If a query was provided, also do semantic search
if query:
search_result = memory_search(query, top_k)
if search_result and search_result != "No relevant memories found.":
parts.append("\n## Search Results")
parts.append(search_result)
if not parts:
return "No memories stored yet. Use memory_write to store information."
return "\n".join(parts)
def memory_store(topic: str, report: str, type: str = "research") -> str:
"""Store a piece of information in persistent memory, particularly for research outputs.
Use this tool to store structured research findings or other important documents.
Stored memories are searchable via memory_search across all channels.
Args:
topic: A concise title or topic for the research output.
report: The detailed content of the research output or document.
type: Type of memory — "research" for research outputs (default),
"fact" for permanent facts, "conversation" for conversation context,
"document" for other document fragments.
Returns:
Confirmation that the memory was stored.
"""
if not report or not report.strip():
return "Nothing to store — report is empty."
# Combine topic and report for embedding and storage content
full_content = f"Topic: {topic.strip()}\n\nReport: {report.strip()}"
valid_types = ("fact", "conversation", "document", "research")
if type not in valid_types:
type = "research"
try:
# Dedup check for facts and research — skip if similar exists
if type in ("fact", "research"):
existing = search_memories(full_content, limit=3, context_type=type, min_relevance=0.75)
if existing:
return (
f"Similar {type} already stored (id={existing[0].id[:8]}). Skipping duplicate."
)
entry = store_memory(
content=full_content,
source="agent",
context_type=type,
metadata={"topic": topic},
)
return f"Stored in memory (type={type}, id={entry.id[:8]}). This is now searchable across all channels."
except Exception as exc:
logger.error("Failed to write memory: %s", exc)
return f"Failed to store memory: {exc}"
def memory_forget(query: str) -> str:
"""Remove a stored memory that is outdated, incorrect, or no longer relevant.
Searches for memories matching the query and deletes the closest match.
Use this when the user says to forget something or when stored information
has changed.
Args:
query: Description of the memory to forget (e.g. "my phone number",
"the old server address").
Returns:
Confirmation of what was forgotten, or a message if nothing matched.
"""
if not query or not query.strip():
return "Nothing to forget — query is empty."
try:
results = search_memories(query.strip(), limit=3, min_relevance=0.3)
if not results:
return "No matching memories found to forget."
# Delete the closest match
best = results[0]
deleted = delete_memory(best.id)
if deleted:
return f'Forgotten: "{best.content[:80]}" (type={best.context_type})'
return "Memory not found (may have already been deleted)."
except Exception as exc:
logger.error("Failed to forget memory: %s", exc)
return f"Failed to forget: {exc}"
# ── Artifact tools ───────────────────────────────────────────────────────────
NOTES_DIR = Path.home() / ".timmy" / "notes"
DECISION_LOG = Path.home() / ".timmy" / "decisions.md"
def jot_note(title: str, body: str) -> str:
"""Write a markdown note to Timmy's workspace (~/.timmy/notes/).
Use this tool to capture ideas, drafts, summaries, or any artifact that
should persist beyond the conversation. Each note is saved as a
timestamped markdown file.
Args:
title: Short descriptive title (used as filename slug).
body: Markdown content of the note.
Returns:
Confirmation with the file path of the saved note.
"""
if not title or not title.strip():
return "Cannot jot — title is empty."
if not body or not body.strip():
return "Cannot jot — body is empty."
NOTES_DIR.mkdir(parents=True, exist_ok=True)
slug = re.sub(r"[^a-z0-9]+", "-", title.strip().lower()).strip("-")[:60]
timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
filename = f"{timestamp}_{slug}.md"
filepath = NOTES_DIR / filename
content = f"# {title.strip()}\n\n> Created: {datetime.now(UTC).isoformat()}\n\n{body.strip()}\n"
filepath.write_text(content)
logger.info("jot_note: wrote %s", filepath)
return f"Note saved: {filepath}"
def log_decision(decision: str, rationale: str = "") -> str:
"""Append an architectural or design decision to the running decision log.
Use this tool when a significant decision is made during conversation —
technology choices, design trade-offs, scope changes, etc.
Args:
decision: One-line summary of the decision.
rationale: Why this decision was made (optional but encouraged).
Returns:
Confirmation that the decision was logged.
"""
if not decision or not decision.strip():
return "Cannot log — decision is empty."
DECISION_LOG.parent.mkdir(parents=True, exist_ok=True)
# Create file with header if it doesn't exist
if not DECISION_LOG.exists():
DECISION_LOG.write_text(
"# Decision Log\n\nRunning log of architectural and design decisions.\n\n"
)
stamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
entry = f"## {stamp}{decision.strip()}\n\n"
if rationale and rationale.strip():
entry += f"{rationale.strip()}\n\n"
entry += "---\n\n"
with open(DECISION_LOG, "a") as f:
f.write(entry)
logger.info("log_decision: %s", decision.strip()[:80])
return f"Decision logged: {decision.strip()}"

File diff suppressed because it is too large Load Diff

View File

@@ -22,21 +22,20 @@ import sqlite3
from datetime import datetime
from pathlib import Path
from timmy.thinking._db import Thought, _get_conn
from timmy.thinking.engine import ThinkingEngine
from timmy.thinking.seeds import (
SEED_TYPES,
_SENSITIVE_PATTERNS,
_META_OBSERVATION_PHRASES,
_THINK_TAG_RE,
_THINKING_PROMPT,
)
# Re-export HOT_MEMORY_PATH and SOUL_PATH so existing patch targets continue to work.
# Tests that patch "timmy.thinking.HOT_MEMORY_PATH" or "timmy.thinking.SOUL_PATH"
# should instead patch "timmy.thinking._snapshot.HOT_MEMORY_PATH" etc., but these
# re-exports are kept for any code that reads them from the top-level namespace.
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH # noqa: F401
from timmy.thinking._db import Thought, _get_conn
from timmy.thinking.engine import ThinkingEngine
from timmy.thinking.seeds import (
_META_OBSERVATION_PHRASES,
_SENSITIVE_PATTERNS,
_THINK_TAG_RE,
_THINKING_PROMPT,
SEED_TYPES,
)
logger = logging.getLogger(__name__)

View File

@@ -4,7 +4,6 @@ import logging
from pathlib import Path
from config import settings
from timmy.thinking.seeds import _META_OBSERVATION_PHRASES, _SENSITIVE_PATTERNS
logger = logging.getLogger(__name__)

View File

@@ -5,11 +5,11 @@ import random
from datetime import UTC, datetime
from timmy.thinking.seeds import (
SEED_TYPES,
_CREATIVE_SEEDS,
_EXISTENTIAL_SEEDS,
_OBSERVATION_SEEDS,
_SOVEREIGNTY_SEEDS,
SEED_TYPES,
)
logger = logging.getLogger(__name__)

View File

@@ -1,7 +1,7 @@
"""System snapshot and memory context mixin for the thinking engine."""
import logging
from datetime import UTC, datetime
from datetime import datetime
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH

View File

@@ -7,8 +7,7 @@ from difflib import SequenceMatcher
from pathlib import Path
from config import settings
from timmy.thinking._db import Thought, _DEFAULT_DB, _get_conn, _row_to_thought
from timmy.thinking._db import _DEFAULT_DB, Thought, _get_conn, _row_to_thought
from timmy.thinking._distillation import _DistillationMixin
from timmy.thinking._issue_filing import _IssueFilingMixin
from timmy.thinking._seeds_mixin import _SeedsMixin

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from timmy.backlog_triage import (
@@ -28,7 +27,6 @@ from timmy.backlog_triage import (
score_issue,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

View File

@@ -2,14 +2,12 @@
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
# ---------------------------------------------------------------------------
# VassalCycleRecord
# ---------------------------------------------------------------------------

View File

@@ -34,7 +34,6 @@ from timmy.quest_system import (
update_quest_progress,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

View File

@@ -15,7 +15,6 @@ if "serpapi" not in sys.modules:
from timmy.research_tools import get_llm_client, google_web_search # noqa: E402
# ---------------------------------------------------------------------------
# google_web_search
# ---------------------------------------------------------------------------