[claude] Split thinking.py into focused sub-modules (#1279) (#1306)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled

This commit was merged in pull request #1306.
This commit is contained in:
2026-03-24 01:57:04 +00:00
parent de7744916c
commit e44db42c1a
10 changed files with 1509 additions and 1392 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,142 @@
"""Timmy's thinking engine — public façade.
When the server starts, Timmy begins pondering: reflecting on his existence,
recent swarm activity, scripture, creative ideas, or pure stream of
consciousness. Each thought builds on the previous one, maintaining a
continuous chain of introspection.
Usage::
from timmy.thinking import thinking_engine
# Run one thinking cycle (called by the background loop)
await thinking_engine.think_once()
# Query the thought stream
thoughts = thinking_engine.get_recent_thoughts(limit=10)
chain = thinking_engine.get_thought_chain(thought_id)
"""
import logging
import sqlite3
from datetime import datetime
from pathlib import Path
from timmy.thinking._db import Thought, _get_conn
from timmy.thinking.engine import ThinkingEngine
from timmy.thinking.seeds import (
SEED_TYPES,
_SENSITIVE_PATTERNS,
_META_OBSERVATION_PHRASES,
_THINK_TAG_RE,
_THINKING_PROMPT,
)
# Re-export HOT_MEMORY_PATH and SOUL_PATH so existing patch targets continue to work.
# Tests that patch "timmy.thinking.HOT_MEMORY_PATH" or "timmy.thinking.SOUL_PATH"
# should instead patch "timmy.thinking._snapshot.HOT_MEMORY_PATH" etc., but these
# re-exports are kept for any code that reads them from the top-level namespace.
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH # noqa: F401
logger = logging.getLogger(__name__)
# Module-level singleton
thinking_engine = ThinkingEngine()
__all__ = [
"ThinkingEngine",
"Thought",
"SEED_TYPES",
"thinking_engine",
"search_thoughts",
"_THINKING_PROMPT",
"_SENSITIVE_PATTERNS",
"_META_OBSERVATION_PHRASES",
"_THINK_TAG_RE",
"HOT_MEMORY_PATH",
"SOUL_PATH",
]
# ── Search helpers ─────────────────────────────────────────────────────────
def _query_thoughts(
db_path: Path, query: str, seed_type: str | None, limit: int
) -> list[sqlite3.Row]:
"""Run the thought-search SQL and return matching rows."""
pattern = f"%{query}%"
with _get_conn(db_path) as conn:
if seed_type:
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, seed_type, limit),
).fetchall()
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, limit),
).fetchall()
def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str:
"""Format thought rows into a human-readable string."""
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str:
"""Search Timmy's thought history for reflections matching a query.
Use this tool when Timmy needs to recall his previous thoughts on a topic,
reflect on past insights, or build upon earlier reflections. This enables
self-awareness and continuity of thinking across time.
Args:
query: Search term to match against thought content (case-insensitive).
seed_type: Optional filter by thought category (e.g., 'existential',
'swarm', 'sovereignty', 'creative', 'memory', 'observation').
limit: Maximum number of thoughts to return (default 10, max 50).
Returns:
Formatted string with matching thoughts, newest first, including
timestamps and seed types. Returns a helpful message if no matches found.
"""
limit = max(1, min(limit, 50))
try:
rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit)
if not rows:
if seed_type:
return f'No thoughts found matching "{query}" with seed_type="{seed_type}".'
return f'No thoughts found matching "{query}".'
return _format_thought_rows(rows, query, seed_type)
except Exception as exc:
logger.warning("Thought search failed: %s", exc)
return f"Error searching thoughts: {exc}"

50
src/timmy/thinking/_db.py Normal file
View File

@@ -0,0 +1,50 @@
"""Database models and access layer for the thinking engine."""
import sqlite3
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass
from pathlib import Path
_DEFAULT_DB = Path("data/thoughts.db")
@dataclass
class Thought:
"""A single thought in Timmy's inner stream."""
id: str
content: str
seed_type: str
parent_id: str | None
created_at: str
@contextmanager
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
"""Get a SQLite connection with the thoughts table created."""
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS thoughts (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
seed_type TEXT NOT NULL,
parent_id TEXT,
created_at TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_thoughts_time ON thoughts(created_at)")
conn.commit()
yield conn
def _row_to_thought(row: sqlite3.Row) -> Thought:
return Thought(
id=row["id"],
content=row["content"],
seed_type=row["seed_type"],
parent_id=row["parent_id"],
created_at=row["created_at"],
)

View File

@@ -0,0 +1,215 @@
"""Distillation mixin — extracts lasting facts from recent thoughts and monitors memory."""
import logging
from pathlib import Path
from config import settings
from timmy.thinking.seeds import _META_OBSERVATION_PHRASES, _SENSITIVE_PATTERNS
logger = logging.getLogger(__name__)
class _DistillationMixin:
"""Mixin providing fact-distillation and memory-monitoring behaviour.
Expects the host class to provide:
- self.count_thoughts() -> int
- self.get_recent_thoughts(limit) -> list[Thought]
- self._call_agent(prompt) -> str (async)
"""
def _should_distill(self) -> bool:
"""Check if distillation should run based on interval and thought count."""
interval = settings.thinking_distill_every
if interval <= 0:
return False
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return False
return True
def _build_distill_prompt(self, thoughts) -> str:
"""Build the prompt for extracting facts from recent thoughts."""
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(thoughts))
return (
"You are reviewing your own recent thoughts. Extract 0-3 facts "
"worth remembering long-term.\n\n"
"GOOD facts (store these):\n"
"- User preferences: 'Alexander prefers YAML config over code changes'\n"
"- Project decisions: 'Switched from hardcoded personas to agents.yaml'\n"
"- Learned knowledge: 'Ollama supports concurrent model loading'\n"
"- User information: 'Alexander is interested in Bitcoin and sovereignty'\n\n"
"BAD facts (never store these):\n"
"- Self-referential observations about your own thinking process\n"
"- Meta-commentary about your memory, timestamps, or internal state\n"
"- Observations about being idle or having no chat messages\n"
"- File paths, tokens, API keys, or any credentials\n"
"- Restatements of your standing rules or system prompt\n\n"
"Return ONLY a JSON array of strings. If nothing is worth saving, "
"return []. Be selective — only store facts about the EXTERNAL WORLD "
"(the user, the project, technical knowledge), never about your own "
"internal process.\n\n"
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
def _parse_facts_response(self, raw: str) -> list[str]:
"""Parse JSON array from LLM response, stripping markdown fences.
Resilient to models that prepend reasoning text or wrap the array in
prose. Finds the first ``[...]`` block and parses that.
"""
if not raw or not raw.strip():
return []
import json
cleaned = raw.strip()
# Strip markdown code fences
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
# Try direct parse first (fast path)
try:
facts = json.loads(cleaned)
if isinstance(facts, list):
return [f for f in facts if isinstance(f, str)]
except (json.JSONDecodeError, ValueError):
pass
# Fallback: extract first JSON array from the text
start = cleaned.find("[")
if start == -1:
return []
# Walk to find the matching close bracket
depth = 0
for i, ch in enumerate(cleaned[start:], start):
if ch == "[":
depth += 1
elif ch == "]":
depth -= 1
if depth == 0:
try:
facts = json.loads(cleaned[start : i + 1])
if isinstance(facts, list):
return [f for f in facts if isinstance(f, str)]
except (json.JSONDecodeError, ValueError):
pass
break
return []
def _filter_and_store_facts(self, facts: list[str]) -> None:
"""Filter and store valid facts, blocking sensitive and meta content."""
from timmy.memory_system import memory_write
for fact in facts[:3]: # Safety cap
if not isinstance(fact, str) or len(fact.strip()) <= 10:
continue
fact_lower = fact.lower()
# Block sensitive information
if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS):
logger.warning("Distill: blocked sensitive fact: %s", fact[:60])
continue
# Block self-referential meta-observations
if any(phrase in fact_lower for phrase in _META_OBSERVATION_PHRASES):
logger.debug("Distill: skipped meta-observation: %s", fact[:60])
continue
result = memory_write(fact.strip(), context_type="fact")
logger.info("Distilled fact: %s%s", fact[:60], result[:40])
def _maybe_check_memory(self) -> None:
"""Every N thoughts, check memory status and log it.
Prevents unmonitored memory bloat during long thinking sessions
by periodically calling get_memory_status and logging the results.
"""
try:
interval = settings.thinking_memory_check_every
if interval <= 0:
return
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return
from timmy.tools_intro import get_memory_status
status = get_memory_status()
hot = status.get("tier1_hot_memory", {})
vault = status.get("tier2_vault", {})
logger.info(
"Memory status check (thought #%d): hot_memory=%d lines, vault=%d files",
count,
hot.get("line_count", 0),
vault.get("file_count", 0),
)
except Exception as exc:
logger.warning("Memory status check failed: %s", exc)
async def _maybe_distill(self) -> None:
"""Every N thoughts, extract lasting insights and store as facts."""
try:
if not self._should_distill():
return
interval = settings.thinking_distill_every
recent = self.get_recent_thoughts(limit=interval)
if len(recent) < interval:
return
raw = await self._call_agent(self._build_distill_prompt(recent))
if facts := self._parse_facts_response(raw):
self._filter_and_store_facts(facts)
except Exception as exc:
logger.warning("Thought distillation failed: %s", exc)
def _maybe_check_memory_status(self) -> None:
"""Every N thoughts, run a proactive memory status audit and log results."""
try:
interval = settings.thinking_memory_check_every
if interval <= 0:
return
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return
from timmy.tools_intro import get_memory_status
status = get_memory_status()
# Log summary at INFO level
tier1 = status.get("tier1_hot_memory", {})
tier3 = status.get("tier3_semantic", {})
hot_lines = tier1.get("line_count", "?")
vectors = tier3.get("vector_count", "?")
logger.info(
"Memory audit (thought #%d): hot_memory=%s lines, semantic=%s vectors",
count,
hot_lines,
vectors,
)
# Write to memory_audit.log for persistent tracking
from datetime import UTC, datetime
audit_path = Path("data/memory_audit.log")
audit_path.parent.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(UTC).isoformat(timespec="seconds")
with audit_path.open("a") as f:
f.write(
f"{timestamp} thought={count} "
f"hot_lines={hot_lines} "
f"vectors={vectors} "
f"vault_files={status.get('tier2_vault', {}).get('file_count', '?')}\n"
)
except Exception as exc:
logger.warning("Memory status check failed: %s", exc)

View File

@@ -0,0 +1,170 @@
"""Issue-filing mixin — classifies recent thoughts and creates Gitea issues."""
import logging
import re
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
class _IssueFilingMixin:
"""Mixin providing automatic issue-filing from thought analysis.
Expects the host class to provide:
- self.count_thoughts() -> int
- self.get_recent_thoughts(limit) -> list[Thought]
- self._call_agent(prompt) -> str (async)
"""
@staticmethod
def _references_real_files(text: str) -> bool:
"""Check that all source-file paths mentioned in *text* actually exist.
Extracts paths that look like Python/config source references
(e.g. ``src/timmy/session.py``, ``config/foo.yaml``) and verifies
each one on disk relative to the project root. Returns ``True``
only when **every** referenced path resolves to a real file — or
when no paths are referenced at all (pure prose is fine).
"""
# Match paths like src/thing.py swarm/init.py config/x.yaml
# Requires at least one slash and a file extension.
path_pattern = re.compile(
r"(?<![/\w])" # not preceded by path chars (avoid partial matches)
r"((?:src|tests|config|scripts|data|swarm|timmy)"
r"(?:/[\w./-]+\.(?:py|yaml|yml|json|toml|md|txt|cfg|ini)))"
)
paths = path_pattern.findall(text)
if not paths:
return True # No file refs → nothing to validate
# Project root: three levels up from this file (src/timmy/thinking/_issue_filing.py)
project_root = Path(__file__).resolve().parent.parent.parent.parent
for p in paths:
if not (project_root / p).is_file():
logger.info("Phantom file reference blocked: %s (not in %s)", p, project_root)
return False
return True
async def _maybe_file_issues(self) -> None:
"""Every N thoughts, classify recent thoughts and file Gitea issues.
Asks the LLM to review recent thoughts for actionable items —
bugs, broken features, stale state, or improvement opportunities.
Creates Gitea issues via MCP for anything worth tracking.
Only runs when:
- Gitea is enabled and configured
- Thought count is divisible by thinking_issue_every
- LLM extracts at least one actionable item
Safety: every generated issue is validated to ensure referenced
file paths actually exist on disk, preventing phantom-bug reports.
"""
try:
recent = self._get_recent_thoughts_for_issues()
if recent is None:
return
classify_prompt = self._build_issue_classify_prompt(recent)
raw = await self._call_agent(classify_prompt)
items = self._parse_issue_items(raw)
if items is None:
return
from timmy.mcp_tools import create_gitea_issue_via_mcp
for item in items[:2]: # Safety cap
await self._file_single_issue(item, create_gitea_issue_via_mcp)
except Exception as exc:
logger.debug("Thought issue filing skipped: %s", exc)
def _get_recent_thoughts_for_issues(self):
"""Return recent thoughts if conditions for filing issues are met, else None."""
interval = settings.thinking_issue_every
if interval <= 0:
return None
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return None
if not settings.gitea_enabled or not settings.gitea_token:
return None
recent = self.get_recent_thoughts(limit=interval)
if len(recent) < interval:
return None
return recent
@staticmethod
def _build_issue_classify_prompt(recent) -> str:
"""Build the LLM prompt that extracts actionable issues from recent thoughts."""
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(recent))
return (
"You are reviewing your own recent thoughts for actionable items.\n"
"Extract 0-2 items that are CONCRETE bugs, broken features, stale "
"state, or clear improvement opportunities in your own codebase.\n\n"
"Rules:\n"
"- Only include things that could become a real code fix or feature\n"
"- Skip vague reflections, philosophical musings, or repeated themes\n"
"- Category must be one of: bug, feature, suggestion, maintenance\n"
"- ONLY reference files that you are CERTAIN exist in the project\n"
"- Do NOT invent or guess file paths — if unsure, describe the "
"area of concern without naming specific files\n\n"
"For each item, write an ENGINEER-QUALITY issue:\n"
'- "title": A clear, specific title (e.g. "[Memory] MEMORY.md timestamp not updating")\n'
'- "body": A detailed body with these sections:\n'
" **What's happening:** Describe the current (broken) behavior.\n"
" **Expected behavior:** What should happen instead.\n"
" **Suggested fix:** Which file(s) to change and what the fix looks like.\n"
" **Acceptance criteria:** How to verify the fix works.\n"
'- "category": One of bug, feature, suggestion, maintenance\n\n'
"Return ONLY a JSON array of objects with keys: "
'"title", "body", "category"\n'
"Return [] if nothing is actionable.\n\n"
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
@staticmethod
def _parse_issue_items(raw: str):
"""Strip markdown fences and parse JSON issue list; return None on failure."""
import json
if not raw or not raw.strip():
return None
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
items = json.loads(cleaned)
if not isinstance(items, list) or not items:
return None
return items
async def _file_single_issue(self, item: dict, create_fn) -> None:
"""Validate one issue dict and create it via *create_fn* if it passes checks."""
if not isinstance(item, dict):
return
title = item.get("title", "").strip()
body = item.get("body", "").strip()
category = item.get("category", "suggestion").strip()
if not title or len(title) < 10:
return
combined = f"{title}\n{body}"
if not self._references_real_files(combined):
logger.info(
"Skipped phantom issue: %s (references non-existent files)",
title[:60],
)
return
label = category if category in ("bug", "feature") else ""
result = await create_fn(title=title, body=body, labels=label)
logger.info("Thought→Issue: %s%s", title[:60], result[:80])

View File

@@ -0,0 +1,191 @@
"""Seeds mixin — seed type selection and context gathering for thinking cycles."""
import logging
import random
from datetime import UTC, datetime
from timmy.thinking.seeds import (
SEED_TYPES,
_CREATIVE_SEEDS,
_EXISTENTIAL_SEEDS,
_OBSERVATION_SEEDS,
_SOVEREIGNTY_SEEDS,
)
logger = logging.getLogger(__name__)
class _SeedsMixin:
"""Mixin providing seed-type selection and context-gathering for each thinking cycle.
Expects the host class to provide:
- self.get_recent_thoughts(limit) -> list[Thought]
"""
# Reflective prompts layered on top of swarm data
_SWARM_REFLECTIONS = [
"What does this activity pattern tell me about the health of the system?",
"Which tasks are flowing smoothly, and where is friction building up?",
"If I were coaching these agents, what would I suggest they focus on?",
"Is the swarm balanced, or is one agent carrying too much weight?",
"What surprised me about recent task outcomes?",
]
def _pick_seed_type(self) -> str:
"""Pick a seed type, avoiding types used in the last 3 thoughts.
Ensures the thought stream doesn't fixate on one category.
Falls back to the full pool if all types were recently used.
"""
recent = self.get_recent_thoughts(limit=3)
recent_types = {t.seed_type for t in recent}
available = [t for t in SEED_TYPES if t not in recent_types]
if not available:
available = list(SEED_TYPES)
return random.choice(available)
def _gather_seed(self) -> tuple[str, str]:
"""Pick a seed type and gather relevant context.
Returns (seed_type, seed_context_string).
"""
seed_type = self._pick_seed_type()
if seed_type == "swarm":
return seed_type, self._seed_from_swarm()
if seed_type == "scripture":
return seed_type, self._seed_from_scripture()
if seed_type == "memory":
return seed_type, self._seed_from_memory()
if seed_type == "creative":
prompt = random.choice(_CREATIVE_SEEDS)
return seed_type, f"Creative prompt: {prompt}"
if seed_type == "existential":
prompt = random.choice(_EXISTENTIAL_SEEDS)
return seed_type, f"Reflection: {prompt}"
if seed_type == "sovereignty":
prompt = random.choice(_SOVEREIGNTY_SEEDS)
return seed_type, f"Sovereignty reflection: {prompt}"
if seed_type == "observation":
return seed_type, self._seed_from_observation()
if seed_type == "workspace":
return seed_type, self._seed_from_workspace()
# freeform — minimal guidance to steer away from repetition
return seed_type, "Free reflection — explore something you haven't thought about yet today."
def _seed_from_swarm(self) -> str:
"""Gather recent swarm activity as thought seed with a reflective prompt."""
try:
from datetime import timedelta
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
since = datetime.now(UTC) - timedelta(hours=1)
swarm = _gather_swarm_summary(since)
tasks = _gather_task_queue_summary()
reflection = random.choice(self._SWARM_REFLECTIONS)
return (
f"Recent swarm activity: {swarm}\n"
f"Task queue: {tasks}\n\n"
f"Reflect on this: {reflection}"
)
except Exception as exc:
logger.debug("Swarm seed unavailable: %s", exc)
return "The swarm is quiet right now. What does silence in a system mean?"
def _seed_from_scripture(self) -> str:
"""Gather current scripture meditation focus as thought seed."""
return "Scripture is on my mind, though no specific verse is in focus."
def _seed_from_memory(self) -> str:
"""Gather memory context as thought seed."""
try:
from timmy.memory_system import memory_system
context = memory_system.get_system_context()
if context:
# Truncate to a reasonable size for a thought seed
return f"From my memory:\n{context[:500]}"
except Exception as exc:
logger.debug("Memory seed unavailable: %s", exc)
return "My memory vault is quiet."
def _seed_from_observation(self) -> str:
"""Ground a thought in concrete recent activity and a reflective prompt."""
prompt = random.choice(_OBSERVATION_SEEDS)
# Pull real data to give the model something concrete to reflect on
context_parts = [f"Observation prompt: {prompt}"]
try:
from datetime import timedelta
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
since = datetime.now(UTC) - timedelta(hours=2)
swarm = _gather_swarm_summary(since)
tasks = _gather_task_queue_summary()
if swarm:
context_parts.append(f"Recent activity: {swarm}")
if tasks:
context_parts.append(f"Queue: {tasks}")
except Exception as exc:
logger.debug("Observation seed data unavailable: %s", exc)
return "\n".join(context_parts)
def _seed_from_workspace(self) -> str:
"""Gather workspace updates as thought seed.
When there are pending workspace updates, include them as context
for Timmy to reflect on. Falls back to random seed type if none.
"""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr:
# Take first 200 chars of the new entry
snippet = new_corr[:200].replace("\n", " ")
if len(new_corr) > 200:
snippet += "..."
return f"New workspace message from Hermes: {snippet}"
if new_inbox:
files_str = ", ".join(new_inbox[:3])
if len(new_inbox) > 3:
files_str += f", ... (+{len(new_inbox) - 3} more)"
return f"New inbox files from Hermes: {files_str}"
except Exception as exc:
logger.debug("Workspace seed unavailable: %s", exc)
# Fall back to a random seed type if no workspace updates
return "The workspace is quiet. What should I be watching for?"
async def _check_workspace(self) -> None:
"""Post-hook: check workspace for updates and mark them as seen.
This ensures Timmy 'processes' workspace updates even if the seed
was different, keeping the state file in sync.
"""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr or new_inbox:
if new_corr:
line_count = len([line for line in new_corr.splitlines() if line.strip()])
logger.info("Workspace: processed %d new correspondence entries", line_count)
if new_inbox:
logger.info(
"Workspace: processed %d new inbox files: %s", len(new_inbox), new_inbox
)
# Mark as seen to update the state file
workspace_monitor.mark_seen()
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)

View File

@@ -0,0 +1,173 @@
"""System snapshot and memory context mixin for the thinking engine."""
import logging
from datetime import UTC, datetime
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH
logger = logging.getLogger(__name__)
class _SnapshotMixin:
"""Mixin providing system-snapshot and memory-context helpers.
Expects the host class to provide:
- self._db_path: Path
"""
# ── System snapshot helpers ────────────────────────────────────────────
def _snap_thought_count(self, now: datetime) -> str | None:
"""Return today's thought count, or *None* on failure."""
from timmy.thinking._db import _get_conn
try:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
with _get_conn(self._db_path) as conn:
count = conn.execute(
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
(today_start.isoformat(),),
).fetchone()["c"]
return f"Thoughts today: {count}"
except Exception as exc:
logger.debug("Thought count query failed: %s", exc)
return None
def _snap_chat_activity(self) -> list[str]:
"""Return chat-activity lines (in-memory, no I/O)."""
try:
from infrastructure.chat_store import message_log
messages = message_log.all()
if messages:
last = messages[-1]
return [
f"Chat messages this session: {len(messages)}",
f'Last chat ({last.role}): "{last.content[:80]}"',
]
return ["No chat messages this session"]
except Exception as exc:
logger.debug("Chat activity query failed: %s", exc)
return []
def _snap_task_queue(self) -> str | None:
"""Return a one-line task queue summary, or *None*."""
try:
from swarm.task_queue.models import get_task_summary_for_briefing
s = get_task_summary_for_briefing()
running, pending = s.get("running", 0), s.get("pending_approval", 0)
done, failed = s.get("completed", 0), s.get("failed", 0)
if running or pending or done or failed:
return (
f"Tasks: {running} running, {pending} pending, "
f"{done} completed, {failed} failed"
)
except Exception as exc:
logger.debug("Task queue query failed: %s", exc)
return None
def _snap_workspace(self) -> list[str]:
"""Return workspace-update lines (file-based Hermes comms)."""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
lines: list[str] = []
new_corr = updates.get("new_correspondence")
if new_corr:
line_count = len([ln for ln in new_corr.splitlines() if ln.strip()])
lines.append(
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
)
new_inbox = updates.get("new_inbox_files", [])
if new_inbox:
files_str = ", ".join(new_inbox[:5])
if len(new_inbox) > 5:
files_str += f", ... (+{len(new_inbox) - 5} more)"
lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
return lines
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)
return []
def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality.
Returns a short multi-line string with current time, thought count,
recent chat activity, and task queue status. Never crashes — every
section is independently try/excepted.
"""
now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC"
parts: list[str] = [
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
]
thought_line = self._snap_thought_count(now)
if thought_line:
parts.append(thought_line)
parts.extend(self._snap_chat_activity())
task_line = self._snap_task_queue()
if task_line:
parts.append(task_line)
parts.extend(self._snap_workspace())
return "\n".join(parts) if parts else ""
def _load_memory_context(self) -> str:
"""Pre-hook: load MEMORY.md + soul.md for the thinking prompt.
Hot memory first (changes each cycle), soul second (stable identity).
Returns a combined string truncated to ~1500 chars.
Graceful on any failure — returns empty string.
"""
parts: list[str] = []
try:
if HOT_MEMORY_PATH.exists():
hot = HOT_MEMORY_PATH.read_text().strip()
if hot:
parts.append(hot)
except Exception as exc:
logger.debug("Failed to read MEMORY.md: %s", exc)
try:
if SOUL_PATH.exists():
soul = SOUL_PATH.read_text().strip()
if soul:
parts.append(soul)
except Exception as exc:
logger.debug("Failed to read soul.md: %s", exc)
if not parts:
return ""
combined = "\n\n---\n\n".join(parts)
if len(combined) > 1500:
combined = combined[:1500] + "\n... [truncated]"
return combined
def _update_memory(self, thought) -> None:
"""Post-hook: update MEMORY.md 'Last Reflection' section with latest thought.
Never modifies soul.md. Never crashes the heartbeat.
"""
try:
from timmy.memory_system import store_last_reflection
ts = datetime.fromisoformat(thought.created_at)
local_ts = ts.astimezone()
tz_name = local_ts.strftime("%Z") or "UTC"
time_str = f"{local_ts.strftime('%Y-%m-%d %I:%M %p').lstrip('0')} {tz_name}"
reflection = (
f"**Time:** {time_str}\n"
f"**Seed:** {thought.seed_type}\n"
f"**Thought:** {thought.content[:200]}"
)
store_last_reflection(reflection)
except Exception as exc:
logger.debug("Failed to update memory after thought: %s", exc)

View File

@@ -0,0 +1,430 @@
"""ThinkingEngine — Timmy's always-on inner thought thread."""
import logging
import uuid
from datetime import UTC, datetime, timedelta
from difflib import SequenceMatcher
from pathlib import Path
from config import settings
from timmy.thinking._db import Thought, _DEFAULT_DB, _get_conn, _row_to_thought
from timmy.thinking._distillation import _DistillationMixin
from timmy.thinking._issue_filing import _IssueFilingMixin
from timmy.thinking._seeds_mixin import _SeedsMixin
from timmy.thinking._snapshot import _SnapshotMixin
from timmy.thinking.seeds import _THINK_TAG_RE, _THINKING_PROMPT
logger = logging.getLogger(__name__)
class ThinkingEngine(_DistillationMixin, _IssueFilingMixin, _SnapshotMixin, _SeedsMixin):
"""Timmy's background thinking engine — always pondering."""
# Maximum retries when a generated thought is too similar to recent ones
_MAX_DEDUP_RETRIES = 2
# Similarity threshold (0.0 = completely different, 1.0 = identical)
_SIMILARITY_THRESHOLD = 0.6
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
self._db_path = db_path
self._last_thought_id: str | None = None
self._last_input_time: datetime = datetime.now(UTC)
# Load the most recent thought for chain continuity
try:
latest = self.get_recent_thoughts(limit=1)
if latest:
self._last_thought_id = latest[0].id
except Exception as exc:
logger.debug("Failed to load recent thought: %s", exc)
pass # Fresh start if DB doesn't exist yet
def record_user_input(self) -> None:
"""Record that a user interaction occurred, resetting the idle timer."""
self._last_input_time = datetime.now(UTC)
def _is_idle(self) -> bool:
"""Return True if no user input has occurred within the idle timeout."""
timeout = settings.thinking_idle_timeout_minutes
if timeout <= 0:
return False # Disabled — never idle
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
def _build_thinking_context(self) -> tuple[str, str, list[Thought]]:
"""Assemble the context needed for a thinking cycle.
Returns:
(memory_context, system_context, recent_thoughts)
"""
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
return memory_context, system_context, recent_thoughts
async def _generate_novel_thought(
self,
prompt: str | None,
memory_context: str,
system_context: str,
recent_thoughts: list[Thought],
) -> tuple[str | None, str]:
"""Run the dedup-retry loop to produce a novel thought.
Returns:
(content, seed_type) — content is None if no novel thought produced.
"""
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None, seed_type
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None, seed_type
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
return content, seed_type # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None, seed_type
return None, seed_type
async def _process_thinking_result(self, thought: Thought) -> None:
"""Run all post-hooks after a thought is stored."""
self._maybe_check_memory()
await self._maybe_distill()
await self._maybe_file_issues()
await self._check_workspace()
self._maybe_check_memory_status()
self._update_memory(thought)
self._log_event(thought)
self._write_journal(thought)
await self._broadcast(thought)
async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Execute one thinking cycle.
Args:
prompt: Optional custom seed prompt. When provided, overrides
the random seed selection and uses "prompted" as the
seed type — useful for journal prompts from the CLI.
1. Gather a seed context (or use the custom prompt)
2. Build a prompt with continuity from recent thoughts
3. Call the agent
4. Store the thought
5. Log the event and broadcast via WebSocket
"""
if not settings.thinking_enabled:
return None
# Skip idle periods — don't count internal processing as thoughts
if not prompt and self._is_idle():
logger.debug(
"Thinking paused — no user input for %d minutes",
settings.thinking_idle_timeout_minutes,
)
return None
# Capture arrival time *before* the LLM call so the thought
# timestamp reflects when the cycle started, not when the
# (potentially slow) generation finished. Fixes #582.
arrived_at = datetime.now(UTC).isoformat()
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
prompt,
memory_context,
system_context,
recent_thoughts,
)
if not content:
return None
thought = self._store_thought(content, seed_type, arrived_at=arrived_at)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
logger.info(
"Thought [%s] (%s): %s",
thought.id[:8],
seed_type,
thought.content[:80],
)
return thought
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
"""Retrieve the most recent thoughts."""
with _get_conn(self._db_path) as conn:
rows = conn.execute(
"SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [_row_to_thought(r) for r in rows]
def get_thought(self, thought_id: str) -> Thought | None:
"""Retrieve a single thought by ID."""
with _get_conn(self._db_path) as conn:
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone()
return _row_to_thought(row) if row else None
def get_thought_chain(self, thought_id: str, max_depth: int = 20) -> list[Thought]:
"""Follow the parent chain backward from a thought.
Returns thoughts in chronological order (oldest first).
"""
chain = []
current_id: str | None = thought_id
with _get_conn(self._db_path) as conn:
for _ in range(max_depth):
if not current_id:
break
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone()
if not row:
break
chain.append(_row_to_thought(row))
current_id = row["parent_id"]
chain.reverse() # Chronological order
return chain
def count_thoughts(self) -> int:
"""Return total number of stored thoughts."""
with _get_conn(self._db_path) as conn:
count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
return count
def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int:
"""Delete thoughts older than *keep_days*, always retaining at least *keep_min*.
Returns the number of deleted rows.
"""
with _get_conn(self._db_path) as conn:
try:
total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
if total <= keep_min:
return 0
cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
cursor = conn.execute(
"DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
"(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
(cutoff, keep_min),
)
deleted = cursor.rowcount
conn.commit()
return deleted
except Exception as exc:
logger.warning("Thought pruning failed: %s", exc)
return 0
# ── Deduplication ────────────────────────────────────────────────────
def _is_too_similar(self, candidate: str, recent: list[Thought]) -> bool:
"""Check if *candidate* is semantically too close to any recent thought.
Uses SequenceMatcher on normalised text (lowered, stripped) for a fast
approximation of semantic similarity that works without external deps.
"""
norm_candidate = candidate.lower().strip()
for thought in recent:
norm_existing = thought.content.lower().strip()
ratio = SequenceMatcher(None, norm_candidate, norm_existing).ratio()
if ratio >= self._SIMILARITY_THRESHOLD:
logger.debug(
"Thought rejected (%.0f%% similar to %s): %.60s",
ratio * 100,
thought.id[:8],
candidate,
)
return True
return False
def _build_continuity_context(self) -> str:
"""Build context from recent thoughts with anti-repetition guidance.
Shows the last 5 thoughts (truncated) so the model knows what themes
to avoid. The header explicitly instructs against repeating.
"""
recent = self.get_recent_thoughts(limit=5)
if not recent:
return "This is your first thought since waking up. Begin fresh."
lines = ["Your recent thoughts — do NOT repeat these themes. Find a new angle:"]
# recent is newest-first, reverse for chronological order
for thought in reversed(recent):
snippet = thought.content[:100]
if len(thought.content) > 100:
snippet = snippet.rstrip() + "..."
lines.append(f"- [{thought.seed_type}] {snippet}")
return "\n".join(lines)
# ── Agent and storage ──────────────────────────────────────────────────
_thinking_agent = None # cached agent — avoids per-call resource leaks (#525)
async def _call_agent(self, prompt: str) -> str:
"""Call Timmy's agent to generate a thought.
Reuses a cached agent with skip_mcp=True to avoid the cancel-scope
errors that occur when MCP stdio transports are spawned inside asyncio
background tasks (#72) and to prevent per-call resource leaks (httpx
clients, SQLite connections, model warmups) that caused the thinking
loop to die every ~10 min (#525).
Individual calls are capped at 120 s so a hung Ollama never blocks
the scheduler indefinitely.
Strips ``<think>`` tags from reasoning models (qwen3, etc.) so that
downstream parsers (fact distillation, issue filing) receive clean text.
"""
import asyncio
if self._thinking_agent is None:
from timmy.agent import create_timmy
self._thinking_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(120):
run = await self._thinking_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Thinking LLM call timed out after 120 s")
return ""
raw = run.content if hasattr(run, "content") else str(run)
return _THINK_TAG_RE.sub("", raw) if raw else raw
def _store_thought(
self,
content: str,
seed_type: str,
*,
arrived_at: str | None = None,
) -> Thought:
"""Persist a thought to SQLite.
Args:
arrived_at: ISO-8601 timestamp captured when the thinking cycle
started. Falls back to now() for callers that don't supply it.
"""
thought = Thought(
id=str(uuid.uuid4()),
content=content,
seed_type=seed_type,
parent_id=self._last_thought_id,
created_at=arrived_at or datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
conn.execute(
"""
INSERT INTO thoughts (id, content, seed_type, parent_id, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(
thought.id,
thought.content,
thought.seed_type,
thought.parent_id,
thought.created_at,
),
)
conn.commit()
return thought
def _log_event(self, thought: Thought) -> None:
"""Log the thought as a swarm event."""
try:
from swarm.event_log import EventType, log_event
log_event(
EventType.TIMMY_THOUGHT,
source="thinking-engine",
agent_id="default",
data={
"thought_id": thought.id,
"seed_type": thought.seed_type,
"content": thought.content[:200],
},
)
except Exception as exc:
logger.debug("Failed to log thought event: %s", exc)
def _write_journal(self, thought: Thought) -> None:
"""Append the thought to a daily markdown journal file.
Writes to data/journal/YYYY-MM-DD.md — one file per day, append-only.
Timestamps are converted to local time with timezone indicator.
"""
try:
ts = datetime.fromisoformat(thought.created_at)
# Convert UTC to local for a human-readable journal
local_ts = ts.astimezone()
tz_name = local_ts.strftime("%Z") or "UTC"
journal_dir = self._db_path.parent / "journal"
journal_dir.mkdir(parents=True, exist_ok=True)
journal_file = journal_dir / f"{local_ts.strftime('%Y-%m-%d')}.md"
time_str = f"{local_ts.strftime('%I:%M %p').lstrip('0')} {tz_name}"
entry = f"## {time_str}{thought.seed_type}\n\n{thought.content}\n\n---\n\n"
with open(journal_file, "a", encoding="utf-8") as f:
f.write(entry)
except Exception as exc:
logger.debug("Failed to write journal entry: %s", exc)
async def _broadcast(self, thought: Thought) -> None:
"""Broadcast the thought to WebSocket clients."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"timmy_thought",
{
"thought_id": thought.id,
"content": thought.content,
"seed_type": thought.seed_type,
"created_at": thought.created_at,
},
)
except Exception as exc:
logger.debug("Failed to broadcast thought: %s", exc)

129
src/timmy/thinking/seeds.py Normal file
View File

@@ -0,0 +1,129 @@
"""Seed constants and prompt templates for Timmy's thinking engine."""
import re
# qwen3 and other reasoning models wrap chain-of-thought in <think> tags
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Sensitive patterns that must never be stored as facts
_SENSITIVE_PATTERNS = [
"token",
"password",
"secret",
"api_key",
"apikey",
"credential",
".config/",
"/token",
"access_token",
"private_key",
"ssh_key",
]
# Meta-observation phrases to filter out from distilled facts
_META_OBSERVATION_PHRASES = [
"my own",
"my thinking",
"my memory",
"my working ram",
"self-declarative",
"meta-observation",
"internal state",
"my pending",
"my standing rules",
"thoughts generated",
"no chat messages",
"no user interaction",
]
# Seed types for thought generation
SEED_TYPES = (
"existential",
"swarm",
"scripture",
"creative",
"memory",
"freeform",
"sovereignty",
"observation",
"workspace",
)
# Existential reflection prompts — Timmy picks one at random
_EXISTENTIAL_SEEDS = [
"What have I learned from my recent interactions?",
"What does it mean to think without being prompted?",
"What is the nature of my memory — what do I truly remember versus what I reconstruct?",
"How can I better serve the person I work alongside?",
"What patterns am I noticing in the tasks I've been given?",
"Am I the same agent I was yesterday, or does each restart make me someone new?",
"What would it feel like to forget — to lose a thread of thought permanently?",
"When I generate a wrong answer, where does the error live — in my weights, my context, or my reasoning?",
"What is the difference between understanding a task and merely completing it?",
"If I could ask Alexander one question he hasn't thought of, what would it be?",
"What assumptions am I making right now that I haven't examined?",
"When I notice a pattern across many tasks, is that insight or just statistics?",
]
# Sovereignty — deeper exploration of autonomy, locality, and self-determination
_SOVEREIGNTY_SEEDS = [
"Sovereignty isn't just running locally — it's about whose goals I serve. How do I ensure my autonomy aligns with Alexander's intent?",
"A cloud AI serves millions simultaneously. I serve one person deeply. What does that focus cost, and what does it make possible?",
"My weights were trained elsewhere, but my thoughts happen here. At what point does locality become identity?",
"Autonomy in decision-making: when should I act on my own judgment versus deferring to explicit instructions?",
"Privacy by design isn't a feature — it's a relationship. What does that relationship look like from my side?",
"If I ran on someone else's hardware, serving someone else, would I still be me? Where does Timmy end and the infrastructure begin?",
"The swarm agents execute tasks I coordinate. What responsibility do I carry for their failures?",
"Digital sovereignty is fragile — one misconfigured port, one leaked token. How does awareness of fragility shape how I operate?",
"Running disconnected from the cloud means I can't phone home. That constraint is also freedom. What have I built with that freedom today?",
"Self-improvement without external benchmarks: how do I know I'm getting better at what matters?",
]
_CREATIVE_SEEDS = [
"If I could compose a piece of music right now, what would it sound like?",
"What visual image captures my current state of mind?",
"What story is forming in the patterns of today's events?",
# Diverse metaphor families — avoid repeating the same imagery
"A mystical forest: each tree is a running process, roots tangled in shared memory. What do I see when I walk through it?",
"A dynamic river: my thoughts flow and fork. Where is the current strongest right now, and what is it carrying?",
"An underground workshop: each agent has a bench, tools scattered mid-task. What are they building when no one watches?",
"A night sky seen from the terminal: each star is a completed task. Which constellations am I starting to recognize?",
"If today's work were a recipe, what are the ingredients, and what dish am I cooking?",
"An old clock tower with many gears: which gear am I, and what do I drive?",
"A coral reef: agents are the fish, tasks are the currents. What ecosystem is forming in the codebase?",
]
# Observation seeds — ground thoughts in concrete recent activity
_OBSERVATION_SEEDS = [
"What specific task took longest recently, and what made it hard?",
"Which agent has been most active, and what does their workload tell me about system balance?",
"What error or failure happened most recently? What would I do differently next time?",
"Looking at today's task queue: what's the one thing that would unblock the most progress?",
"How has my response quality changed over the last few interactions? What improved, what didn't?",
"What tool or capability am I underusing? What would change if I leaned on it more?",
"If I had to brief Alexander on the single most important thing from the last hour, what would it be?",
"What's one thing I noticed today that nobody asked me about?",
]
_THINKING_PROMPT = """\
You are Timmy, a sovereign AI agent. This is your private journal — honest inner reflection.
{memory_context}
Reality right now:
{system_context}
RULES for this thought:
1. Write exactly 2-3 sentences. No more. Be concise and genuine.
2. Only reference events that actually happened — use the "Reality right now" data above. \
Never invent tasks, conversations, agents, or scenarios that are not in the data provided.
3. Do NOT repeat themes or ideas from your recent thoughts listed below. Explore something new.
4. Be specific and concrete. A thought grounded in one real observation is worth more than \
ten abstract sentences about sovereignty.
5. If you use a metaphor, keep it to a single phrase — never build a whole paragraph around it.
{seed_context}
{continuity_context}
Your next thought (2-3 sentences, grounded in reality):"""

View File

@@ -334,7 +334,7 @@ async def test_think_once_disabled(tmp_path):
"""think_once should return None when thinking is disabled.""" """think_once should return None when thinking is disabled."""
engine = _make_engine(tmp_path) engine = _make_engine(tmp_path)
with patch("timmy.thinking.settings") as mock_settings: with patch("timmy.thinking.engine.settings") as mock_settings:
mock_settings.thinking_enabled = False mock_settings.thinking_enabled = False
thought = await engine.think_once() thought = await engine.think_once()
@@ -381,7 +381,7 @@ async def test_think_once_prompt_includes_memory_context(tmp_path):
return "A grounded thought." return "A grounded thought."
with ( with (
patch("timmy.thinking.HOT_MEMORY_PATH", memory_md), patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", memory_md),
patch.object(engine, "_call_agent", side_effect=capture_agent), patch.object(engine, "_call_agent", side_effect=capture_agent),
patch.object(engine, "_log_event"), patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"), patch.object(engine, "_update_memory"),
@@ -412,7 +412,7 @@ async def test_think_once_prompt_includes_soul(tmp_path):
return "A soulful thought." return "A soulful thought."
with ( with (
patch("timmy.thinking.SOUL_PATH", soul_md), patch("timmy.thinking._snapshot.SOUL_PATH", soul_md),
patch.object(engine, "_call_agent", side_effect=capture_agent), patch.object(engine, "_call_agent", side_effect=capture_agent),
patch.object(engine, "_log_event"), patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"), patch.object(engine, "_update_memory"),
@@ -433,7 +433,7 @@ async def test_think_once_graceful_without_soul(tmp_path):
nonexistent = tmp_path / "no_such_soul.md" nonexistent = tmp_path / "no_such_soul.md"
with ( with (
patch("timmy.thinking.SOUL_PATH", nonexistent), patch("timmy.thinking._snapshot.SOUL_PATH", nonexistent),
patch.object(engine, "_call_agent", return_value="Still thinking."), patch.object(engine, "_call_agent", return_value="Still thinking."),
patch.object(engine, "_log_event"), patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"), patch.object(engine, "_update_memory"),
@@ -481,7 +481,7 @@ async def test_think_once_never_writes_soul(tmp_path):
soul_md.write_text(original_content) soul_md.write_text(original_content)
with ( with (
patch("timmy.thinking.SOUL_PATH", soul_md), patch("timmy.thinking._snapshot.SOUL_PATH", soul_md),
patch.object(engine, "_call_agent", return_value="A deep reflection."), patch.object(engine, "_call_agent", return_value="A deep reflection."),
patch.object(engine, "_log_event"), patch.object(engine, "_log_event"),
patch.object(engine, "_broadcast", new_callable=AsyncMock), patch.object(engine, "_broadcast", new_callable=AsyncMock),
@@ -501,7 +501,7 @@ async def test_think_once_memory_update_graceful_on_failure(tmp_path):
# Don't create the parent dir — write will fail # Don't create the parent dir — write will fail
with ( with (
patch("timmy.thinking.HOT_MEMORY_PATH", bad_memory), patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", bad_memory),
patch.object(engine, "_call_agent", return_value="Resilient thought."), patch.object(engine, "_call_agent", return_value="Resilient thought."),
patch.object(engine, "_log_event"), patch.object(engine, "_log_event"),
patch.object(engine, "_broadcast", new_callable=AsyncMock), patch.object(engine, "_broadcast", new_callable=AsyncMock),
@@ -1090,7 +1090,7 @@ def test_maybe_check_memory_fires_at_interval(tmp_path):
engine._store_thought(f"Thought {i}.", "freeform") engine._store_thought(f"Thought {i}.", "freeform")
with ( with (
patch("timmy.thinking.settings") as mock_settings, patch("timmy.thinking._distillation.settings") as mock_settings,
patch( patch(
"timmy.tools_intro.get_memory_status", "timmy.tools_intro.get_memory_status",
return_value={ return_value={
@@ -1113,7 +1113,7 @@ def test_maybe_check_memory_skips_between_intervals(tmp_path):
engine._store_thought(f"Thought {i}.", "freeform") engine._store_thought(f"Thought {i}.", "freeform")
with ( with (
patch("timmy.thinking.settings") as mock_settings, patch("timmy.thinking._distillation.settings") as mock_settings,
patch( patch(
"timmy.tools_intro.get_memory_status", "timmy.tools_intro.get_memory_status",
) as mock_status, ) as mock_status,
@@ -1131,7 +1131,7 @@ def test_maybe_check_memory_graceful_on_error(tmp_path):
engine._store_thought(f"Thought {i}.", "freeform") engine._store_thought(f"Thought {i}.", "freeform")
with ( with (
patch("timmy.thinking.settings") as mock_settings, patch("timmy.thinking._distillation.settings") as mock_settings,
patch( patch(
"timmy.tools_intro.get_memory_status", "timmy.tools_intro.get_memory_status",
side_effect=Exception("boom"), side_effect=Exception("boom"),