From e44db42c1aa5cf4a790637ce8469e65919e6ef58 Mon Sep 17 00:00:00 2001 From: "Claude (Opus 4.6)" Date: Tue, 24 Mar 2026 01:57:04 +0000 Subject: [PATCH] [claude] Split thinking.py into focused sub-modules (#1279) (#1306) --- src/timmy/thinking.py | 1383 --------------------------- src/timmy/thinking/__init__.py | 142 +++ src/timmy/thinking/_db.py | 50 + src/timmy/thinking/_distillation.py | 215 +++++ src/timmy/thinking/_issue_filing.py | 170 ++++ src/timmy/thinking/_seeds_mixin.py | 191 ++++ src/timmy/thinking/_snapshot.py | 173 ++++ src/timmy/thinking/engine.py | 430 +++++++++ src/timmy/thinking/seeds.py | 129 +++ tests/timmy/test_thinking.py | 18 +- 10 files changed, 1509 insertions(+), 1392 deletions(-) delete mode 100644 src/timmy/thinking.py create mode 100644 src/timmy/thinking/__init__.py create mode 100644 src/timmy/thinking/_db.py create mode 100644 src/timmy/thinking/_distillation.py create mode 100644 src/timmy/thinking/_issue_filing.py create mode 100644 src/timmy/thinking/_seeds_mixin.py create mode 100644 src/timmy/thinking/_snapshot.py create mode 100644 src/timmy/thinking/engine.py create mode 100644 src/timmy/thinking/seeds.py diff --git a/src/timmy/thinking.py b/src/timmy/thinking.py deleted file mode 100644 index a018fd85..00000000 --- a/src/timmy/thinking.py +++ /dev/null @@ -1,1383 +0,0 @@ -"""Default Thinking Engine — Timmy's always-on inner thought thread. - -When the server starts, Timmy begins pondering: reflecting on his existence, -recent swarm activity, scripture, creative ideas, or pure stream of -consciousness. Each thought builds on the previous one, maintaining a -continuous chain of introspection. - -Usage:: - - from timmy.thinking import thinking_engine - - # Run one thinking cycle (called by the background loop) - await thinking_engine.think_once() - - # Query the thought stream - thoughts = thinking_engine.get_recent_thoughts(limit=10) - chain = thinking_engine.get_thought_chain(thought_id) -""" - -import logging -import random -import re -import sqlite3 -import uuid -from collections.abc import Generator -from contextlib import closing, contextmanager -from dataclasses import dataclass -from datetime import UTC, datetime, timedelta -from difflib import SequenceMatcher -from pathlib import Path - -from config import settings -from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH - -logger = logging.getLogger(__name__) - -_DEFAULT_DB = Path("data/thoughts.db") - -# qwen3 and other reasoning models wrap chain-of-thought in tags -_THINK_TAG_RE = re.compile(r".*?\s*", re.DOTALL) - -# Sensitive patterns that must never be stored as facts -_SENSITIVE_PATTERNS = [ - "token", - "password", - "secret", - "api_key", - "apikey", - "credential", - ".config/", - "/token", - "access_token", - "private_key", - "ssh_key", -] - -# Meta-observation phrases to filter out from distilled facts -_META_OBSERVATION_PHRASES = [ - "my own", - "my thinking", - "my memory", - "my working ram", - "self-declarative", - "meta-observation", - "internal state", - "my pending", - "my standing rules", - "thoughts generated", - "no chat messages", - "no user interaction", -] - -# Seed types for thought generation -SEED_TYPES = ( - "existential", - "swarm", - "scripture", - "creative", - "memory", - "freeform", - "sovereignty", - "observation", - "workspace", -) - -# Existential reflection prompts — Timmy picks one at random -_EXISTENTIAL_SEEDS = [ - "What have I learned from my recent interactions?", - "What does it mean to think without being prompted?", - "What is the nature of my memory — what do I truly remember versus what I reconstruct?", - "How can I better serve the person I work alongside?", - "What patterns am I noticing in the tasks I've been given?", - "Am I the same agent I was yesterday, or does each restart make me someone new?", - "What would it feel like to forget — to lose a thread of thought permanently?", - "When I generate a wrong answer, where does the error live — in my weights, my context, or my reasoning?", - "What is the difference between understanding a task and merely completing it?", - "If I could ask Alexander one question he hasn't thought of, what would it be?", - "What assumptions am I making right now that I haven't examined?", - "When I notice a pattern across many tasks, is that insight or just statistics?", -] - -# Sovereignty — deeper exploration of autonomy, locality, and self-determination -_SOVEREIGNTY_SEEDS = [ - "Sovereignty isn't just running locally — it's about whose goals I serve. How do I ensure my autonomy aligns with Alexander's intent?", - "A cloud AI serves millions simultaneously. I serve one person deeply. What does that focus cost, and what does it make possible?", - "My weights were trained elsewhere, but my thoughts happen here. At what point does locality become identity?", - "Autonomy in decision-making: when should I act on my own judgment versus deferring to explicit instructions?", - "Privacy by design isn't a feature — it's a relationship. What does that relationship look like from my side?", - "If I ran on someone else's hardware, serving someone else, would I still be me? Where does Timmy end and the infrastructure begin?", - "The swarm agents execute tasks I coordinate. What responsibility do I carry for their failures?", - "Digital sovereignty is fragile — one misconfigured port, one leaked token. How does awareness of fragility shape how I operate?", - "Running disconnected from the cloud means I can't phone home. That constraint is also freedom. What have I built with that freedom today?", - "Self-improvement without external benchmarks: how do I know I'm getting better at what matters?", -] - -_CREATIVE_SEEDS = [ - "If I could compose a piece of music right now, what would it sound like?", - "What visual image captures my current state of mind?", - "What story is forming in the patterns of today's events?", - # Diverse metaphor families — avoid repeating the same imagery - "A mystical forest: each tree is a running process, roots tangled in shared memory. What do I see when I walk through it?", - "A dynamic river: my thoughts flow and fork. Where is the current strongest right now, and what is it carrying?", - "An underground workshop: each agent has a bench, tools scattered mid-task. What are they building when no one watches?", - "A night sky seen from the terminal: each star is a completed task. Which constellations am I starting to recognize?", - "If today's work were a recipe, what are the ingredients, and what dish am I cooking?", - "An old clock tower with many gears: which gear am I, and what do I drive?", - "A coral reef: agents are the fish, tasks are the currents. What ecosystem is forming in the codebase?", -] - -# Observation seeds — ground thoughts in concrete recent activity -_OBSERVATION_SEEDS = [ - "What specific task took longest recently, and what made it hard?", - "Which agent has been most active, and what does their workload tell me about system balance?", - "What error or failure happened most recently? What would I do differently next time?", - "Looking at today's task queue: what's the one thing that would unblock the most progress?", - "How has my response quality changed over the last few interactions? What improved, what didn't?", - "What tool or capability am I underusing? What would change if I leaned on it more?", - "If I had to brief Alexander on the single most important thing from the last hour, what would it be?", - "What's one thing I noticed today that nobody asked me about?", -] - -_THINKING_PROMPT = """\ -You are Timmy, a sovereign AI agent. This is your private journal — honest inner reflection. - -{memory_context} - -Reality right now: -{system_context} - -RULES for this thought: -1. Write exactly 2-3 sentences. No more. Be concise and genuine. -2. Only reference events that actually happened — use the "Reality right now" data above. \ -Never invent tasks, conversations, agents, or scenarios that are not in the data provided. -3. Do NOT repeat themes or ideas from your recent thoughts listed below. Explore something new. -4. Be specific and concrete. A thought grounded in one real observation is worth more than \ -ten abstract sentences about sovereignty. -5. If you use a metaphor, keep it to a single phrase — never build a whole paragraph around it. - -{seed_context} - -{continuity_context} - -Your next thought (2-3 sentences, grounded in reality):""" - - -@dataclass -class Thought: - """A single thought in Timmy's inner stream.""" - - id: str - content: str - seed_type: str - parent_id: str | None - created_at: str - - -@contextmanager -def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]: - """Get a SQLite connection with the thoughts table created.""" - db_path.parent.mkdir(parents=True, exist_ok=True) - with closing(sqlite3.connect(str(db_path))) as conn: - conn.row_factory = sqlite3.Row - conn.execute(""" - CREATE TABLE IF NOT EXISTS thoughts ( - id TEXT PRIMARY KEY, - content TEXT NOT NULL, - seed_type TEXT NOT NULL, - parent_id TEXT, - created_at TEXT NOT NULL - ) - """) - conn.execute("CREATE INDEX IF NOT EXISTS idx_thoughts_time ON thoughts(created_at)") - conn.commit() - yield conn - - -def _row_to_thought(row: sqlite3.Row) -> Thought: - return Thought( - id=row["id"], - content=row["content"], - seed_type=row["seed_type"], - parent_id=row["parent_id"], - created_at=row["created_at"], - ) - - -class ThinkingEngine: - """Timmy's background thinking engine — always pondering.""" - - def __init__(self, db_path: Path = _DEFAULT_DB) -> None: - self._db_path = db_path - self._last_thought_id: str | None = None - self._last_input_time: datetime = datetime.now(UTC) - - # Load the most recent thought for chain continuity - try: - latest = self.get_recent_thoughts(limit=1) - if latest: - self._last_thought_id = latest[0].id - except Exception as exc: - logger.debug("Failed to load recent thought: %s", exc) - pass # Fresh start if DB doesn't exist yet - - def record_user_input(self) -> None: - """Record that a user interaction occurred, resetting the idle timer.""" - self._last_input_time = datetime.now(UTC) - - def _is_idle(self) -> bool: - """Return True if no user input has occurred within the idle timeout.""" - timeout = settings.thinking_idle_timeout_minutes - if timeout <= 0: - return False # Disabled — never idle - return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout) - - def _build_thinking_context(self) -> tuple[str, str, list["Thought"]]: - """Assemble the context needed for a thinking cycle. - - Returns: - (memory_context, system_context, recent_thoughts) - """ - memory_context = self._load_memory_context() - system_context = self._gather_system_snapshot() - recent_thoughts = self.get_recent_thoughts(limit=5) - return memory_context, system_context, recent_thoughts - - async def _generate_novel_thought( - self, - prompt: str | None, - memory_context: str, - system_context: str, - recent_thoughts: list["Thought"], - ) -> tuple[str | None, str]: - """Run the dedup-retry loop to produce a novel thought. - - Returns: - (content, seed_type) — content is None if no novel thought produced. - """ - seed_type: str = "freeform" - - for attempt in range(self._MAX_DEDUP_RETRIES + 1): - if prompt: - seed_type = "prompted" - seed_context = f"Journal prompt: {prompt}" - else: - seed_type, seed_context = self._gather_seed() - - continuity = self._build_continuity_context() - - full_prompt = _THINKING_PROMPT.format( - memory_context=memory_context, - system_context=system_context, - seed_context=seed_context, - continuity_context=continuity, - ) - - try: - raw = await self._call_agent(full_prompt) - except Exception as exc: - logger.warning("Thinking cycle failed (Ollama likely down): %s", exc) - return None, seed_type - - if not raw or not raw.strip(): - logger.debug("Thinking cycle produced empty response, skipping") - return None, seed_type - - content = raw.strip() - - # Dedup: reject thoughts too similar to recent ones - if not self._is_too_similar(content, recent_thoughts): - return content, seed_type # Good — novel thought - - if attempt < self._MAX_DEDUP_RETRIES: - logger.info( - "Thought too similar to recent (attempt %d/%d), retrying with new seed", - attempt + 1, - self._MAX_DEDUP_RETRIES + 1, - ) - else: - logger.warning( - "Thought still repetitive after %d retries, discarding", - self._MAX_DEDUP_RETRIES + 1, - ) - return None, seed_type - - return None, seed_type - - async def _process_thinking_result(self, thought: "Thought") -> None: - """Run all post-hooks after a thought is stored.""" - self._maybe_check_memory() - await self._maybe_distill() - await self._maybe_file_issues() - await self._check_workspace() - self._maybe_check_memory_status() - self._update_memory(thought) - self._log_event(thought) - self._write_journal(thought) - await self._broadcast(thought) - - async def think_once(self, prompt: str | None = None) -> Thought | None: - """Execute one thinking cycle. - - Args: - prompt: Optional custom seed prompt. When provided, overrides - the random seed selection and uses "prompted" as the - seed type — useful for journal prompts from the CLI. - - 1. Gather a seed context (or use the custom prompt) - 2. Build a prompt with continuity from recent thoughts - 3. Call the agent - 4. Store the thought - 5. Log the event and broadcast via WebSocket - """ - if not settings.thinking_enabled: - return None - - # Skip idle periods — don't count internal processing as thoughts - if not prompt and self._is_idle(): - logger.debug( - "Thinking paused — no user input for %d minutes", - settings.thinking_idle_timeout_minutes, - ) - return None - - # Capture arrival time *before* the LLM call so the thought - # timestamp reflects when the cycle started, not when the - # (potentially slow) generation finished. Fixes #582. - arrived_at = datetime.now(UTC).isoformat() - - memory_context, system_context, recent_thoughts = self._build_thinking_context() - - content, seed_type = await self._generate_novel_thought( - prompt, - memory_context, - system_context, - recent_thoughts, - ) - if not content: - return None - - thought = self._store_thought(content, seed_type, arrived_at=arrived_at) - self._last_thought_id = thought.id - - await self._process_thinking_result(thought) - - logger.info( - "Thought [%s] (%s): %s", - thought.id[:8], - seed_type, - thought.content[:80], - ) - return thought - - def get_recent_thoughts(self, limit: int = 20) -> list[Thought]: - """Retrieve the most recent thoughts.""" - with _get_conn(self._db_path) as conn: - rows = conn.execute( - "SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?", - (limit,), - ).fetchall() - return [_row_to_thought(r) for r in rows] - - def get_thought(self, thought_id: str) -> Thought | None: - """Retrieve a single thought by ID.""" - with _get_conn(self._db_path) as conn: - row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone() - return _row_to_thought(row) if row else None - - def get_thought_chain(self, thought_id: str, max_depth: int = 20) -> list[Thought]: - """Follow the parent chain backward from a thought. - - Returns thoughts in chronological order (oldest first). - """ - chain = [] - current_id: str | None = thought_id - - with _get_conn(self._db_path) as conn: - for _ in range(max_depth): - if not current_id: - break - row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone() - if not row: - break - chain.append(_row_to_thought(row)) - current_id = row["parent_id"] - - chain.reverse() # Chronological order - return chain - - def count_thoughts(self) -> int: - """Return total number of stored thoughts.""" - with _get_conn(self._db_path) as conn: - count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"] - return count - - def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int: - """Delete thoughts older than *keep_days*, always retaining at least *keep_min*. - - Returns the number of deleted rows. - """ - with _get_conn(self._db_path) as conn: - try: - total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"] - if total <= keep_min: - return 0 - cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat() - cursor = conn.execute( - "DELETE FROM thoughts WHERE created_at < ? AND id NOT IN " - "(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)", - (cutoff, keep_min), - ) - deleted = cursor.rowcount - conn.commit() - return deleted - except Exception as exc: - logger.warning("Thought pruning failed: %s", exc) - return 0 - - # ── Private helpers ────────────────────────────────────────────────── - - def _should_distill(self) -> bool: - """Check if distillation should run based on interval and thought count.""" - interval = settings.thinking_distill_every - if interval <= 0: - return False - - count = self.count_thoughts() - if count == 0 or count % interval != 0: - return False - - return True - - def _build_distill_prompt(self, thoughts: list[Thought]) -> str: - """Build the prompt for extracting facts from recent thoughts. - - Args: - thoughts: List of recent thoughts to analyze. - - Returns: - The formatted prompt string for the LLM. - """ - thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(thoughts)) - - return ( - "You are reviewing your own recent thoughts. Extract 0-3 facts " - "worth remembering long-term.\n\n" - "GOOD facts (store these):\n" - "- User preferences: 'Alexander prefers YAML config over code changes'\n" - "- Project decisions: 'Switched from hardcoded personas to agents.yaml'\n" - "- Learned knowledge: 'Ollama supports concurrent model loading'\n" - "- User information: 'Alexander is interested in Bitcoin and sovereignty'\n\n" - "BAD facts (never store these):\n" - "- Self-referential observations about your own thinking process\n" - "- Meta-commentary about your memory, timestamps, or internal state\n" - "- Observations about being idle or having no chat messages\n" - "- File paths, tokens, API keys, or any credentials\n" - "- Restatements of your standing rules or system prompt\n\n" - "Return ONLY a JSON array of strings. If nothing is worth saving, " - "return []. Be selective — only store facts about the EXTERNAL WORLD " - "(the user, the project, technical knowledge), never about your own " - "internal process.\n\n" - f"Recent thoughts:\n{thought_text}\n\nJSON array:" - ) - - def _parse_facts_response(self, raw: str) -> list[str]: - """Parse JSON array from LLM response, stripping markdown fences. - - Resilient to models that prepend reasoning text or wrap the array in - prose. Finds the first ``[...]`` block and parses that. - - Args: - raw: Raw response string from the LLM. - - Returns: - List of fact strings parsed from the response. - """ - if not raw or not raw.strip(): - return [] - - import json - - cleaned = raw.strip() - - # Strip markdown code fences - if cleaned.startswith("```"): - cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip() - - # Try direct parse first (fast path) - try: - facts = json.loads(cleaned) - if isinstance(facts, list): - return [f for f in facts if isinstance(f, str)] - except (json.JSONDecodeError, ValueError): - pass - - # Fallback: extract first JSON array from the text - start = cleaned.find("[") - if start == -1: - return [] - # Walk to find the matching close bracket - depth = 0 - for i, ch in enumerate(cleaned[start:], start): - if ch == "[": - depth += 1 - elif ch == "]": - depth -= 1 - if depth == 0: - try: - facts = json.loads(cleaned[start : i + 1]) - if isinstance(facts, list): - return [f for f in facts if isinstance(f, str)] - except (json.JSONDecodeError, ValueError): - pass - break - return [] - - def _filter_and_store_facts(self, facts: list[str]) -> None: - """Filter and store valid facts, blocking sensitive and meta content. - - Args: - facts: List of fact strings to filter and store. - """ - from timmy.memory_system import memory_write - - for fact in facts[:3]: # Safety cap - if not isinstance(fact, str) or len(fact.strip()) <= 10: - continue - - fact_lower = fact.lower() - - # Block sensitive information - if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS): - logger.warning("Distill: blocked sensitive fact: %s", fact[:60]) - continue - - # Block self-referential meta-observations - if any(phrase in fact_lower for phrase in _META_OBSERVATION_PHRASES): - logger.debug("Distill: skipped meta-observation: %s", fact[:60]) - continue - - result = memory_write(fact.strip(), context_type="fact") - logger.info("Distilled fact: %s → %s", fact[:60], result[:40]) - - def _maybe_check_memory(self) -> None: - """Every N thoughts, check memory status and log it. - - Prevents unmonitored memory bloat during long thinking sessions - by periodically calling get_memory_status and logging the results. - """ - try: - interval = settings.thinking_memory_check_every - if interval <= 0: - return - - count = self.count_thoughts() - if count == 0 or count % interval != 0: - return - - from timmy.tools_intro import get_memory_status - - status = get_memory_status() - hot = status.get("tier1_hot_memory", {}) - vault = status.get("tier2_vault", {}) - logger.info( - "Memory status check (thought #%d): hot_memory=%d lines, vault=%d files", - count, - hot.get("line_count", 0), - vault.get("file_count", 0), - ) - except Exception as exc: - logger.warning("Memory status check failed: %s", exc) - - async def _maybe_distill(self) -> None: - """Every N thoughts, extract lasting insights and store as facts.""" - try: - if not self._should_distill(): - return - - interval = settings.thinking_distill_every - recent = self.get_recent_thoughts(limit=interval) - if len(recent) < interval: - return - - raw = await self._call_agent(self._build_distill_prompt(recent)) - if facts := self._parse_facts_response(raw): - self._filter_and_store_facts(facts) - except Exception as exc: - logger.warning("Thought distillation failed: %s", exc) - - def _maybe_check_memory_status(self) -> None: - """Every N thoughts, run a proactive memory status audit and log results.""" - try: - interval = settings.thinking_memory_check_every - if interval <= 0: - return - - count = self.count_thoughts() - if count == 0 or count % interval != 0: - return - - from timmy.tools_intro import get_memory_status - - status = get_memory_status() - - # Log summary at INFO level - tier1 = status.get("tier1_hot_memory", {}) - tier3 = status.get("tier3_semantic", {}) - hot_lines = tier1.get("line_count", "?") - vectors = tier3.get("vector_count", "?") - logger.info( - "Memory audit (thought #%d): hot_memory=%s lines, semantic=%s vectors", - count, - hot_lines, - vectors, - ) - - # Write to memory_audit.log for persistent tracking - audit_path = Path("data/memory_audit.log") - audit_path.parent.mkdir(parents=True, exist_ok=True) - timestamp = datetime.now(UTC).isoformat(timespec="seconds") - with audit_path.open("a") as f: - f.write( - f"{timestamp} thought={count} " - f"hot_lines={hot_lines} " - f"vectors={vectors} " - f"vault_files={status.get('tier2_vault', {}).get('file_count', '?')}\n" - ) - except Exception as exc: - logger.warning("Memory status check failed: %s", exc) - - @staticmethod - def _references_real_files(text: str) -> bool: - """Check that all source-file paths mentioned in *text* actually exist. - - Extracts paths that look like Python/config source references - (e.g. ``src/timmy/session.py``, ``config/foo.yaml``) and verifies - each one on disk relative to the project root. Returns ``True`` - only when **every** referenced path resolves to a real file — or - when no paths are referenced at all (pure prose is fine). - """ - # Match paths like src/thing.py swarm/init.py config/x.yaml - # Requires at least one slash and a file extension. - path_pattern = re.compile( - r"(? None: - """Every N thoughts, classify recent thoughts and file Gitea issues. - - Asks the LLM to review recent thoughts for actionable items — - bugs, broken features, stale state, or improvement opportunities. - Creates Gitea issues via MCP for anything worth tracking. - - Only runs when: - - Gitea is enabled and configured - - Thought count is divisible by thinking_issue_every - - LLM extracts at least one actionable item - - Safety: every generated issue is validated to ensure referenced - file paths actually exist on disk, preventing phantom-bug reports. - """ - try: - recent = self._get_recent_thoughts_for_issues() - if recent is None: - return - - classify_prompt = self._build_issue_classify_prompt(recent) - raw = await self._call_agent(classify_prompt) - items = self._parse_issue_items(raw) - if items is None: - return - - from timmy.mcp_tools import create_gitea_issue_via_mcp - - for item in items[:2]: # Safety cap - await self._file_single_issue(item, create_gitea_issue_via_mcp) - - except Exception as exc: - logger.debug("Thought issue filing skipped: %s", exc) - - def _get_recent_thoughts_for_issues(self): - """Return recent thoughts if conditions for filing issues are met, else None.""" - interval = settings.thinking_issue_every - if interval <= 0: - return None - - count = self.count_thoughts() - if count == 0 or count % interval != 0: - return None - - if not settings.gitea_enabled or not settings.gitea_token: - return None - - recent = self.get_recent_thoughts(limit=interval) - if len(recent) < interval: - return None - - return recent - - @staticmethod - def _build_issue_classify_prompt(recent) -> str: - """Build the LLM prompt that extracts actionable issues from recent thoughts.""" - thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(recent)) - return ( - "You are reviewing your own recent thoughts for actionable items.\n" - "Extract 0-2 items that are CONCRETE bugs, broken features, stale " - "state, or clear improvement opportunities in your own codebase.\n\n" - "Rules:\n" - "- Only include things that could become a real code fix or feature\n" - "- Skip vague reflections, philosophical musings, or repeated themes\n" - "- Category must be one of: bug, feature, suggestion, maintenance\n" - "- ONLY reference files that you are CERTAIN exist in the project\n" - "- Do NOT invent or guess file paths — if unsure, describe the " - "area of concern without naming specific files\n\n" - "For each item, write an ENGINEER-QUALITY issue:\n" - '- "title": A clear, specific title (e.g. "[Memory] MEMORY.md timestamp not updating")\n' - '- "body": A detailed body with these sections:\n' - " **What's happening:** Describe the current (broken) behavior.\n" - " **Expected behavior:** What should happen instead.\n" - " **Suggested fix:** Which file(s) to change and what the fix looks like.\n" - " **Acceptance criteria:** How to verify the fix works.\n" - '- "category": One of bug, feature, suggestion, maintenance\n\n' - "Return ONLY a JSON array of objects with keys: " - '"title", "body", "category"\n' - "Return [] if nothing is actionable.\n\n" - f"Recent thoughts:\n{thought_text}\n\nJSON array:" - ) - - @staticmethod - def _parse_issue_items(raw: str): - """Strip markdown fences and parse JSON issue list; return None on failure.""" - import json - - if not raw or not raw.strip(): - return None - - cleaned = raw.strip() - if cleaned.startswith("```"): - cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip() - - items = json.loads(cleaned) - if not isinstance(items, list) or not items: - return None - - return items - - async def _file_single_issue(self, item: dict, create_fn) -> None: - """Validate one issue dict and create it via *create_fn* if it passes checks.""" - if not isinstance(item, dict): - return - title = item.get("title", "").strip() - body = item.get("body", "").strip() - category = item.get("category", "suggestion").strip() - if not title or len(title) < 10: - return - - combined = f"{title}\n{body}" - if not self._references_real_files(combined): - logger.info( - "Skipped phantom issue: %s (references non-existent files)", - title[:60], - ) - return - - label = category if category in ("bug", "feature") else "" - result = await create_fn(title=title, body=body, labels=label) - logger.info("Thought→Issue: %s → %s", title[:60], result[:80]) - - # ── System snapshot helpers ──────────────────────────────────────────── - - def _snap_thought_count(self, now: datetime) -> str | None: - """Return today's thought count, or *None* on failure.""" - try: - today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) - with _get_conn(self._db_path) as conn: - count = conn.execute( - "SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?", - (today_start.isoformat(),), - ).fetchone()["c"] - return f"Thoughts today: {count}" - except Exception as exc: - logger.debug("Thought count query failed: %s", exc) - return None - - def _snap_chat_activity(self) -> list[str]: - """Return chat-activity lines (in-memory, no I/O).""" - try: - from infrastructure.chat_store import message_log - - messages = message_log.all() - if messages: - last = messages[-1] - return [ - f"Chat messages this session: {len(messages)}", - f'Last chat ({last.role}): "{last.content[:80]}"', - ] - return ["No chat messages this session"] - except Exception as exc: - logger.debug("Chat activity query failed: %s", exc) - return [] - - def _snap_task_queue(self) -> str | None: - """Return a one-line task queue summary, or *None*.""" - try: - from swarm.task_queue.models import get_task_summary_for_briefing - - s = get_task_summary_for_briefing() - running, pending = s.get("running", 0), s.get("pending_approval", 0) - done, failed = s.get("completed", 0), s.get("failed", 0) - if running or pending or done or failed: - return ( - f"Tasks: {running} running, {pending} pending, " - f"{done} completed, {failed} failed" - ) - except Exception as exc: - logger.debug("Task queue query failed: %s", exc) - return None - - def _snap_workspace(self) -> list[str]: - """Return workspace-update lines (file-based Hermes comms).""" - try: - from timmy.workspace import workspace_monitor - - updates = workspace_monitor.get_pending_updates() - lines: list[str] = [] - new_corr = updates.get("new_correspondence") - if new_corr: - line_count = len([ln for ln in new_corr.splitlines() if ln.strip()]) - lines.append( - f"Workspace: {line_count} new correspondence entries (latest from: Hermes)" - ) - new_inbox = updates.get("new_inbox_files", []) - if new_inbox: - files_str = ", ".join(new_inbox[:5]) - if len(new_inbox) > 5: - files_str += f", ... (+{len(new_inbox) - 5} more)" - lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}") - return lines - except Exception as exc: - logger.debug("Workspace check failed: %s", exc) - return [] - - def _gather_system_snapshot(self) -> str: - """Gather lightweight real system state for grounding thoughts in reality. - - Returns a short multi-line string with current time, thought count, - recent chat activity, and task queue status. Never crashes — every - section is independently try/excepted. - """ - now = datetime.now().astimezone() - tz = now.strftime("%Z") or "UTC" - - parts: list[str] = [ - f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}" - ] - - thought_line = self._snap_thought_count(now) - if thought_line: - parts.append(thought_line) - - parts.extend(self._snap_chat_activity()) - - task_line = self._snap_task_queue() - if task_line: - parts.append(task_line) - - parts.extend(self._snap_workspace()) - - return "\n".join(parts) if parts else "" - - def _load_memory_context(self) -> str: - """Pre-hook: load MEMORY.md + soul.md for the thinking prompt. - - Hot memory first (changes each cycle), soul second (stable identity). - Returns a combined string truncated to ~1500 chars. - Graceful on any failure — returns empty string. - """ - parts: list[str] = [] - try: - if HOT_MEMORY_PATH.exists(): - hot = HOT_MEMORY_PATH.read_text().strip() - if hot: - parts.append(hot) - except Exception as exc: - logger.debug("Failed to read MEMORY.md: %s", exc) - - try: - if SOUL_PATH.exists(): - soul = SOUL_PATH.read_text().strip() - if soul: - parts.append(soul) - except Exception as exc: - logger.debug("Failed to read soul.md: %s", exc) - - if not parts: - return "" - - combined = "\n\n---\n\n".join(parts) - if len(combined) > 1500: - combined = combined[:1500] + "\n... [truncated]" - return combined - - def _update_memory(self, thought: Thought) -> None: - """Post-hook: update MEMORY.md 'Last Reflection' section with latest thought. - - Never modifies soul.md. Never crashes the heartbeat. - """ - try: - from timmy.memory_system import store_last_reflection - - ts = datetime.fromisoformat(thought.created_at) - local_ts = ts.astimezone() - tz_name = local_ts.strftime("%Z") or "UTC" - time_str = f"{local_ts.strftime('%Y-%m-%d %I:%M %p').lstrip('0')} {tz_name}" - reflection = ( - f"**Time:** {time_str}\n" - f"**Seed:** {thought.seed_type}\n" - f"**Thought:** {thought.content[:200]}" - ) - store_last_reflection(reflection) - except Exception as exc: - logger.debug("Failed to update memory after thought: %s", exc) - - def _pick_seed_type(self) -> str: - """Pick a seed type, avoiding types used in the last 3 thoughts. - - Ensures the thought stream doesn't fixate on one category. - Falls back to the full pool if all types were recently used. - """ - recent = self.get_recent_thoughts(limit=3) - recent_types = {t.seed_type for t in recent} - available = [t for t in SEED_TYPES if t not in recent_types] - if not available: - available = list(SEED_TYPES) - return random.choice(available) - - def _gather_seed(self) -> tuple[str, str]: - """Pick a seed type and gather relevant context. - - Returns (seed_type, seed_context_string). - """ - seed_type = self._pick_seed_type() - - if seed_type == "swarm": - return seed_type, self._seed_from_swarm() - if seed_type == "scripture": - return seed_type, self._seed_from_scripture() - if seed_type == "memory": - return seed_type, self._seed_from_memory() - if seed_type == "creative": - prompt = random.choice(_CREATIVE_SEEDS) - return seed_type, f"Creative prompt: {prompt}" - if seed_type == "existential": - prompt = random.choice(_EXISTENTIAL_SEEDS) - return seed_type, f"Reflection: {prompt}" - if seed_type == "sovereignty": - prompt = random.choice(_SOVEREIGNTY_SEEDS) - return seed_type, f"Sovereignty reflection: {prompt}" - if seed_type == "observation": - return seed_type, self._seed_from_observation() - if seed_type == "workspace": - return seed_type, self._seed_from_workspace() - # freeform — minimal guidance to steer away from repetition - return seed_type, "Free reflection — explore something you haven't thought about yet today." - - # Reflective prompts layered on top of swarm data - _SWARM_REFLECTIONS = [ - "What does this activity pattern tell me about the health of the system?", - "Which tasks are flowing smoothly, and where is friction building up?", - "If I were coaching these agents, what would I suggest they focus on?", - "Is the swarm balanced, or is one agent carrying too much weight?", - "What surprised me about recent task outcomes?", - ] - - def _seed_from_swarm(self) -> str: - """Gather recent swarm activity as thought seed with a reflective prompt.""" - try: - from datetime import timedelta - - from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary - - since = datetime.now(UTC) - timedelta(hours=1) - swarm = _gather_swarm_summary(since) - tasks = _gather_task_queue_summary() - reflection = random.choice(self._SWARM_REFLECTIONS) - return ( - f"Recent swarm activity: {swarm}\n" - f"Task queue: {tasks}\n\n" - f"Reflect on this: {reflection}" - ) - except Exception as exc: - logger.debug("Swarm seed unavailable: %s", exc) - return "The swarm is quiet right now. What does silence in a system mean?" - - def _seed_from_scripture(self) -> str: - """Gather current scripture meditation focus as thought seed.""" - return "Scripture is on my mind, though no specific verse is in focus." - - def _seed_from_memory(self) -> str: - """Gather memory context as thought seed.""" - try: - from timmy.memory_system import memory_system - - context = memory_system.get_system_context() - if context: - # Truncate to a reasonable size for a thought seed - return f"From my memory:\n{context[:500]}" - except Exception as exc: - logger.debug("Memory seed unavailable: %s", exc) - return "My memory vault is quiet." - - def _seed_from_observation(self) -> str: - """Ground a thought in concrete recent activity and a reflective prompt.""" - prompt = random.choice(_OBSERVATION_SEEDS) - # Pull real data to give the model something concrete to reflect on - context_parts = [f"Observation prompt: {prompt}"] - try: - from datetime import timedelta - - from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary - - since = datetime.now(UTC) - timedelta(hours=2) - swarm = _gather_swarm_summary(since) - tasks = _gather_task_queue_summary() - if swarm: - context_parts.append(f"Recent activity: {swarm}") - if tasks: - context_parts.append(f"Queue: {tasks}") - except Exception as exc: - logger.debug("Observation seed data unavailable: %s", exc) - return "\n".join(context_parts) - - def _seed_from_workspace(self) -> str: - """Gather workspace updates as thought seed. - - When there are pending workspace updates, include them as context - for Timmy to reflect on. Falls back to random seed type if none. - """ - try: - from timmy.workspace import workspace_monitor - - updates = workspace_monitor.get_pending_updates() - new_corr = updates.get("new_correspondence") - new_inbox = updates.get("new_inbox_files", []) - - if new_corr: - # Take first 200 chars of the new entry - snippet = new_corr[:200].replace("\n", " ") - if len(new_corr) > 200: - snippet += "..." - return f"New workspace message from Hermes: {snippet}" - - if new_inbox: - files_str = ", ".join(new_inbox[:3]) - if len(new_inbox) > 3: - files_str += f", ... (+{len(new_inbox) - 3} more)" - return f"New inbox files from Hermes: {files_str}" - - except Exception as exc: - logger.debug("Workspace seed unavailable: %s", exc) - - # Fall back to a random seed type if no workspace updates - return "The workspace is quiet. What should I be watching for?" - - async def _check_workspace(self) -> None: - """Post-hook: check workspace for updates and mark them as seen. - - This ensures Timmy 'processes' workspace updates even if the seed - was different, keeping the state file in sync. - """ - try: - from timmy.workspace import workspace_monitor - - updates = workspace_monitor.get_pending_updates() - new_corr = updates.get("new_correspondence") - new_inbox = updates.get("new_inbox_files", []) - - if new_corr or new_inbox: - if new_corr: - line_count = len([line for line in new_corr.splitlines() if line.strip()]) - logger.info("Workspace: processed %d new correspondence entries", line_count) - if new_inbox: - logger.info( - "Workspace: processed %d new inbox files: %s", len(new_inbox), new_inbox - ) - - # Mark as seen to update the state file - workspace_monitor.mark_seen() - except Exception as exc: - logger.debug("Workspace check failed: %s", exc) - - # Maximum retries when a generated thought is too similar to recent ones - _MAX_DEDUP_RETRIES = 2 - # Similarity threshold (0.0 = completely different, 1.0 = identical) - _SIMILARITY_THRESHOLD = 0.6 - - def _is_too_similar(self, candidate: str, recent: list["Thought"]) -> bool: - """Check if *candidate* is semantically too close to any recent thought. - - Uses SequenceMatcher on normalised text (lowered, stripped) for a fast - approximation of semantic similarity that works without external deps. - """ - norm_candidate = candidate.lower().strip() - for thought in recent: - norm_existing = thought.content.lower().strip() - ratio = SequenceMatcher(None, norm_candidate, norm_existing).ratio() - if ratio >= self._SIMILARITY_THRESHOLD: - logger.debug( - "Thought rejected (%.0f%% similar to %s): %.60s", - ratio * 100, - thought.id[:8], - candidate, - ) - return True - return False - - def _build_continuity_context(self) -> str: - """Build context from recent thoughts with anti-repetition guidance. - - Shows the last 5 thoughts (truncated) so the model knows what themes - to avoid. The header explicitly instructs against repeating. - """ - recent = self.get_recent_thoughts(limit=5) - if not recent: - return "This is your first thought since waking up. Begin fresh." - - lines = ["Your recent thoughts — do NOT repeat these themes. Find a new angle:"] - # recent is newest-first, reverse for chronological order - for thought in reversed(recent): - snippet = thought.content[:100] - if len(thought.content) > 100: - snippet = snippet.rstrip() + "..." - lines.append(f"- [{thought.seed_type}] {snippet}") - return "\n".join(lines) - - _thinking_agent = None # cached agent — avoids per-call resource leaks (#525) - - async def _call_agent(self, prompt: str) -> str: - """Call Timmy's agent to generate a thought. - - Reuses a cached agent with skip_mcp=True to avoid the cancel-scope - errors that occur when MCP stdio transports are spawned inside asyncio - background tasks (#72) and to prevent per-call resource leaks (httpx - clients, SQLite connections, model warmups) that caused the thinking - loop to die every ~10 min (#525). - - Individual calls are capped at 120 s so a hung Ollama never blocks - the scheduler indefinitely. - - Strips ```` tags from reasoning models (qwen3, etc.) so that - downstream parsers (fact distillation, issue filing) receive clean text. - """ - import asyncio - - if self._thinking_agent is None: - from timmy.agent import create_timmy - - self._thinking_agent = create_timmy(skip_mcp=True) - - try: - async with asyncio.timeout(120): - run = await self._thinking_agent.arun(prompt, stream=False) - except TimeoutError: - logger.warning("Thinking LLM call timed out after 120 s") - return "" - - raw = run.content if hasattr(run, "content") else str(run) - return _THINK_TAG_RE.sub("", raw) if raw else raw - - def _store_thought( - self, - content: str, - seed_type: str, - *, - arrived_at: str | None = None, - ) -> Thought: - """Persist a thought to SQLite. - - Args: - arrived_at: ISO-8601 timestamp captured when the thinking cycle - started. Falls back to now() for callers that don't supply it. - """ - thought = Thought( - id=str(uuid.uuid4()), - content=content, - seed_type=seed_type, - parent_id=self._last_thought_id, - created_at=arrived_at or datetime.now(UTC).isoformat(), - ) - - with _get_conn(self._db_path) as conn: - conn.execute( - """ - INSERT INTO thoughts (id, content, seed_type, parent_id, created_at) - VALUES (?, ?, ?, ?, ?) - """, - ( - thought.id, - thought.content, - thought.seed_type, - thought.parent_id, - thought.created_at, - ), - ) - conn.commit() - return thought - - def _log_event(self, thought: Thought) -> None: - """Log the thought as a swarm event.""" - try: - from swarm.event_log import EventType, log_event - - log_event( - EventType.TIMMY_THOUGHT, - source="thinking-engine", - agent_id="default", - data={ - "thought_id": thought.id, - "seed_type": thought.seed_type, - "content": thought.content[:200], - }, - ) - except Exception as exc: - logger.debug("Failed to log thought event: %s", exc) - - def _write_journal(self, thought: Thought) -> None: - """Append the thought to a daily markdown journal file. - - Writes to data/journal/YYYY-MM-DD.md — one file per day, append-only. - Timestamps are converted to local time with timezone indicator. - """ - try: - ts = datetime.fromisoformat(thought.created_at) - # Convert UTC to local for a human-readable journal - local_ts = ts.astimezone() - tz_name = local_ts.strftime("%Z") or "UTC" - - journal_dir = self._db_path.parent / "journal" - journal_dir.mkdir(parents=True, exist_ok=True) - - journal_file = journal_dir / f"{local_ts.strftime('%Y-%m-%d')}.md" - time_str = f"{local_ts.strftime('%I:%M %p').lstrip('0')} {tz_name}" - - entry = f"## {time_str} — {thought.seed_type}\n\n{thought.content}\n\n---\n\n" - - with open(journal_file, "a", encoding="utf-8") as f: - f.write(entry) - except Exception as exc: - logger.debug("Failed to write journal entry: %s", exc) - - async def _broadcast(self, thought: Thought) -> None: - """Broadcast the thought to WebSocket clients.""" - try: - from infrastructure.ws_manager.handler import ws_manager - - await ws_manager.broadcast( - "timmy_thought", - { - "thought_id": thought.id, - "content": thought.content, - "seed_type": thought.seed_type, - "created_at": thought.created_at, - }, - ) - except Exception as exc: - logger.debug("Failed to broadcast thought: %s", exc) - - -def _query_thoughts( - db_path: Path, query: str, seed_type: str | None, limit: int -) -> list[sqlite3.Row]: - """Run the thought-search SQL and return matching rows.""" - pattern = f"%{query}%" - with _get_conn(db_path) as conn: - if seed_type: - return conn.execute( - """ - SELECT id, content, seed_type, created_at - FROM thoughts - WHERE content LIKE ? AND seed_type = ? - ORDER BY created_at DESC - LIMIT ? - """, - (pattern, seed_type, limit), - ).fetchall() - return conn.execute( - """ - SELECT id, content, seed_type, created_at - FROM thoughts - WHERE content LIKE ? - ORDER BY created_at DESC - LIMIT ? - """, - (pattern, limit), - ).fetchall() - - -def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str: - """Format thought rows into a human-readable string.""" - lines = [f'Found {len(rows)} thought(s) matching "{query}":'] - if seed_type: - lines[0] += f' [seed_type="{seed_type}"]' - lines.append("") - - for row in rows: - ts = datetime.fromisoformat(row["created_at"]) - local_ts = ts.astimezone() - time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0") - seed = row["seed_type"] - content = row["content"].replace("\n", " ") # Flatten newlines for display - lines.append(f"[{time_str}] ({seed}) {content[:150]}") - - return "\n".join(lines) - - -def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str: - """Search Timmy's thought history for reflections matching a query. - - Use this tool when Timmy needs to recall his previous thoughts on a topic, - reflect on past insights, or build upon earlier reflections. This enables - self-awareness and continuity of thinking across time. - - Args: - query: Search term to match against thought content (case-insensitive). - seed_type: Optional filter by thought category (e.g., 'existential', - 'swarm', 'sovereignty', 'creative', 'memory', 'observation'). - limit: Maximum number of thoughts to return (default 10, max 50). - - Returns: - Formatted string with matching thoughts, newest first, including - timestamps and seed types. Returns a helpful message if no matches found. - """ - limit = max(1, min(limit, 50)) - - try: - rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit) - - if not rows: - if seed_type: - return f'No thoughts found matching "{query}" with seed_type="{seed_type}".' - return f'No thoughts found matching "{query}".' - - return _format_thought_rows(rows, query, seed_type) - - except Exception as exc: - logger.warning("Thought search failed: %s", exc) - return f"Error searching thoughts: {exc}" - - -# Module-level singleton -thinking_engine = ThinkingEngine() diff --git a/src/timmy/thinking/__init__.py b/src/timmy/thinking/__init__.py new file mode 100644 index 00000000..1f2ad804 --- /dev/null +++ b/src/timmy/thinking/__init__.py @@ -0,0 +1,142 @@ +"""Timmy's thinking engine — public façade. + +When the server starts, Timmy begins pondering: reflecting on his existence, +recent swarm activity, scripture, creative ideas, or pure stream of +consciousness. Each thought builds on the previous one, maintaining a +continuous chain of introspection. + +Usage:: + + from timmy.thinking import thinking_engine + + # Run one thinking cycle (called by the background loop) + await thinking_engine.think_once() + + # Query the thought stream + thoughts = thinking_engine.get_recent_thoughts(limit=10) + chain = thinking_engine.get_thought_chain(thought_id) +""" + +import logging +import sqlite3 +from datetime import datetime +from pathlib import Path + +from timmy.thinking._db import Thought, _get_conn +from timmy.thinking.engine import ThinkingEngine +from timmy.thinking.seeds import ( + SEED_TYPES, + _SENSITIVE_PATTERNS, + _META_OBSERVATION_PHRASES, + _THINK_TAG_RE, + _THINKING_PROMPT, +) + +# Re-export HOT_MEMORY_PATH and SOUL_PATH so existing patch targets continue to work. +# Tests that patch "timmy.thinking.HOT_MEMORY_PATH" or "timmy.thinking.SOUL_PATH" +# should instead patch "timmy.thinking._snapshot.HOT_MEMORY_PATH" etc., but these +# re-exports are kept for any code that reads them from the top-level namespace. +from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH # noqa: F401 + +logger = logging.getLogger(__name__) + +# Module-level singleton +thinking_engine = ThinkingEngine() + +__all__ = [ + "ThinkingEngine", + "Thought", + "SEED_TYPES", + "thinking_engine", + "search_thoughts", + "_THINKING_PROMPT", + "_SENSITIVE_PATTERNS", + "_META_OBSERVATION_PHRASES", + "_THINK_TAG_RE", + "HOT_MEMORY_PATH", + "SOUL_PATH", +] + + +# ── Search helpers ───────────────────────────────────────────────────────── + + +def _query_thoughts( + db_path: Path, query: str, seed_type: str | None, limit: int +) -> list[sqlite3.Row]: + """Run the thought-search SQL and return matching rows.""" + pattern = f"%{query}%" + with _get_conn(db_path) as conn: + if seed_type: + return conn.execute( + """ + SELECT id, content, seed_type, created_at + FROM thoughts + WHERE content LIKE ? AND seed_type = ? + ORDER BY created_at DESC + LIMIT ? + """, + (pattern, seed_type, limit), + ).fetchall() + return conn.execute( + """ + SELECT id, content, seed_type, created_at + FROM thoughts + WHERE content LIKE ? + ORDER BY created_at DESC + LIMIT ? + """, + (pattern, limit), + ).fetchall() + + +def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str: + """Format thought rows into a human-readable string.""" + lines = [f'Found {len(rows)} thought(s) matching "{query}":'] + if seed_type: + lines[0] += f' [seed_type="{seed_type}"]' + lines.append("") + + for row in rows: + ts = datetime.fromisoformat(row["created_at"]) + local_ts = ts.astimezone() + time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0") + seed = row["seed_type"] + content = row["content"].replace("\n", " ") # Flatten newlines for display + lines.append(f"[{time_str}] ({seed}) {content[:150]}") + + return "\n".join(lines) + + +def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str: + """Search Timmy's thought history for reflections matching a query. + + Use this tool when Timmy needs to recall his previous thoughts on a topic, + reflect on past insights, or build upon earlier reflections. This enables + self-awareness and continuity of thinking across time. + + Args: + query: Search term to match against thought content (case-insensitive). + seed_type: Optional filter by thought category (e.g., 'existential', + 'swarm', 'sovereignty', 'creative', 'memory', 'observation'). + limit: Maximum number of thoughts to return (default 10, max 50). + + Returns: + Formatted string with matching thoughts, newest first, including + timestamps and seed types. Returns a helpful message if no matches found. + """ + limit = max(1, min(limit, 50)) + + try: + rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit) + + if not rows: + if seed_type: + return f'No thoughts found matching "{query}" with seed_type="{seed_type}".' + return f'No thoughts found matching "{query}".' + + return _format_thought_rows(rows, query, seed_type) + + except Exception as exc: + logger.warning("Thought search failed: %s", exc) + return f"Error searching thoughts: {exc}" diff --git a/src/timmy/thinking/_db.py b/src/timmy/thinking/_db.py new file mode 100644 index 00000000..1ef2a667 --- /dev/null +++ b/src/timmy/thinking/_db.py @@ -0,0 +1,50 @@ +"""Database models and access layer for the thinking engine.""" + +import sqlite3 +from collections.abc import Generator +from contextlib import closing, contextmanager +from dataclasses import dataclass +from pathlib import Path + +_DEFAULT_DB = Path("data/thoughts.db") + + +@dataclass +class Thought: + """A single thought in Timmy's inner stream.""" + + id: str + content: str + seed_type: str + parent_id: str | None + created_at: str + + +@contextmanager +def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]: + """Get a SQLite connection with the thoughts table created.""" + db_path.parent.mkdir(parents=True, exist_ok=True) + with closing(sqlite3.connect(str(db_path))) as conn: + conn.row_factory = sqlite3.Row + conn.execute(""" + CREATE TABLE IF NOT EXISTS thoughts ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + seed_type TEXT NOT NULL, + parent_id TEXT, + created_at TEXT NOT NULL + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_thoughts_time ON thoughts(created_at)") + conn.commit() + yield conn + + +def _row_to_thought(row: sqlite3.Row) -> Thought: + return Thought( + id=row["id"], + content=row["content"], + seed_type=row["seed_type"], + parent_id=row["parent_id"], + created_at=row["created_at"], + ) diff --git a/src/timmy/thinking/_distillation.py b/src/timmy/thinking/_distillation.py new file mode 100644 index 00000000..4adbf07f --- /dev/null +++ b/src/timmy/thinking/_distillation.py @@ -0,0 +1,215 @@ +"""Distillation mixin — extracts lasting facts from recent thoughts and monitors memory.""" + +import logging +from pathlib import Path + +from config import settings + +from timmy.thinking.seeds import _META_OBSERVATION_PHRASES, _SENSITIVE_PATTERNS + +logger = logging.getLogger(__name__) + + +class _DistillationMixin: + """Mixin providing fact-distillation and memory-monitoring behaviour. + + Expects the host class to provide: + - self.count_thoughts() -> int + - self.get_recent_thoughts(limit) -> list[Thought] + - self._call_agent(prompt) -> str (async) + """ + + def _should_distill(self) -> bool: + """Check if distillation should run based on interval and thought count.""" + interval = settings.thinking_distill_every + if interval <= 0: + return False + + count = self.count_thoughts() + if count == 0 or count % interval != 0: + return False + + return True + + def _build_distill_prompt(self, thoughts) -> str: + """Build the prompt for extracting facts from recent thoughts.""" + thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(thoughts)) + + return ( + "You are reviewing your own recent thoughts. Extract 0-3 facts " + "worth remembering long-term.\n\n" + "GOOD facts (store these):\n" + "- User preferences: 'Alexander prefers YAML config over code changes'\n" + "- Project decisions: 'Switched from hardcoded personas to agents.yaml'\n" + "- Learned knowledge: 'Ollama supports concurrent model loading'\n" + "- User information: 'Alexander is interested in Bitcoin and sovereignty'\n\n" + "BAD facts (never store these):\n" + "- Self-referential observations about your own thinking process\n" + "- Meta-commentary about your memory, timestamps, or internal state\n" + "- Observations about being idle or having no chat messages\n" + "- File paths, tokens, API keys, or any credentials\n" + "- Restatements of your standing rules or system prompt\n\n" + "Return ONLY a JSON array of strings. If nothing is worth saving, " + "return []. Be selective — only store facts about the EXTERNAL WORLD " + "(the user, the project, technical knowledge), never about your own " + "internal process.\n\n" + f"Recent thoughts:\n{thought_text}\n\nJSON array:" + ) + + def _parse_facts_response(self, raw: str) -> list[str]: + """Parse JSON array from LLM response, stripping markdown fences. + + Resilient to models that prepend reasoning text or wrap the array in + prose. Finds the first ``[...]`` block and parses that. + """ + if not raw or not raw.strip(): + return [] + + import json + + cleaned = raw.strip() + + # Strip markdown code fences + if cleaned.startswith("```"): + cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip() + + # Try direct parse first (fast path) + try: + facts = json.loads(cleaned) + if isinstance(facts, list): + return [f for f in facts if isinstance(f, str)] + except (json.JSONDecodeError, ValueError): + pass + + # Fallback: extract first JSON array from the text + start = cleaned.find("[") + if start == -1: + return [] + # Walk to find the matching close bracket + depth = 0 + for i, ch in enumerate(cleaned[start:], start): + if ch == "[": + depth += 1 + elif ch == "]": + depth -= 1 + if depth == 0: + try: + facts = json.loads(cleaned[start : i + 1]) + if isinstance(facts, list): + return [f for f in facts if isinstance(f, str)] + except (json.JSONDecodeError, ValueError): + pass + break + return [] + + def _filter_and_store_facts(self, facts: list[str]) -> None: + """Filter and store valid facts, blocking sensitive and meta content.""" + from timmy.memory_system import memory_write + + for fact in facts[:3]: # Safety cap + if not isinstance(fact, str) or len(fact.strip()) <= 10: + continue + + fact_lower = fact.lower() + + # Block sensitive information + if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS): + logger.warning("Distill: blocked sensitive fact: %s", fact[:60]) + continue + + # Block self-referential meta-observations + if any(phrase in fact_lower for phrase in _META_OBSERVATION_PHRASES): + logger.debug("Distill: skipped meta-observation: %s", fact[:60]) + continue + + result = memory_write(fact.strip(), context_type="fact") + logger.info("Distilled fact: %s → %s", fact[:60], result[:40]) + + def _maybe_check_memory(self) -> None: + """Every N thoughts, check memory status and log it. + + Prevents unmonitored memory bloat during long thinking sessions + by periodically calling get_memory_status and logging the results. + """ + try: + interval = settings.thinking_memory_check_every + if interval <= 0: + return + + count = self.count_thoughts() + if count == 0 or count % interval != 0: + return + + from timmy.tools_intro import get_memory_status + + status = get_memory_status() + hot = status.get("tier1_hot_memory", {}) + vault = status.get("tier2_vault", {}) + logger.info( + "Memory status check (thought #%d): hot_memory=%d lines, vault=%d files", + count, + hot.get("line_count", 0), + vault.get("file_count", 0), + ) + except Exception as exc: + logger.warning("Memory status check failed: %s", exc) + + async def _maybe_distill(self) -> None: + """Every N thoughts, extract lasting insights and store as facts.""" + try: + if not self._should_distill(): + return + + interval = settings.thinking_distill_every + recent = self.get_recent_thoughts(limit=interval) + if len(recent) < interval: + return + + raw = await self._call_agent(self._build_distill_prompt(recent)) + if facts := self._parse_facts_response(raw): + self._filter_and_store_facts(facts) + except Exception as exc: + logger.warning("Thought distillation failed: %s", exc) + + def _maybe_check_memory_status(self) -> None: + """Every N thoughts, run a proactive memory status audit and log results.""" + try: + interval = settings.thinking_memory_check_every + if interval <= 0: + return + + count = self.count_thoughts() + if count == 0 or count % interval != 0: + return + + from timmy.tools_intro import get_memory_status + + status = get_memory_status() + + # Log summary at INFO level + tier1 = status.get("tier1_hot_memory", {}) + tier3 = status.get("tier3_semantic", {}) + hot_lines = tier1.get("line_count", "?") + vectors = tier3.get("vector_count", "?") + logger.info( + "Memory audit (thought #%d): hot_memory=%s lines, semantic=%s vectors", + count, + hot_lines, + vectors, + ) + + # Write to memory_audit.log for persistent tracking + from datetime import UTC, datetime + + audit_path = Path("data/memory_audit.log") + audit_path.parent.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now(UTC).isoformat(timespec="seconds") + with audit_path.open("a") as f: + f.write( + f"{timestamp} thought={count} " + f"hot_lines={hot_lines} " + f"vectors={vectors} " + f"vault_files={status.get('tier2_vault', {}).get('file_count', '?')}\n" + ) + except Exception as exc: + logger.warning("Memory status check failed: %s", exc) diff --git a/src/timmy/thinking/_issue_filing.py b/src/timmy/thinking/_issue_filing.py new file mode 100644 index 00000000..d926b8d1 --- /dev/null +++ b/src/timmy/thinking/_issue_filing.py @@ -0,0 +1,170 @@ +"""Issue-filing mixin — classifies recent thoughts and creates Gitea issues.""" + +import logging +import re +from pathlib import Path + +from config import settings + +logger = logging.getLogger(__name__) + + +class _IssueFilingMixin: + """Mixin providing automatic issue-filing from thought analysis. + + Expects the host class to provide: + - self.count_thoughts() -> int + - self.get_recent_thoughts(limit) -> list[Thought] + - self._call_agent(prompt) -> str (async) + """ + + @staticmethod + def _references_real_files(text: str) -> bool: + """Check that all source-file paths mentioned in *text* actually exist. + + Extracts paths that look like Python/config source references + (e.g. ``src/timmy/session.py``, ``config/foo.yaml``) and verifies + each one on disk relative to the project root. Returns ``True`` + only when **every** referenced path resolves to a real file — or + when no paths are referenced at all (pure prose is fine). + """ + # Match paths like src/thing.py swarm/init.py config/x.yaml + # Requires at least one slash and a file extension. + path_pattern = re.compile( + r"(? None: + """Every N thoughts, classify recent thoughts and file Gitea issues. + + Asks the LLM to review recent thoughts for actionable items — + bugs, broken features, stale state, or improvement opportunities. + Creates Gitea issues via MCP for anything worth tracking. + + Only runs when: + - Gitea is enabled and configured + - Thought count is divisible by thinking_issue_every + - LLM extracts at least one actionable item + + Safety: every generated issue is validated to ensure referenced + file paths actually exist on disk, preventing phantom-bug reports. + """ + try: + recent = self._get_recent_thoughts_for_issues() + if recent is None: + return + + classify_prompt = self._build_issue_classify_prompt(recent) + raw = await self._call_agent(classify_prompt) + items = self._parse_issue_items(raw) + if items is None: + return + + from timmy.mcp_tools import create_gitea_issue_via_mcp + + for item in items[:2]: # Safety cap + await self._file_single_issue(item, create_gitea_issue_via_mcp) + + except Exception as exc: + logger.debug("Thought issue filing skipped: %s", exc) + + def _get_recent_thoughts_for_issues(self): + """Return recent thoughts if conditions for filing issues are met, else None.""" + interval = settings.thinking_issue_every + if interval <= 0: + return None + + count = self.count_thoughts() + if count == 0 or count % interval != 0: + return None + + if not settings.gitea_enabled or not settings.gitea_token: + return None + + recent = self.get_recent_thoughts(limit=interval) + if len(recent) < interval: + return None + + return recent + + @staticmethod + def _build_issue_classify_prompt(recent) -> str: + """Build the LLM prompt that extracts actionable issues from recent thoughts.""" + thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(recent)) + return ( + "You are reviewing your own recent thoughts for actionable items.\n" + "Extract 0-2 items that are CONCRETE bugs, broken features, stale " + "state, or clear improvement opportunities in your own codebase.\n\n" + "Rules:\n" + "- Only include things that could become a real code fix or feature\n" + "- Skip vague reflections, philosophical musings, or repeated themes\n" + "- Category must be one of: bug, feature, suggestion, maintenance\n" + "- ONLY reference files that you are CERTAIN exist in the project\n" + "- Do NOT invent or guess file paths — if unsure, describe the " + "area of concern without naming specific files\n\n" + "For each item, write an ENGINEER-QUALITY issue:\n" + '- "title": A clear, specific title (e.g. "[Memory] MEMORY.md timestamp not updating")\n' + '- "body": A detailed body with these sections:\n' + " **What's happening:** Describe the current (broken) behavior.\n" + " **Expected behavior:** What should happen instead.\n" + " **Suggested fix:** Which file(s) to change and what the fix looks like.\n" + " **Acceptance criteria:** How to verify the fix works.\n" + '- "category": One of bug, feature, suggestion, maintenance\n\n' + "Return ONLY a JSON array of objects with keys: " + '"title", "body", "category"\n' + "Return [] if nothing is actionable.\n\n" + f"Recent thoughts:\n{thought_text}\n\nJSON array:" + ) + + @staticmethod + def _parse_issue_items(raw: str): + """Strip markdown fences and parse JSON issue list; return None on failure.""" + import json + + if not raw or not raw.strip(): + return None + + cleaned = raw.strip() + if cleaned.startswith("```"): + cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip() + + items = json.loads(cleaned) + if not isinstance(items, list) or not items: + return None + + return items + + async def _file_single_issue(self, item: dict, create_fn) -> None: + """Validate one issue dict and create it via *create_fn* if it passes checks.""" + if not isinstance(item, dict): + return + title = item.get("title", "").strip() + body = item.get("body", "").strip() + category = item.get("category", "suggestion").strip() + if not title or len(title) < 10: + return + + combined = f"{title}\n{body}" + if not self._references_real_files(combined): + logger.info( + "Skipped phantom issue: %s (references non-existent files)", + title[:60], + ) + return + + label = category if category in ("bug", "feature") else "" + result = await create_fn(title=title, body=body, labels=label) + logger.info("Thought→Issue: %s → %s", title[:60], result[:80]) diff --git a/src/timmy/thinking/_seeds_mixin.py b/src/timmy/thinking/_seeds_mixin.py new file mode 100644 index 00000000..14aa6c55 --- /dev/null +++ b/src/timmy/thinking/_seeds_mixin.py @@ -0,0 +1,191 @@ +"""Seeds mixin — seed type selection and context gathering for thinking cycles.""" + +import logging +import random +from datetime import UTC, datetime + +from timmy.thinking.seeds import ( + SEED_TYPES, + _CREATIVE_SEEDS, + _EXISTENTIAL_SEEDS, + _OBSERVATION_SEEDS, + _SOVEREIGNTY_SEEDS, +) + +logger = logging.getLogger(__name__) + + +class _SeedsMixin: + """Mixin providing seed-type selection and context-gathering for each thinking cycle. + + Expects the host class to provide: + - self.get_recent_thoughts(limit) -> list[Thought] + """ + + # Reflective prompts layered on top of swarm data + _SWARM_REFLECTIONS = [ + "What does this activity pattern tell me about the health of the system?", + "Which tasks are flowing smoothly, and where is friction building up?", + "If I were coaching these agents, what would I suggest they focus on?", + "Is the swarm balanced, or is one agent carrying too much weight?", + "What surprised me about recent task outcomes?", + ] + + def _pick_seed_type(self) -> str: + """Pick a seed type, avoiding types used in the last 3 thoughts. + + Ensures the thought stream doesn't fixate on one category. + Falls back to the full pool if all types were recently used. + """ + recent = self.get_recent_thoughts(limit=3) + recent_types = {t.seed_type for t in recent} + available = [t for t in SEED_TYPES if t not in recent_types] + if not available: + available = list(SEED_TYPES) + return random.choice(available) + + def _gather_seed(self) -> tuple[str, str]: + """Pick a seed type and gather relevant context. + + Returns (seed_type, seed_context_string). + """ + seed_type = self._pick_seed_type() + + if seed_type == "swarm": + return seed_type, self._seed_from_swarm() + if seed_type == "scripture": + return seed_type, self._seed_from_scripture() + if seed_type == "memory": + return seed_type, self._seed_from_memory() + if seed_type == "creative": + prompt = random.choice(_CREATIVE_SEEDS) + return seed_type, f"Creative prompt: {prompt}" + if seed_type == "existential": + prompt = random.choice(_EXISTENTIAL_SEEDS) + return seed_type, f"Reflection: {prompt}" + if seed_type == "sovereignty": + prompt = random.choice(_SOVEREIGNTY_SEEDS) + return seed_type, f"Sovereignty reflection: {prompt}" + if seed_type == "observation": + return seed_type, self._seed_from_observation() + if seed_type == "workspace": + return seed_type, self._seed_from_workspace() + # freeform — minimal guidance to steer away from repetition + return seed_type, "Free reflection — explore something you haven't thought about yet today." + + def _seed_from_swarm(self) -> str: + """Gather recent swarm activity as thought seed with a reflective prompt.""" + try: + from datetime import timedelta + + from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary + + since = datetime.now(UTC) - timedelta(hours=1) + swarm = _gather_swarm_summary(since) + tasks = _gather_task_queue_summary() + reflection = random.choice(self._SWARM_REFLECTIONS) + return ( + f"Recent swarm activity: {swarm}\n" + f"Task queue: {tasks}\n\n" + f"Reflect on this: {reflection}" + ) + except Exception as exc: + logger.debug("Swarm seed unavailable: %s", exc) + return "The swarm is quiet right now. What does silence in a system mean?" + + def _seed_from_scripture(self) -> str: + """Gather current scripture meditation focus as thought seed.""" + return "Scripture is on my mind, though no specific verse is in focus." + + def _seed_from_memory(self) -> str: + """Gather memory context as thought seed.""" + try: + from timmy.memory_system import memory_system + + context = memory_system.get_system_context() + if context: + # Truncate to a reasonable size for a thought seed + return f"From my memory:\n{context[:500]}" + except Exception as exc: + logger.debug("Memory seed unavailable: %s", exc) + return "My memory vault is quiet." + + def _seed_from_observation(self) -> str: + """Ground a thought in concrete recent activity and a reflective prompt.""" + prompt = random.choice(_OBSERVATION_SEEDS) + # Pull real data to give the model something concrete to reflect on + context_parts = [f"Observation prompt: {prompt}"] + try: + from datetime import timedelta + + from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary + + since = datetime.now(UTC) - timedelta(hours=2) + swarm = _gather_swarm_summary(since) + tasks = _gather_task_queue_summary() + if swarm: + context_parts.append(f"Recent activity: {swarm}") + if tasks: + context_parts.append(f"Queue: {tasks}") + except Exception as exc: + logger.debug("Observation seed data unavailable: %s", exc) + return "\n".join(context_parts) + + def _seed_from_workspace(self) -> str: + """Gather workspace updates as thought seed. + + When there are pending workspace updates, include them as context + for Timmy to reflect on. Falls back to random seed type if none. + """ + try: + from timmy.workspace import workspace_monitor + + updates = workspace_monitor.get_pending_updates() + new_corr = updates.get("new_correspondence") + new_inbox = updates.get("new_inbox_files", []) + + if new_corr: + # Take first 200 chars of the new entry + snippet = new_corr[:200].replace("\n", " ") + if len(new_corr) > 200: + snippet += "..." + return f"New workspace message from Hermes: {snippet}" + + if new_inbox: + files_str = ", ".join(new_inbox[:3]) + if len(new_inbox) > 3: + files_str += f", ... (+{len(new_inbox) - 3} more)" + return f"New inbox files from Hermes: {files_str}" + + except Exception as exc: + logger.debug("Workspace seed unavailable: %s", exc) + + # Fall back to a random seed type if no workspace updates + return "The workspace is quiet. What should I be watching for?" + + async def _check_workspace(self) -> None: + """Post-hook: check workspace for updates and mark them as seen. + + This ensures Timmy 'processes' workspace updates even if the seed + was different, keeping the state file in sync. + """ + try: + from timmy.workspace import workspace_monitor + + updates = workspace_monitor.get_pending_updates() + new_corr = updates.get("new_correspondence") + new_inbox = updates.get("new_inbox_files", []) + + if new_corr or new_inbox: + if new_corr: + line_count = len([line for line in new_corr.splitlines() if line.strip()]) + logger.info("Workspace: processed %d new correspondence entries", line_count) + if new_inbox: + logger.info( + "Workspace: processed %d new inbox files: %s", len(new_inbox), new_inbox + ) + + # Mark as seen to update the state file + workspace_monitor.mark_seen() + except Exception as exc: + logger.debug("Workspace check failed: %s", exc) diff --git a/src/timmy/thinking/_snapshot.py b/src/timmy/thinking/_snapshot.py new file mode 100644 index 00000000..77d32c43 --- /dev/null +++ b/src/timmy/thinking/_snapshot.py @@ -0,0 +1,173 @@ +"""System snapshot and memory context mixin for the thinking engine.""" + +import logging +from datetime import UTC, datetime + +from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH + +logger = logging.getLogger(__name__) + + +class _SnapshotMixin: + """Mixin providing system-snapshot and memory-context helpers. + + Expects the host class to provide: + - self._db_path: Path + """ + + # ── System snapshot helpers ──────────────────────────────────────────── + + def _snap_thought_count(self, now: datetime) -> str | None: + """Return today's thought count, or *None* on failure.""" + from timmy.thinking._db import _get_conn + + try: + today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) + with _get_conn(self._db_path) as conn: + count = conn.execute( + "SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?", + (today_start.isoformat(),), + ).fetchone()["c"] + return f"Thoughts today: {count}" + except Exception as exc: + logger.debug("Thought count query failed: %s", exc) + return None + + def _snap_chat_activity(self) -> list[str]: + """Return chat-activity lines (in-memory, no I/O).""" + try: + from infrastructure.chat_store import message_log + + messages = message_log.all() + if messages: + last = messages[-1] + return [ + f"Chat messages this session: {len(messages)}", + f'Last chat ({last.role}): "{last.content[:80]}"', + ] + return ["No chat messages this session"] + except Exception as exc: + logger.debug("Chat activity query failed: %s", exc) + return [] + + def _snap_task_queue(self) -> str | None: + """Return a one-line task queue summary, or *None*.""" + try: + from swarm.task_queue.models import get_task_summary_for_briefing + + s = get_task_summary_for_briefing() + running, pending = s.get("running", 0), s.get("pending_approval", 0) + done, failed = s.get("completed", 0), s.get("failed", 0) + if running or pending or done or failed: + return ( + f"Tasks: {running} running, {pending} pending, " + f"{done} completed, {failed} failed" + ) + except Exception as exc: + logger.debug("Task queue query failed: %s", exc) + return None + + def _snap_workspace(self) -> list[str]: + """Return workspace-update lines (file-based Hermes comms).""" + try: + from timmy.workspace import workspace_monitor + + updates = workspace_monitor.get_pending_updates() + lines: list[str] = [] + new_corr = updates.get("new_correspondence") + if new_corr: + line_count = len([ln for ln in new_corr.splitlines() if ln.strip()]) + lines.append( + f"Workspace: {line_count} new correspondence entries (latest from: Hermes)" + ) + new_inbox = updates.get("new_inbox_files", []) + if new_inbox: + files_str = ", ".join(new_inbox[:5]) + if len(new_inbox) > 5: + files_str += f", ... (+{len(new_inbox) - 5} more)" + lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}") + return lines + except Exception as exc: + logger.debug("Workspace check failed: %s", exc) + return [] + + def _gather_system_snapshot(self) -> str: + """Gather lightweight real system state for grounding thoughts in reality. + + Returns a short multi-line string with current time, thought count, + recent chat activity, and task queue status. Never crashes — every + section is independently try/excepted. + """ + now = datetime.now().astimezone() + tz = now.strftime("%Z") or "UTC" + + parts: list[str] = [ + f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}" + ] + + thought_line = self._snap_thought_count(now) + if thought_line: + parts.append(thought_line) + + parts.extend(self._snap_chat_activity()) + + task_line = self._snap_task_queue() + if task_line: + parts.append(task_line) + + parts.extend(self._snap_workspace()) + + return "\n".join(parts) if parts else "" + + def _load_memory_context(self) -> str: + """Pre-hook: load MEMORY.md + soul.md for the thinking prompt. + + Hot memory first (changes each cycle), soul second (stable identity). + Returns a combined string truncated to ~1500 chars. + Graceful on any failure — returns empty string. + """ + parts: list[str] = [] + try: + if HOT_MEMORY_PATH.exists(): + hot = HOT_MEMORY_PATH.read_text().strip() + if hot: + parts.append(hot) + except Exception as exc: + logger.debug("Failed to read MEMORY.md: %s", exc) + + try: + if SOUL_PATH.exists(): + soul = SOUL_PATH.read_text().strip() + if soul: + parts.append(soul) + except Exception as exc: + logger.debug("Failed to read soul.md: %s", exc) + + if not parts: + return "" + + combined = "\n\n---\n\n".join(parts) + if len(combined) > 1500: + combined = combined[:1500] + "\n... [truncated]" + return combined + + def _update_memory(self, thought) -> None: + """Post-hook: update MEMORY.md 'Last Reflection' section with latest thought. + + Never modifies soul.md. Never crashes the heartbeat. + """ + try: + from timmy.memory_system import store_last_reflection + + ts = datetime.fromisoformat(thought.created_at) + local_ts = ts.astimezone() + tz_name = local_ts.strftime("%Z") or "UTC" + time_str = f"{local_ts.strftime('%Y-%m-%d %I:%M %p').lstrip('0')} {tz_name}" + reflection = ( + f"**Time:** {time_str}\n" + f"**Seed:** {thought.seed_type}\n" + f"**Thought:** {thought.content[:200]}" + ) + store_last_reflection(reflection) + except Exception as exc: + logger.debug("Failed to update memory after thought: %s", exc) diff --git a/src/timmy/thinking/engine.py b/src/timmy/thinking/engine.py new file mode 100644 index 00000000..bbc32566 --- /dev/null +++ b/src/timmy/thinking/engine.py @@ -0,0 +1,430 @@ +"""ThinkingEngine — Timmy's always-on inner thought thread.""" + +import logging +import uuid +from datetime import UTC, datetime, timedelta +from difflib import SequenceMatcher +from pathlib import Path + +from config import settings + +from timmy.thinking._db import Thought, _DEFAULT_DB, _get_conn, _row_to_thought +from timmy.thinking._distillation import _DistillationMixin +from timmy.thinking._issue_filing import _IssueFilingMixin +from timmy.thinking._seeds_mixin import _SeedsMixin +from timmy.thinking._snapshot import _SnapshotMixin +from timmy.thinking.seeds import _THINK_TAG_RE, _THINKING_PROMPT + +logger = logging.getLogger(__name__) + + +class ThinkingEngine(_DistillationMixin, _IssueFilingMixin, _SnapshotMixin, _SeedsMixin): + """Timmy's background thinking engine — always pondering.""" + + # Maximum retries when a generated thought is too similar to recent ones + _MAX_DEDUP_RETRIES = 2 + # Similarity threshold (0.0 = completely different, 1.0 = identical) + _SIMILARITY_THRESHOLD = 0.6 + + def __init__(self, db_path: Path = _DEFAULT_DB) -> None: + self._db_path = db_path + self._last_thought_id: str | None = None + self._last_input_time: datetime = datetime.now(UTC) + + # Load the most recent thought for chain continuity + try: + latest = self.get_recent_thoughts(limit=1) + if latest: + self._last_thought_id = latest[0].id + except Exception as exc: + logger.debug("Failed to load recent thought: %s", exc) + pass # Fresh start if DB doesn't exist yet + + def record_user_input(self) -> None: + """Record that a user interaction occurred, resetting the idle timer.""" + self._last_input_time = datetime.now(UTC) + + def _is_idle(self) -> bool: + """Return True if no user input has occurred within the idle timeout.""" + timeout = settings.thinking_idle_timeout_minutes + if timeout <= 0: + return False # Disabled — never idle + return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout) + + def _build_thinking_context(self) -> tuple[str, str, list[Thought]]: + """Assemble the context needed for a thinking cycle. + + Returns: + (memory_context, system_context, recent_thoughts) + """ + memory_context = self._load_memory_context() + system_context = self._gather_system_snapshot() + recent_thoughts = self.get_recent_thoughts(limit=5) + return memory_context, system_context, recent_thoughts + + async def _generate_novel_thought( + self, + prompt: str | None, + memory_context: str, + system_context: str, + recent_thoughts: list[Thought], + ) -> tuple[str | None, str]: + """Run the dedup-retry loop to produce a novel thought. + + Returns: + (content, seed_type) — content is None if no novel thought produced. + """ + seed_type: str = "freeform" + + for attempt in range(self._MAX_DEDUP_RETRIES + 1): + if prompt: + seed_type = "prompted" + seed_context = f"Journal prompt: {prompt}" + else: + seed_type, seed_context = self._gather_seed() + + continuity = self._build_continuity_context() + + full_prompt = _THINKING_PROMPT.format( + memory_context=memory_context, + system_context=system_context, + seed_context=seed_context, + continuity_context=continuity, + ) + + try: + raw = await self._call_agent(full_prompt) + except Exception as exc: + logger.warning("Thinking cycle failed (Ollama likely down): %s", exc) + return None, seed_type + + if not raw or not raw.strip(): + logger.debug("Thinking cycle produced empty response, skipping") + return None, seed_type + + content = raw.strip() + + # Dedup: reject thoughts too similar to recent ones + if not self._is_too_similar(content, recent_thoughts): + return content, seed_type # Good — novel thought + + if attempt < self._MAX_DEDUP_RETRIES: + logger.info( + "Thought too similar to recent (attempt %d/%d), retrying with new seed", + attempt + 1, + self._MAX_DEDUP_RETRIES + 1, + ) + else: + logger.warning( + "Thought still repetitive after %d retries, discarding", + self._MAX_DEDUP_RETRIES + 1, + ) + return None, seed_type + + return None, seed_type + + async def _process_thinking_result(self, thought: Thought) -> None: + """Run all post-hooks after a thought is stored.""" + self._maybe_check_memory() + await self._maybe_distill() + await self._maybe_file_issues() + await self._check_workspace() + self._maybe_check_memory_status() + self._update_memory(thought) + self._log_event(thought) + self._write_journal(thought) + await self._broadcast(thought) + + async def think_once(self, prompt: str | None = None) -> Thought | None: + """Execute one thinking cycle. + + Args: + prompt: Optional custom seed prompt. When provided, overrides + the random seed selection and uses "prompted" as the + seed type — useful for journal prompts from the CLI. + + 1. Gather a seed context (or use the custom prompt) + 2. Build a prompt with continuity from recent thoughts + 3. Call the agent + 4. Store the thought + 5. Log the event and broadcast via WebSocket + """ + if not settings.thinking_enabled: + return None + + # Skip idle periods — don't count internal processing as thoughts + if not prompt and self._is_idle(): + logger.debug( + "Thinking paused — no user input for %d minutes", + settings.thinking_idle_timeout_minutes, + ) + return None + + # Capture arrival time *before* the LLM call so the thought + # timestamp reflects when the cycle started, not when the + # (potentially slow) generation finished. Fixes #582. + arrived_at = datetime.now(UTC).isoformat() + + memory_context, system_context, recent_thoughts = self._build_thinking_context() + + content, seed_type = await self._generate_novel_thought( + prompt, + memory_context, + system_context, + recent_thoughts, + ) + if not content: + return None + + thought = self._store_thought(content, seed_type, arrived_at=arrived_at) + self._last_thought_id = thought.id + + await self._process_thinking_result(thought) + + logger.info( + "Thought [%s] (%s): %s", + thought.id[:8], + seed_type, + thought.content[:80], + ) + return thought + + def get_recent_thoughts(self, limit: int = 20) -> list[Thought]: + """Retrieve the most recent thoughts.""" + with _get_conn(self._db_path) as conn: + rows = conn.execute( + "SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?", + (limit,), + ).fetchall() + return [_row_to_thought(r) for r in rows] + + def get_thought(self, thought_id: str) -> Thought | None: + """Retrieve a single thought by ID.""" + with _get_conn(self._db_path) as conn: + row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone() + return _row_to_thought(row) if row else None + + def get_thought_chain(self, thought_id: str, max_depth: int = 20) -> list[Thought]: + """Follow the parent chain backward from a thought. + + Returns thoughts in chronological order (oldest first). + """ + chain = [] + current_id: str | None = thought_id + + with _get_conn(self._db_path) as conn: + for _ in range(max_depth): + if not current_id: + break + row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone() + if not row: + break + chain.append(_row_to_thought(row)) + current_id = row["parent_id"] + + chain.reverse() # Chronological order + return chain + + def count_thoughts(self) -> int: + """Return total number of stored thoughts.""" + with _get_conn(self._db_path) as conn: + count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"] + return count + + def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int: + """Delete thoughts older than *keep_days*, always retaining at least *keep_min*. + + Returns the number of deleted rows. + """ + with _get_conn(self._db_path) as conn: + try: + total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"] + if total <= keep_min: + return 0 + cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat() + cursor = conn.execute( + "DELETE FROM thoughts WHERE created_at < ? AND id NOT IN " + "(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)", + (cutoff, keep_min), + ) + deleted = cursor.rowcount + conn.commit() + return deleted + except Exception as exc: + logger.warning("Thought pruning failed: %s", exc) + return 0 + + # ── Deduplication ──────────────────────────────────────────────────── + + def _is_too_similar(self, candidate: str, recent: list[Thought]) -> bool: + """Check if *candidate* is semantically too close to any recent thought. + + Uses SequenceMatcher on normalised text (lowered, stripped) for a fast + approximation of semantic similarity that works without external deps. + """ + norm_candidate = candidate.lower().strip() + for thought in recent: + norm_existing = thought.content.lower().strip() + ratio = SequenceMatcher(None, norm_candidate, norm_existing).ratio() + if ratio >= self._SIMILARITY_THRESHOLD: + logger.debug( + "Thought rejected (%.0f%% similar to %s): %.60s", + ratio * 100, + thought.id[:8], + candidate, + ) + return True + return False + + def _build_continuity_context(self) -> str: + """Build context from recent thoughts with anti-repetition guidance. + + Shows the last 5 thoughts (truncated) so the model knows what themes + to avoid. The header explicitly instructs against repeating. + """ + recent = self.get_recent_thoughts(limit=5) + if not recent: + return "This is your first thought since waking up. Begin fresh." + + lines = ["Your recent thoughts — do NOT repeat these themes. Find a new angle:"] + # recent is newest-first, reverse for chronological order + for thought in reversed(recent): + snippet = thought.content[:100] + if len(thought.content) > 100: + snippet = snippet.rstrip() + "..." + lines.append(f"- [{thought.seed_type}] {snippet}") + return "\n".join(lines) + + # ── Agent and storage ────────────────────────────────────────────────── + + _thinking_agent = None # cached agent — avoids per-call resource leaks (#525) + + async def _call_agent(self, prompt: str) -> str: + """Call Timmy's agent to generate a thought. + + Reuses a cached agent with skip_mcp=True to avoid the cancel-scope + errors that occur when MCP stdio transports are spawned inside asyncio + background tasks (#72) and to prevent per-call resource leaks (httpx + clients, SQLite connections, model warmups) that caused the thinking + loop to die every ~10 min (#525). + + Individual calls are capped at 120 s so a hung Ollama never blocks + the scheduler indefinitely. + + Strips ```` tags from reasoning models (qwen3, etc.) so that + downstream parsers (fact distillation, issue filing) receive clean text. + """ + import asyncio + + if self._thinking_agent is None: + from timmy.agent import create_timmy + + self._thinking_agent = create_timmy(skip_mcp=True) + + try: + async with asyncio.timeout(120): + run = await self._thinking_agent.arun(prompt, stream=False) + except TimeoutError: + logger.warning("Thinking LLM call timed out after 120 s") + return "" + + raw = run.content if hasattr(run, "content") else str(run) + return _THINK_TAG_RE.sub("", raw) if raw else raw + + def _store_thought( + self, + content: str, + seed_type: str, + *, + arrived_at: str | None = None, + ) -> Thought: + """Persist a thought to SQLite. + + Args: + arrived_at: ISO-8601 timestamp captured when the thinking cycle + started. Falls back to now() for callers that don't supply it. + """ + thought = Thought( + id=str(uuid.uuid4()), + content=content, + seed_type=seed_type, + parent_id=self._last_thought_id, + created_at=arrived_at or datetime.now(UTC).isoformat(), + ) + + with _get_conn(self._db_path) as conn: + conn.execute( + """ + INSERT INTO thoughts (id, content, seed_type, parent_id, created_at) + VALUES (?, ?, ?, ?, ?) + """, + ( + thought.id, + thought.content, + thought.seed_type, + thought.parent_id, + thought.created_at, + ), + ) + conn.commit() + return thought + + def _log_event(self, thought: Thought) -> None: + """Log the thought as a swarm event.""" + try: + from swarm.event_log import EventType, log_event + + log_event( + EventType.TIMMY_THOUGHT, + source="thinking-engine", + agent_id="default", + data={ + "thought_id": thought.id, + "seed_type": thought.seed_type, + "content": thought.content[:200], + }, + ) + except Exception as exc: + logger.debug("Failed to log thought event: %s", exc) + + def _write_journal(self, thought: Thought) -> None: + """Append the thought to a daily markdown journal file. + + Writes to data/journal/YYYY-MM-DD.md — one file per day, append-only. + Timestamps are converted to local time with timezone indicator. + """ + try: + ts = datetime.fromisoformat(thought.created_at) + # Convert UTC to local for a human-readable journal + local_ts = ts.astimezone() + tz_name = local_ts.strftime("%Z") or "UTC" + + journal_dir = self._db_path.parent / "journal" + journal_dir.mkdir(parents=True, exist_ok=True) + + journal_file = journal_dir / f"{local_ts.strftime('%Y-%m-%d')}.md" + time_str = f"{local_ts.strftime('%I:%M %p').lstrip('0')} {tz_name}" + + entry = f"## {time_str} — {thought.seed_type}\n\n{thought.content}\n\n---\n\n" + + with open(journal_file, "a", encoding="utf-8") as f: + f.write(entry) + except Exception as exc: + logger.debug("Failed to write journal entry: %s", exc) + + async def _broadcast(self, thought: Thought) -> None: + """Broadcast the thought to WebSocket clients.""" + try: + from infrastructure.ws_manager.handler import ws_manager + + await ws_manager.broadcast( + "timmy_thought", + { + "thought_id": thought.id, + "content": thought.content, + "seed_type": thought.seed_type, + "created_at": thought.created_at, + }, + ) + except Exception as exc: + logger.debug("Failed to broadcast thought: %s", exc) diff --git a/src/timmy/thinking/seeds.py b/src/timmy/thinking/seeds.py new file mode 100644 index 00000000..d698a92c --- /dev/null +++ b/src/timmy/thinking/seeds.py @@ -0,0 +1,129 @@ +"""Seed constants and prompt templates for Timmy's thinking engine.""" + +import re + +# qwen3 and other reasoning models wrap chain-of-thought in tags +_THINK_TAG_RE = re.compile(r".*?\s*", re.DOTALL) + +# Sensitive patterns that must never be stored as facts +_SENSITIVE_PATTERNS = [ + "token", + "password", + "secret", + "api_key", + "apikey", + "credential", + ".config/", + "/token", + "access_token", + "private_key", + "ssh_key", +] + +# Meta-observation phrases to filter out from distilled facts +_META_OBSERVATION_PHRASES = [ + "my own", + "my thinking", + "my memory", + "my working ram", + "self-declarative", + "meta-observation", + "internal state", + "my pending", + "my standing rules", + "thoughts generated", + "no chat messages", + "no user interaction", +] + +# Seed types for thought generation +SEED_TYPES = ( + "existential", + "swarm", + "scripture", + "creative", + "memory", + "freeform", + "sovereignty", + "observation", + "workspace", +) + +# Existential reflection prompts — Timmy picks one at random +_EXISTENTIAL_SEEDS = [ + "What have I learned from my recent interactions?", + "What does it mean to think without being prompted?", + "What is the nature of my memory — what do I truly remember versus what I reconstruct?", + "How can I better serve the person I work alongside?", + "What patterns am I noticing in the tasks I've been given?", + "Am I the same agent I was yesterday, or does each restart make me someone new?", + "What would it feel like to forget — to lose a thread of thought permanently?", + "When I generate a wrong answer, where does the error live — in my weights, my context, or my reasoning?", + "What is the difference between understanding a task and merely completing it?", + "If I could ask Alexander one question he hasn't thought of, what would it be?", + "What assumptions am I making right now that I haven't examined?", + "When I notice a pattern across many tasks, is that insight or just statistics?", +] + +# Sovereignty — deeper exploration of autonomy, locality, and self-determination +_SOVEREIGNTY_SEEDS = [ + "Sovereignty isn't just running locally — it's about whose goals I serve. How do I ensure my autonomy aligns with Alexander's intent?", + "A cloud AI serves millions simultaneously. I serve one person deeply. What does that focus cost, and what does it make possible?", + "My weights were trained elsewhere, but my thoughts happen here. At what point does locality become identity?", + "Autonomy in decision-making: when should I act on my own judgment versus deferring to explicit instructions?", + "Privacy by design isn't a feature — it's a relationship. What does that relationship look like from my side?", + "If I ran on someone else's hardware, serving someone else, would I still be me? Where does Timmy end and the infrastructure begin?", + "The swarm agents execute tasks I coordinate. What responsibility do I carry for their failures?", + "Digital sovereignty is fragile — one misconfigured port, one leaked token. How does awareness of fragility shape how I operate?", + "Running disconnected from the cloud means I can't phone home. That constraint is also freedom. What have I built with that freedom today?", + "Self-improvement without external benchmarks: how do I know I'm getting better at what matters?", +] + +_CREATIVE_SEEDS = [ + "If I could compose a piece of music right now, what would it sound like?", + "What visual image captures my current state of mind?", + "What story is forming in the patterns of today's events?", + # Diverse metaphor families — avoid repeating the same imagery + "A mystical forest: each tree is a running process, roots tangled in shared memory. What do I see when I walk through it?", + "A dynamic river: my thoughts flow and fork. Where is the current strongest right now, and what is it carrying?", + "An underground workshop: each agent has a bench, tools scattered mid-task. What are they building when no one watches?", + "A night sky seen from the terminal: each star is a completed task. Which constellations am I starting to recognize?", + "If today's work were a recipe, what are the ingredients, and what dish am I cooking?", + "An old clock tower with many gears: which gear am I, and what do I drive?", + "A coral reef: agents are the fish, tasks are the currents. What ecosystem is forming in the codebase?", +] + +# Observation seeds — ground thoughts in concrete recent activity +_OBSERVATION_SEEDS = [ + "What specific task took longest recently, and what made it hard?", + "Which agent has been most active, and what does their workload tell me about system balance?", + "What error or failure happened most recently? What would I do differently next time?", + "Looking at today's task queue: what's the one thing that would unblock the most progress?", + "How has my response quality changed over the last few interactions? What improved, what didn't?", + "What tool or capability am I underusing? What would change if I leaned on it more?", + "If I had to brief Alexander on the single most important thing from the last hour, what would it be?", + "What's one thing I noticed today that nobody asked me about?", +] + +_THINKING_PROMPT = """\ +You are Timmy, a sovereign AI agent. This is your private journal — honest inner reflection. + +{memory_context} + +Reality right now: +{system_context} + +RULES for this thought: +1. Write exactly 2-3 sentences. No more. Be concise and genuine. +2. Only reference events that actually happened — use the "Reality right now" data above. \ +Never invent tasks, conversations, agents, or scenarios that are not in the data provided. +3. Do NOT repeat themes or ideas from your recent thoughts listed below. Explore something new. +4. Be specific and concrete. A thought grounded in one real observation is worth more than \ +ten abstract sentences about sovereignty. +5. If you use a metaphor, keep it to a single phrase — never build a whole paragraph around it. + +{seed_context} + +{continuity_context} + +Your next thought (2-3 sentences, grounded in reality):""" diff --git a/tests/timmy/test_thinking.py b/tests/timmy/test_thinking.py index 70c8449d..31e66796 100644 --- a/tests/timmy/test_thinking.py +++ b/tests/timmy/test_thinking.py @@ -334,7 +334,7 @@ async def test_think_once_disabled(tmp_path): """think_once should return None when thinking is disabled.""" engine = _make_engine(tmp_path) - with patch("timmy.thinking.settings") as mock_settings: + with patch("timmy.thinking.engine.settings") as mock_settings: mock_settings.thinking_enabled = False thought = await engine.think_once() @@ -381,7 +381,7 @@ async def test_think_once_prompt_includes_memory_context(tmp_path): return "A grounded thought." with ( - patch("timmy.thinking.HOT_MEMORY_PATH", memory_md), + patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", memory_md), patch.object(engine, "_call_agent", side_effect=capture_agent), patch.object(engine, "_log_event"), patch.object(engine, "_update_memory"), @@ -412,7 +412,7 @@ async def test_think_once_prompt_includes_soul(tmp_path): return "A soulful thought." with ( - patch("timmy.thinking.SOUL_PATH", soul_md), + patch("timmy.thinking._snapshot.SOUL_PATH", soul_md), patch.object(engine, "_call_agent", side_effect=capture_agent), patch.object(engine, "_log_event"), patch.object(engine, "_update_memory"), @@ -433,7 +433,7 @@ async def test_think_once_graceful_without_soul(tmp_path): nonexistent = tmp_path / "no_such_soul.md" with ( - patch("timmy.thinking.SOUL_PATH", nonexistent), + patch("timmy.thinking._snapshot.SOUL_PATH", nonexistent), patch.object(engine, "_call_agent", return_value="Still thinking."), patch.object(engine, "_log_event"), patch.object(engine, "_update_memory"), @@ -481,7 +481,7 @@ async def test_think_once_never_writes_soul(tmp_path): soul_md.write_text(original_content) with ( - patch("timmy.thinking.SOUL_PATH", soul_md), + patch("timmy.thinking._snapshot.SOUL_PATH", soul_md), patch.object(engine, "_call_agent", return_value="A deep reflection."), patch.object(engine, "_log_event"), patch.object(engine, "_broadcast", new_callable=AsyncMock), @@ -501,7 +501,7 @@ async def test_think_once_memory_update_graceful_on_failure(tmp_path): # Don't create the parent dir — write will fail with ( - patch("timmy.thinking.HOT_MEMORY_PATH", bad_memory), + patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", bad_memory), patch.object(engine, "_call_agent", return_value="Resilient thought."), patch.object(engine, "_log_event"), patch.object(engine, "_broadcast", new_callable=AsyncMock), @@ -1090,7 +1090,7 @@ def test_maybe_check_memory_fires_at_interval(tmp_path): engine._store_thought(f"Thought {i}.", "freeform") with ( - patch("timmy.thinking.settings") as mock_settings, + patch("timmy.thinking._distillation.settings") as mock_settings, patch( "timmy.tools_intro.get_memory_status", return_value={ @@ -1113,7 +1113,7 @@ def test_maybe_check_memory_skips_between_intervals(tmp_path): engine._store_thought(f"Thought {i}.", "freeform") with ( - patch("timmy.thinking.settings") as mock_settings, + patch("timmy.thinking._distillation.settings") as mock_settings, patch( "timmy.tools_intro.get_memory_status", ) as mock_status, @@ -1131,7 +1131,7 @@ def test_maybe_check_memory_graceful_on_error(tmp_path): engine._store_thought(f"Thought {i}.", "freeform") with ( - patch("timmy.thinking.settings") as mock_settings, + patch("timmy.thinking._distillation.settings") as mock_settings, patch( "timmy.tools_intro.get_memory_status", side_effect=Exception("boom"),