feat: consolidate memory into unified memory.db with 4-type model

Consolidates 3 separate memory databases (semantic_memory.db, swarm.db
memory_entries, brain.db) into a single data/memory.db with facts,
chunks, and episodes tables.

Key changes:
- Add unified schema (timmy/memory/unified.py) with 3 core tables
- Redirect vector_store.py and semantic_memory.py to memory.db
- Add thought distillation: every Nth thought extracts lasting facts
- Enrich agent context with known facts in system prompt
- Add memory_forget tool for removing outdated memories
- Unify embeddings: vector_store delegates to semantic_memory.embed_text
- Bridge spark events to unified event log
- Add pruning for thoughts and events with configurable retention
- Add data migration script (timmy/memory_migrate.py)
- Deprecate brain.memory in favor of unified system

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Trip T
2026-03-12 11:23:18 -04:00
parent bc38fee817
commit b2f12ca97c
15 changed files with 695 additions and 129 deletions

View File

@@ -1,4 +1,9 @@
"""Unified memory interface.
"""Unified memory interface (DEPRECATED).
New code should use ``timmy.memory.unified`` and the tools in
``timmy.semantic_memory`` (memory_write, memory_read, memory_search,
memory_forget). This module is retained for backward compatibility
and the loop-QA self-test probes.
One API, two backends:
- **Local SQLite** (default) — works immediately, no setup

View File

@@ -136,6 +136,14 @@ class Settings(BaseSettings):
# Maximum size in MB for the memory/notes/ vault directory.
# When exceeded, a warning is logged. Set to 0 to disable.
memory_vault_max_mb: int = 100
# Auto-prune thoughts older than this many days. 0 = disabled.
thoughts_prune_days: int = 90
# Minimum thoughts to keep regardless of age.
thoughts_prune_keep_min: int = 200
# Auto-prune system events older than this many days. 0 = disabled.
events_prune_days: int = 90
# Minimum events to keep regardless of age.
events_prune_keep_min: int = 200
# ── Agentic Loop ──────────────────────────────────────────────────
# Maximum steps the agentic loop will execute before stopping.
@@ -206,6 +214,7 @@ class Settings(BaseSettings):
# When enabled, the agent starts an internal thought loop on server start.
thinking_enabled: bool = True
thinking_interval_seconds: int = 300 # 5 minutes between thoughts
thinking_distill_every: int = 10 # distill facts from thoughts every Nth thought
# ── Loop QA (Self-Testing) ─────────────────────────────────────────
# Self-test orchestrator that probes capabilities alongside the thinking loop.

View File

@@ -325,6 +325,42 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Memory auto-prune skipped: %s", exc)
# Auto-prune old thoughts on startup
if settings.thoughts_prune_days > 0:
try:
from timmy.thinking import thinking_engine
pruned = thinking_engine.prune_old_thoughts(
keep_days=settings.thoughts_prune_days,
keep_min=settings.thoughts_prune_keep_min,
)
if pruned:
logger.info(
"Thought auto-prune: removed %d entries older than %d days",
pruned,
settings.thoughts_prune_days,
)
except Exception as exc:
logger.debug("Thought auto-prune skipped: %s", exc)
# Auto-prune old system events on startup
if settings.events_prune_days > 0:
try:
from swarm.event_log import prune_old_events
pruned = prune_old_events(
keep_days=settings.events_prune_days,
keep_min=settings.events_prune_keep_min,
)
if pruned:
logger.info(
"Event auto-prune: removed %d entries older than %d days",
pruned,
settings.events_prune_days,
)
except Exception as exc:
logger.debug("Event auto-prune skipped: %s", exc)
# Warn if memory vault exceeds size limit
if settings.memory_vault_max_mb > 0:
try:

View File

@@ -157,6 +157,22 @@ def record_event(
)
conn.commit()
conn.close()
# Bridge to unified event log so all events are queryable from one place
try:
from swarm.event_log import EventType as _ET
from swarm.event_log import log_event as _log
_log(
_ET.SYSTEM_INFO,
source=f"spark:{event_type}",
data={"description": description, "importance": importance, "spark_event_id": event_id},
task_id=task_id or "",
agent_id=agent_id or "",
)
except Exception:
pass # Graceful — don't break spark if event_log is unavailable
return event_id

View File

@@ -12,7 +12,7 @@ import logging
import sqlite3
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from enum import Enum
from pathlib import Path
@@ -93,6 +93,9 @@ def _ensure_db() -> sqlite3.Connection:
timestamp TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_time ON events(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_agent ON events(agent_id)")
conn.commit()
return conn
@@ -190,6 +193,32 @@ def log_event(
return entry
def prune_old_events(keep_days: int = 90, keep_min: int = 200) -> int:
"""Delete events older than *keep_days*, always retaining at least *keep_min*.
Returns the number of deleted rows.
"""
db = _ensure_db()
try:
total = db.execute("SELECT COUNT(*) as c FROM events").fetchone()["c"]
if total <= keep_min:
return 0
cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
cursor = db.execute(
"DELETE FROM events WHERE timestamp < ? AND id NOT IN "
"(SELECT id FROM events ORDER BY timestamp DESC LIMIT ?)",
(cutoff, keep_min),
)
deleted = cursor.rowcount
db.commit()
return deleted
except Exception as exc:
logger.warning("Event pruning failed: %s", exc)
return 0
finally:
db.close()
def get_task_events(task_id: str, limit: int = 50) -> list[EventLogEntry]:
"""Retrieve events for a specific task."""
db = _ensure_db()

View File

@@ -65,7 +65,11 @@ def _get_vault():
def _get_brain_memory():
"""Lazy-import the brain unified memory."""
"""Lazy-import the brain unified memory.
Redirected to use unified memory.db (via vector_store) instead of
brain.db. The brain module is deprecated for new memory operations.
"""
from brain.memory import get_memory
return get_memory()

View File

@@ -0,0 +1,85 @@
"""Unified memory database — single SQLite DB for all memory types.
Consolidates three previously separate stores into one:
- **facts**: Long-term knowledge (user preferences, learned patterns)
- **chunks**: Indexed vault documents (markdown files from memory/)
- **episodes**: Runtime memories (conversations, agent observations)
All three tables live in ``data/memory.db``. Existing APIs in
``vector_store.py`` and ``semantic_memory.py`` are updated to point here.
"""
import logging
import sqlite3
from pathlib import Path
logger = logging.getLogger(__name__)
DB_PATH = Path(__file__).parent.parent.parent.parent / "data" / "memory.db"
def get_connection() -> sqlite3.Connection:
"""Open (and lazily create) the unified memory database."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
_ensure_schema(conn)
return conn
def _ensure_schema(conn: sqlite3.Connection) -> None:
"""Create the three core tables and indexes if they don't exist."""
# --- facts ---------------------------------------------------------------
conn.execute("""
CREATE TABLE IF NOT EXISTS facts (
id TEXT PRIMARY KEY,
category TEXT NOT NULL DEFAULT 'general',
content TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 0.8,
source TEXT NOT NULL DEFAULT 'agent',
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_confidence ON facts(confidence)")
# --- chunks (vault document fragments) -----------------------------------
conn.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
content TEXT NOT NULL,
embedding TEXT NOT NULL,
created_at TEXT NOT NULL,
source_hash TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source)")
# --- episodes (runtime memory entries) -----------------------------------
conn.execute("""
CREATE TABLE IF NOT EXISTS episodes (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
source TEXT NOT NULL,
context_type TEXT NOT NULL DEFAULT 'conversation',
embedding TEXT,
metadata TEXT,
agent_id TEXT,
task_id TEXT,
session_id TEXT,
timestamp TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_type ON episodes(context_type)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_time ON episodes(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_session ON episodes(session_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_agent ON episodes(agent_id)")
conn.commit()

View File

@@ -9,83 +9,28 @@ import sqlite3
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
DB_PATH = Path(__file__).parent.parent.parent.parent / "data" / "swarm.db"
# Simple embedding function using sentence-transformers if available,
# otherwise fall back to keyword-based "pseudo-embeddings"
_model = None
_has_embeddings = None
def _get_model():
"""Lazy-load the embedding model."""
global _model, _has_embeddings
if _has_embeddings is False:
return None
if _model is not None:
return _model
from config import settings
# In test mode or low-memory environments, skip embedding model load
if settings.timmy_skip_embeddings:
_has_embeddings = False
return None
def _check_embedding_model() -> bool | None:
"""Check if the canonical embedding model is available."""
try:
from sentence_transformers import SentenceTransformer
from timmy.semantic_memory import _get_embedding_model
_model = SentenceTransformer("all-MiniLM-L6-v2")
_has_embeddings = True
return _model
except (ImportError, RuntimeError, Exception):
# Gracefully fall back if anything goes wrong (e.g. OOM, Bus error)
_has_embeddings = False
model = _get_embedding_model()
return model is not None and model is not False
except Exception:
return None
def _get_embedding_dimension() -> int:
"""Get the dimension of embeddings."""
model = _get_model()
if model:
return model.get_sentence_embedding_dimension()
return 384 # Default for all-MiniLM-L6-v2
def _compute_embedding(text: str) -> list[float]:
"""Compute embedding vector for text.
Uses sentence-transformers if available, otherwise returns
a simple hash-based vector for basic similarity.
Delegates to the canonical embedding provider in semantic_memory
to avoid loading the model multiple times.
"""
model = _get_model()
if model:
try:
return model.encode(text).tolist()
except Exception:
pass
from timmy.semantic_memory import embed_text
# Fallback: simple character n-gram hash embedding
# Not as good but allows the system to work without heavy deps
dim = 384
vec = [0.0] * dim
text = text.lower()
# Generate character trigram features
for i in range(len(text) - 2):
trigram = text[i : i + 3]
hash_val = hash(trigram) % dim
vec[hash_val] += 1.0
# Normalize
norm = sum(x * x for x in vec) ** 0.5
if norm > 0:
vec = [x / norm for x in vec]
return vec
return embed_text(text)
@dataclass
@@ -106,45 +51,10 @@ class MemoryEntry:
def _get_conn() -> sqlite3.Connection:
"""Get database connection with vector extension."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
"""Get database connection to unified memory.db."""
from timmy.memory.unified import get_connection
# Try to load sqlite-vss extension
try:
conn.enable_load_extension(True)
conn.load_extension("vector0")
conn.load_extension("vss0")
_has_vss = True
except Exception:
_has_vss = False
# Create tables
conn.execute("""
CREATE TABLE IF NOT EXISTS memory_entries (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
source TEXT NOT NULL,
context_type TEXT NOT NULL DEFAULT 'conversation',
agent_id TEXT,
task_id TEXT,
session_id TEXT,
metadata TEXT,
embedding TEXT, -- JSON array of floats
timestamp TEXT NOT NULL
)
""")
# Create indexes
conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_agent ON memory_entries(agent_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_task ON memory_entries(task_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_session ON memory_entries(session_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_time ON memory_entries(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_type ON memory_entries(context_type)")
conn.commit()
return conn
return get_connection()
def store_memory(
@@ -190,7 +100,7 @@ def store_memory(
conn = _get_conn()
conn.execute(
"""
INSERT INTO memory_entries
INSERT INTO episodes
(id, content, source, context_type, agent_id, task_id, session_id,
metadata, embedding, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
@@ -258,7 +168,7 @@ def search_memories(
# Fetch candidates (we'll do in-memory similarity for now)
# For production with sqlite-vss, this would use vector similarity index
query_sql = f"""
SELECT * FROM memory_entries
SELECT * FROM episodes
{where_clause}
ORDER BY timestamp DESC
LIMIT ?
@@ -366,7 +276,7 @@ def recall_personal_facts(agent_id: str | None = None) -> list[str]:
if agent_id:
rows = conn.execute(
"""
SELECT content FROM memory_entries
SELECT content FROM episodes
WHERE context_type = 'fact' AND agent_id = ?
ORDER BY timestamp DESC
LIMIT 100
@@ -376,7 +286,7 @@ def recall_personal_facts(agent_id: str | None = None) -> list[str]:
else:
rows = conn.execute(
"""
SELECT content FROM memory_entries
SELECT content FROM episodes
WHERE context_type = 'fact'
ORDER BY timestamp DESC
LIMIT 100
@@ -392,12 +302,12 @@ def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]:
conn = _get_conn()
if agent_id:
rows = conn.execute(
"SELECT id, content FROM memory_entries WHERE context_type = 'fact' AND agent_id = ? ORDER BY timestamp DESC LIMIT 100",
"SELECT id, content FROM episodes WHERE context_type = 'fact' AND agent_id = ? ORDER BY timestamp DESC LIMIT 100",
(agent_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT id, content FROM memory_entries WHERE context_type = 'fact' ORDER BY timestamp DESC LIMIT 100",
"SELECT id, content FROM episodes WHERE context_type = 'fact' ORDER BY timestamp DESC LIMIT 100",
).fetchall()
conn.close()
return [{"id": r["id"], "content": r["content"]} for r in rows]
@@ -407,7 +317,7 @@ def update_personal_fact(memory_id: str, new_content: str) -> bool:
"""Update a personal fact's content."""
conn = _get_conn()
cursor = conn.execute(
"UPDATE memory_entries SET content = ? WHERE id = ? AND context_type = 'fact'",
"UPDATE episodes SET content = ? WHERE id = ? AND context_type = 'fact'",
(new_content, memory_id),
)
conn.commit()
@@ -443,7 +353,7 @@ def delete_memory(memory_id: str) -> bool:
"""
conn = _get_conn()
cursor = conn.execute(
"DELETE FROM memory_entries WHERE id = ?",
"DELETE FROM episodes WHERE id = ?",
(memory_id,),
)
conn.commit()
@@ -460,17 +370,17 @@ def get_memory_stats() -> dict:
"""
conn = _get_conn()
total = conn.execute("SELECT COUNT(*) as count FROM memory_entries").fetchone()["count"]
total = conn.execute("SELECT COUNT(*) as count FROM episodes").fetchone()["count"]
by_type = {}
rows = conn.execute(
"SELECT context_type, COUNT(*) as count FROM memory_entries GROUP BY context_type"
"SELECT context_type, COUNT(*) as count FROM episodes GROUP BY context_type"
).fetchall()
for row in rows:
by_type[row["context_type"]] = row["count"]
with_embeddings = conn.execute(
"SELECT COUNT(*) as count FROM memory_entries WHERE embedding IS NOT NULL"
"SELECT COUNT(*) as count FROM episodes WHERE embedding IS NOT NULL"
).fetchone()["count"]
conn.close()
@@ -479,7 +389,7 @@ def get_memory_stats() -> dict:
"total_entries": total,
"by_type": by_type,
"with_embeddings": with_embeddings,
"has_embedding_model": _has_embeddings,
"has_embedding_model": _check_embedding_model(),
}
@@ -502,14 +412,14 @@ def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int:
if keep_facts:
cursor = conn.execute(
"""
DELETE FROM memory_entries
DELETE FROM episodes
WHERE timestamp < ? AND context_type != 'fact'
""",
(cutoff,),
)
else:
cursor = conn.execute(
"DELETE FROM memory_entries WHERE timestamp < ?",
"DELETE FROM episodes WHERE timestamp < ?",
(cutoff,),
)

296
src/timmy/memory_migrate.py Normal file
View File

@@ -0,0 +1,296 @@
"""One-shot migration: consolidate old memory databases into data/memory.db.
Migrates:
- data/semantic_memory.db → memory.db (chunks table)
- data/swarm.db → memory.db (memory_entries → episodes table)
- data/brain.db → memory.db (facts table, if any rows exist)
After migration the old DB files are moved to data/archive/.
Usage:
python -m timmy.memory_migrate # dry-run (default)
python -m timmy.memory_migrate --apply # actually migrate
"""
import json
import logging
import shutil
import sqlite3
import sys
from pathlib import Path
logger = logging.getLogger(__name__)
PROJECT_ROOT = Path(__file__).parent.parent.parent
DATA_DIR = PROJECT_ROOT / "data"
ARCHIVE_DIR = DATA_DIR / "archive"
MEMORY_DB = DATA_DIR / "memory.db"
def _open(path: Path) -> sqlite3.Connection:
conn = sqlite3.connect(str(path))
conn.row_factory = sqlite3.Row
return conn
def migrate_semantic_chunks(dry_run: bool = True) -> int:
"""Copy chunks from semantic_memory.db → memory.db."""
src = DATA_DIR / "semantic_memory.db"
if not src.exists():
logger.info("semantic_memory.db not found — skipping")
return 0
src_conn = _open(src)
# Check if source table exists
has_table = src_conn.execute(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='chunks'"
).fetchone()[0]
if not has_table:
src_conn.close()
return 0
rows = src_conn.execute("SELECT * FROM chunks").fetchall()
src_conn.close()
if not rows:
logger.info("semantic_memory.db: no chunks to migrate")
return 0
if dry_run:
logger.info("[DRY RUN] Would migrate %d chunks from semantic_memory.db", len(rows))
return len(rows)
from timmy.memory.unified import get_connection
dst = get_connection()
migrated = 0
for r in rows:
try:
dst.execute(
"INSERT OR IGNORE INTO chunks (id, source, content, embedding, created_at, source_hash) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
r["id"],
r["source"],
r["content"],
r["embedding"],
r["created_at"],
r["source_hash"],
),
)
migrated += 1
except Exception as exc:
logger.warning("Chunk migration error: %s", exc)
dst.commit()
dst.close()
logger.info("Migrated %d chunks from semantic_memory.db", migrated)
return migrated
def migrate_memory_entries(dry_run: bool = True) -> int:
"""Copy memory_entries from swarm.db → memory.db episodes table."""
src = DATA_DIR / "swarm.db"
if not src.exists():
logger.info("swarm.db not found — skipping")
return 0
src_conn = _open(src)
has_table = src_conn.execute(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='memory_entries'"
).fetchone()[0]
if not has_table:
src_conn.close()
return 0
rows = src_conn.execute("SELECT * FROM memory_entries").fetchall()
src_conn.close()
if not rows:
logger.info("swarm.db: no memory_entries to migrate")
return 0
if dry_run:
logger.info("[DRY RUN] Would migrate %d memory_entries from swarm.db → episodes", len(rows))
return len(rows)
from timmy.memory.unified import get_connection
dst = get_connection()
migrated = 0
for r in rows:
try:
dst.execute(
"INSERT OR IGNORE INTO episodes "
"(id, content, source, context_type, embedding, metadata, agent_id, task_id, session_id, timestamp) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
r["id"],
r["content"],
r["source"],
r["context_type"],
r["embedding"],
r["metadata"],
r["agent_id"],
r["task_id"],
r["session_id"],
r["timestamp"],
),
)
migrated += 1
except Exception as exc:
logger.warning("Episode migration error: %s", exc)
dst.commit()
dst.close()
logger.info("Migrated %d memory_entries → episodes", migrated)
return migrated
def migrate_brain_facts(dry_run: bool = True) -> int:
"""Copy facts from brain.db → memory.db facts table."""
src = DATA_DIR / "brain.db"
if not src.exists():
logger.info("brain.db not found — skipping")
return 0
src_conn = _open(src)
has_table = src_conn.execute(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='facts'"
).fetchone()[0]
if not has_table:
# Try 'memories' table (brain.db sometimes uses this name)
has_memories = src_conn.execute(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='memories'"
).fetchone()[0]
if not has_memories:
src_conn.close()
return 0
rows = src_conn.execute("SELECT * FROM memories").fetchall()
src_conn.close()
if not rows:
return 0
if dry_run:
logger.info("[DRY RUN] Would migrate %d brain memories → facts", len(rows))
return len(rows)
from timmy.memory.unified import get_connection
dst = get_connection()
from datetime import UTC, datetime
migrated = 0
for r in rows:
try:
dst.execute(
"INSERT OR IGNORE INTO facts "
"(id, category, content, confidence, source, tags, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
r["id"],
"brain",
r.get("content", r.get("text", "")),
0.7,
"brain",
"[]",
r.get("created_at", datetime.now(UTC).isoformat()),
),
)
migrated += 1
except Exception as exc:
logger.warning("Brain fact migration error: %s", exc)
dst.commit()
dst.close()
return migrated
rows = src_conn.execute("SELECT * FROM facts").fetchall()
src_conn.close()
if not rows:
logger.info("brain.db: no facts to migrate")
return 0
if dry_run:
logger.info("[DRY RUN] Would migrate %d facts from brain.db", len(rows))
return len(rows)
from timmy.memory.unified import get_connection
dst = get_connection()
migrated = 0
for r in rows:
try:
dst.execute(
"INSERT OR IGNORE INTO facts "
"(id, category, content, confidence, source, tags, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
r["id"],
r.get("category", "brain"),
r["content"],
r.get("confidence", 0.7),
"brain",
json.dumps(r.get("tags", [])) if isinstance(r.get("tags"), list) else "[]",
r.get("created_at", ""),
),
)
migrated += 1
except Exception as exc:
logger.warning("Fact migration error: %s", exc)
dst.commit()
dst.close()
logger.info("Migrated %d facts from brain.db", migrated)
return migrated
def archive_old_dbs(dry_run: bool = True) -> list[str]:
"""Move old database files to data/archive/."""
old_dbs = ["semantic_memory.db", "brain.db"]
archived = []
for name in old_dbs:
src = DATA_DIR / name
if not src.exists():
continue
if dry_run:
logger.info("[DRY RUN] Would archive %s → data/archive/%s", name, name)
archived.append(name)
else:
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
dst = ARCHIVE_DIR / name
shutil.move(str(src), str(dst))
logger.info("Archived %s → data/archive/%s", name, name)
archived.append(name)
return archived
def run_migration(dry_run: bool = True) -> dict:
"""Run the full migration pipeline."""
results = {
"chunks": migrate_semantic_chunks(dry_run),
"episodes": migrate_memory_entries(dry_run),
"facts": migrate_brain_facts(dry_run),
"archived": archive_old_dbs(dry_run),
"dry_run": dry_run,
}
total = results["chunks"] + results["episodes"] + results["facts"]
mode = "DRY RUN" if dry_run else "APPLIED"
logger.info("[%s] Migration complete: %d total records", mode, total)
return results
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
apply = "--apply" in sys.argv
results = run_migration(dry_run=not apply)
print(f"\n{'=' * 50}")
print(f"Migration {'APPLIED' if apply else 'DRY RUN'}:")
print(f" Chunks migrated: {results['chunks']}")
print(f" Episodes migrated: {results['episodes']}")
print(f" Facts migrated: {results['facts']}")
print(f" Archived DBs: {results['archived']}")
if not apply:
print("\nRun with --apply to execute the migration.")
print(f"{'=' * 50}")

View File

@@ -465,8 +465,34 @@ class MemorySystem:
if profile:
context_parts.append("## User Context\n" + profile)
# 4. Known facts from long-term memory
facts_section = self._load_known_facts()
if facts_section:
context_parts.append(facts_section)
return "\n\n---\n\n".join(context_parts)
def _load_known_facts(self, limit: int = 10) -> str:
"""Load top facts from the episodes table (context_type='fact').
Returns a formatted section string, or empty string if no facts exist.
"""
try:
from timmy.memory.vector_store import recall_personal_facts
facts = recall_personal_facts()
if not facts:
return ""
# Cap at limit
facts = facts[:limit]
lines = ["## Known Facts\n"]
for fact in facts:
lines.append(f"- {fact[:200]}")
return "\n".join(lines)
except Exception as exc:
logger.debug("Failed to load known facts: %s", exc)
return ""
# ── Lazy singleton ────────────────────────────────────────────────────────────
_memory_system: MemorySystem | None = None

View File

@@ -24,7 +24,7 @@ logger = logging.getLogger(__name__)
# Paths
PROJECT_ROOT = Path(__file__).parent.parent.parent
VAULT_PATH = PROJECT_ROOT / "memory"
SEMANTIC_DB_PATH = PROJECT_ROOT / "data" / "semantic_memory.db"
SEMANTIC_DB_PATH = PROJECT_ROOT / "data" / "memory.db"
# Embedding model - small, fast, local
# Using 'all-MiniLM-L6-v2' (~80MB) or fallback to simple keyword matching
@@ -119,12 +119,12 @@ class SemanticMemory:
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
content TEXT NOT NULL,
embedding TEXT NOT NULL, -- JSON array
embedding TEXT NOT NULL,
created_at TEXT NOT NULL,
source_hash TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_source ON chunks(source)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source)")
conn.commit()
conn.close()
@@ -434,7 +434,15 @@ def memory_write(content: str, context_type: str = "fact") -> str:
context_type = "fact"
try:
from timmy.memory.vector_store import store_memory
from timmy.memory.vector_store import search_memories, store_memory
# Dedup check for facts — skip if a very similar fact already exists
if context_type == "fact":
existing = search_memories(
content.strip(), limit=3, context_type="fact", min_relevance=0.9
)
if existing:
return f"Similar fact already stored (id={existing[0].id[:8]}). Skipping duplicate."
entry = store_memory(
content=content.strip(),
@@ -445,3 +453,38 @@ def memory_write(content: str, context_type: str = "fact") -> str:
except Exception as exc:
logger.error("Failed to write memory: %s", exc)
return f"Failed to store memory: {exc}"
def memory_forget(query: str) -> str:
"""Remove a stored memory that is outdated, incorrect, or no longer relevant.
Searches for memories matching the query and deletes the closest match.
Use this when the user says to forget something or when stored information
has changed.
Args:
query: Description of the memory to forget (e.g. "my phone number",
"the old server address").
Returns:
Confirmation of what was forgotten, or a message if nothing matched.
"""
if not query or not query.strip():
return "Nothing to forget — query is empty."
try:
from timmy.memory.vector_store import delete_memory, search_memories
results = search_memories(query.strip(), limit=3, min_relevance=0.3)
if not results:
return "No matching memories found to forget."
# Delete the closest match
best = results[0]
deleted = delete_memory(best.id)
if deleted:
return f'Forgotten: "{best.content[:80]}" (type={best.context_type})'
return "Memory not found (may have already been deleted)."
except Exception as exc:
logger.error("Failed to forget memory: %s", exc)
return f"Failed to forget: {exc}"

View File

@@ -22,7 +22,7 @@ import random
import sqlite3
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from pathlib import Path
from config import settings
@@ -225,6 +225,9 @@ class ThinkingEngine:
thought = self._store_thought(content.strip(), seed_type)
self._last_thought_id = thought.id
# Post-hook: distill facts from recent thoughts periodically
self._maybe_distill()
# Post-hook: update MEMORY.md with latest reflection
self._update_memory(thought)
@@ -291,8 +294,90 @@ class ThinkingEngine:
conn.close()
return count
def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int:
"""Delete thoughts older than *keep_days*, always retaining at least *keep_min*.
Returns the number of deleted rows.
"""
conn = _get_conn(self._db_path)
try:
total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
if total <= keep_min:
return 0
cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
cursor = conn.execute(
"DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
"(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
(cutoff, keep_min),
)
deleted = cursor.rowcount
conn.commit()
return deleted
except Exception as exc:
logger.warning("Thought pruning failed: %s", exc)
return 0
finally:
conn.close()
# ── Private helpers ──────────────────────────────────────────────────
def _maybe_distill(self) -> None:
"""Every N thoughts, extract lasting insights and store as facts.
Reads the last N thoughts, asks the LLM to extract any durable facts
or insights, and stores them via memory_write. Only runs when the
thought count is divisible by the configured interval.
"""
try:
interval = settings.thinking_distill_every
if interval <= 0:
return
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return
recent = self.get_recent_thoughts(limit=interval)
if len(recent) < interval:
return
# Build a summary of recent thoughts for the LLM
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(recent))
distill_prompt = (
"You are reviewing your own recent thoughts. Extract 0-3 lasting "
"insights, preferences, or facts worth remembering long-term. "
"Return ONLY a JSON array of strings. If nothing is worth saving, "
"return []. Do not include ephemeral observations or repeated themes.\n\n"
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
raw = self._call_agent(distill_prompt)
if not raw or not raw.strip():
return
# Parse JSON array from response
import json
# Strip markdown code fences if present
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
facts = json.loads(cleaned)
if not isinstance(facts, list) or not facts:
return
from timmy.semantic_memory import memory_write
for fact in facts[:3]: # Safety cap
if isinstance(fact, str) and len(fact.strip()) > 10:
result = memory_write(fact.strip(), context_type="fact")
logger.info("Distilled fact: %s%s", fact[:60], result[:40])
except Exception as exc:
logger.debug("Thought distillation skipped: %s", exc)
def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality.

View File

@@ -490,13 +490,14 @@ def create_full_toolkit(base_dir: str | Path | None = None):
except Exception:
logger.debug("Grok tool not available")
# Memory search and write — persistent recall across all channels
# Memory search, write, and forget — persistent recall across all channels
try:
from timmy.semantic_memory import memory_read, memory_search, memory_write
from timmy.semantic_memory import memory_forget, memory_read, memory_search, memory_write
toolkit.register(memory_search, name="memory_search")
toolkit.register(memory_write, name="memory_write")
toolkit.register(memory_read, name="memory_read")
toolkit.register(memory_forget, name="memory_forget")
except Exception:
logger.debug("Memory tools not available")

View File

@@ -156,7 +156,7 @@ def get_memory_status() -> dict[str, Any]:
try:
import sqlite3
sem_db = repo_root / "data" / "semantic_memory.db"
sem_db = repo_root / "data" / "memory.db"
if sem_db.exists():
conn = sqlite3.connect(str(sem_db))
row = conn.execute(

View File

@@ -69,15 +69,19 @@ def clean_database(tmp_path):
tmp_work_orders_db = tmp_path / "work_orders.db"
_swarm_db_modules = [
"timmy.memory.vector_store",
"infrastructure.models.registry",
]
_memory_db_modules = [
"timmy.memory.unified",
]
_spark_db_modules = [
"spark.memory",
"spark.eidos",
]
_self_coding_db_modules = []
tmp_memory_db = tmp_path / "memory.db"
originals = {}
for mod_name in _swarm_db_modules:
try:
@@ -88,6 +92,23 @@ def clean_database(tmp_path):
except Exception:
pass
for mod_name in _memory_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
originals[(mod_name, "DB_PATH")] = mod.DB_PATH
mod.DB_PATH = tmp_memory_db
except Exception:
pass
# Redirect semantic memory DB path (uses SEMANTIC_DB_PATH, not DB_PATH)
try:
import timmy.semantic_memory as _sem_mod
originals[("timmy.semantic_memory", "SEMANTIC_DB_PATH")] = _sem_mod.SEMANTIC_DB_PATH
_sem_mod.SEMANTIC_DB_PATH = tmp_memory_db
except Exception:
pass
for mod_name in _spark_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])