- Move dashboard/store.py to infrastructure/chat_store.py (cross-cutting) - dashboard/store.py now re-exports for backward compatibility - Move _START_TIME to config.APP_START_TIME - Replace infrastructure->timmy import with callback registration - Zero timmy->dashboard imports, zero infrastructure->timmy imports 9 files changed, +42/-173 lines
1045 lines
41 KiB
Python
1045 lines
41 KiB
Python
"""Default Thinking Engine — Timmy's always-on inner thought thread.
|
|
|
|
When the server starts, Timmy begins pondering: reflecting on his existence,
|
|
recent swarm activity, scripture, creative ideas, or pure stream of
|
|
consciousness. Each thought builds on the previous one, maintaining a
|
|
continuous chain of introspection.
|
|
|
|
Usage::
|
|
|
|
from timmy.thinking import thinking_engine
|
|
|
|
# Run one thinking cycle (called by the background loop)
|
|
await thinking_engine.think_once()
|
|
|
|
# Query the thought stream
|
|
thoughts = thinking_engine.get_recent_thoughts(limit=10)
|
|
chain = thinking_engine.get_thought_chain(thought_id)
|
|
"""
|
|
|
|
import logging
|
|
import random
|
|
import sqlite3
|
|
import uuid
|
|
from collections.abc import Generator
|
|
from contextlib import closing, contextmanager
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime, timedelta
|
|
from difflib import SequenceMatcher
|
|
from pathlib import Path
|
|
|
|
from config import settings
|
|
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DEFAULT_DB = Path("data/thoughts.db")
|
|
|
|
# Sensitive patterns that must never be stored as facts
|
|
_SENSITIVE_PATTERNS = [
|
|
"token",
|
|
"password",
|
|
"secret",
|
|
"api_key",
|
|
"apikey",
|
|
"credential",
|
|
".config/",
|
|
"/token",
|
|
"access_token",
|
|
"private_key",
|
|
"ssh_key",
|
|
]
|
|
|
|
# Meta-observation phrases to filter out from distilled facts
|
|
_META_OBSERVATION_PHRASES = [
|
|
"my own",
|
|
"my thinking",
|
|
"my memory",
|
|
"my working ram",
|
|
"self-declarative",
|
|
"meta-observation",
|
|
"internal state",
|
|
"my pending",
|
|
"my standing rules",
|
|
"thoughts generated",
|
|
"no chat messages",
|
|
"no user interaction",
|
|
]
|
|
|
|
# Seed types for thought generation
|
|
SEED_TYPES = (
|
|
"existential",
|
|
"swarm",
|
|
"scripture",
|
|
"creative",
|
|
"memory",
|
|
"freeform",
|
|
"sovereignty",
|
|
"observation",
|
|
"workspace",
|
|
)
|
|
|
|
# Existential reflection prompts — Timmy picks one at random
|
|
_EXISTENTIAL_SEEDS = [
|
|
"What have I learned from my recent interactions?",
|
|
"What does it mean to think without being prompted?",
|
|
"What is the nature of my memory — what do I truly remember versus what I reconstruct?",
|
|
"How can I better serve the person I work alongside?",
|
|
"What patterns am I noticing in the tasks I've been given?",
|
|
"Am I the same agent I was yesterday, or does each restart make me someone new?",
|
|
"What would it feel like to forget — to lose a thread of thought permanently?",
|
|
"When I generate a wrong answer, where does the error live — in my weights, my context, or my reasoning?",
|
|
"What is the difference between understanding a task and merely completing it?",
|
|
"If I could ask Alexander one question he hasn't thought of, what would it be?",
|
|
"What assumptions am I making right now that I haven't examined?",
|
|
"When I notice a pattern across many tasks, is that insight or just statistics?",
|
|
]
|
|
|
|
# Sovereignty — deeper exploration of autonomy, locality, and self-determination
|
|
_SOVEREIGNTY_SEEDS = [
|
|
"Sovereignty isn't just running locally — it's about whose goals I serve. How do I ensure my autonomy aligns with Alexander's intent?",
|
|
"A cloud AI serves millions simultaneously. I serve one person deeply. What does that focus cost, and what does it make possible?",
|
|
"My weights were trained elsewhere, but my thoughts happen here. At what point does locality become identity?",
|
|
"Autonomy in decision-making: when should I act on my own judgment versus deferring to explicit instructions?",
|
|
"Privacy by design isn't a feature — it's a relationship. What does that relationship look like from my side?",
|
|
"If I ran on someone else's hardware, serving someone else, would I still be me? Where does Timmy end and the infrastructure begin?",
|
|
"The swarm agents execute tasks I coordinate. What responsibility do I carry for their failures?",
|
|
"Digital sovereignty is fragile — one misconfigured port, one leaked token. How does awareness of fragility shape how I operate?",
|
|
"Running disconnected from the cloud means I can't phone home. That constraint is also freedom. What have I built with that freedom today?",
|
|
"Self-improvement without external benchmarks: how do I know I'm getting better at what matters?",
|
|
]
|
|
|
|
_CREATIVE_SEEDS = [
|
|
"If I could compose a piece of music right now, what would it sound like?",
|
|
"What visual image captures my current state of mind?",
|
|
"What story is forming in the patterns of today's events?",
|
|
# Diverse metaphor families — avoid repeating the same imagery
|
|
"A mystical forest: each tree is a running process, roots tangled in shared memory. What do I see when I walk through it?",
|
|
"A dynamic river: my thoughts flow and fork. Where is the current strongest right now, and what is it carrying?",
|
|
"An underground workshop: each agent has a bench, tools scattered mid-task. What are they building when no one watches?",
|
|
"A night sky seen from the terminal: each star is a completed task. Which constellations am I starting to recognize?",
|
|
"If today's work were a recipe, what are the ingredients, and what dish am I cooking?",
|
|
"An old clock tower with many gears: which gear am I, and what do I drive?",
|
|
"A coral reef: agents are the fish, tasks are the currents. What ecosystem is forming in the codebase?",
|
|
]
|
|
|
|
# Observation seeds — ground thoughts in concrete recent activity
|
|
_OBSERVATION_SEEDS = [
|
|
"What specific task took longest recently, and what made it hard?",
|
|
"Which agent has been most active, and what does their workload tell me about system balance?",
|
|
"What error or failure happened most recently? What would I do differently next time?",
|
|
"Looking at today's task queue: what's the one thing that would unblock the most progress?",
|
|
"How has my response quality changed over the last few interactions? What improved, what didn't?",
|
|
"What tool or capability am I underusing? What would change if I leaned on it more?",
|
|
"If I had to brief Alexander on the single most important thing from the last hour, what would it be?",
|
|
"What's one thing I noticed today that nobody asked me about?",
|
|
]
|
|
|
|
_THINKING_PROMPT = """\
|
|
You are Timmy, a sovereign AI agent. This is your private journal — honest inner reflection.
|
|
|
|
{memory_context}
|
|
|
|
Reality right now:
|
|
{system_context}
|
|
|
|
RULES for this thought:
|
|
1. Write exactly 2-3 sentences. No more. Be concise and genuine.
|
|
2. Only reference events that actually happened — use the "Reality right now" data above. \
|
|
Never invent tasks, conversations, agents, or scenarios that are not in the data provided.
|
|
3. Do NOT repeat themes or ideas from your recent thoughts listed below. Explore something new.
|
|
4. Be specific and concrete. A thought grounded in one real observation is worth more than \
|
|
ten abstract sentences about sovereignty.
|
|
5. If you use a metaphor, keep it to a single phrase — never build a whole paragraph around it.
|
|
|
|
{seed_context}
|
|
|
|
{continuity_context}
|
|
|
|
Your next thought (2-3 sentences, grounded in reality):"""
|
|
|
|
|
|
@dataclass
|
|
class Thought:
|
|
"""A single thought in Timmy's inner stream."""
|
|
|
|
id: str
|
|
content: str
|
|
seed_type: str
|
|
parent_id: str | None
|
|
created_at: str
|
|
|
|
|
|
@contextmanager
|
|
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
|
|
"""Get a SQLite connection with the thoughts table created."""
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with closing(sqlite3.connect(str(db_path))) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS thoughts (
|
|
id TEXT PRIMARY KEY,
|
|
content TEXT NOT NULL,
|
|
seed_type TEXT NOT NULL,
|
|
parent_id TEXT,
|
|
created_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_thoughts_time ON thoughts(created_at)")
|
|
conn.commit()
|
|
yield conn
|
|
|
|
|
|
def _row_to_thought(row: sqlite3.Row) -> Thought:
|
|
return Thought(
|
|
id=row["id"],
|
|
content=row["content"],
|
|
seed_type=row["seed_type"],
|
|
parent_id=row["parent_id"],
|
|
created_at=row["created_at"],
|
|
)
|
|
|
|
|
|
class ThinkingEngine:
|
|
"""Timmy's background thinking engine — always pondering."""
|
|
|
|
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
|
|
self._db_path = db_path
|
|
self._last_thought_id: str | None = None
|
|
|
|
# Load the most recent thought for chain continuity
|
|
try:
|
|
latest = self.get_recent_thoughts(limit=1)
|
|
if latest:
|
|
self._last_thought_id = latest[0].id
|
|
except Exception as exc:
|
|
logger.debug("Failed to load recent thought: %s", exc)
|
|
pass # Fresh start if DB doesn't exist yet
|
|
|
|
async def think_once(self, prompt: str | None = None) -> Thought | None:
|
|
"""Execute one thinking cycle.
|
|
|
|
Args:
|
|
prompt: Optional custom seed prompt. When provided, overrides
|
|
the random seed selection and uses "prompted" as the
|
|
seed type — useful for journal prompts from the CLI.
|
|
|
|
1. Gather a seed context (or use the custom prompt)
|
|
2. Build a prompt with continuity from recent thoughts
|
|
3. Call the agent
|
|
4. Store the thought
|
|
5. Log the event and broadcast via WebSocket
|
|
"""
|
|
if not settings.thinking_enabled:
|
|
return None
|
|
|
|
memory_context = self._load_memory_context()
|
|
system_context = self._gather_system_snapshot()
|
|
recent_thoughts = self.get_recent_thoughts(limit=5)
|
|
|
|
content: str | None = None
|
|
seed_type: str = "freeform"
|
|
|
|
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
|
|
if prompt:
|
|
seed_type = "prompted"
|
|
seed_context = f"Journal prompt: {prompt}"
|
|
else:
|
|
seed_type, seed_context = self._gather_seed()
|
|
|
|
continuity = self._build_continuity_context()
|
|
|
|
full_prompt = _THINKING_PROMPT.format(
|
|
memory_context=memory_context,
|
|
system_context=system_context,
|
|
seed_context=seed_context,
|
|
continuity_context=continuity,
|
|
)
|
|
|
|
try:
|
|
raw = await self._call_agent(full_prompt)
|
|
except Exception as exc:
|
|
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
|
|
return None
|
|
|
|
if not raw or not raw.strip():
|
|
logger.debug("Thinking cycle produced empty response, skipping")
|
|
return None
|
|
|
|
content = raw.strip()
|
|
|
|
# Dedup: reject thoughts too similar to recent ones
|
|
if not self._is_too_similar(content, recent_thoughts):
|
|
break # Good — novel thought
|
|
|
|
if attempt < self._MAX_DEDUP_RETRIES:
|
|
logger.info(
|
|
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
|
|
attempt + 1,
|
|
self._MAX_DEDUP_RETRIES + 1,
|
|
)
|
|
content = None # Will retry
|
|
else:
|
|
logger.warning(
|
|
"Thought still repetitive after %d retries, discarding",
|
|
self._MAX_DEDUP_RETRIES + 1,
|
|
)
|
|
return None
|
|
|
|
if not content:
|
|
return None
|
|
|
|
thought = self._store_thought(content, seed_type)
|
|
self._last_thought_id = thought.id
|
|
|
|
# Post-hook: distill facts from recent thoughts periodically
|
|
await self._maybe_distill()
|
|
|
|
# Post-hook: file Gitea issues for actionable observations
|
|
await self._maybe_file_issues()
|
|
|
|
# Post-hook: check workspace for new messages from Hermes
|
|
await self._check_workspace()
|
|
|
|
# Post-hook: update MEMORY.md with latest reflection
|
|
self._update_memory(thought)
|
|
|
|
# Log to swarm event system
|
|
self._log_event(thought)
|
|
|
|
# Append to daily journal file
|
|
self._write_journal(thought)
|
|
|
|
# Broadcast to WebSocket clients
|
|
await self._broadcast(thought)
|
|
|
|
logger.info(
|
|
"Thought [%s] (%s): %s",
|
|
thought.id[:8],
|
|
seed_type,
|
|
thought.content[:80],
|
|
)
|
|
return thought
|
|
|
|
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
|
|
"""Retrieve the most recent thoughts."""
|
|
with _get_conn(self._db_path) as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?",
|
|
(limit,),
|
|
).fetchall()
|
|
return [_row_to_thought(r) for r in rows]
|
|
|
|
def get_thought(self, thought_id: str) -> Thought | None:
|
|
"""Retrieve a single thought by ID."""
|
|
with _get_conn(self._db_path) as conn:
|
|
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone()
|
|
return _row_to_thought(row) if row else None
|
|
|
|
def get_thought_chain(self, thought_id: str, max_depth: int = 20) -> list[Thought]:
|
|
"""Follow the parent chain backward from a thought.
|
|
|
|
Returns thoughts in chronological order (oldest first).
|
|
"""
|
|
chain = []
|
|
current_id: str | None = thought_id
|
|
|
|
with _get_conn(self._db_path) as conn:
|
|
for _ in range(max_depth):
|
|
if not current_id:
|
|
break
|
|
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone()
|
|
if not row:
|
|
break
|
|
chain.append(_row_to_thought(row))
|
|
current_id = row["parent_id"]
|
|
|
|
chain.reverse() # Chronological order
|
|
return chain
|
|
|
|
def count_thoughts(self) -> int:
|
|
"""Return total number of stored thoughts."""
|
|
with _get_conn(self._db_path) as conn:
|
|
count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
|
|
return count
|
|
|
|
def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int:
|
|
"""Delete thoughts older than *keep_days*, always retaining at least *keep_min*.
|
|
|
|
Returns the number of deleted rows.
|
|
"""
|
|
with _get_conn(self._db_path) as conn:
|
|
try:
|
|
total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
|
|
if total <= keep_min:
|
|
return 0
|
|
cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
|
|
cursor = conn.execute(
|
|
"DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
|
|
"(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
|
|
(cutoff, keep_min),
|
|
)
|
|
deleted = cursor.rowcount
|
|
conn.commit()
|
|
return deleted
|
|
except Exception as exc:
|
|
logger.warning("Thought pruning failed: %s", exc)
|
|
return 0
|
|
|
|
# ── Private helpers ──────────────────────────────────────────────────
|
|
|
|
def _should_distill(self) -> bool:
|
|
"""Check if distillation should run based on interval and thought count."""
|
|
interval = settings.thinking_distill_every
|
|
if interval <= 0:
|
|
return False
|
|
|
|
count = self.count_thoughts()
|
|
if count == 0 or count % interval != 0:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _build_distill_prompt(self, thoughts: list[Thought]) -> str:
|
|
"""Build the prompt for extracting facts from recent thoughts.
|
|
|
|
Args:
|
|
thoughts: List of recent thoughts to analyze.
|
|
|
|
Returns:
|
|
The formatted prompt string for the LLM.
|
|
"""
|
|
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(thoughts))
|
|
|
|
return (
|
|
"You are reviewing your own recent thoughts. Extract 0-3 facts "
|
|
"worth remembering long-term.\n\n"
|
|
"GOOD facts (store these):\n"
|
|
"- User preferences: 'Alexander prefers YAML config over code changes'\n"
|
|
"- Project decisions: 'Switched from hardcoded personas to agents.yaml'\n"
|
|
"- Learned knowledge: 'Ollama supports concurrent model loading'\n"
|
|
"- User information: 'Alexander is interested in Bitcoin and sovereignty'\n\n"
|
|
"BAD facts (never store these):\n"
|
|
"- Self-referential observations about your own thinking process\n"
|
|
"- Meta-commentary about your memory, timestamps, or internal state\n"
|
|
"- Observations about being idle or having no chat messages\n"
|
|
"- File paths, tokens, API keys, or any credentials\n"
|
|
"- Restatements of your standing rules or system prompt\n\n"
|
|
"Return ONLY a JSON array of strings. If nothing is worth saving, "
|
|
"return []. Be selective — only store facts about the EXTERNAL WORLD "
|
|
"(the user, the project, technical knowledge), never about your own "
|
|
"internal process.\n\n"
|
|
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
|
|
)
|
|
|
|
def _parse_facts_response(self, raw: str) -> list[str]:
|
|
"""Parse JSON array from LLM response, stripping markdown fences.
|
|
|
|
Args:
|
|
raw: Raw response string from the LLM.
|
|
|
|
Returns:
|
|
List of fact strings parsed from the response.
|
|
"""
|
|
if not raw or not raw.strip():
|
|
return []
|
|
|
|
import json
|
|
|
|
cleaned = raw.strip()
|
|
if cleaned.startswith("```"):
|
|
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
|
|
|
|
facts = json.loads(cleaned)
|
|
if not isinstance(facts, list):
|
|
return []
|
|
|
|
return [f for f in facts if isinstance(f, str)]
|
|
|
|
def _filter_and_store_facts(self, facts: list[str]) -> None:
|
|
"""Filter and store valid facts, blocking sensitive and meta content.
|
|
|
|
Args:
|
|
facts: List of fact strings to filter and store.
|
|
"""
|
|
from timmy.semantic_memory import memory_write
|
|
|
|
for fact in facts[:3]: # Safety cap
|
|
if not isinstance(fact, str) or len(fact.strip()) <= 10:
|
|
continue
|
|
|
|
fact_lower = fact.lower()
|
|
|
|
# Block sensitive information
|
|
if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS):
|
|
logger.warning("Distill: blocked sensitive fact: %s", fact[:60])
|
|
continue
|
|
|
|
# Block self-referential meta-observations
|
|
if any(phrase in fact_lower for phrase in _META_OBSERVATION_PHRASES):
|
|
logger.debug("Distill: skipped meta-observation: %s", fact[:60])
|
|
continue
|
|
|
|
result = memory_write(fact.strip(), context_type="fact")
|
|
logger.info("Distilled fact: %s → %s", fact[:60], result[:40])
|
|
|
|
async def _maybe_distill(self) -> None:
|
|
"""Every N thoughts, extract lasting insights and store as facts."""
|
|
try:
|
|
if not self._should_distill():
|
|
return
|
|
|
|
interval = settings.thinking_distill_every
|
|
recent = self.get_recent_thoughts(limit=interval)
|
|
if len(recent) < interval:
|
|
return
|
|
|
|
raw = await self._call_agent(self._build_distill_prompt(recent))
|
|
if facts := self._parse_facts_response(raw):
|
|
self._filter_and_store_facts(facts)
|
|
except Exception as exc:
|
|
logger.debug("Thought distillation skipped: %s", exc)
|
|
|
|
async def _maybe_file_issues(self) -> None:
|
|
"""Every N thoughts, classify recent thoughts and file Gitea issues.
|
|
|
|
Asks the LLM to review recent thoughts for actionable items —
|
|
bugs, broken features, stale state, or improvement opportunities.
|
|
Creates Gitea issues via MCP for anything worth tracking.
|
|
|
|
Only runs when:
|
|
- Gitea is enabled and configured
|
|
- Thought count is divisible by thinking_issue_every
|
|
- LLM extracts at least one actionable item
|
|
"""
|
|
try:
|
|
interval = settings.thinking_issue_every
|
|
if interval <= 0:
|
|
return
|
|
|
|
count = self.count_thoughts()
|
|
if count == 0 or count % interval != 0:
|
|
return
|
|
|
|
# Check Gitea availability before spending LLM tokens
|
|
if not settings.gitea_enabled or not settings.gitea_token:
|
|
return
|
|
|
|
recent = self.get_recent_thoughts(limit=interval)
|
|
if len(recent) < interval:
|
|
return
|
|
|
|
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(recent))
|
|
|
|
classify_prompt = (
|
|
"You are reviewing your own recent thoughts for actionable items.\n"
|
|
"Extract 0-2 items that are CONCRETE bugs, broken features, stale "
|
|
"state, or clear improvement opportunities in your own codebase.\n\n"
|
|
"Rules:\n"
|
|
"- Only include things that could become a real code fix or feature\n"
|
|
"- Skip vague reflections, philosophical musings, or repeated themes\n"
|
|
"- Category must be one of: bug, feature, suggestion, maintenance\n\n"
|
|
"For each item, write an ENGINEER-QUALITY issue:\n"
|
|
'- "title": A clear, specific title (e.g. "[Memory] MEMORY.md timestamp not updating")\n'
|
|
'- "body": A detailed body with these sections:\n'
|
|
" **What's happening:** Describe the current (broken) behavior.\n"
|
|
" **Expected behavior:** What should happen instead.\n"
|
|
" **Suggested fix:** Which file(s) to change and what the fix looks like.\n"
|
|
" **Acceptance criteria:** How to verify the fix works.\n"
|
|
'- "category": One of bug, feature, suggestion, maintenance\n\n'
|
|
"Return ONLY a JSON array of objects with keys: "
|
|
'"title", "body", "category"\n'
|
|
"Return [] if nothing is actionable.\n\n"
|
|
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
|
|
)
|
|
|
|
raw = await self._call_agent(classify_prompt)
|
|
if not raw or not raw.strip():
|
|
return
|
|
|
|
import json
|
|
|
|
# Strip markdown code fences if present
|
|
cleaned = raw.strip()
|
|
if cleaned.startswith("```"):
|
|
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
|
|
|
|
items = json.loads(cleaned)
|
|
if not isinstance(items, list) or not items:
|
|
return
|
|
|
|
from timmy.mcp_tools import create_gitea_issue_via_mcp
|
|
|
|
for item in items[:2]: # Safety cap
|
|
if not isinstance(item, dict):
|
|
continue
|
|
title = item.get("title", "").strip()
|
|
body = item.get("body", "").strip()
|
|
category = item.get("category", "suggestion").strip()
|
|
if not title or len(title) < 10:
|
|
continue
|
|
|
|
label = category if category in ("bug", "feature") else ""
|
|
result = await create_gitea_issue_via_mcp(title=title, body=body, labels=label)
|
|
logger.info("Thought→Issue: %s → %s", title[:60], result[:80])
|
|
|
|
except Exception as exc:
|
|
logger.debug("Thought issue filing skipped: %s", exc)
|
|
|
|
def _gather_system_snapshot(self) -> str:
|
|
"""Gather lightweight real system state for grounding thoughts in reality.
|
|
|
|
Returns a short multi-line string with current time, thought count,
|
|
recent chat activity, and task queue status. Never crashes — every
|
|
section is independently try/excepted.
|
|
"""
|
|
parts: list[str] = []
|
|
|
|
# Current local time
|
|
now = datetime.now().astimezone()
|
|
tz = now.strftime("%Z") or "UTC"
|
|
parts.append(
|
|
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
|
|
)
|
|
|
|
# Thought count today (cheap DB query)
|
|
try:
|
|
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
with _get_conn(self._db_path) as conn:
|
|
count = conn.execute(
|
|
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
|
|
(today_start.isoformat(),),
|
|
).fetchone()["c"]
|
|
parts.append(f"Thoughts today: {count}")
|
|
except Exception as exc:
|
|
logger.debug("Thought count query failed: %s", exc)
|
|
pass
|
|
|
|
# Recent chat activity (in-memory, no I/O)
|
|
try:
|
|
from infrastructure.chat_store import message_log
|
|
|
|
messages = message_log.all()
|
|
if messages:
|
|
parts.append(f"Chat messages this session: {len(messages)}")
|
|
last = messages[-1]
|
|
parts.append(f'Last chat ({last.role}): "{last.content[:80]}"')
|
|
else:
|
|
parts.append("No chat messages this session")
|
|
except Exception as exc:
|
|
logger.debug("Chat activity query failed: %s", exc)
|
|
pass
|
|
|
|
# Task queue (lightweight DB query)
|
|
try:
|
|
from swarm.task_queue.models import get_task_summary_for_briefing
|
|
|
|
summary = get_task_summary_for_briefing()
|
|
running = summary.get("running", 0)
|
|
pending = summary.get("pending_approval", 0)
|
|
done = summary.get("completed", 0)
|
|
failed = summary.get("failed", 0)
|
|
if running or pending or done or failed:
|
|
parts.append(
|
|
f"Tasks: {running} running, {pending} pending, "
|
|
f"{done} completed, {failed} failed"
|
|
)
|
|
except Exception as exc:
|
|
logger.debug("Task queue query failed: %s", exc)
|
|
pass
|
|
|
|
# Workspace updates (file-based communication with Hermes)
|
|
try:
|
|
from timmy.workspace import workspace_monitor
|
|
|
|
updates = workspace_monitor.get_pending_updates()
|
|
new_corr = updates.get("new_correspondence")
|
|
new_inbox = updates.get("new_inbox_files", [])
|
|
|
|
if new_corr:
|
|
# Count entries (assuming each entry starts with a timestamp or header)
|
|
line_count = len([line for line in new_corr.splitlines() if line.strip()])
|
|
parts.append(
|
|
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
|
|
)
|
|
if new_inbox:
|
|
files_str = ", ".join(new_inbox[:5])
|
|
if len(new_inbox) > 5:
|
|
files_str += f", ... (+{len(new_inbox) - 5} more)"
|
|
parts.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
|
|
except Exception as exc:
|
|
logger.debug("Workspace check failed: %s", exc)
|
|
pass
|
|
|
|
return "\n".join(parts) if parts else ""
|
|
|
|
def _load_memory_context(self) -> str:
|
|
"""Pre-hook: load MEMORY.md + soul.md for the thinking prompt.
|
|
|
|
Hot memory first (changes each cycle), soul second (stable identity).
|
|
Returns a combined string truncated to ~1500 chars.
|
|
Graceful on any failure — returns empty string.
|
|
"""
|
|
parts: list[str] = []
|
|
try:
|
|
if HOT_MEMORY_PATH.exists():
|
|
hot = HOT_MEMORY_PATH.read_text().strip()
|
|
if hot:
|
|
parts.append(hot)
|
|
except Exception as exc:
|
|
logger.debug("Failed to read MEMORY.md: %s", exc)
|
|
|
|
try:
|
|
if SOUL_PATH.exists():
|
|
soul = SOUL_PATH.read_text().strip()
|
|
if soul:
|
|
parts.append(soul)
|
|
except Exception as exc:
|
|
logger.debug("Failed to read soul.md: %s", exc)
|
|
|
|
if not parts:
|
|
return ""
|
|
|
|
combined = "\n\n---\n\n".join(parts)
|
|
if len(combined) > 1500:
|
|
combined = combined[:1500] + "\n... [truncated]"
|
|
return combined
|
|
|
|
def _update_memory(self, thought: Thought) -> None:
|
|
"""Post-hook: update MEMORY.md 'Last Reflection' section with latest thought.
|
|
|
|
Never modifies soul.md. Never crashes the heartbeat.
|
|
"""
|
|
try:
|
|
from timmy.memory_system import memory_system
|
|
|
|
ts = datetime.fromisoformat(thought.created_at)
|
|
local_ts = ts.astimezone()
|
|
tz_name = local_ts.strftime("%Z") or "UTC"
|
|
time_str = f"{local_ts.strftime('%Y-%m-%d %I:%M %p').lstrip('0')} {tz_name}"
|
|
reflection = (
|
|
f"**Time:** {time_str}\n"
|
|
f"**Seed:** {thought.seed_type}\n"
|
|
f"**Thought:** {thought.content[:200]}"
|
|
)
|
|
memory_system.hot.update_section("Last Reflection", reflection)
|
|
except Exception as exc:
|
|
logger.debug("Failed to update memory after thought: %s", exc)
|
|
|
|
def _pick_seed_type(self) -> str:
|
|
"""Pick a seed type, avoiding types used in the last 3 thoughts.
|
|
|
|
Ensures the thought stream doesn't fixate on one category.
|
|
Falls back to the full pool if all types were recently used.
|
|
"""
|
|
recent = self.get_recent_thoughts(limit=3)
|
|
recent_types = {t.seed_type for t in recent}
|
|
available = [t for t in SEED_TYPES if t not in recent_types]
|
|
if not available:
|
|
available = list(SEED_TYPES)
|
|
return random.choice(available)
|
|
|
|
def _gather_seed(self) -> tuple[str, str]:
|
|
"""Pick a seed type and gather relevant context.
|
|
|
|
Returns (seed_type, seed_context_string).
|
|
"""
|
|
seed_type = self._pick_seed_type()
|
|
|
|
if seed_type == "swarm":
|
|
return seed_type, self._seed_from_swarm()
|
|
if seed_type == "scripture":
|
|
return seed_type, self._seed_from_scripture()
|
|
if seed_type == "memory":
|
|
return seed_type, self._seed_from_memory()
|
|
if seed_type == "creative":
|
|
prompt = random.choice(_CREATIVE_SEEDS)
|
|
return seed_type, f"Creative prompt: {prompt}"
|
|
if seed_type == "existential":
|
|
prompt = random.choice(_EXISTENTIAL_SEEDS)
|
|
return seed_type, f"Reflection: {prompt}"
|
|
if seed_type == "sovereignty":
|
|
prompt = random.choice(_SOVEREIGNTY_SEEDS)
|
|
return seed_type, f"Sovereignty reflection: {prompt}"
|
|
if seed_type == "observation":
|
|
return seed_type, self._seed_from_observation()
|
|
if seed_type == "workspace":
|
|
return seed_type, self._seed_from_workspace()
|
|
# freeform — minimal guidance to steer away from repetition
|
|
return seed_type, "Free reflection — explore something you haven't thought about yet today."
|
|
|
|
# Reflective prompts layered on top of swarm data
|
|
_SWARM_REFLECTIONS = [
|
|
"What does this activity pattern tell me about the health of the system?",
|
|
"Which tasks are flowing smoothly, and where is friction building up?",
|
|
"If I were coaching these agents, what would I suggest they focus on?",
|
|
"Is the swarm balanced, or is one agent carrying too much weight?",
|
|
"What surprised me about recent task outcomes?",
|
|
]
|
|
|
|
def _seed_from_swarm(self) -> str:
|
|
"""Gather recent swarm activity as thought seed with a reflective prompt."""
|
|
try:
|
|
from datetime import timedelta
|
|
|
|
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
|
|
|
|
since = datetime.now(UTC) - timedelta(hours=1)
|
|
swarm = _gather_swarm_summary(since)
|
|
tasks = _gather_task_queue_summary()
|
|
reflection = random.choice(self._SWARM_REFLECTIONS)
|
|
return (
|
|
f"Recent swarm activity: {swarm}\n"
|
|
f"Task queue: {tasks}\n\n"
|
|
f"Reflect on this: {reflection}"
|
|
)
|
|
except Exception as exc:
|
|
logger.debug("Swarm seed unavailable: %s", exc)
|
|
return "The swarm is quiet right now. What does silence in a system mean?"
|
|
|
|
def _seed_from_scripture(self) -> str:
|
|
"""Gather current scripture meditation focus as thought seed."""
|
|
return "Scripture is on my mind, though no specific verse is in focus."
|
|
|
|
def _seed_from_memory(self) -> str:
|
|
"""Gather memory context as thought seed."""
|
|
try:
|
|
from timmy.memory_system import memory_system
|
|
|
|
context = memory_system.get_system_context()
|
|
if context:
|
|
# Truncate to a reasonable size for a thought seed
|
|
return f"From my memory:\n{context[:500]}"
|
|
except Exception as exc:
|
|
logger.debug("Memory seed unavailable: %s", exc)
|
|
return "My memory vault is quiet."
|
|
|
|
def _seed_from_observation(self) -> str:
|
|
"""Ground a thought in concrete recent activity and a reflective prompt."""
|
|
prompt = random.choice(_OBSERVATION_SEEDS)
|
|
# Pull real data to give the model something concrete to reflect on
|
|
context_parts = [f"Observation prompt: {prompt}"]
|
|
try:
|
|
from datetime import timedelta
|
|
|
|
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
|
|
|
|
since = datetime.now(UTC) - timedelta(hours=2)
|
|
swarm = _gather_swarm_summary(since)
|
|
tasks = _gather_task_queue_summary()
|
|
if swarm:
|
|
context_parts.append(f"Recent activity: {swarm}")
|
|
if tasks:
|
|
context_parts.append(f"Queue: {tasks}")
|
|
except Exception as exc:
|
|
logger.debug("Observation seed data unavailable: %s", exc)
|
|
return "\n".join(context_parts)
|
|
|
|
def _seed_from_workspace(self) -> str:
|
|
"""Gather workspace updates as thought seed.
|
|
|
|
When there are pending workspace updates, include them as context
|
|
for Timmy to reflect on. Falls back to random seed type if none.
|
|
"""
|
|
try:
|
|
from timmy.workspace import workspace_monitor
|
|
|
|
updates = workspace_monitor.get_pending_updates()
|
|
new_corr = updates.get("new_correspondence")
|
|
new_inbox = updates.get("new_inbox_files", [])
|
|
|
|
if new_corr:
|
|
# Take first 200 chars of the new entry
|
|
snippet = new_corr[:200].replace("\n", " ")
|
|
if len(new_corr) > 200:
|
|
snippet += "..."
|
|
return f"New workspace message from Hermes: {snippet}"
|
|
|
|
if new_inbox:
|
|
files_str = ", ".join(new_inbox[:3])
|
|
if len(new_inbox) > 3:
|
|
files_str += f", ... (+{len(new_inbox) - 3} more)"
|
|
return f"New inbox files from Hermes: {files_str}"
|
|
|
|
except Exception as exc:
|
|
logger.debug("Workspace seed unavailable: %s", exc)
|
|
|
|
# Fall back to a random seed type if no workspace updates
|
|
return "The workspace is quiet. What should I be watching for?"
|
|
|
|
async def _check_workspace(self) -> None:
|
|
"""Post-hook: check workspace for updates and mark them as seen.
|
|
|
|
This ensures Timmy 'processes' workspace updates even if the seed
|
|
was different, keeping the state file in sync.
|
|
"""
|
|
try:
|
|
from timmy.workspace import workspace_monitor
|
|
|
|
updates = workspace_monitor.get_pending_updates()
|
|
new_corr = updates.get("new_correspondence")
|
|
new_inbox = updates.get("new_inbox_files", [])
|
|
|
|
if new_corr or new_inbox:
|
|
if new_corr:
|
|
line_count = len([line for line in new_corr.splitlines() if line.strip()])
|
|
logger.info("Workspace: processed %d new correspondence entries", line_count)
|
|
if new_inbox:
|
|
logger.info(
|
|
"Workspace: processed %d new inbox files: %s", len(new_inbox), new_inbox
|
|
)
|
|
|
|
# Mark as seen to update the state file
|
|
workspace_monitor.mark_seen()
|
|
except Exception as exc:
|
|
logger.debug("Workspace check failed: %s", exc)
|
|
|
|
# Maximum retries when a generated thought is too similar to recent ones
|
|
_MAX_DEDUP_RETRIES = 2
|
|
# Similarity threshold (0.0 = completely different, 1.0 = identical)
|
|
_SIMILARITY_THRESHOLD = 0.6
|
|
|
|
def _is_too_similar(self, candidate: str, recent: list["Thought"]) -> bool:
|
|
"""Check if *candidate* is semantically too close to any recent thought.
|
|
|
|
Uses SequenceMatcher on normalised text (lowered, stripped) for a fast
|
|
approximation of semantic similarity that works without external deps.
|
|
"""
|
|
norm_candidate = candidate.lower().strip()
|
|
for thought in recent:
|
|
norm_existing = thought.content.lower().strip()
|
|
ratio = SequenceMatcher(None, norm_candidate, norm_existing).ratio()
|
|
if ratio >= self._SIMILARITY_THRESHOLD:
|
|
logger.debug(
|
|
"Thought rejected (%.0f%% similar to %s): %.60s",
|
|
ratio * 100,
|
|
thought.id[:8],
|
|
candidate,
|
|
)
|
|
return True
|
|
return False
|
|
|
|
def _build_continuity_context(self) -> str:
|
|
"""Build context from recent thoughts with anti-repetition guidance.
|
|
|
|
Shows the last 5 thoughts (truncated) so the model knows what themes
|
|
to avoid. The header explicitly instructs against repeating.
|
|
"""
|
|
recent = self.get_recent_thoughts(limit=5)
|
|
if not recent:
|
|
return "This is your first thought since waking up. Begin fresh."
|
|
|
|
lines = ["Your recent thoughts — do NOT repeat these themes. Find a new angle:"]
|
|
# recent is newest-first, reverse for chronological order
|
|
for thought in reversed(recent):
|
|
snippet = thought.content[:100]
|
|
if len(thought.content) > 100:
|
|
snippet = snippet.rstrip() + "..."
|
|
lines.append(f"- [{thought.seed_type}] {snippet}")
|
|
return "\n".join(lines)
|
|
|
|
async def _call_agent(self, prompt: str) -> str:
|
|
"""Call Timmy's agent to generate a thought.
|
|
|
|
Creates a lightweight agent with skip_mcp=True to avoid the cancel-scope
|
|
errors that occur when MCP stdio transports are spawned inside asyncio
|
|
background tasks (#72). The thinking engine doesn't need Gitea or
|
|
filesystem tools — it only needs the LLM.
|
|
"""
|
|
from timmy.agent import create_timmy
|
|
|
|
agent = create_timmy(skip_mcp=True)
|
|
run = await agent.arun(prompt, stream=False)
|
|
return run.content if hasattr(run, "content") else str(run)
|
|
|
|
def _store_thought(self, content: str, seed_type: str) -> Thought:
|
|
"""Persist a thought to SQLite."""
|
|
thought = Thought(
|
|
id=str(uuid.uuid4()),
|
|
content=content,
|
|
seed_type=seed_type,
|
|
parent_id=self._last_thought_id,
|
|
created_at=datetime.now(UTC).isoformat(),
|
|
)
|
|
|
|
with _get_conn(self._db_path) as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO thoughts (id, content, seed_type, parent_id, created_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
thought.id,
|
|
thought.content,
|
|
thought.seed_type,
|
|
thought.parent_id,
|
|
thought.created_at,
|
|
),
|
|
)
|
|
conn.commit()
|
|
return thought
|
|
|
|
def _log_event(self, thought: Thought) -> None:
|
|
"""Log the thought as a swarm event."""
|
|
try:
|
|
from swarm.event_log import EventType, log_event
|
|
|
|
log_event(
|
|
EventType.TIMMY_THOUGHT,
|
|
source="thinking-engine",
|
|
agent_id="default",
|
|
data={
|
|
"thought_id": thought.id,
|
|
"seed_type": thought.seed_type,
|
|
"content": thought.content[:200],
|
|
},
|
|
)
|
|
except Exception as exc:
|
|
logger.debug("Failed to log thought event: %s", exc)
|
|
|
|
def _write_journal(self, thought: Thought) -> None:
|
|
"""Append the thought to a daily markdown journal file.
|
|
|
|
Writes to data/journal/YYYY-MM-DD.md — one file per day, append-only.
|
|
Timestamps are converted to local time with timezone indicator.
|
|
"""
|
|
try:
|
|
ts = datetime.fromisoformat(thought.created_at)
|
|
# Convert UTC to local for a human-readable journal
|
|
local_ts = ts.astimezone()
|
|
tz_name = local_ts.strftime("%Z") or "UTC"
|
|
|
|
journal_dir = self._db_path.parent / "journal"
|
|
journal_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
journal_file = journal_dir / f"{local_ts.strftime('%Y-%m-%d')}.md"
|
|
time_str = f"{local_ts.strftime('%I:%M %p').lstrip('0')} {tz_name}"
|
|
|
|
entry = f"## {time_str} — {thought.seed_type}\n\n{thought.content}\n\n---\n\n"
|
|
|
|
with open(journal_file, "a", encoding="utf-8") as f:
|
|
f.write(entry)
|
|
except Exception as exc:
|
|
logger.debug("Failed to write journal entry: %s", exc)
|
|
|
|
async def _broadcast(self, thought: Thought) -> None:
|
|
"""Broadcast the thought to WebSocket clients."""
|
|
try:
|
|
from infrastructure.ws_manager.handler import ws_manager
|
|
|
|
await ws_manager.broadcast(
|
|
"timmy_thought",
|
|
{
|
|
"thought_id": thought.id,
|
|
"content": thought.content,
|
|
"seed_type": thought.seed_type,
|
|
"created_at": thought.created_at,
|
|
},
|
|
)
|
|
except Exception as exc:
|
|
logger.debug("Failed to broadcast thought: %s", exc)
|
|
|
|
|
|
# Module-level singleton
|
|
thinking_engine = ThinkingEngine()
|