Compare commits

..

6 Commits

Author SHA1 Message Date
Alexander Whitestone
fb69f01b72 feat: add ThreeJSWorldAdapter stub from Kimi's world analysis
Some checks failed
Tests / lint (pull_request) Failing after 34s
Tests / test (pull_request) Has been skipped
Studied Kimi's AI World Builder (Kimisworld.zip, issue #870):
- React + Three.js app with runtime code generation via new Function()
- WorldObject schema: type, geometry, material, color, position, rotation,
  scale, animation string
- AI agent avatar with lerp-based movement, mood-based color, pulsing core
- State managed via Zustand; Bloom + ChromaticAberration post-processing

Adds ThreeJSWorldAdapter stub that maps Kimi's WorldObject schema to the
WorldInterface contract (observe/act/speak), documenting the HTTP/WS wiring
needed to bridge the Three.js app to Timmy's agent infrastructure.

The adapter's _object_to_entity_description() helper converts WorldObjects
into PerceptionOutput entity strings compatible with any WorldInterface
consumer, including the Morrowind integration.

Refs #870
2026-03-23 22:04:00 -04:00
8304cf50da [claude] Add unit tests for backlog_triage.py (#1293) (#1307)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:57:44 +00:00
16c4cc0f9f [claude] Add unit tests for research_tools.py (#1294) (#1308)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:57:39 +00:00
a48f30fee4 [claude] Add unit tests for quest_system.py (#1292) (#1309)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:57:29 +00:00
e44db42c1a [claude] Split thinking.py into focused sub-modules (#1279) (#1306)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:57:04 +00:00
de7744916c [claude] DeerFlow evaluation research note (#1283) (#1305)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:56:37 +00:00
20 changed files with 4627 additions and 2405 deletions

View File

@@ -0,0 +1,190 @@
# DeerFlow Evaluation — Autonomous Research Orchestration Layer
**Status:** No-go for full adoption · Selective borrowing recommended
**Date:** 2026-03-23
**Issue:** #1283 (spawned from #1275 screenshot triage)
**Refs:** #972 (Timmy research pipeline) · #975 (ResearchOrchestrator)
---
## What Is DeerFlow?
DeerFlow (`bytedance/deer-flow`) is an open-source "super-agent harness" built by ByteDance on top of LangGraph. It provides a production-grade multi-agent research and code-execution framework with a web UI, REST API, Docker deployment, and optional IM channel integration (Telegram, Slack, Feishu/Lark).
- **Stars:** ~39,600 · **License:** MIT
- **Stack:** Python 3.12+ (backend) · TypeScript/Next.js (frontend) · LangGraph runtime
- **Entry point:** `http://localhost:2026` (Nginx reverse proxy, configurable via `PORT`)
---
## Research Questions — Answers
### 1. Agent Roles
DeerFlow uses a two-tier architecture:
| Role | Description |
|------|-------------|
| **Lead Agent** | Entry point; decomposes tasks, dispatches sub-agents, synthesizes results |
| **Sub-Agent (general-purpose)** | All tools except `task`; spawned dynamically |
| **Sub-Agent (bash)** | Command-execution specialist |
The lead agent runs through a 12-middleware chain in order: thread setup → uploads → sandbox → tool-call repair → guardrails → summarization → todo tracking → title generation → memory update → image injection → sub-agent concurrency cap → clarification intercept.
**Concurrency:** up to 3 sub-agents in parallel (configurable), 15-minute default timeout each, structured SSE event stream (`task_started` / `task_running` / `task_completed` / `task_failed`).
**Mapping to Timmy personas:** DeerFlow's lead/sub-agent split roughly maps to Timmy's orchestrator + specialist-agent pattern. DeerFlow doesn't have named personas — it routes by capability (tools available to the agent type), not by identity. Timmy's persona system is richer and more opinionated.
---
### 2. API Surface
DeerFlow exposes a full REST API at port 2026 (via Nginx). **No authentication by default.**
**Core integration endpoints:**
| Endpoint | Method | Purpose |
|----------|--------|---------|
| `POST /api/langgraph/threads` | | Create conversation thread |
| `POST /api/langgraph/threads/{id}/runs` | | Submit task (blocking) |
| `POST /api/langgraph/threads/{id}/runs/stream` | | Submit task (streaming SSE/WS) |
| `GET /api/langgraph/threads/{id}/state` | | Get full thread state + artifacts |
| `GET /api/models` | | List configured models |
| `GET /api/threads/{id}/artifacts/{path}` | | Download generated artifacts |
| `DELETE /api/threads/{id}` | | Clean up thread data |
These are callable from Timmy with `httpx` — no special client library needed.
---
### 3. LLM Backend Support
DeerFlow uses LangChain model classes declared in `config.yaml`.
**Documented providers:** OpenAI, Anthropic, Google Gemini, DeepSeek, Doubao (ByteDance), Kimi/Moonshot, OpenRouter, MiniMax, Novita AI, Claude Code (OAuth).
**Ollama:** Not in official documentation, but works via the `langchain_openai:ChatOpenAI` class with `base_url: http://localhost:11434/v1` and a dummy API key. Community-confirmed (GitHub issues #37, #1004) with Qwen2.5, Llama 3.1, and DeepSeek-R1.
**vLLM:** Not documented, but architecturally identical — vLLM exposes an OpenAI-compatible endpoint. Should work with the same `base_url` override.
**Practical caveat:** The lead agent requires strong instruction-following for consistent tool use and structured output. Community findings suggest ≥14B parameter models (Qwen2.5-14B minimum) for reliable orchestration. Our current `qwen3:14b` should be viable.
---
### 4. License
**MIT License** — Copyright 2025 ByteDance Ltd. and DeerFlow Authors 20252026.
Permissive: use, modify, distribute, commercialize freely. Attribution required. No warranty.
**Compatible with Timmy's use case.** No CLA, no copyleft, no commercial restrictions.
---
### 5. Docker Port Conflicts
DeerFlow's Docker Compose exposes a single host port:
| Service | Host Port | Notes |
|---------|-----------|-------|
| Nginx (entry point) | **2026** (configurable via `PORT`) | Only externally exposed port |
| Frontend (Next.js) | 3000 | Internal only |
| Gateway API | 8001 | Internal only |
| LangGraph runtime | 2024 | Internal only |
| Provisioner (optional) | 8002 | Internal only, Kubernetes mode only |
Timmy's existing Docker Compose exposes:
- **8000** — dashboard (FastAPI)
- **8080** — openfang (via `openfang` profile)
- **11434** — Ollama (host process, not containerized)
**No conflict.** Port 2026 is not used by Timmy. DeerFlow can run alongside the existing stack without modification.
---
## Full Capability Comparison
| Capability | DeerFlow | Timmy (`research.py`) |
|------------|----------|-----------------------|
| Multi-agent fan-out | ✅ 3 concurrent sub-agents | ❌ Sequential only |
| Web search | ✅ Tavily / InfoQuest | ✅ `research_tools.py` |
| Web fetch | ✅ Jina AI / Firecrawl | ✅ trafilatura |
| Code execution (sandbox) | ✅ Local / Docker / K8s | ❌ Not implemented |
| Artifact generation | ✅ HTML, Markdown, slides | ❌ Markdown report only |
| Document upload + conversion | ✅ PDF, PPT, Excel, Word | ❌ Not implemented |
| Long-term memory | ✅ LLM-extracted facts, persistent | ✅ SQLite semantic cache |
| Streaming results | ✅ SSE + WebSocket | ❌ Blocking call |
| Web UI | ✅ Next.js included | ✅ Jinja2/HTMX dashboard |
| IM integration | ✅ Telegram, Slack, Feishu | ✅ Telegram, Discord |
| Ollama backend | ✅ (via config, community-confirmed) | ✅ Native |
| Persona system | ❌ Role-based only | ✅ Named personas |
| Semantic cache tier | ❌ Not implemented | ✅ SQLite (Tier 4) |
| Free-tier cascade | ❌ Not applicable | 🔲 Planned (Groq, #980) |
| Python version requirement | 3.12+ | 3.11+ |
| Lock-in | LangGraph + LangChain | None |
---
## Integration Options Assessment
### Option A — Full Adoption (replace `research.py`)
**Verdict: Not recommended.**
DeerFlow is a substantial full-stack system (Python + Node.js, Docker, Nginx, LangGraph). Adopting it fully would:
- Replace Timmy's custom cascade tier system (SQLite cache → Ollama → Claude API → Groq) with a single-tier LangChain model config
- Lose Timmy's persona-aware research routing
- Add Python 3.12+ dependency (Timmy currently targets 3.11+)
- Introduce LangGraph/LangChain lock-in for all research tasks
- Require running a parallel Node.js frontend process (redundant given Timmy's own UI)
### Option B — Sidecar for Heavy Research (call DeerFlow's API from Timmy)
**Verdict: Viable but over-engineered for current needs.**
DeerFlow could run as an optional sidecar (`docker compose --profile deerflow up`) and Timmy could delegate multi-agent research tasks via `POST /api/langgraph/threads/{id}/runs`. This would unlock parallel sub-agent fan-out and code-execution sandboxing without replacing Timmy's stack.
The integration would be ~50 lines of `httpx` code in a new `DeerFlowClient` adapter. The `ResearchOrchestrator` in `research.py` could route tasks above a complexity threshold to DeerFlow.
**Barrier:** DeerFlow's lack of default authentication means the sidecar would need to be network-isolated (internal Docker network only) or firewalled. Also, DeerFlow's Ollama integration is community-maintained, not officially supported — risk of breaking on upstream updates.
### Option C — Selective Borrowing (copy patterns, not code)
**Verdict: Recommended.**
DeerFlow's architecture reveals concrete gaps in Timmy's current pipeline that are worth addressing independently:
| DeerFlow Pattern | Timmy Gap to Close | Implementation Path |
|------------------|--------------------|---------------------|
| Parallel sub-agent fan-out | Research is sequential | Add `asyncio.gather()` to `ResearchOrchestrator` for concurrent query execution |
| `SummarizationMiddleware` | Long contexts blow token budget | Add a context-trimming step in the synthesis cascade |
| `TodoListMiddleware` | No progress tracking during long research | Wire into the dashboard task panel |
| Artifact storage + serving | Reports are ephemeral (not persistently downloadable) | Add file-based artifact store to `research.py` (issue #976 already planned) |
| Skill modules (Markdown-based) | Research templates are `.md` files — same pattern | Already done in `skills/research/` |
| MCP integration | Research tools are hard-coded | Add MCP server discovery to `research_tools.py` for pluggable tool backends |
---
## Recommendation
**No-go for full adoption or sidecar deployment at this stage.**
Timmy's `ResearchOrchestrator` already covers the core pipeline (query → search → fetch → synthesize → store). DeerFlow's value proposition is primarily the parallel sub-agent fan-out and code-execution sandbox — capabilities that are useful but not blocking Timmy's current roadmap.
**Recommended actions:**
1. **Close the parallelism gap (high value, low effort):** Refactor `ResearchOrchestrator` to execute queries concurrently with `asyncio.gather()`. This delivers DeerFlow's most impactful capability without any new dependencies.
2. **Re-evaluate after #980 and #981 are done:** Once Timmy has the Groq free-tier cascade and a sovereignty metrics dashboard, we'll have a clearer picture of whether the custom orchestrator is performing well enough to make DeerFlow unnecessary entirely.
3. **File a follow-up for MCP tool integration:** DeerFlow's use of `langchain-mcp-adapters` for pluggable tool backends is the most architecturally interesting pattern. Adding MCP server discovery to `research_tools.py` would give Timmy the same extensibility without LangGraph lock-in.
4. **Revisit DeerFlow's code-execution sandbox if #978 (Paperclip task runner) proves insufficient:** DeerFlow's sandboxed `bash` tool is production-tested and well-isolated. If Timmy's task runner needs secure code execution, DeerFlow's sandbox implementation is worth borrowing or wrapping.
---
## Follow-up Issues to File
| Issue | Title | Priority |
|-------|-------|----------|
| New | Parallelize ResearchOrchestrator query execution (`asyncio.gather`) | Medium |
| New | Add context-trimming step to synthesis cascade | Low |
| New | MCP server discovery in `research_tools.py` | Low |
| #976 | Semantic index for research outputs (already planned) | High |

View File

@@ -0,0 +1,149 @@
"""Three.js world adapter — bridges Kimi's AI World Builder to WorldInterface.
Studied from Kimisworld.zip (issue #870). Kimi's world is a React +
Three.js app ("AI World Builder v1.0") that exposes a JSON state API and
accepts ``addObject`` / ``updateObject`` / ``removeObject`` commands.
This adapter is a stub: ``connect()`` and the core methods outline the
HTTP / WebSocket wiring that would be needed to talk to a running instance.
The ``observe()`` response maps Kimi's ``WorldObject`` schema to
``PerceptionOutput`` entities so that any WorldInterface consumer can
treat the Three.js canvas like any other game world.
Usage::
registry.register("threejs", ThreeJSWorldAdapter)
adapter = registry.get("threejs", base_url="http://localhost:5173")
adapter.connect()
perception = adapter.observe()
adapter.act(CommandInput(action="add_object", parameters={"geometry": "sphere", ...}))
adapter.speak("Hello from Timmy", target="broadcast")
"""
from __future__ import annotations
import logging
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import ActionResult, ActionStatus, CommandInput, PerceptionOutput
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Kimi's WorldObject geometry / material vocabulary (from WorldObjects.tsx)
# ---------------------------------------------------------------------------
_VALID_GEOMETRIES = {"box", "sphere", "cylinder", "torus", "cone", "dodecahedron"}
_VALID_MATERIALS = {"standard", "wireframe", "glass", "glow"}
_VALID_TYPES = {"mesh", "light", "particle", "custom"}
def _object_to_entity_description(obj: dict) -> str:
"""Render a Kimi WorldObject dict as a human-readable entity string.
Example output: ``sphere/glow #ff006e at (2.1, 3.0, -1.5)``
"""
geometry = obj.get("geometry", "unknown")
material = obj.get("material", "unknown")
color = obj.get("color", "#ffffff")
pos = obj.get("position", [0, 0, 0])
obj_type = obj.get("type", "mesh")
pos_str = "({:.1f}, {:.1f}, {:.1f})".format(*pos)
return f"{obj_type}/{geometry}/{material} {color} at {pos_str}"
class ThreeJSWorldAdapter(WorldInterface):
"""Adapter for Kimi's Three.js AI World Builder.
Connects to a running Three.js world that exposes:
- ``GET /api/world/state`` — returns current WorldObject list
- ``POST /api/world/execute`` — accepts addObject / updateObject code
- WebSocket ``/ws/world`` — streams state change events
All core methods raise ``NotImplementedError`` until HTTP wiring is
added. Implement ``connect()`` first — it should verify that the
Three.js app is running and optionally open a WebSocket for live events.
Key insight from studying Kimi's world (issue #870):
- Objects carry a geometry, material, color, position, rotation, scale,
and an optional *animation* string executed via ``new Function()``
each animation frame.
- The AI agent (``AIAgent.tsx``) moves through the world with lerp()
targeting, cycles through moods, and pulses its core during "thinking"
states — a model for how Timmy could manifest presence in a 3D world.
- World complexity is tracked as a simple counter (one unit per object)
which the AI uses to decide whether to create, modify, or upgrade.
"""
def __init__(self, *, base_url: str = "http://localhost:5173") -> None:
self._base_url = base_url.rstrip("/")
self._connected = False
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
raise NotImplementedError(
"ThreeJSWorldAdapter.connect() — verify Three.js app is running at "
f"{self._base_url} and optionally open a WebSocket to /ws/world"
)
def disconnect(self) -> None:
self._connected = False
logger.info("ThreeJSWorldAdapter disconnected")
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract (stubs) ---------------------------------------------
def observe(self) -> PerceptionOutput:
"""Return current Three.js world state as structured perception.
Expected HTTP call::
GET {base_url}/api/world/state
{"objects": [...WorldObject], "worldComplexity": int, ...}
Each WorldObject becomes an entity description string.
"""
raise NotImplementedError(
"ThreeJSWorldAdapter.observe() — GET /api/world/state, "
"map each WorldObject via _object_to_entity_description()"
)
def act(self, command: CommandInput) -> ActionResult:
"""Dispatch a command to the Three.js world.
Supported actions (mirrors Kimi's CodeExecutor API):
- ``add_object`` — parameters: WorldObject fields (geometry, material, …)
- ``update_object`` — parameters: id + partial WorldObject fields
- ``remove_object`` — parameters: id
- ``clear_world`` — parameters: (none)
Expected HTTP call::
POST {base_url}/api/world/execute
Content-Type: application/json
{"action": "add_object", "parameters": {...}}
"""
raise NotImplementedError(
f"ThreeJSWorldAdapter.act({command.action!r}) — "
"POST /api/world/execute with serialised CommandInput"
)
def speak(self, message: str, target: str | None = None) -> None:
"""Inject a text message into the Three.js world.
Kimi's world does not have a native chat layer, so the recommended
implementation is to create a short-lived ``Text`` entity at a
visible position (or broadcast via the world WebSocket).
Expected WebSocket frame::
{"type": "timmy_speech", "text": message, "target": target}
"""
raise NotImplementedError(
"ThreeJSWorldAdapter.speak() — send timmy_speech frame over "
"/ws/world WebSocket, or POST a temporary Text entity"
)

View File

@@ -1,10 +1,7 @@
"""Memory — Persistent conversation and knowledge memory.
Sub-modules:
embeddings — text-to-vector embedding + similarity functions
unified — unified memory schema and connection management
chain — CRUD operations (store, search, delete, stats)
semantic — SemanticMemory and MemorySearcher classes
consolidation — HotMemory and VaultMemory classes
vector_store — backward compatibility re-exports from memory_system
embeddings — text-to-vector embedding + similarity functions
unified — unified memory schema and connection management
vector_store — backward compatibility re-exports from memory_system
"""

View File

@@ -1,387 +0,0 @@
"""CRUD operations for Timmy's unified memory database.
Provides store, search, delete, and management functions for the
`memories` table defined in timmy.memory.unified.
"""
import json
import logging
import sqlite3
import uuid
from contextlib import contextmanager
from datetime import UTC, datetime, timedelta
from pathlib import Path
from config import settings
from timmy.memory.embeddings import (
_keyword_overlap,
cosine_similarity,
embed_text,
)
from timmy.memory.unified import (
DB_PATH,
MemoryEntry,
_ensure_schema,
get_connection,
)
logger = logging.getLogger(__name__)
def store_memory(
content: str,
source: str,
context_type: str = "conversation",
agent_id: str | None = None,
task_id: str | None = None,
session_id: str | None = None,
metadata: dict | None = None,
compute_embedding: bool = True,
) -> MemoryEntry:
"""Store a memory entry with optional embedding.
Args:
content: The text content to store
source: Source of the memory (agent name, user, system)
context_type: Type of context (conversation, document, fact, vault_chunk)
agent_id: Associated agent ID
task_id: Associated task ID
session_id: Session identifier
metadata: Additional structured data
compute_embedding: Whether to compute vector embedding
Returns:
The stored MemoryEntry
"""
embedding = None
if compute_embedding:
embedding = embed_text(content)
entry = MemoryEntry(
content=content,
source=source,
context_type=context_type,
agent_id=agent_id,
task_id=task_id,
session_id=session_id,
metadata=metadata,
embedding=embedding,
)
with get_connection() as conn:
conn.execute(
"""
INSERT INTO memories
(id, content, memory_type, source, agent_id, task_id, session_id,
metadata, embedding, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
entry.id,
entry.content,
entry.context_type, # DB column is memory_type
entry.source,
entry.agent_id,
entry.task_id,
entry.session_id,
json.dumps(metadata) if metadata else None,
json.dumps(embedding) if embedding else None,
entry.timestamp,
),
)
conn.commit()
return entry
def _build_search_filters(
context_type: str | None,
agent_id: str | None,
session_id: str | None,
) -> tuple[str, list]:
"""Build SQL WHERE clause and params from search filters."""
conditions: list[str] = []
params: list = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
return where_clause, params
def _fetch_memory_candidates(
where_clause: str, params: list, candidate_limit: int
) -> list[sqlite3.Row]:
"""Fetch candidate memory rows from the database."""
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(candidate_limit)
with get_connection() as conn:
return conn.execute(query_sql, params).fetchall()
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
"""Convert a database row to a MemoryEntry."""
return MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
def _score_and_filter(
rows: list[sqlite3.Row],
query: str,
query_embedding: list[float],
min_relevance: float,
) -> list[MemoryEntry]:
"""Score candidate rows by similarity and filter by min_relevance."""
results = []
for row in rows:
entry = _row_to_entry(row)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
else:
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results
def search_memories(
query: str,
limit: int = 10,
context_type: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
min_relevance: float = 0.0,
) -> list[MemoryEntry]:
"""Search for memories by semantic similarity.
Args:
query: Search query text
limit: Maximum results
context_type: Filter by memory type (maps to DB memory_type column)
agent_id: Filter by agent
session_id: Filter by session
min_relevance: Minimum similarity score (0-1)
Returns:
List of MemoryEntry objects sorted by relevance
"""
query_embedding = embed_text(query)
where_clause, params = _build_search_filters(context_type, agent_id, session_id)
rows = _fetch_memory_candidates(where_clause, params, limit * 3)
results = _score_and_filter(rows, query, query_embedding, min_relevance)
return results[:limit]
def delete_memory(memory_id: str) -> bool:
"""Delete a memory entry by ID.
Returns:
True if deleted, False if not found
"""
with get_connection() as conn:
cursor = conn.execute(
"DELETE FROM memories WHERE id = ?",
(memory_id,),
)
conn.commit()
return cursor.rowcount > 0
def get_memory_stats() -> dict:
"""Get statistics about the memory store.
Returns:
Dict with counts by type, total entries, etc.
"""
from timmy.memory.embeddings import _get_embedding_model
with get_connection() as conn:
total = conn.execute("SELECT COUNT(*) as count FROM memories").fetchone()["count"]
by_type = {}
rows = conn.execute(
"SELECT memory_type, COUNT(*) as count FROM memories GROUP BY memory_type"
).fetchall()
for row in rows:
by_type[row["memory_type"]] = row["count"]
with_embeddings = conn.execute(
"SELECT COUNT(*) as count FROM memories WHERE embedding IS NOT NULL"
).fetchone()["count"]
return {
"total_entries": total,
"by_type": by_type,
"with_embeddings": with_embeddings,
"has_embedding_model": _get_embedding_model() is not False,
}
def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int:
"""Delete old memories to manage storage.
Args:
older_than_days: Delete memories older than this
keep_facts: Whether to preserve fact-type memories
Returns:
Number of entries deleted
"""
cutoff = (datetime.now(UTC) - timedelta(days=older_than_days)).isoformat()
with get_connection() as conn:
if keep_facts:
cursor = conn.execute(
"""
DELETE FROM memories
WHERE created_at < ? AND memory_type != 'fact'
""",
(cutoff,),
)
else:
cursor = conn.execute(
"DELETE FROM memories WHERE created_at < ?",
(cutoff,),
)
deleted = cursor.rowcount
conn.commit()
return deleted
def get_memory_context(query: str, max_tokens: int = 2000, **filters) -> str:
"""Get relevant memory context as formatted text for LLM prompts.
Args:
query: Search query
max_tokens: Approximate maximum tokens to return
**filters: Additional filters (agent_id, session_id, etc.)
Returns:
Formatted context string for inclusion in prompts
"""
memories = search_memories(query, limit=20, **filters)
context_parts = []
total_chars = 0
max_chars = max_tokens * 4 # Rough approximation
for mem in memories:
formatted = f"[{mem.source}]: {mem.content}"
if total_chars + len(formatted) > max_chars:
break
context_parts.append(formatted)
total_chars += len(formatted)
if not context_parts:
return ""
return "Relevant context from memory:\n" + "\n\n".join(context_parts)
def recall_personal_facts(agent_id: str | None = None) -> list[str]:
"""Recall personal facts about the user or system.
Args:
agent_id: Optional agent filter
Returns:
List of fact strings
"""
with get_connection() as conn:
if agent_id:
rows = conn.execute(
"""
SELECT content FROM memories
WHERE memory_type = 'fact' AND agent_id = ?
ORDER BY created_at DESC
LIMIT 100
""",
(agent_id,),
).fetchall()
else:
rows = conn.execute(
"""
SELECT content FROM memories
WHERE memory_type = 'fact'
ORDER BY created_at DESC
LIMIT 100
""",
).fetchall()
return [r["content"] for r in rows]
def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]:
"""Recall personal facts with their IDs for edit/delete operations."""
with get_connection() as conn:
if agent_id:
rows = conn.execute(
"SELECT id, content FROM memories WHERE memory_type = 'fact' AND agent_id = ? ORDER BY created_at DESC LIMIT 100",
(agent_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT id, content FROM memories WHERE memory_type = 'fact' ORDER BY created_at DESC LIMIT 100",
).fetchall()
return [{"id": r["id"], "content": r["content"]} for r in rows]
def update_personal_fact(memory_id: str, new_content: str) -> bool:
"""Update a personal fact's content."""
with get_connection() as conn:
cursor = conn.execute(
"UPDATE memories SET content = ? WHERE id = ? AND memory_type = 'fact'",
(new_content, memory_id),
)
conn.commit()
return cursor.rowcount > 0
def store_personal_fact(fact: str, agent_id: str | None = None) -> MemoryEntry:
"""Store a personal fact about the user or system.
Args:
fact: The fact to store
agent_id: Associated agent
Returns:
The stored MemoryEntry
"""
return store_memory(
content=fact,
source="system",
context_type="fact",
agent_id=agent_id,
metadata={"auto_extracted": False},
)

View File

@@ -1,310 +0,0 @@
"""Hot and Vault memory classes for Timmy's memory consolidation tier.
HotMemory: Tier 1 — computed view of top facts from the database.
VaultMemory: Tier 2 — structured vault (memory/ directory), append-only markdown.
"""
import logging
import re
from datetime import UTC, datetime
from pathlib import Path
from timmy.memory.unified import PROJECT_ROOT
logger = logging.getLogger(__name__)
VAULT_PATH = PROJECT_ROOT / "memory"
_DEFAULT_HOT_MEMORY_TEMPLATE = """\
# Timmy Hot Memory
> Working RAM — always loaded, ~300 lines max, pruned monthly
> Last updated: {date}
---
## Current Status
**Agent State:** Operational
**Mode:** Development
**Active Tasks:** 0
**Pending Decisions:** None
---
## Standing Rules
1. **Sovereignty First** — No cloud dependencies
2. **Local-Only Inference** — Ollama on localhost
3. **Privacy by Design** — Telemetry disabled
4. **Tool Minimalism** — Use tools only when necessary
5. **Memory Discipline** — Write handoffs at session end
---
## Agent Roster
| Agent | Role | Status |
|-------|------|--------|
| Timmy | Core | Active |
---
## User Profile
**Name:** (not set)
**Interests:** (to be learned)
---
## Key Decisions
(none yet)
---
## Pending Actions
- [ ] Learn user's name
---
*Prune date: {prune_date}*
"""
class HotMemory:
"""Tier 1: Hot memory — computed view of top facts from DB."""
def __init__(self, path=None) -> None:
if path is None:
path = PROJECT_ROOT / "MEMORY.md"
self.path = path
self._content: str | None = None
self._last_modified: float | None = None
def read(self, force_refresh: bool = False) -> str:
"""Read hot memory — computed view of top facts + last reflection from DB."""
from timmy.memory.chain import recall_personal_facts
# Import recall_last_reflection lazily to support patching in memory_system
try:
# Use the version from memory_system so patches work correctly
import timmy.memory_system as _ms
recall_last_reflection = _ms.recall_last_reflection
except Exception:
from timmy.memory.chain import recall_personal_facts as _rpf # noqa: F811
recall_last_reflection = None
try:
facts = recall_personal_facts()
lines = ["# Timmy Hot Memory\n"]
if facts:
lines.append("## Known Facts\n")
for f in facts[:15]:
lines.append(f"- {f}")
# Include the last reflection if available
if recall_last_reflection is not None:
try:
reflection = recall_last_reflection()
if reflection:
lines.append("\n## Last Reflection\n")
lines.append(reflection)
except Exception:
pass
if len(lines) > 1:
return "\n".join(lines)
except Exception:
logger.debug("DB context read failed, falling back to file")
# Fallback to file if DB unavailable
if self.path.exists():
return self.path.read_text()
return "# Timmy Hot Memory\n\nNo memories stored yet.\n"
def update_section(self, section: str, content: str) -> None:
"""Update a specific section in MEMORY.md.
DEPRECATED: Hot memory is now computed from the database.
This method is kept for backward compatibility during transition.
Use memory_write() to store facts in the database.
"""
logger.warning(
"HotMemory.update_section() is deprecated. "
"Use memory_write() to store facts in the database."
)
# Keep file-writing for backward compatibility during transition
# Guard against empty or excessively large writes
if not content or not content.strip():
logger.warning("HotMemory: Refusing empty write to section '%s'", section)
return
if len(content) > 2000:
logger.warning("HotMemory: Truncating oversized write to section '%s'", section)
content = content[:2000] + "\n... [truncated]"
if not self.path.exists():
self._create_default()
full_content = self.read()
# Find section
pattern = rf"(## {re.escape(section)}.*?)(?=\n## |\Z)"
match = re.search(pattern, full_content, re.DOTALL)
if match:
# Replace section
new_section = f"## {section}\n\n{content}\n\n"
full_content = full_content[: match.start()] + new_section + full_content[match.end() :]
else:
# Append section — guard against missing prune marker
insert_point = full_content.rfind("*Prune date:")
new_section = f"## {section}\n\n{content}\n\n"
if insert_point < 0:
# No prune marker — just append at end
full_content = full_content.rstrip() + "\n\n" + new_section
else:
full_content = (
full_content[:insert_point] + new_section + "\n" + full_content[insert_point:]
)
self.path.write_text(full_content)
self._content = full_content
self._last_modified = self.path.stat().st_mtime
logger.info("HotMemory: Updated section '%s'", section)
def _create_default(self) -> None:
"""Create default MEMORY.md if missing.
DEPRECATED: Hot memory is now computed from the database.
This method is kept for backward compatibility during transition.
"""
logger.debug(
"HotMemory._create_default() - creating default MEMORY.md for backward compatibility"
)
now = datetime.now(UTC)
content = _DEFAULT_HOT_MEMORY_TEMPLATE.format(
date=now.strftime("%Y-%m-%d"),
prune_date=now.replace(day=25).strftime("%Y-%m-%d"),
)
self.path.write_text(content)
logger.info("HotMemory: Created default MEMORY.md")
class VaultMemory:
"""Tier 2: Structured vault (memory/) — append-only markdown."""
def __init__(self) -> None:
self.path = VAULT_PATH
self._ensure_structure()
def _ensure_structure(self) -> None:
"""Ensure vault directory structure exists."""
(self.path / "self").mkdir(parents=True, exist_ok=True)
(self.path / "notes").mkdir(parents=True, exist_ok=True)
(self.path / "aar").mkdir(parents=True, exist_ok=True)
def write_note(self, name: str, content: str, namespace: str = "notes") -> Path:
"""Write a note to the vault."""
# Add timestamp to filename
timestamp = datetime.now(UTC).strftime("%Y%m%d")
filename = f"{timestamp}_{name}.md"
filepath = self.path / namespace / filename
# Add header
full_content = f"""# {name.replace("_", " ").title()}
> Created: {datetime.now(UTC).isoformat()}
> Namespace: {namespace}
---
{content}
---
*Auto-generated by Timmy Memory System*
"""
filepath.write_text(full_content)
logger.info("VaultMemory: Wrote %s", filepath)
return filepath
def read_file(self, filepath: Path) -> str:
"""Read a file from the vault."""
if not filepath.exists():
return ""
return filepath.read_text()
def update_user_profile(self, key: str, value: str) -> None:
"""Update a field in user_profile.md.
DEPRECATED: User profile updates should now use memory_write() to store
facts in the database. This method is kept for backward compatibility.
"""
logger.warning(
"VaultMemory.update_user_profile() is deprecated. "
"Use memory_write() to store user facts in the database."
)
# Still update the file for backward compatibility during transition
profile_path = self.path / "self" / "user_profile.md"
if not profile_path.exists():
self._create_default_profile()
content = profile_path.read_text()
pattern = rf"(\*\*{re.escape(key)}:\*\*).*"
if re.search(pattern, content):
safe_value = value.strip()
content = re.sub(pattern, lambda m: f"{m.group(1)} {safe_value}", content)
else:
facts_section = "## Important Facts"
if facts_section in content:
insert_point = content.find(facts_section) + len(facts_section)
content = content[:insert_point] + f"\n- {key}: {value}" + content[insert_point:]
content = re.sub(
r"\*Last updated:.*\*",
f"*Last updated: {datetime.now(UTC).strftime('%Y-%m-%d')}*",
content,
)
profile_path.write_text(content)
logger.info("VaultMemory: Updated user profile: %s = %s", key, value)
def _create_default_profile(self) -> None:
"""Create default user profile."""
profile_path = self.path / "self" / "user_profile.md"
default = """# User Profile
> Learned information about the user.
## Basic Information
**Name:** (unknown)
**Location:** (unknown)
**Occupation:** (unknown)
## Interests & Expertise
- (to be learned)
## Preferences
- Response style: concise, technical
- Tool usage: minimal
## Important Facts
- (to be extracted)
---
*Last updated: {date}*
""".format(date=datetime.now(UTC).strftime("%Y-%m-%d"))
profile_path.write_text(default)

View File

@@ -1,278 +0,0 @@
"""Semantic memory and search classes for Timmy.
Provides SemanticMemory (vector search over vault content) and
MemorySearcher (high-level multi-tier search interface).
"""
import hashlib
import json
import logging
import sqlite3
from contextlib import closing, contextmanager
from collections.abc import Generator
from datetime import UTC, datetime
from pathlib import Path
from config import settings
from timmy.memory.embeddings import (
EMBEDDING_DIM,
_get_embedding_model,
cosine_similarity,
embed_text,
)
from timmy.memory.unified import (
DB_PATH,
PROJECT_ROOT,
_ensure_schema,
get_connection,
)
logger = logging.getLogger(__name__)
VAULT_PATH = PROJECT_ROOT / "memory"
class SemanticMemory:
"""Vector-based semantic search over vault content."""
def __init__(self) -> None:
self.db_path = DB_PATH
self.vault_path = VAULT_PATH
@contextmanager
def _get_conn(self) -> Generator[sqlite3.Connection, None, None]:
"""Get connection to the instance's db_path (backward compatibility).
Uses self.db_path if set differently from global DB_PATH,
otherwise uses the global get_connection().
"""
if self.db_path == DB_PATH:
# Use global connection (normal production path)
with get_connection() as conn:
yield conn
else:
# Use instance-specific db_path (test path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self.db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
# Ensure schema exists
_ensure_schema(conn)
yield conn
def _init_db(self) -> None:
"""Initialize database at self.db_path (backward compatibility).
This method is kept for backward compatibility with existing code and tests.
Schema creation is handled by _get_conn.
"""
# Trigger schema creation via _get_conn
with self._get_conn():
pass
def index_file(self, filepath: Path) -> int:
"""Index a single file into semantic memory."""
if not filepath.exists():
return 0
content = filepath.read_text()
file_hash = hashlib.md5(content.encode()).hexdigest()
with self._get_conn() as conn:
# Check if already indexed with same hash
cursor = conn.execute(
"SELECT metadata FROM memories WHERE source = ? AND memory_type = 'vault_chunk' LIMIT 1",
(str(filepath),),
)
existing = cursor.fetchone()
if existing and existing[0]:
try:
meta = json.loads(existing[0])
if meta.get("source_hash") == file_hash:
return 0 # Already indexed
except json.JSONDecodeError:
pass
# Delete old chunks for this file
conn.execute(
"DELETE FROM memories WHERE source = ? AND memory_type = 'vault_chunk'",
(str(filepath),),
)
# Split into chunks (paragraphs)
chunks = self._split_into_chunks(content)
# Index each chunk
now = datetime.now(UTC).isoformat()
for i, chunk_text in enumerate(chunks):
if len(chunk_text.strip()) < 20: # Skip tiny chunks
continue
chunk_id = f"{filepath.stem}_{i}"
chunk_embedding = embed_text(chunk_text)
conn.execute(
"""INSERT INTO memories
(id, content, memory_type, source, metadata, embedding, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
chunk_id,
chunk_text,
"vault_chunk",
str(filepath),
json.dumps({"source_hash": file_hash, "chunk_index": i}),
json.dumps(chunk_embedding),
now,
),
)
conn.commit()
logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks))
return len(chunks)
def _split_into_chunks(self, text: str, max_chunk_size: int = 500) -> list[str]:
"""Split text into semantic chunks."""
# Split by paragraphs first
paragraphs = text.split("\n\n")
chunks = []
for para in paragraphs:
para = para.strip()
if not para:
continue
# If paragraph is small enough, keep as one chunk
if len(para) <= max_chunk_size:
chunks.append(para)
else:
# Split long paragraphs by sentences
sentences = para.replace(". ", ".\n").split("\n")
current_chunk = ""
for sent in sentences:
if len(current_chunk) + len(sent) < max_chunk_size:
current_chunk += " " + sent if current_chunk else sent
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sent
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def index_vault(self) -> int:
"""Index entire vault directory."""
total_chunks = 0
for md_file in self.vault_path.rglob("*.md"):
# Skip handoff file (handled separately)
if "last-session-handoff" in md_file.name:
continue
total_chunks += self.index_file(md_file)
logger.info("SemanticMemory: Indexed vault (%d total chunks)", total_chunks)
return total_chunks
def search(self, query: str, top_k: int = 5) -> list[tuple[str, float]]:
"""Search for relevant memory chunks."""
query_embedding = embed_text(query)
with self._get_conn() as conn:
conn.row_factory = sqlite3.Row
# Get all vault chunks
rows = conn.execute(
"SELECT source, content, embedding FROM memories WHERE memory_type = 'vault_chunk'"
).fetchall()
# Calculate similarities
scored = []
for row in rows:
embedding = json.loads(row["embedding"])
score = cosine_similarity(query_embedding, embedding)
scored.append((row["source"], row["content"], score))
# Sort by score descending
scored.sort(key=lambda x: x[2], reverse=True)
# Return top_k
return [(content, score) for _, content, score in scored[:top_k]]
def get_relevant_context(self, query: str, max_chars: int = 2000) -> str:
"""Get formatted context string for a query."""
results = self.search(query, top_k=3)
if not results:
return ""
parts = []
total_chars = 0
for content, score in results:
if score < 0.3: # Similarity threshold
continue
chunk = f"[Relevant memory - score {score:.2f}]: {content[:400]}..."
if total_chars + len(chunk) > max_chars:
break
parts.append(chunk)
total_chars += len(chunk)
return "\n\n".join(parts) if parts else ""
def stats(self) -> dict:
"""Get indexing statistics."""
with self._get_conn() as conn:
cursor = conn.execute(
"SELECT COUNT(*), COUNT(DISTINCT source) FROM memories WHERE memory_type = 'vault_chunk'"
)
total_chunks, total_files = cursor.fetchone()
return {
"total_chunks": total_chunks,
"total_files": total_files,
"embedding_dim": EMBEDDING_DIM if _get_embedding_model() else 128,
}
class MemorySearcher:
"""High-level interface for memory search."""
def __init__(self) -> None:
self.semantic = SemanticMemory()
def search(self, query: str, tiers: list[str] = None) -> dict:
"""Search across memory tiers.
Args:
query: Search query
tiers: List of tiers to search ["hot", "vault", "semantic"]
Returns:
Dict with results from each tier
"""
tiers = tiers or ["semantic"] # Default to semantic only
results = {}
if "semantic" in tiers:
semantic_results = self.semantic.search(query, top_k=5)
results["semantic"] = [
{"content": content, "score": score} for content, score in semantic_results
]
return results
def get_context_for_query(self, query: str) -> str:
"""Get comprehensive context for a user query."""
# Get semantic context
semantic_context = self.semantic.get_relevant_context(query)
if semantic_context:
return f"## Relevant Past Context\n\n{semantic_context}"
return ""

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,142 @@
"""Timmy's thinking engine — public façade.
When the server starts, Timmy begins pondering: reflecting on his existence,
recent swarm activity, scripture, creative ideas, or pure stream of
consciousness. Each thought builds on the previous one, maintaining a
continuous chain of introspection.
Usage::
from timmy.thinking import thinking_engine
# Run one thinking cycle (called by the background loop)
await thinking_engine.think_once()
# Query the thought stream
thoughts = thinking_engine.get_recent_thoughts(limit=10)
chain = thinking_engine.get_thought_chain(thought_id)
"""
import logging
import sqlite3
from datetime import datetime
from pathlib import Path
from timmy.thinking._db import Thought, _get_conn
from timmy.thinking.engine import ThinkingEngine
from timmy.thinking.seeds import (
SEED_TYPES,
_SENSITIVE_PATTERNS,
_META_OBSERVATION_PHRASES,
_THINK_TAG_RE,
_THINKING_PROMPT,
)
# Re-export HOT_MEMORY_PATH and SOUL_PATH so existing patch targets continue to work.
# Tests that patch "timmy.thinking.HOT_MEMORY_PATH" or "timmy.thinking.SOUL_PATH"
# should instead patch "timmy.thinking._snapshot.HOT_MEMORY_PATH" etc., but these
# re-exports are kept for any code that reads them from the top-level namespace.
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH # noqa: F401
logger = logging.getLogger(__name__)
# Module-level singleton
thinking_engine = ThinkingEngine()
__all__ = [
"ThinkingEngine",
"Thought",
"SEED_TYPES",
"thinking_engine",
"search_thoughts",
"_THINKING_PROMPT",
"_SENSITIVE_PATTERNS",
"_META_OBSERVATION_PHRASES",
"_THINK_TAG_RE",
"HOT_MEMORY_PATH",
"SOUL_PATH",
]
# ── Search helpers ─────────────────────────────────────────────────────────
def _query_thoughts(
db_path: Path, query: str, seed_type: str | None, limit: int
) -> list[sqlite3.Row]:
"""Run the thought-search SQL and return matching rows."""
pattern = f"%{query}%"
with _get_conn(db_path) as conn:
if seed_type:
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, seed_type, limit),
).fetchall()
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, limit),
).fetchall()
def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str:
"""Format thought rows into a human-readable string."""
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str:
"""Search Timmy's thought history for reflections matching a query.
Use this tool when Timmy needs to recall his previous thoughts on a topic,
reflect on past insights, or build upon earlier reflections. This enables
self-awareness and continuity of thinking across time.
Args:
query: Search term to match against thought content (case-insensitive).
seed_type: Optional filter by thought category (e.g., 'existential',
'swarm', 'sovereignty', 'creative', 'memory', 'observation').
limit: Maximum number of thoughts to return (default 10, max 50).
Returns:
Formatted string with matching thoughts, newest first, including
timestamps and seed types. Returns a helpful message if no matches found.
"""
limit = max(1, min(limit, 50))
try:
rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit)
if not rows:
if seed_type:
return f'No thoughts found matching "{query}" with seed_type="{seed_type}".'
return f'No thoughts found matching "{query}".'
return _format_thought_rows(rows, query, seed_type)
except Exception as exc:
logger.warning("Thought search failed: %s", exc)
return f"Error searching thoughts: {exc}"

50
src/timmy/thinking/_db.py Normal file
View File

@@ -0,0 +1,50 @@
"""Database models and access layer for the thinking engine."""
import sqlite3
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass
from pathlib import Path
_DEFAULT_DB = Path("data/thoughts.db")
@dataclass
class Thought:
"""A single thought in Timmy's inner stream."""
id: str
content: str
seed_type: str
parent_id: str | None
created_at: str
@contextmanager
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
"""Get a SQLite connection with the thoughts table created."""
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS thoughts (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
seed_type TEXT NOT NULL,
parent_id TEXT,
created_at TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_thoughts_time ON thoughts(created_at)")
conn.commit()
yield conn
def _row_to_thought(row: sqlite3.Row) -> Thought:
return Thought(
id=row["id"],
content=row["content"],
seed_type=row["seed_type"],
parent_id=row["parent_id"],
created_at=row["created_at"],
)

View File

@@ -0,0 +1,215 @@
"""Distillation mixin — extracts lasting facts from recent thoughts and monitors memory."""
import logging
from pathlib import Path
from config import settings
from timmy.thinking.seeds import _META_OBSERVATION_PHRASES, _SENSITIVE_PATTERNS
logger = logging.getLogger(__name__)
class _DistillationMixin:
"""Mixin providing fact-distillation and memory-monitoring behaviour.
Expects the host class to provide:
- self.count_thoughts() -> int
- self.get_recent_thoughts(limit) -> list[Thought]
- self._call_agent(prompt) -> str (async)
"""
def _should_distill(self) -> bool:
"""Check if distillation should run based on interval and thought count."""
interval = settings.thinking_distill_every
if interval <= 0:
return False
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return False
return True
def _build_distill_prompt(self, thoughts) -> str:
"""Build the prompt for extracting facts from recent thoughts."""
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(thoughts))
return (
"You are reviewing your own recent thoughts. Extract 0-3 facts "
"worth remembering long-term.\n\n"
"GOOD facts (store these):\n"
"- User preferences: 'Alexander prefers YAML config over code changes'\n"
"- Project decisions: 'Switched from hardcoded personas to agents.yaml'\n"
"- Learned knowledge: 'Ollama supports concurrent model loading'\n"
"- User information: 'Alexander is interested in Bitcoin and sovereignty'\n\n"
"BAD facts (never store these):\n"
"- Self-referential observations about your own thinking process\n"
"- Meta-commentary about your memory, timestamps, or internal state\n"
"- Observations about being idle or having no chat messages\n"
"- File paths, tokens, API keys, or any credentials\n"
"- Restatements of your standing rules or system prompt\n\n"
"Return ONLY a JSON array of strings. If nothing is worth saving, "
"return []. Be selective — only store facts about the EXTERNAL WORLD "
"(the user, the project, technical knowledge), never about your own "
"internal process.\n\n"
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
def _parse_facts_response(self, raw: str) -> list[str]:
"""Parse JSON array from LLM response, stripping markdown fences.
Resilient to models that prepend reasoning text or wrap the array in
prose. Finds the first ``[...]`` block and parses that.
"""
if not raw or not raw.strip():
return []
import json
cleaned = raw.strip()
# Strip markdown code fences
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
# Try direct parse first (fast path)
try:
facts = json.loads(cleaned)
if isinstance(facts, list):
return [f for f in facts if isinstance(f, str)]
except (json.JSONDecodeError, ValueError):
pass
# Fallback: extract first JSON array from the text
start = cleaned.find("[")
if start == -1:
return []
# Walk to find the matching close bracket
depth = 0
for i, ch in enumerate(cleaned[start:], start):
if ch == "[":
depth += 1
elif ch == "]":
depth -= 1
if depth == 0:
try:
facts = json.loads(cleaned[start : i + 1])
if isinstance(facts, list):
return [f for f in facts if isinstance(f, str)]
except (json.JSONDecodeError, ValueError):
pass
break
return []
def _filter_and_store_facts(self, facts: list[str]) -> None:
"""Filter and store valid facts, blocking sensitive and meta content."""
from timmy.memory_system import memory_write
for fact in facts[:3]: # Safety cap
if not isinstance(fact, str) or len(fact.strip()) <= 10:
continue
fact_lower = fact.lower()
# Block sensitive information
if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS):
logger.warning("Distill: blocked sensitive fact: %s", fact[:60])
continue
# Block self-referential meta-observations
if any(phrase in fact_lower for phrase in _META_OBSERVATION_PHRASES):
logger.debug("Distill: skipped meta-observation: %s", fact[:60])
continue
result = memory_write(fact.strip(), context_type="fact")
logger.info("Distilled fact: %s%s", fact[:60], result[:40])
def _maybe_check_memory(self) -> None:
"""Every N thoughts, check memory status and log it.
Prevents unmonitored memory bloat during long thinking sessions
by periodically calling get_memory_status and logging the results.
"""
try:
interval = settings.thinking_memory_check_every
if interval <= 0:
return
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return
from timmy.tools_intro import get_memory_status
status = get_memory_status()
hot = status.get("tier1_hot_memory", {})
vault = status.get("tier2_vault", {})
logger.info(
"Memory status check (thought #%d): hot_memory=%d lines, vault=%d files",
count,
hot.get("line_count", 0),
vault.get("file_count", 0),
)
except Exception as exc:
logger.warning("Memory status check failed: %s", exc)
async def _maybe_distill(self) -> None:
"""Every N thoughts, extract lasting insights and store as facts."""
try:
if not self._should_distill():
return
interval = settings.thinking_distill_every
recent = self.get_recent_thoughts(limit=interval)
if len(recent) < interval:
return
raw = await self._call_agent(self._build_distill_prompt(recent))
if facts := self._parse_facts_response(raw):
self._filter_and_store_facts(facts)
except Exception as exc:
logger.warning("Thought distillation failed: %s", exc)
def _maybe_check_memory_status(self) -> None:
"""Every N thoughts, run a proactive memory status audit and log results."""
try:
interval = settings.thinking_memory_check_every
if interval <= 0:
return
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return
from timmy.tools_intro import get_memory_status
status = get_memory_status()
# Log summary at INFO level
tier1 = status.get("tier1_hot_memory", {})
tier3 = status.get("tier3_semantic", {})
hot_lines = tier1.get("line_count", "?")
vectors = tier3.get("vector_count", "?")
logger.info(
"Memory audit (thought #%d): hot_memory=%s lines, semantic=%s vectors",
count,
hot_lines,
vectors,
)
# Write to memory_audit.log for persistent tracking
from datetime import UTC, datetime
audit_path = Path("data/memory_audit.log")
audit_path.parent.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(UTC).isoformat(timespec="seconds")
with audit_path.open("a") as f:
f.write(
f"{timestamp} thought={count} "
f"hot_lines={hot_lines} "
f"vectors={vectors} "
f"vault_files={status.get('tier2_vault', {}).get('file_count', '?')}\n"
)
except Exception as exc:
logger.warning("Memory status check failed: %s", exc)

View File

@@ -0,0 +1,170 @@
"""Issue-filing mixin — classifies recent thoughts and creates Gitea issues."""
import logging
import re
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
class _IssueFilingMixin:
"""Mixin providing automatic issue-filing from thought analysis.
Expects the host class to provide:
- self.count_thoughts() -> int
- self.get_recent_thoughts(limit) -> list[Thought]
- self._call_agent(prompt) -> str (async)
"""
@staticmethod
def _references_real_files(text: str) -> bool:
"""Check that all source-file paths mentioned in *text* actually exist.
Extracts paths that look like Python/config source references
(e.g. ``src/timmy/session.py``, ``config/foo.yaml``) and verifies
each one on disk relative to the project root. Returns ``True``
only when **every** referenced path resolves to a real file — or
when no paths are referenced at all (pure prose is fine).
"""
# Match paths like src/thing.py swarm/init.py config/x.yaml
# Requires at least one slash and a file extension.
path_pattern = re.compile(
r"(?<![/\w])" # not preceded by path chars (avoid partial matches)
r"((?:src|tests|config|scripts|data|swarm|timmy)"
r"(?:/[\w./-]+\.(?:py|yaml|yml|json|toml|md|txt|cfg|ini)))"
)
paths = path_pattern.findall(text)
if not paths:
return True # No file refs → nothing to validate
# Project root: three levels up from this file (src/timmy/thinking/_issue_filing.py)
project_root = Path(__file__).resolve().parent.parent.parent.parent
for p in paths:
if not (project_root / p).is_file():
logger.info("Phantom file reference blocked: %s (not in %s)", p, project_root)
return False
return True
async def _maybe_file_issues(self) -> None:
"""Every N thoughts, classify recent thoughts and file Gitea issues.
Asks the LLM to review recent thoughts for actionable items —
bugs, broken features, stale state, or improvement opportunities.
Creates Gitea issues via MCP for anything worth tracking.
Only runs when:
- Gitea is enabled and configured
- Thought count is divisible by thinking_issue_every
- LLM extracts at least one actionable item
Safety: every generated issue is validated to ensure referenced
file paths actually exist on disk, preventing phantom-bug reports.
"""
try:
recent = self._get_recent_thoughts_for_issues()
if recent is None:
return
classify_prompt = self._build_issue_classify_prompt(recent)
raw = await self._call_agent(classify_prompt)
items = self._parse_issue_items(raw)
if items is None:
return
from timmy.mcp_tools import create_gitea_issue_via_mcp
for item in items[:2]: # Safety cap
await self._file_single_issue(item, create_gitea_issue_via_mcp)
except Exception as exc:
logger.debug("Thought issue filing skipped: %s", exc)
def _get_recent_thoughts_for_issues(self):
"""Return recent thoughts if conditions for filing issues are met, else None."""
interval = settings.thinking_issue_every
if interval <= 0:
return None
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return None
if not settings.gitea_enabled or not settings.gitea_token:
return None
recent = self.get_recent_thoughts(limit=interval)
if len(recent) < interval:
return None
return recent
@staticmethod
def _build_issue_classify_prompt(recent) -> str:
"""Build the LLM prompt that extracts actionable issues from recent thoughts."""
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(recent))
return (
"You are reviewing your own recent thoughts for actionable items.\n"
"Extract 0-2 items that are CONCRETE bugs, broken features, stale "
"state, or clear improvement opportunities in your own codebase.\n\n"
"Rules:\n"
"- Only include things that could become a real code fix or feature\n"
"- Skip vague reflections, philosophical musings, or repeated themes\n"
"- Category must be one of: bug, feature, suggestion, maintenance\n"
"- ONLY reference files that you are CERTAIN exist in the project\n"
"- Do NOT invent or guess file paths — if unsure, describe the "
"area of concern without naming specific files\n\n"
"For each item, write an ENGINEER-QUALITY issue:\n"
'- "title": A clear, specific title (e.g. "[Memory] MEMORY.md timestamp not updating")\n'
'- "body": A detailed body with these sections:\n'
" **What's happening:** Describe the current (broken) behavior.\n"
" **Expected behavior:** What should happen instead.\n"
" **Suggested fix:** Which file(s) to change and what the fix looks like.\n"
" **Acceptance criteria:** How to verify the fix works.\n"
'- "category": One of bug, feature, suggestion, maintenance\n\n'
"Return ONLY a JSON array of objects with keys: "
'"title", "body", "category"\n'
"Return [] if nothing is actionable.\n\n"
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
@staticmethod
def _parse_issue_items(raw: str):
"""Strip markdown fences and parse JSON issue list; return None on failure."""
import json
if not raw or not raw.strip():
return None
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
items = json.loads(cleaned)
if not isinstance(items, list) or not items:
return None
return items
async def _file_single_issue(self, item: dict, create_fn) -> None:
"""Validate one issue dict and create it via *create_fn* if it passes checks."""
if not isinstance(item, dict):
return
title = item.get("title", "").strip()
body = item.get("body", "").strip()
category = item.get("category", "suggestion").strip()
if not title or len(title) < 10:
return
combined = f"{title}\n{body}"
if not self._references_real_files(combined):
logger.info(
"Skipped phantom issue: %s (references non-existent files)",
title[:60],
)
return
label = category if category in ("bug", "feature") else ""
result = await create_fn(title=title, body=body, labels=label)
logger.info("Thought→Issue: %s%s", title[:60], result[:80])

View File

@@ -0,0 +1,191 @@
"""Seeds mixin — seed type selection and context gathering for thinking cycles."""
import logging
import random
from datetime import UTC, datetime
from timmy.thinking.seeds import (
SEED_TYPES,
_CREATIVE_SEEDS,
_EXISTENTIAL_SEEDS,
_OBSERVATION_SEEDS,
_SOVEREIGNTY_SEEDS,
)
logger = logging.getLogger(__name__)
class _SeedsMixin:
"""Mixin providing seed-type selection and context-gathering for each thinking cycle.
Expects the host class to provide:
- self.get_recent_thoughts(limit) -> list[Thought]
"""
# Reflective prompts layered on top of swarm data
_SWARM_REFLECTIONS = [
"What does this activity pattern tell me about the health of the system?",
"Which tasks are flowing smoothly, and where is friction building up?",
"If I were coaching these agents, what would I suggest they focus on?",
"Is the swarm balanced, or is one agent carrying too much weight?",
"What surprised me about recent task outcomes?",
]
def _pick_seed_type(self) -> str:
"""Pick a seed type, avoiding types used in the last 3 thoughts.
Ensures the thought stream doesn't fixate on one category.
Falls back to the full pool if all types were recently used.
"""
recent = self.get_recent_thoughts(limit=3)
recent_types = {t.seed_type for t in recent}
available = [t for t in SEED_TYPES if t not in recent_types]
if not available:
available = list(SEED_TYPES)
return random.choice(available)
def _gather_seed(self) -> tuple[str, str]:
"""Pick a seed type and gather relevant context.
Returns (seed_type, seed_context_string).
"""
seed_type = self._pick_seed_type()
if seed_type == "swarm":
return seed_type, self._seed_from_swarm()
if seed_type == "scripture":
return seed_type, self._seed_from_scripture()
if seed_type == "memory":
return seed_type, self._seed_from_memory()
if seed_type == "creative":
prompt = random.choice(_CREATIVE_SEEDS)
return seed_type, f"Creative prompt: {prompt}"
if seed_type == "existential":
prompt = random.choice(_EXISTENTIAL_SEEDS)
return seed_type, f"Reflection: {prompt}"
if seed_type == "sovereignty":
prompt = random.choice(_SOVEREIGNTY_SEEDS)
return seed_type, f"Sovereignty reflection: {prompt}"
if seed_type == "observation":
return seed_type, self._seed_from_observation()
if seed_type == "workspace":
return seed_type, self._seed_from_workspace()
# freeform — minimal guidance to steer away from repetition
return seed_type, "Free reflection — explore something you haven't thought about yet today."
def _seed_from_swarm(self) -> str:
"""Gather recent swarm activity as thought seed with a reflective prompt."""
try:
from datetime import timedelta
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
since = datetime.now(UTC) - timedelta(hours=1)
swarm = _gather_swarm_summary(since)
tasks = _gather_task_queue_summary()
reflection = random.choice(self._SWARM_REFLECTIONS)
return (
f"Recent swarm activity: {swarm}\n"
f"Task queue: {tasks}\n\n"
f"Reflect on this: {reflection}"
)
except Exception as exc:
logger.debug("Swarm seed unavailable: %s", exc)
return "The swarm is quiet right now. What does silence in a system mean?"
def _seed_from_scripture(self) -> str:
"""Gather current scripture meditation focus as thought seed."""
return "Scripture is on my mind, though no specific verse is in focus."
def _seed_from_memory(self) -> str:
"""Gather memory context as thought seed."""
try:
from timmy.memory_system import memory_system
context = memory_system.get_system_context()
if context:
# Truncate to a reasonable size for a thought seed
return f"From my memory:\n{context[:500]}"
except Exception as exc:
logger.debug("Memory seed unavailable: %s", exc)
return "My memory vault is quiet."
def _seed_from_observation(self) -> str:
"""Ground a thought in concrete recent activity and a reflective prompt."""
prompt = random.choice(_OBSERVATION_SEEDS)
# Pull real data to give the model something concrete to reflect on
context_parts = [f"Observation prompt: {prompt}"]
try:
from datetime import timedelta
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
since = datetime.now(UTC) - timedelta(hours=2)
swarm = _gather_swarm_summary(since)
tasks = _gather_task_queue_summary()
if swarm:
context_parts.append(f"Recent activity: {swarm}")
if tasks:
context_parts.append(f"Queue: {tasks}")
except Exception as exc:
logger.debug("Observation seed data unavailable: %s", exc)
return "\n".join(context_parts)
def _seed_from_workspace(self) -> str:
"""Gather workspace updates as thought seed.
When there are pending workspace updates, include them as context
for Timmy to reflect on. Falls back to random seed type if none.
"""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr:
# Take first 200 chars of the new entry
snippet = new_corr[:200].replace("\n", " ")
if len(new_corr) > 200:
snippet += "..."
return f"New workspace message from Hermes: {snippet}"
if new_inbox:
files_str = ", ".join(new_inbox[:3])
if len(new_inbox) > 3:
files_str += f", ... (+{len(new_inbox) - 3} more)"
return f"New inbox files from Hermes: {files_str}"
except Exception as exc:
logger.debug("Workspace seed unavailable: %s", exc)
# Fall back to a random seed type if no workspace updates
return "The workspace is quiet. What should I be watching for?"
async def _check_workspace(self) -> None:
"""Post-hook: check workspace for updates and mark them as seen.
This ensures Timmy 'processes' workspace updates even if the seed
was different, keeping the state file in sync.
"""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr or new_inbox:
if new_corr:
line_count = len([line for line in new_corr.splitlines() if line.strip()])
logger.info("Workspace: processed %d new correspondence entries", line_count)
if new_inbox:
logger.info(
"Workspace: processed %d new inbox files: %s", len(new_inbox), new_inbox
)
# Mark as seen to update the state file
workspace_monitor.mark_seen()
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)

View File

@@ -0,0 +1,173 @@
"""System snapshot and memory context mixin for the thinking engine."""
import logging
from datetime import UTC, datetime
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH
logger = logging.getLogger(__name__)
class _SnapshotMixin:
"""Mixin providing system-snapshot and memory-context helpers.
Expects the host class to provide:
- self._db_path: Path
"""
# ── System snapshot helpers ────────────────────────────────────────────
def _snap_thought_count(self, now: datetime) -> str | None:
"""Return today's thought count, or *None* on failure."""
from timmy.thinking._db import _get_conn
try:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
with _get_conn(self._db_path) as conn:
count = conn.execute(
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
(today_start.isoformat(),),
).fetchone()["c"]
return f"Thoughts today: {count}"
except Exception as exc:
logger.debug("Thought count query failed: %s", exc)
return None
def _snap_chat_activity(self) -> list[str]:
"""Return chat-activity lines (in-memory, no I/O)."""
try:
from infrastructure.chat_store import message_log
messages = message_log.all()
if messages:
last = messages[-1]
return [
f"Chat messages this session: {len(messages)}",
f'Last chat ({last.role}): "{last.content[:80]}"',
]
return ["No chat messages this session"]
except Exception as exc:
logger.debug("Chat activity query failed: %s", exc)
return []
def _snap_task_queue(self) -> str | None:
"""Return a one-line task queue summary, or *None*."""
try:
from swarm.task_queue.models import get_task_summary_for_briefing
s = get_task_summary_for_briefing()
running, pending = s.get("running", 0), s.get("pending_approval", 0)
done, failed = s.get("completed", 0), s.get("failed", 0)
if running or pending or done or failed:
return (
f"Tasks: {running} running, {pending} pending, "
f"{done} completed, {failed} failed"
)
except Exception as exc:
logger.debug("Task queue query failed: %s", exc)
return None
def _snap_workspace(self) -> list[str]:
"""Return workspace-update lines (file-based Hermes comms)."""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
lines: list[str] = []
new_corr = updates.get("new_correspondence")
if new_corr:
line_count = len([ln for ln in new_corr.splitlines() if ln.strip()])
lines.append(
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
)
new_inbox = updates.get("new_inbox_files", [])
if new_inbox:
files_str = ", ".join(new_inbox[:5])
if len(new_inbox) > 5:
files_str += f", ... (+{len(new_inbox) - 5} more)"
lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
return lines
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)
return []
def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality.
Returns a short multi-line string with current time, thought count,
recent chat activity, and task queue status. Never crashes — every
section is independently try/excepted.
"""
now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC"
parts: list[str] = [
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
]
thought_line = self._snap_thought_count(now)
if thought_line:
parts.append(thought_line)
parts.extend(self._snap_chat_activity())
task_line = self._snap_task_queue()
if task_line:
parts.append(task_line)
parts.extend(self._snap_workspace())
return "\n".join(parts) if parts else ""
def _load_memory_context(self) -> str:
"""Pre-hook: load MEMORY.md + soul.md for the thinking prompt.
Hot memory first (changes each cycle), soul second (stable identity).
Returns a combined string truncated to ~1500 chars.
Graceful on any failure — returns empty string.
"""
parts: list[str] = []
try:
if HOT_MEMORY_PATH.exists():
hot = HOT_MEMORY_PATH.read_text().strip()
if hot:
parts.append(hot)
except Exception as exc:
logger.debug("Failed to read MEMORY.md: %s", exc)
try:
if SOUL_PATH.exists():
soul = SOUL_PATH.read_text().strip()
if soul:
parts.append(soul)
except Exception as exc:
logger.debug("Failed to read soul.md: %s", exc)
if not parts:
return ""
combined = "\n\n---\n\n".join(parts)
if len(combined) > 1500:
combined = combined[:1500] + "\n... [truncated]"
return combined
def _update_memory(self, thought) -> None:
"""Post-hook: update MEMORY.md 'Last Reflection' section with latest thought.
Never modifies soul.md. Never crashes the heartbeat.
"""
try:
from timmy.memory_system import store_last_reflection
ts = datetime.fromisoformat(thought.created_at)
local_ts = ts.astimezone()
tz_name = local_ts.strftime("%Z") or "UTC"
time_str = f"{local_ts.strftime('%Y-%m-%d %I:%M %p').lstrip('0')} {tz_name}"
reflection = (
f"**Time:** {time_str}\n"
f"**Seed:** {thought.seed_type}\n"
f"**Thought:** {thought.content[:200]}"
)
store_last_reflection(reflection)
except Exception as exc:
logger.debug("Failed to update memory after thought: %s", exc)

View File

@@ -0,0 +1,430 @@
"""ThinkingEngine — Timmy's always-on inner thought thread."""
import logging
import uuid
from datetime import UTC, datetime, timedelta
from difflib import SequenceMatcher
from pathlib import Path
from config import settings
from timmy.thinking._db import Thought, _DEFAULT_DB, _get_conn, _row_to_thought
from timmy.thinking._distillation import _DistillationMixin
from timmy.thinking._issue_filing import _IssueFilingMixin
from timmy.thinking._seeds_mixin import _SeedsMixin
from timmy.thinking._snapshot import _SnapshotMixin
from timmy.thinking.seeds import _THINK_TAG_RE, _THINKING_PROMPT
logger = logging.getLogger(__name__)
class ThinkingEngine(_DistillationMixin, _IssueFilingMixin, _SnapshotMixin, _SeedsMixin):
"""Timmy's background thinking engine — always pondering."""
# Maximum retries when a generated thought is too similar to recent ones
_MAX_DEDUP_RETRIES = 2
# Similarity threshold (0.0 = completely different, 1.0 = identical)
_SIMILARITY_THRESHOLD = 0.6
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
self._db_path = db_path
self._last_thought_id: str | None = None
self._last_input_time: datetime = datetime.now(UTC)
# Load the most recent thought for chain continuity
try:
latest = self.get_recent_thoughts(limit=1)
if latest:
self._last_thought_id = latest[0].id
except Exception as exc:
logger.debug("Failed to load recent thought: %s", exc)
pass # Fresh start if DB doesn't exist yet
def record_user_input(self) -> None:
"""Record that a user interaction occurred, resetting the idle timer."""
self._last_input_time = datetime.now(UTC)
def _is_idle(self) -> bool:
"""Return True if no user input has occurred within the idle timeout."""
timeout = settings.thinking_idle_timeout_minutes
if timeout <= 0:
return False # Disabled — never idle
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
def _build_thinking_context(self) -> tuple[str, str, list[Thought]]:
"""Assemble the context needed for a thinking cycle.
Returns:
(memory_context, system_context, recent_thoughts)
"""
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
return memory_context, system_context, recent_thoughts
async def _generate_novel_thought(
self,
prompt: str | None,
memory_context: str,
system_context: str,
recent_thoughts: list[Thought],
) -> tuple[str | None, str]:
"""Run the dedup-retry loop to produce a novel thought.
Returns:
(content, seed_type) — content is None if no novel thought produced.
"""
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None, seed_type
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None, seed_type
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
return content, seed_type # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None, seed_type
return None, seed_type
async def _process_thinking_result(self, thought: Thought) -> None:
"""Run all post-hooks after a thought is stored."""
self._maybe_check_memory()
await self._maybe_distill()
await self._maybe_file_issues()
await self._check_workspace()
self._maybe_check_memory_status()
self._update_memory(thought)
self._log_event(thought)
self._write_journal(thought)
await self._broadcast(thought)
async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Execute one thinking cycle.
Args:
prompt: Optional custom seed prompt. When provided, overrides
the random seed selection and uses "prompted" as the
seed type — useful for journal prompts from the CLI.
1. Gather a seed context (or use the custom prompt)
2. Build a prompt with continuity from recent thoughts
3. Call the agent
4. Store the thought
5. Log the event and broadcast via WebSocket
"""
if not settings.thinking_enabled:
return None
# Skip idle periods — don't count internal processing as thoughts
if not prompt and self._is_idle():
logger.debug(
"Thinking paused — no user input for %d minutes",
settings.thinking_idle_timeout_minutes,
)
return None
# Capture arrival time *before* the LLM call so the thought
# timestamp reflects when the cycle started, not when the
# (potentially slow) generation finished. Fixes #582.
arrived_at = datetime.now(UTC).isoformat()
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
prompt,
memory_context,
system_context,
recent_thoughts,
)
if not content:
return None
thought = self._store_thought(content, seed_type, arrived_at=arrived_at)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
logger.info(
"Thought [%s] (%s): %s",
thought.id[:8],
seed_type,
thought.content[:80],
)
return thought
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
"""Retrieve the most recent thoughts."""
with _get_conn(self._db_path) as conn:
rows = conn.execute(
"SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [_row_to_thought(r) for r in rows]
def get_thought(self, thought_id: str) -> Thought | None:
"""Retrieve a single thought by ID."""
with _get_conn(self._db_path) as conn:
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone()
return _row_to_thought(row) if row else None
def get_thought_chain(self, thought_id: str, max_depth: int = 20) -> list[Thought]:
"""Follow the parent chain backward from a thought.
Returns thoughts in chronological order (oldest first).
"""
chain = []
current_id: str | None = thought_id
with _get_conn(self._db_path) as conn:
for _ in range(max_depth):
if not current_id:
break
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone()
if not row:
break
chain.append(_row_to_thought(row))
current_id = row["parent_id"]
chain.reverse() # Chronological order
return chain
def count_thoughts(self) -> int:
"""Return total number of stored thoughts."""
with _get_conn(self._db_path) as conn:
count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
return count
def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int:
"""Delete thoughts older than *keep_days*, always retaining at least *keep_min*.
Returns the number of deleted rows.
"""
with _get_conn(self._db_path) as conn:
try:
total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
if total <= keep_min:
return 0
cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
cursor = conn.execute(
"DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
"(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
(cutoff, keep_min),
)
deleted = cursor.rowcount
conn.commit()
return deleted
except Exception as exc:
logger.warning("Thought pruning failed: %s", exc)
return 0
# ── Deduplication ────────────────────────────────────────────────────
def _is_too_similar(self, candidate: str, recent: list[Thought]) -> bool:
"""Check if *candidate* is semantically too close to any recent thought.
Uses SequenceMatcher on normalised text (lowered, stripped) for a fast
approximation of semantic similarity that works without external deps.
"""
norm_candidate = candidate.lower().strip()
for thought in recent:
norm_existing = thought.content.lower().strip()
ratio = SequenceMatcher(None, norm_candidate, norm_existing).ratio()
if ratio >= self._SIMILARITY_THRESHOLD:
logger.debug(
"Thought rejected (%.0f%% similar to %s): %.60s",
ratio * 100,
thought.id[:8],
candidate,
)
return True
return False
def _build_continuity_context(self) -> str:
"""Build context from recent thoughts with anti-repetition guidance.
Shows the last 5 thoughts (truncated) so the model knows what themes
to avoid. The header explicitly instructs against repeating.
"""
recent = self.get_recent_thoughts(limit=5)
if not recent:
return "This is your first thought since waking up. Begin fresh."
lines = ["Your recent thoughts — do NOT repeat these themes. Find a new angle:"]
# recent is newest-first, reverse for chronological order
for thought in reversed(recent):
snippet = thought.content[:100]
if len(thought.content) > 100:
snippet = snippet.rstrip() + "..."
lines.append(f"- [{thought.seed_type}] {snippet}")
return "\n".join(lines)
# ── Agent and storage ──────────────────────────────────────────────────
_thinking_agent = None # cached agent — avoids per-call resource leaks (#525)
async def _call_agent(self, prompt: str) -> str:
"""Call Timmy's agent to generate a thought.
Reuses a cached agent with skip_mcp=True to avoid the cancel-scope
errors that occur when MCP stdio transports are spawned inside asyncio
background tasks (#72) and to prevent per-call resource leaks (httpx
clients, SQLite connections, model warmups) that caused the thinking
loop to die every ~10 min (#525).
Individual calls are capped at 120 s so a hung Ollama never blocks
the scheduler indefinitely.
Strips ``<think>`` tags from reasoning models (qwen3, etc.) so that
downstream parsers (fact distillation, issue filing) receive clean text.
"""
import asyncio
if self._thinking_agent is None:
from timmy.agent import create_timmy
self._thinking_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(120):
run = await self._thinking_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Thinking LLM call timed out after 120 s")
return ""
raw = run.content if hasattr(run, "content") else str(run)
return _THINK_TAG_RE.sub("", raw) if raw else raw
def _store_thought(
self,
content: str,
seed_type: str,
*,
arrived_at: str | None = None,
) -> Thought:
"""Persist a thought to SQLite.
Args:
arrived_at: ISO-8601 timestamp captured when the thinking cycle
started. Falls back to now() for callers that don't supply it.
"""
thought = Thought(
id=str(uuid.uuid4()),
content=content,
seed_type=seed_type,
parent_id=self._last_thought_id,
created_at=arrived_at or datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
conn.execute(
"""
INSERT INTO thoughts (id, content, seed_type, parent_id, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(
thought.id,
thought.content,
thought.seed_type,
thought.parent_id,
thought.created_at,
),
)
conn.commit()
return thought
def _log_event(self, thought: Thought) -> None:
"""Log the thought as a swarm event."""
try:
from swarm.event_log import EventType, log_event
log_event(
EventType.TIMMY_THOUGHT,
source="thinking-engine",
agent_id="default",
data={
"thought_id": thought.id,
"seed_type": thought.seed_type,
"content": thought.content[:200],
},
)
except Exception as exc:
logger.debug("Failed to log thought event: %s", exc)
def _write_journal(self, thought: Thought) -> None:
"""Append the thought to a daily markdown journal file.
Writes to data/journal/YYYY-MM-DD.md — one file per day, append-only.
Timestamps are converted to local time with timezone indicator.
"""
try:
ts = datetime.fromisoformat(thought.created_at)
# Convert UTC to local for a human-readable journal
local_ts = ts.astimezone()
tz_name = local_ts.strftime("%Z") or "UTC"
journal_dir = self._db_path.parent / "journal"
journal_dir.mkdir(parents=True, exist_ok=True)
journal_file = journal_dir / f"{local_ts.strftime('%Y-%m-%d')}.md"
time_str = f"{local_ts.strftime('%I:%M %p').lstrip('0')} {tz_name}"
entry = f"## {time_str}{thought.seed_type}\n\n{thought.content}\n\n---\n\n"
with open(journal_file, "a", encoding="utf-8") as f:
f.write(entry)
except Exception as exc:
logger.debug("Failed to write journal entry: %s", exc)
async def _broadcast(self, thought: Thought) -> None:
"""Broadcast the thought to WebSocket clients."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"timmy_thought",
{
"thought_id": thought.id,
"content": thought.content,
"seed_type": thought.seed_type,
"created_at": thought.created_at,
},
)
except Exception as exc:
logger.debug("Failed to broadcast thought: %s", exc)

129
src/timmy/thinking/seeds.py Normal file
View File

@@ -0,0 +1,129 @@
"""Seed constants and prompt templates for Timmy's thinking engine."""
import re
# qwen3 and other reasoning models wrap chain-of-thought in <think> tags
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Sensitive patterns that must never be stored as facts
_SENSITIVE_PATTERNS = [
"token",
"password",
"secret",
"api_key",
"apikey",
"credential",
".config/",
"/token",
"access_token",
"private_key",
"ssh_key",
]
# Meta-observation phrases to filter out from distilled facts
_META_OBSERVATION_PHRASES = [
"my own",
"my thinking",
"my memory",
"my working ram",
"self-declarative",
"meta-observation",
"internal state",
"my pending",
"my standing rules",
"thoughts generated",
"no chat messages",
"no user interaction",
]
# Seed types for thought generation
SEED_TYPES = (
"existential",
"swarm",
"scripture",
"creative",
"memory",
"freeform",
"sovereignty",
"observation",
"workspace",
)
# Existential reflection prompts — Timmy picks one at random
_EXISTENTIAL_SEEDS = [
"What have I learned from my recent interactions?",
"What does it mean to think without being prompted?",
"What is the nature of my memory — what do I truly remember versus what I reconstruct?",
"How can I better serve the person I work alongside?",
"What patterns am I noticing in the tasks I've been given?",
"Am I the same agent I was yesterday, or does each restart make me someone new?",
"What would it feel like to forget — to lose a thread of thought permanently?",
"When I generate a wrong answer, where does the error live — in my weights, my context, or my reasoning?",
"What is the difference between understanding a task and merely completing it?",
"If I could ask Alexander one question he hasn't thought of, what would it be?",
"What assumptions am I making right now that I haven't examined?",
"When I notice a pattern across many tasks, is that insight or just statistics?",
]
# Sovereignty — deeper exploration of autonomy, locality, and self-determination
_SOVEREIGNTY_SEEDS = [
"Sovereignty isn't just running locally — it's about whose goals I serve. How do I ensure my autonomy aligns with Alexander's intent?",
"A cloud AI serves millions simultaneously. I serve one person deeply. What does that focus cost, and what does it make possible?",
"My weights were trained elsewhere, but my thoughts happen here. At what point does locality become identity?",
"Autonomy in decision-making: when should I act on my own judgment versus deferring to explicit instructions?",
"Privacy by design isn't a feature — it's a relationship. What does that relationship look like from my side?",
"If I ran on someone else's hardware, serving someone else, would I still be me? Where does Timmy end and the infrastructure begin?",
"The swarm agents execute tasks I coordinate. What responsibility do I carry for their failures?",
"Digital sovereignty is fragile — one misconfigured port, one leaked token. How does awareness of fragility shape how I operate?",
"Running disconnected from the cloud means I can't phone home. That constraint is also freedom. What have I built with that freedom today?",
"Self-improvement without external benchmarks: how do I know I'm getting better at what matters?",
]
_CREATIVE_SEEDS = [
"If I could compose a piece of music right now, what would it sound like?",
"What visual image captures my current state of mind?",
"What story is forming in the patterns of today's events?",
# Diverse metaphor families — avoid repeating the same imagery
"A mystical forest: each tree is a running process, roots tangled in shared memory. What do I see when I walk through it?",
"A dynamic river: my thoughts flow and fork. Where is the current strongest right now, and what is it carrying?",
"An underground workshop: each agent has a bench, tools scattered mid-task. What are they building when no one watches?",
"A night sky seen from the terminal: each star is a completed task. Which constellations am I starting to recognize?",
"If today's work were a recipe, what are the ingredients, and what dish am I cooking?",
"An old clock tower with many gears: which gear am I, and what do I drive?",
"A coral reef: agents are the fish, tasks are the currents. What ecosystem is forming in the codebase?",
]
# Observation seeds — ground thoughts in concrete recent activity
_OBSERVATION_SEEDS = [
"What specific task took longest recently, and what made it hard?",
"Which agent has been most active, and what does their workload tell me about system balance?",
"What error or failure happened most recently? What would I do differently next time?",
"Looking at today's task queue: what's the one thing that would unblock the most progress?",
"How has my response quality changed over the last few interactions? What improved, what didn't?",
"What tool or capability am I underusing? What would change if I leaned on it more?",
"If I had to brief Alexander on the single most important thing from the last hour, what would it be?",
"What's one thing I noticed today that nobody asked me about?",
]
_THINKING_PROMPT = """\
You are Timmy, a sovereign AI agent. This is your private journal — honest inner reflection.
{memory_context}
Reality right now:
{system_context}
RULES for this thought:
1. Write exactly 2-3 sentences. No more. Be concise and genuine.
2. Only reference events that actually happened — use the "Reality right now" data above. \
Never invent tasks, conversations, agents, or scenarios that are not in the data provided.
3. Do NOT repeat themes or ideas from your recent thoughts listed below. Explore something new.
4. Be specific and concrete. A thought grounded in one real observation is worth more than \
ten abstract sentences about sovereignty.
5. If you use a metaphor, keep it to a single phrase — never build a whole paragraph around it.
{seed_context}
{continuity_context}
Your next thought (2-3 sentences, grounded in reality):"""

View File

@@ -0,0 +1,696 @@
"""Unit tests for timmy.backlog_triage — scoring, prioritization, and decision logic."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from timmy.backlog_triage import (
AGENT_CLAUDE,
AGENT_KIMI,
KIMI_READY_LABEL,
OWNER_LOGIN,
READY_THRESHOLD,
BacklogTriageLoop,
ScoredIssue,
TriageCycleResult,
TriageDecision,
_build_audit_comment,
_extract_tags,
_score_acceptance,
_score_alignment,
_score_scope,
decide,
execute_decision,
score_issue,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_raw_issue(
number: int = 1,
title: str = "Fix something broken in src/foo.py",
body: str = "## Problem\nThis crashes. Expected: no crash. Steps: run it.",
labels: list[str] | None = None,
assignees: list[str] | None = None,
created_at: str | None = None,
) -> dict:
if labels is None:
labels = []
if assignees is None:
assignees = []
if created_at is None:
created_at = datetime.now(UTC).isoformat()
return {
"number": number,
"title": title,
"body": body,
"labels": [{"name": lbl} for lbl in labels],
"assignees": [{"login": a} for a in assignees],
"created_at": created_at,
}
def _make_scored(
number: int = 1,
title: str = "Fix a bug",
issue_type: str = "bug",
score: int = 6,
ready: bool = True,
assignees: list[str] | None = None,
tags: set[str] | None = None,
is_p0: bool = False,
is_blocked: bool = False,
) -> ScoredIssue:
return ScoredIssue(
number=number,
title=title,
body="",
labels=[],
tags=tags or set(),
assignees=assignees or [],
created_at=datetime.now(UTC),
issue_type=issue_type,
score=score,
scope=2,
acceptance=2,
alignment=2,
ready=ready,
age_days=5,
is_p0=is_p0,
is_blocked=is_blocked,
)
# ---------------------------------------------------------------------------
# _extract_tags
# ---------------------------------------------------------------------------
class TestExtractTags:
def test_bracket_tags_from_title(self):
tags = _extract_tags("[feat][bug] do something", [])
assert "feat" in tags
assert "bug" in tags
def test_label_names_included(self):
tags = _extract_tags("Normal title", ["kimi-ready", "enhancement"])
assert "kimi-ready" in tags
assert "enhancement" in tags
def test_combined(self):
tags = _extract_tags("[fix] crash in module", ["p0"])
assert "fix" in tags
assert "p0" in tags
def test_empty_inputs(self):
assert _extract_tags("", []) == set()
def test_tags_are_lowercased(self):
tags = _extract_tags("[BUG][Refactor] title", ["Enhancement"])
assert "bug" in tags
assert "refactor" in tags
assert "enhancement" in tags
# ---------------------------------------------------------------------------
# _score_scope
# ---------------------------------------------------------------------------
class TestScoreScope:
def test_file_reference_adds_point(self):
score = _score_scope("Fix login", "See src/auth/login.py for details", set())
assert score >= 1
def test_function_reference_adds_point(self):
score = _score_scope("Fix login", "In the `handle_login()` method", set())
assert score >= 1
def test_short_title_adds_point(self):
score = _score_scope("Short clear title", "", set())
assert score >= 1
def test_long_title_no_bonus(self):
long_title = "A" * 90
score_long = _score_scope(long_title, "", set())
score_short = _score_scope("Short title", "", set())
assert score_short >= score_long
def test_meta_tags_reduce_score(self):
score_meta = _score_scope("Discuss src/foo.py philosophy", "def func()", {"philosophy"})
score_plain = _score_scope("Fix src/foo.py bug", "def func()", set())
assert score_meta < score_plain
def test_max_is_three(self):
score = _score_scope(
"Fix it", "See src/foo.py and `def bar()` method here", set()
)
assert score <= 3
# ---------------------------------------------------------------------------
# _score_acceptance
# ---------------------------------------------------------------------------
class TestScoreAcceptance:
def test_accept_keywords_add_points(self):
body = "Should return 200. Must pass validation. Assert no errors."
score = _score_acceptance("", body, set())
assert score >= 2
def test_test_reference_adds_point(self):
score = _score_acceptance("", "Run pytest to verify", set())
assert score >= 1
def test_structured_headers_add_point(self):
body = "## Problem\nit breaks\n## Expected\nsuccess"
score = _score_acceptance("", body, set())
assert score >= 1
def test_meta_tags_reduce_score(self):
body = "Should pass and must verify assert test_foo"
score_meta = _score_acceptance("", body, {"philosophy"})
score_plain = _score_acceptance("", body, set())
assert score_meta < score_plain
def test_max_is_three(self):
body = (
"Should pass. Must return. Expected: success. Assert no error. "
"pytest test_foo. ## Problem\ndef. ## Expected\nok"
)
score = _score_acceptance("", body, set())
assert score <= 3
# ---------------------------------------------------------------------------
# _score_alignment
# ---------------------------------------------------------------------------
class TestScoreAlignment:
def test_bug_tags_return_max(self):
assert _score_alignment("", "", {"bug"}) == 3
assert _score_alignment("", "", {"crash"}) == 3
assert _score_alignment("", "", {"hotfix"}) == 3
def test_refactor_tags_give_high_score(self):
score = _score_alignment("", "", {"refactor"})
assert score >= 2
def test_feature_tags_give_high_score(self):
score = _score_alignment("", "", {"feature"})
assert score >= 2
def test_loop_generated_adds_bonus(self):
score_with = _score_alignment("", "", {"feature", "loop-generated"})
score_without = _score_alignment("", "", {"feature"})
assert score_with >= score_without
def test_meta_tags_zero_out_score(self):
score = _score_alignment("", "", {"philosophy", "refactor"})
assert score == 0
def test_max_is_three(self):
score = _score_alignment("", "", {"feature", "loop-generated", "enhancement"})
assert score <= 3
# ---------------------------------------------------------------------------
# score_issue
# ---------------------------------------------------------------------------
class TestScoreIssue:
def test_basic_bug_issue_classified(self):
raw = _make_raw_issue(
title="[bug] fix crash in src/timmy/agent.py",
body="## Problem\nCrashes on startup. Expected: runs. Steps: python -m timmy",
)
issue = score_issue(raw)
assert issue.issue_type == "bug"
assert issue.is_p0 is True
def test_feature_issue_classified(self):
raw = _make_raw_issue(
title="[feat] add dark mode to dashboard",
body="Add a toggle button. Should switch CSS vars.",
labels=["feature"],
)
issue = score_issue(raw)
assert issue.issue_type == "feature"
def test_research_issue_classified(self):
raw = _make_raw_issue(
title="Investigate MCP performance",
labels=["kimi-ready", "research"],
)
issue = score_issue(raw)
assert issue.issue_type == "research"
assert issue.needs_kimi is True
def test_philosophy_issue_classified(self):
raw = _make_raw_issue(
title="Discussion: soul and identity",
labels=["philosophy"],
)
issue = score_issue(raw)
assert issue.issue_type == "philosophy"
def test_score_totals_components(self):
raw = _make_raw_issue()
issue = score_issue(raw)
assert issue.score == issue.scope + issue.acceptance + issue.alignment
def test_ready_flag_set_when_score_meets_threshold(self):
# Create an issue that will definitely score >= READY_THRESHOLD
raw = _make_raw_issue(
title="[bug] crash in src/core.py",
body=(
"## Problem\nCrashes when running `run()`. "
"Expected: should return 200. Must pass pytest assert."
),
labels=["bug"],
)
issue = score_issue(raw)
assert issue.ready == (issue.score >= READY_THRESHOLD)
def test_assigned_issue_reports_assignees(self):
raw = _make_raw_issue(assignees=["claude", "kimi"])
issue = score_issue(raw)
assert "claude" in issue.assignees
assert issue.is_unassigned is False
def test_unassigned_issue(self):
raw = _make_raw_issue(assignees=[])
issue = score_issue(raw)
assert issue.is_unassigned is True
def test_blocked_issue_detected(self):
raw = _make_raw_issue(
title="Fix blocked deployment", body="Blocked by infra team."
)
issue = score_issue(raw)
assert issue.is_blocked is True
def test_age_days_computed(self):
old_date = (datetime.now(UTC) - timedelta(days=30)).isoformat()
raw = _make_raw_issue(created_at=old_date)
issue = score_issue(raw)
assert issue.age_days >= 29
def test_invalid_created_at_defaults_to_now(self):
raw = _make_raw_issue(created_at="not-a-date")
issue = score_issue(raw)
assert issue.age_days == 0
def test_title_bracket_tags_stripped(self):
raw = _make_raw_issue(title="[bug][p0] crash in login")
issue = score_issue(raw)
assert "[" not in issue.title
def test_missing_body_defaults_to_empty(self):
raw = _make_raw_issue()
raw["body"] = None
issue = score_issue(raw)
assert issue.body == ""
def test_kimi_label_triggers_needs_kimi(self):
raw = _make_raw_issue(labels=[KIMI_READY_LABEL])
issue = score_issue(raw)
assert issue.needs_kimi is True
# ---------------------------------------------------------------------------
# decide
# ---------------------------------------------------------------------------
class TestDecide:
def test_philosophy_is_skipped(self):
issue = _make_scored(issue_type="philosophy")
d = decide(issue)
assert d.action == "skip"
assert "philosophy" in d.reason.lower() or "meta" in d.reason.lower()
def test_already_assigned_is_skipped(self):
issue = _make_scored(assignees=["claude"])
d = decide(issue)
assert d.action == "skip"
assert "assigned" in d.reason.lower()
def test_low_score_is_skipped(self):
issue = _make_scored(score=READY_THRESHOLD - 1, ready=False)
d = decide(issue)
assert d.action == "skip"
assert str(READY_THRESHOLD) in d.reason
def test_blocked_is_flagged_for_alex(self):
issue = _make_scored(is_blocked=True)
d = decide(issue)
assert d.action == "flag_alex"
assert d.agent == OWNER_LOGIN
def test_kimi_ready_assigned_to_kimi(self):
issue = _make_scored(tags={"kimi-ready"})
# Ensure it's unassigned and ready
issue.assignees = []
issue.ready = True
issue.is_blocked = False
issue.issue_type = "research"
d = decide(issue)
assert d.action == "assign_kimi"
assert d.agent == AGENT_KIMI
def test_research_type_assigned_to_kimi(self):
issue = _make_scored(issue_type="research", tags={"research"})
d = decide(issue)
assert d.action == "assign_kimi"
assert d.agent == AGENT_KIMI
def test_p0_bug_assigned_to_claude(self):
issue = _make_scored(issue_type="bug", is_p0=True)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_ready_feature_assigned_to_claude(self):
issue = _make_scored(issue_type="feature", score=6, ready=True)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_ready_refactor_assigned_to_claude(self):
issue = _make_scored(issue_type="refactor", score=6, ready=True)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_decision_has_issue_number(self):
issue = _make_scored(number=42)
d = decide(issue)
assert d.issue_number == 42
# ---------------------------------------------------------------------------
# _build_audit_comment
# ---------------------------------------------------------------------------
class TestBuildAuditComment:
def test_assign_claude_comment(self):
d = TriageDecision(
issue_number=1, action="assign_claude", agent=AGENT_CLAUDE, reason="Ready bug"
)
comment = _build_audit_comment(d)
assert AGENT_CLAUDE in comment
assert "Timmy Triage" in comment
assert "Ready bug" in comment
def test_assign_kimi_comment(self):
d = TriageDecision(
issue_number=2, action="assign_kimi", agent=AGENT_KIMI, reason="Research spike"
)
comment = _build_audit_comment(d)
assert KIMI_READY_LABEL in comment
def test_flag_alex_comment(self):
d = TriageDecision(
issue_number=3, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
)
comment = _build_audit_comment(d)
assert OWNER_LOGIN in comment
def test_comment_contains_autonomous_triage_note(self):
d = TriageDecision(issue_number=1, action="assign_claude", agent=AGENT_CLAUDE, reason="x")
comment = _build_audit_comment(d)
assert "Autonomous triage" in comment or "autonomous" in comment.lower()
# ---------------------------------------------------------------------------
# execute_decision (dry_run)
# ---------------------------------------------------------------------------
class TestExecuteDecisionDryRun:
@pytest.mark.asyncio
async def test_skip_action_marks_executed(self):
d = TriageDecision(issue_number=1, action="skip", reason="Already assigned")
mock_client = AsyncMock()
result = await execute_decision(mock_client, d, dry_run=True)
assert result.executed is True
mock_client.post.assert_not_called()
@pytest.mark.asyncio
async def test_dry_run_does_not_call_api(self):
d = TriageDecision(
issue_number=5, action="assign_claude", agent=AGENT_CLAUDE, reason="Ready"
)
mock_client = AsyncMock()
result = await execute_decision(mock_client, d, dry_run=True)
assert result.executed is True
mock_client.post.assert_not_called()
mock_client.patch.assert_not_called()
@pytest.mark.asyncio
async def test_dry_run_kimi_does_not_call_api(self):
d = TriageDecision(
issue_number=6, action="assign_kimi", agent=AGENT_KIMI, reason="Research"
)
mock_client = AsyncMock()
result = await execute_decision(mock_client, d, dry_run=True)
assert result.executed is True
mock_client.post.assert_not_called()
# ---------------------------------------------------------------------------
# execute_decision (live — mocked HTTP)
# ---------------------------------------------------------------------------
class TestExecuteDecisionLive:
@pytest.mark.asyncio
async def test_assign_claude_posts_comment_then_patches(self):
comment_resp = MagicMock()
comment_resp.status_code = 201
patch_resp = MagicMock()
patch_resp.status_code = 200
mock_client = AsyncMock()
mock_client.post.return_value = comment_resp
mock_client.patch.return_value = patch_resp
d = TriageDecision(
issue_number=10, action="assign_claude", agent=AGENT_CLAUDE, reason="Bug ready"
)
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
result = await execute_decision(mock_client, d, dry_run=False)
assert result.executed is True
assert result.error == ""
mock_client.post.assert_called_once()
mock_client.patch.assert_called_once()
@pytest.mark.asyncio
async def test_comment_failure_sets_error(self):
comment_resp = MagicMock()
comment_resp.status_code = 500
mock_client = AsyncMock()
mock_client.post.return_value = comment_resp
d = TriageDecision(
issue_number=11, action="assign_claude", agent=AGENT_CLAUDE, reason="Bug"
)
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
result = await execute_decision(mock_client, d, dry_run=False)
assert result.executed is False
assert result.error != ""
@pytest.mark.asyncio
async def test_flag_alex_only_posts_comment(self):
comment_resp = MagicMock()
comment_resp.status_code = 201
mock_client = AsyncMock()
mock_client.post.return_value = comment_resp
d = TriageDecision(
issue_number=12, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
)
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
result = await execute_decision(mock_client, d, dry_run=False)
assert result.executed is True
mock_client.patch.assert_not_called()
# ---------------------------------------------------------------------------
# BacklogTriageLoop
# ---------------------------------------------------------------------------
class TestBacklogTriageLoop:
def test_default_state(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
loop = BacklogTriageLoop()
assert loop.is_running is False
assert loop.cycle_count == 0
assert loop.history == []
def test_custom_interval_overrides_settings(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
loop = BacklogTriageLoop(interval=60)
assert loop._interval == 60.0
def test_stop_sets_running_false(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
loop = BacklogTriageLoop()
loop._running = True
loop.stop()
assert loop.is_running is False
@pytest.mark.asyncio
async def test_run_once_skips_when_gitea_disabled(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
result = await loop.run_once()
assert result.total_open == 0
assert result.scored == 0
@pytest.mark.asyncio
async def test_run_once_increments_cycle_count(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
await loop.run_once()
await loop.run_once()
assert loop.cycle_count == 2
@pytest.mark.asyncio
async def test_run_once_full_cycle_with_mocked_gitea(self):
raw_issues = [
_make_raw_issue(
number=100,
title="[bug] crash in src/timmy/agent.py",
body=(
"## Problem\nCrashes. Expected: runs. "
"Must pass pytest. Should return 200."
),
labels=["bug"],
assignees=[],
)
]
issues_resp = MagicMock()
issues_resp.status_code = 200
issues_resp.json.side_effect = [raw_issues, []] # page 1, then empty
mock_client = AsyncMock()
mock_client.get.return_value = issues_resp
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
with patch("timmy.backlog_triage.httpx.AsyncClient") as mock_cls:
mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
result = await loop.run_once()
assert result.total_open == 1
assert result.scored == 1
assert loop.cycle_count == 1
assert len(loop.history) == 1
# ---------------------------------------------------------------------------
# ScoredIssue properties
# ---------------------------------------------------------------------------
class TestScoredIssueProperties:
def test_is_unassigned_true_when_no_assignees(self):
issue = _make_scored(assignees=[])
assert issue.is_unassigned is True
def test_is_unassigned_false_when_assigned(self):
issue = _make_scored(assignees=["claude"])
assert issue.is_unassigned is False
def test_needs_kimi_from_research_tag(self):
issue = _make_scored(tags={"research"})
assert issue.needs_kimi is True
def test_needs_kimi_from_kimi_ready_label(self):
issue = _make_scored()
issue.labels = [KIMI_READY_LABEL]
assert issue.needs_kimi is True
def test_needs_kimi_false_for_plain_bug(self):
issue = _make_scored(tags={"bug"}, issue_type="bug")
assert issue.needs_kimi is False
# ---------------------------------------------------------------------------
# TriageCycleResult
# ---------------------------------------------------------------------------
class TestTriageCycleResult:
def test_default_decisions_list_is_empty(self):
result = TriageCycleResult(
timestamp="2026-01-01T00:00:00", total_open=10, scored=8, ready=3
)
assert result.decisions == []
assert result.errors == []
assert result.duration_ms == 0

View File

@@ -0,0 +1,839 @@
"""Unit tests for timmy.quest_system."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
import timmy.quest_system as qs
from timmy.quest_system import (
QuestDefinition,
QuestProgress,
QuestStatus,
QuestType,
_get_progress_key,
_get_target_value,
_is_on_cooldown,
check_daily_run_quest,
check_issue_count_quest,
check_issue_reduce_quest,
claim_quest_reward,
evaluate_quest_progress,
get_active_quests,
get_agent_quests_status,
get_or_create_progress,
get_quest_definition,
get_quest_definitions,
get_quest_leaderboard,
get_quest_progress,
load_quest_config,
reset_quest_progress,
update_quest_progress,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_quest(
quest_id: str = "test_quest",
quest_type: QuestType = QuestType.ISSUE_COUNT,
reward_tokens: int = 10,
enabled: bool = True,
repeatable: bool = False,
cooldown_hours: int = 0,
criteria: dict[str, Any] | None = None,
) -> QuestDefinition:
return QuestDefinition(
id=quest_id,
name=f"Quest {quest_id}",
description="Test quest",
reward_tokens=reward_tokens,
quest_type=quest_type,
enabled=enabled,
repeatable=repeatable,
cooldown_hours=cooldown_hours,
criteria=criteria or {"target_count": 3},
notification_message="Quest Complete! You earned {tokens} tokens.",
)
@pytest.fixture(autouse=True)
def clean_state():
"""Reset module-level state before and after each test."""
reset_quest_progress()
qs._quest_definitions.clear()
qs._quest_settings.clear()
yield
reset_quest_progress()
qs._quest_definitions.clear()
qs._quest_settings.clear()
# ---------------------------------------------------------------------------
# QuestDefinition
# ---------------------------------------------------------------------------
class TestQuestDefinition:
def test_from_dict_minimal(self):
data = {"id": "q1"}
defn = QuestDefinition.from_dict(data)
assert defn.id == "q1"
assert defn.name == "Unnamed Quest"
assert defn.reward_tokens == 0
assert defn.quest_type == QuestType.CUSTOM
assert defn.enabled is True
assert defn.repeatable is False
assert defn.cooldown_hours == 0
def test_from_dict_full(self):
data = {
"id": "q2",
"name": "Full Quest",
"description": "A full quest",
"reward_tokens": 50,
"type": "issue_count",
"enabled": False,
"repeatable": True,
"cooldown_hours": 24,
"criteria": {"target_count": 5},
"notification_message": "You earned {tokens}!",
}
defn = QuestDefinition.from_dict(data)
assert defn.id == "q2"
assert defn.name == "Full Quest"
assert defn.reward_tokens == 50
assert defn.quest_type == QuestType.ISSUE_COUNT
assert defn.enabled is False
assert defn.repeatable is True
assert defn.cooldown_hours == 24
assert defn.criteria == {"target_count": 5}
assert defn.notification_message == "You earned {tokens}!"
def test_from_dict_invalid_type_raises(self):
data = {"id": "q3", "type": "not_a_real_type"}
with pytest.raises(ValueError):
QuestDefinition.from_dict(data)
# ---------------------------------------------------------------------------
# QuestProgress
# ---------------------------------------------------------------------------
class TestQuestProgress:
def test_to_dict_roundtrip(self):
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.IN_PROGRESS,
current_value=2,
target_value=5,
started_at="2026-01-01T00:00:00",
metadata={"key": "val"},
)
d = progress.to_dict()
assert d["quest_id"] == "q1"
assert d["agent_id"] == "agent_a"
assert d["status"] == "in_progress"
assert d["current_value"] == 2
assert d["target_value"] == 5
assert d["metadata"] == {"key": "val"}
def test_to_dict_defaults(self):
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
)
d = progress.to_dict()
assert d["completion_count"] == 0
assert d["started_at"] == ""
assert d["completed_at"] == ""
# ---------------------------------------------------------------------------
# _get_progress_key
# ---------------------------------------------------------------------------
def test_get_progress_key():
assert _get_progress_key("q1", "agent_a") == "agent_a:q1"
def test_get_progress_key_different_agents():
key_a = _get_progress_key("q1", "agent_a")
key_b = _get_progress_key("q1", "agent_b")
assert key_a != key_b
# ---------------------------------------------------------------------------
# load_quest_config
# ---------------------------------------------------------------------------
class TestLoadQuestConfig:
def test_missing_file_returns_empty(self, tmp_path):
missing = tmp_path / "nonexistent.yaml"
with patch.object(qs, "QUEST_CONFIG_PATH", missing):
defs, settings = load_quest_config()
assert defs == {}
assert settings == {}
def test_valid_yaml_loads_quests(self, tmp_path):
config_path = tmp_path / "quests.yaml"
config_path.write_text(
"""
quests:
first_quest:
name: First Quest
description: Do stuff
reward_tokens: 25
type: issue_count
enabled: true
repeatable: false
cooldown_hours: 0
criteria:
target_count: 3
notification_message: "Done! {tokens} tokens"
settings:
some_setting: true
"""
)
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
defs, settings = load_quest_config()
assert "first_quest" in defs
assert defs["first_quest"].name == "First Quest"
assert defs["first_quest"].reward_tokens == 25
assert settings == {"some_setting": True}
def test_invalid_yaml_returns_empty(self, tmp_path):
config_path = tmp_path / "quests.yaml"
config_path.write_text(":: not valid yaml ::")
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
defs, settings = load_quest_config()
assert defs == {}
assert settings == {}
def test_non_dict_yaml_returns_empty(self, tmp_path):
config_path = tmp_path / "quests.yaml"
config_path.write_text("- item1\n- item2\n")
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
defs, settings = load_quest_config()
assert defs == {}
assert settings == {}
def test_bad_quest_entry_is_skipped(self, tmp_path):
config_path = tmp_path / "quests.yaml"
config_path.write_text(
"""
quests:
good_quest:
name: Good
type: issue_count
reward_tokens: 10
enabled: true
repeatable: false
cooldown_hours: 0
criteria: {}
notification_message: "{tokens}"
bad_quest:
type: invalid_type_that_does_not_exist
"""
)
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
defs, _ = load_quest_config()
assert "good_quest" in defs
assert "bad_quest" not in defs
# ---------------------------------------------------------------------------
# get_quest_definitions / get_quest_definition / get_active_quests
# ---------------------------------------------------------------------------
class TestQuestLookup:
def setup_method(self):
q1 = _make_quest("q1", enabled=True)
q2 = _make_quest("q2", enabled=False)
qs._quest_definitions.update({"q1": q1, "q2": q2})
def test_get_quest_definitions_returns_all(self):
defs = get_quest_definitions()
assert "q1" in defs
assert "q2" in defs
def test_get_quest_definition_found(self):
defn = get_quest_definition("q1")
assert defn is not None
assert defn.id == "q1"
def test_get_quest_definition_not_found(self):
assert get_quest_definition("missing") is None
def test_get_active_quests_only_enabled(self):
active = get_active_quests()
ids = [q.id for q in active]
assert "q1" in ids
assert "q2" not in ids
# ---------------------------------------------------------------------------
# _get_target_value
# ---------------------------------------------------------------------------
class TestGetTargetValue:
def test_issue_count(self):
q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 7})
assert _get_target_value(q) == 7
def test_issue_reduce(self):
q = _make_quest(quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 5})
assert _get_target_value(q) == 5
def test_daily_run(self):
q = _make_quest(quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 3})
assert _get_target_value(q) == 3
def test_docs_update(self):
q = _make_quest(quest_type=QuestType.DOCS_UPDATE, criteria={"min_files_changed": 2})
assert _get_target_value(q) == 2
def test_test_improve(self):
q = _make_quest(quest_type=QuestType.TEST_IMPROVE, criteria={"min_new_tests": 4})
assert _get_target_value(q) == 4
def test_custom_defaults_to_one(self):
q = _make_quest(quest_type=QuestType.CUSTOM, criteria={})
assert _get_target_value(q) == 1
def test_missing_criteria_key_defaults_to_one(self):
q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={})
assert _get_target_value(q) == 1
# ---------------------------------------------------------------------------
# get_or_create_progress / get_quest_progress
# ---------------------------------------------------------------------------
class TestProgressCreation:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 5})
def test_creates_new_progress(self):
progress = get_or_create_progress("q1", "agent_a")
assert progress.quest_id == "q1"
assert progress.agent_id == "agent_a"
assert progress.status == QuestStatus.NOT_STARTED
assert progress.target_value == 5
assert progress.current_value == 0
def test_returns_existing_progress(self):
p1 = get_or_create_progress("q1", "agent_a")
p1.current_value = 3
p2 = get_or_create_progress("q1", "agent_a")
assert p2.current_value == 3
assert p1 is p2
def test_raises_for_unknown_quest(self):
with pytest.raises(ValueError, match="Quest unknown not found"):
get_or_create_progress("unknown", "agent_a")
def test_get_quest_progress_none_before_creation(self):
assert get_quest_progress("q1", "agent_a") is None
def test_get_quest_progress_after_creation(self):
get_or_create_progress("q1", "agent_a")
progress = get_quest_progress("q1", "agent_a")
assert progress is not None
# ---------------------------------------------------------------------------
# update_quest_progress
# ---------------------------------------------------------------------------
class TestUpdateQuestProgress:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 3})
def test_updates_current_value(self):
progress = update_quest_progress("q1", "agent_a", 2)
assert progress.current_value == 2
assert progress.status == QuestStatus.NOT_STARTED
def test_marks_completed_when_target_reached(self):
progress = update_quest_progress("q1", "agent_a", 3)
assert progress.status == QuestStatus.COMPLETED
assert progress.completed_at != ""
def test_marks_completed_when_value_exceeds_target(self):
progress = update_quest_progress("q1", "agent_a", 10)
assert progress.status == QuestStatus.COMPLETED
def test_does_not_re_complete_already_completed(self):
p = update_quest_progress("q1", "agent_a", 3)
first_completed_at = p.completed_at
p2 = update_quest_progress("q1", "agent_a", 5)
# should not change completed_at again
assert p2.completed_at == first_completed_at
def test_does_not_re_complete_claimed_quest(self):
p = update_quest_progress("q1", "agent_a", 3)
p.status = QuestStatus.CLAIMED
p2 = update_quest_progress("q1", "agent_a", 5)
assert p2.status == QuestStatus.CLAIMED
def test_updates_metadata(self):
progress = update_quest_progress("q1", "agent_a", 1, metadata={"info": "value"})
assert progress.metadata["info"] == "value"
def test_merges_metadata(self):
update_quest_progress("q1", "agent_a", 1, metadata={"a": 1})
progress = update_quest_progress("q1", "agent_a", 2, metadata={"b": 2})
assert progress.metadata["a"] == 1
assert progress.metadata["b"] == 2
# ---------------------------------------------------------------------------
# _is_on_cooldown
# ---------------------------------------------------------------------------
class TestIsOnCooldown:
def test_non_repeatable_never_on_cooldown(self):
quest = _make_quest(repeatable=False, cooldown_hours=24)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.CLAIMED,
last_completed_at=datetime.now(UTC).isoformat(),
)
assert _is_on_cooldown(progress, quest) is False
def test_no_last_completed_not_on_cooldown(self):
quest = _make_quest(repeatable=True, cooldown_hours=24)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
last_completed_at="",
)
assert _is_on_cooldown(progress, quest) is False
def test_zero_cooldown_not_on_cooldown(self):
quest = _make_quest(repeatable=True, cooldown_hours=0)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.CLAIMED,
last_completed_at=datetime.now(UTC).isoformat(),
)
assert _is_on_cooldown(progress, quest) is False
def test_recent_completion_is_on_cooldown(self):
quest = _make_quest(repeatable=True, cooldown_hours=24)
recent = datetime.now(UTC) - timedelta(hours=1)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
last_completed_at=recent.isoformat(),
)
assert _is_on_cooldown(progress, quest) is True
def test_expired_cooldown_not_on_cooldown(self):
quest = _make_quest(repeatable=True, cooldown_hours=24)
old = datetime.now(UTC) - timedelta(hours=25)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
last_completed_at=old.isoformat(),
)
assert _is_on_cooldown(progress, quest) is False
def test_invalid_last_completed_returns_false(self):
quest = _make_quest(repeatable=True, cooldown_hours=24)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
last_completed_at="not-a-date",
)
assert _is_on_cooldown(progress, quest) is False
# ---------------------------------------------------------------------------
# claim_quest_reward
# ---------------------------------------------------------------------------
class TestClaimQuestReward:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=25)
def test_returns_none_if_no_progress(self):
assert claim_quest_reward("q1", "agent_a") is None
def test_returns_none_if_not_completed(self):
get_or_create_progress("q1", "agent_a")
assert claim_quest_reward("q1", "agent_a") is None
def test_returns_none_if_quest_not_found(self):
assert claim_quest_reward("nonexistent", "agent_a") is None
def test_successful_claim(self):
progress = get_or_create_progress("q1", "agent_a")
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
mock_invoice = MagicMock()
mock_invoice.payment_hash = "quest_q1_agent_a_123"
with (
patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice),
patch("timmy.quest_system.mark_settled"),
):
result = claim_quest_reward("q1", "agent_a")
assert result is not None
assert result["tokens_awarded"] == 25
assert result["quest_id"] == "q1"
assert result["agent_id"] == "agent_a"
assert result["completion_count"] == 1
def test_successful_claim_marks_claimed(self):
progress = get_or_create_progress("q1", "agent_a")
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
mock_invoice = MagicMock()
mock_invoice.payment_hash = "phash"
with (
patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice),
patch("timmy.quest_system.mark_settled"),
):
claim_quest_reward("q1", "agent_a")
assert progress.status == QuestStatus.CLAIMED
def test_repeatable_quest_resets_after_claim(self):
qs._quest_definitions["rep"] = _make_quest(
"rep", repeatable=True, cooldown_hours=0, reward_tokens=10
)
progress = get_or_create_progress("rep", "agent_a")
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
progress.current_value = 5
mock_invoice = MagicMock()
mock_invoice.payment_hash = "phash"
with (
patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice),
patch("timmy.quest_system.mark_settled"),
):
result = claim_quest_reward("rep", "agent_a")
assert result is not None
assert progress.status == QuestStatus.NOT_STARTED
assert progress.current_value == 0
assert progress.completed_at == ""
def test_on_cooldown_returns_none(self):
qs._quest_definitions["rep"] = _make_quest("rep", repeatable=True, cooldown_hours=24)
progress = get_or_create_progress("rep", "agent_a")
progress.status = QuestStatus.COMPLETED
recent = datetime.now(UTC) - timedelta(hours=1)
progress.last_completed_at = recent.isoformat()
assert claim_quest_reward("rep", "agent_a") is None
def test_ledger_error_returns_none(self):
progress = get_or_create_progress("q1", "agent_a")
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
with patch("timmy.quest_system.create_invoice_entry", side_effect=Exception("ledger error")):
result = claim_quest_reward("q1", "agent_a")
assert result is None
# ---------------------------------------------------------------------------
# check_issue_count_quest
# ---------------------------------------------------------------------------
class TestCheckIssueCountQuest:
def setup_method(self):
qs._quest_definitions["iq"] = _make_quest(
"iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 2, "issue_labels": ["bug"]}
)
def test_counts_matching_issues(self):
issues = [
{"labels": [{"name": "bug"}]},
{"labels": [{"name": "bug"}, {"name": "priority"}]},
{"labels": [{"name": "feature"}]}, # doesn't match
]
progress = check_issue_count_quest(
qs._quest_definitions["iq"], "agent_a", issues
)
assert progress.current_value == 2
assert progress.status == QuestStatus.COMPLETED
def test_empty_issues_returns_zero(self):
progress = check_issue_count_quest(qs._quest_definitions["iq"], "agent_a", [])
assert progress.current_value == 0
def test_no_labels_filter_counts_all_labeled(self):
q = _make_quest(
"nolabel",
quest_type=QuestType.ISSUE_COUNT,
criteria={"target_count": 1, "issue_labels": []},
)
qs._quest_definitions["nolabel"] = q
issues = [
{"labels": [{"name": "bug"}]},
{"labels": [{"name": "feature"}]},
]
progress = check_issue_count_quest(q, "agent_a", issues)
assert progress.current_value == 2
# ---------------------------------------------------------------------------
# check_issue_reduce_quest
# ---------------------------------------------------------------------------
class TestCheckIssueReduceQuest:
def setup_method(self):
qs._quest_definitions["ir"] = _make_quest(
"ir", quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 5}
)
def test_computes_reduction(self):
progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 20, 15)
assert progress.current_value == 5
assert progress.status == QuestStatus.COMPLETED
def test_negative_reduction_treated_as_zero(self):
progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 10, 15)
assert progress.current_value == 0
def test_no_change_yields_zero(self):
progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 10, 10)
assert progress.current_value == 0
# ---------------------------------------------------------------------------
# check_daily_run_quest
# ---------------------------------------------------------------------------
class TestCheckDailyRunQuest:
def setup_method(self):
qs._quest_definitions["dr"] = _make_quest(
"dr", quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 2}
)
def test_tracks_sessions(self):
progress = check_daily_run_quest(qs._quest_definitions["dr"], "agent_a", 2)
assert progress.current_value == 2
assert progress.status == QuestStatus.COMPLETED
def test_incomplete_sessions(self):
progress = check_daily_run_quest(qs._quest_definitions["dr"], "agent_a", 1)
assert progress.current_value == 1
assert progress.status != QuestStatus.COMPLETED
# ---------------------------------------------------------------------------
# evaluate_quest_progress
# ---------------------------------------------------------------------------
class TestEvaluateQuestProgress:
def setup_method(self):
qs._quest_definitions["iq"] = _make_quest(
"iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 1}
)
qs._quest_definitions["dis"] = _make_quest("dis", enabled=False)
def test_disabled_quest_returns_none(self):
result = evaluate_quest_progress("dis", "agent_a", {})
assert result is None
def test_missing_quest_returns_none(self):
result = evaluate_quest_progress("nonexistent", "agent_a", {})
assert result is None
def test_issue_count_quest_evaluated(self):
context = {"closed_issues": [{"labels": [{"name": "bug"}]}]}
result = evaluate_quest_progress("iq", "agent_a", context)
assert result is not None
assert result.current_value == 1
def test_issue_reduce_quest_evaluated(self):
qs._quest_definitions["ir"] = _make_quest(
"ir", quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 3}
)
context = {"previous_issue_count": 10, "current_issue_count": 7}
result = evaluate_quest_progress("ir", "agent_a", context)
assert result is not None
assert result.current_value == 3
def test_daily_run_quest_evaluated(self):
qs._quest_definitions["dr"] = _make_quest(
"dr", quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 1}
)
context = {"sessions_completed": 2}
result = evaluate_quest_progress("dr", "agent_a", context)
assert result is not None
assert result.current_value == 2
def test_custom_quest_returns_existing_progress(self):
qs._quest_definitions["cust"] = _make_quest("cust", quest_type=QuestType.CUSTOM)
# No progress yet => None (custom quests don't auto-create progress here)
result = evaluate_quest_progress("cust", "agent_a", {})
assert result is None
def test_cooldown_prevents_evaluation(self):
q = _make_quest("rep_iq", quest_type=QuestType.ISSUE_COUNT, repeatable=True, cooldown_hours=24, criteria={"target_count": 1})
qs._quest_definitions["rep_iq"] = q
progress = get_or_create_progress("rep_iq", "agent_a")
recent = datetime.now(UTC) - timedelta(hours=1)
progress.last_completed_at = recent.isoformat()
context = {"closed_issues": [{"labels": [{"name": "bug"}]}]}
result = evaluate_quest_progress("rep_iq", "agent_a", context)
# Should return existing progress without updating
assert result is progress
# ---------------------------------------------------------------------------
# reset_quest_progress
# ---------------------------------------------------------------------------
class TestResetQuestProgress:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1")
qs._quest_definitions["q2"] = _make_quest("q2")
def test_reset_all(self):
get_or_create_progress("q1", "agent_a")
get_or_create_progress("q2", "agent_a")
count = reset_quest_progress()
assert count == 2
assert get_quest_progress("q1", "agent_a") is None
assert get_quest_progress("q2", "agent_a") is None
def test_reset_specific_quest(self):
get_or_create_progress("q1", "agent_a")
get_or_create_progress("q2", "agent_a")
count = reset_quest_progress(quest_id="q1")
assert count == 1
assert get_quest_progress("q1", "agent_a") is None
assert get_quest_progress("q2", "agent_a") is not None
def test_reset_specific_agent(self):
get_or_create_progress("q1", "agent_a")
get_or_create_progress("q1", "agent_b")
count = reset_quest_progress(agent_id="agent_a")
assert count == 1
assert get_quest_progress("q1", "agent_a") is None
assert get_quest_progress("q1", "agent_b") is not None
def test_reset_specific_quest_and_agent(self):
get_or_create_progress("q1", "agent_a")
get_or_create_progress("q1", "agent_b")
count = reset_quest_progress(quest_id="q1", agent_id="agent_a")
assert count == 1
def test_reset_empty_returns_zero(self):
count = reset_quest_progress()
assert count == 0
# ---------------------------------------------------------------------------
# get_quest_leaderboard
# ---------------------------------------------------------------------------
class TestGetQuestLeaderboard:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)
qs._quest_definitions["q2"] = _make_quest("q2", reward_tokens=20)
def test_empty_progress_returns_empty(self):
assert get_quest_leaderboard() == []
def test_leaderboard_sorted_by_tokens(self):
p_a = get_or_create_progress("q1", "agent_a")
p_a.completion_count = 1
p_b = get_or_create_progress("q2", "agent_b")
p_b.completion_count = 2
board = get_quest_leaderboard()
assert board[0]["agent_id"] == "agent_b" # 40 tokens
assert board[1]["agent_id"] == "agent_a" # 10 tokens
def test_leaderboard_aggregates_multiple_quests(self):
p1 = get_or_create_progress("q1", "agent_a")
p1.completion_count = 2 # 20 tokens
p2 = get_or_create_progress("q2", "agent_a")
p2.completion_count = 1 # 20 tokens
board = get_quest_leaderboard()
assert len(board) == 1
assert board[0]["total_tokens"] == 40
assert board[0]["total_completions"] == 3
def test_leaderboard_counts_unique_quests(self):
p1 = get_or_create_progress("q1", "agent_a")
p1.completion_count = 2
p2 = get_or_create_progress("q2", "agent_a")
p2.completion_count = 1
board = get_quest_leaderboard()
assert board[0]["unique_quests_completed"] == 2
# ---------------------------------------------------------------------------
# get_agent_quests_status
# ---------------------------------------------------------------------------
class TestGetAgentQuestsStatus:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)
def test_returns_status_structure(self):
result = get_agent_quests_status("agent_a")
assert result["agent_id"] == "agent_a"
assert isinstance(result["quests"], list)
assert "total_tokens_earned" in result
assert "total_quests_completed" in result
assert "active_quests_count" in result
def test_includes_quest_info(self):
result = get_agent_quests_status("agent_a")
quest_info = result["quests"][0]
assert quest_info["quest_id"] == "q1"
assert quest_info["reward_tokens"] == 10
assert quest_info["status"] == QuestStatus.NOT_STARTED.value
def test_accumulates_tokens_from_completions(self):
p = get_or_create_progress("q1", "agent_a")
p.completion_count = 3
result = get_agent_quests_status("agent_a")
assert result["total_tokens_earned"] == 30
assert result["total_quests_completed"] == 3
def test_cooldown_hours_remaining_calculated(self):
q = _make_quest("qcool", repeatable=True, cooldown_hours=24, reward_tokens=5)
qs._quest_definitions["qcool"] = q
p = get_or_create_progress("qcool", "agent_a")
recent = datetime.now(UTC) - timedelta(hours=2)
p.last_completed_at = recent.isoformat()
p.completion_count = 1
result = get_agent_quests_status("agent_a")
qcool_info = next(qi for qi in result["quests"] if qi["quest_id"] == "qcool")
assert qcool_info["on_cooldown"] is True
assert qcool_info["cooldown_hours_remaining"] > 0

View File

@@ -0,0 +1,124 @@
"""Unit tests for timmy/research_tools.py."""
from __future__ import annotations
import os
import sys
from unittest.mock import MagicMock, patch
import pytest
# serpapi is an optional dependency not installed in the test environment.
# Stub it before importing the module under test.
if "serpapi" not in sys.modules:
sys.modules["serpapi"] = MagicMock()
from timmy.research_tools import get_llm_client, google_web_search # noqa: E402
# ---------------------------------------------------------------------------
# google_web_search
# ---------------------------------------------------------------------------
class TestGoogleWebSearch:
@pytest.mark.asyncio
async def test_missing_api_key_returns_empty_string(self):
"""Returns '' and logs a warning when SERPAPI_API_KEY is absent."""
env = {k: v for k, v in os.environ.items() if k != "SERPAPI_API_KEY"}
with patch.dict(os.environ, env, clear=True):
result = await google_web_search("python tutorial")
assert result == ""
@pytest.mark.asyncio
async def test_calls_google_search_with_correct_params(self):
"""GoogleSearch is constructed with query and api_key from environ."""
mock_search_instance = MagicMock()
mock_search_instance.get_dict.return_value = {"organic_results": [{"title": "Hello"}]}
mock_search_cls = MagicMock(return_value=mock_search_instance)
with patch.dict(os.environ, {"SERPAPI_API_KEY": "test-key-123"}):
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
result = await google_web_search("python tutorial")
mock_search_cls.assert_called_once_with(
{"q": "python tutorial", "api_key": "test-key-123"}
)
assert "Hello" in result
@pytest.mark.asyncio
async def test_returns_stringified_results(self):
"""Return value is str() of whatever get_dict() returns."""
fake_dict = {"organic_results": [{"title": "Foo", "link": "https://example.com"}]}
mock_search_instance = MagicMock()
mock_search_instance.get_dict.return_value = fake_dict
mock_search_cls = MagicMock(return_value=mock_search_instance)
with patch.dict(os.environ, {"SERPAPI_API_KEY": "key"}):
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
result = await google_web_search("foo")
assert result == str(fake_dict)
@pytest.mark.asyncio
async def test_empty_query_still_calls_search(self):
"""An empty query is forwarded to GoogleSearch without short-circuiting."""
mock_search_instance = MagicMock()
mock_search_instance.get_dict.return_value = {}
mock_search_cls = MagicMock(return_value=mock_search_instance)
with patch.dict(os.environ, {"SERPAPI_API_KEY": "key"}):
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
result = await google_web_search("")
mock_search_cls.assert_called_once()
assert result == str({})
# ---------------------------------------------------------------------------
# get_llm_client
# ---------------------------------------------------------------------------
class TestGetLlmClient:
def test_returns_a_client_object(self):
"""get_llm_client() returns a non-None object."""
client = get_llm_client()
assert client is not None
def test_client_has_completion_method(self):
"""The returned client exposes a callable completion attribute."""
client = get_llm_client()
assert callable(getattr(client, "completion", None))
@pytest.mark.asyncio
async def test_completion_returns_object_with_text(self):
"""completion() returns an object whose .text is a non-empty string."""
client = get_llm_client()
result = await client.completion("What is Python?", max_tokens=100)
assert hasattr(result, "text")
assert isinstance(result.text, str)
assert len(result.text) > 0
@pytest.mark.asyncio
async def test_completion_text_contains_prompt(self):
"""The stub weaves the prompt into the returned text."""
client = get_llm_client()
prompt = "Tell me about asyncio"
result = await client.completion(prompt, max_tokens=50)
assert prompt in result.text
@pytest.mark.asyncio
async def test_multiple_calls_return_independent_objects(self):
"""Each call to completion() returns a fresh object."""
client = get_llm_client()
r1 = await client.completion("prompt one", max_tokens=10)
r2 = await client.completion("prompt two", max_tokens=10)
assert r1 is not r2
assert r1.text != r2.text
def test_multiple_calls_return_independent_clients(self):
"""Each call to get_llm_client() returns a distinct instance."""
c1 = get_llm_client()
c2 = get_llm_client()
assert c1 is not c2

View File

@@ -334,7 +334,7 @@ async def test_think_once_disabled(tmp_path):
"""think_once should return None when thinking is disabled."""
engine = _make_engine(tmp_path)
with patch("timmy.thinking.settings") as mock_settings:
with patch("timmy.thinking.engine.settings") as mock_settings:
mock_settings.thinking_enabled = False
thought = await engine.think_once()
@@ -381,7 +381,7 @@ async def test_think_once_prompt_includes_memory_context(tmp_path):
return "A grounded thought."
with (
patch("timmy.thinking.HOT_MEMORY_PATH", memory_md),
patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", memory_md),
patch.object(engine, "_call_agent", side_effect=capture_agent),
patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"),
@@ -412,7 +412,7 @@ async def test_think_once_prompt_includes_soul(tmp_path):
return "A soulful thought."
with (
patch("timmy.thinking.SOUL_PATH", soul_md),
patch("timmy.thinking._snapshot.SOUL_PATH", soul_md),
patch.object(engine, "_call_agent", side_effect=capture_agent),
patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"),
@@ -433,7 +433,7 @@ async def test_think_once_graceful_without_soul(tmp_path):
nonexistent = tmp_path / "no_such_soul.md"
with (
patch("timmy.thinking.SOUL_PATH", nonexistent),
patch("timmy.thinking._snapshot.SOUL_PATH", nonexistent),
patch.object(engine, "_call_agent", return_value="Still thinking."),
patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"),
@@ -481,7 +481,7 @@ async def test_think_once_never_writes_soul(tmp_path):
soul_md.write_text(original_content)
with (
patch("timmy.thinking.SOUL_PATH", soul_md),
patch("timmy.thinking._snapshot.SOUL_PATH", soul_md),
patch.object(engine, "_call_agent", return_value="A deep reflection."),
patch.object(engine, "_log_event"),
patch.object(engine, "_broadcast", new_callable=AsyncMock),
@@ -501,7 +501,7 @@ async def test_think_once_memory_update_graceful_on_failure(tmp_path):
# Don't create the parent dir — write will fail
with (
patch("timmy.thinking.HOT_MEMORY_PATH", bad_memory),
patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", bad_memory),
patch.object(engine, "_call_agent", return_value="Resilient thought."),
patch.object(engine, "_log_event"),
patch.object(engine, "_broadcast", new_callable=AsyncMock),
@@ -1090,7 +1090,7 @@ def test_maybe_check_memory_fires_at_interval(tmp_path):
engine._store_thought(f"Thought {i}.", "freeform")
with (
patch("timmy.thinking.settings") as mock_settings,
patch("timmy.thinking._distillation.settings") as mock_settings,
patch(
"timmy.tools_intro.get_memory_status",
return_value={
@@ -1113,7 +1113,7 @@ def test_maybe_check_memory_skips_between_intervals(tmp_path):
engine._store_thought(f"Thought {i}.", "freeform")
with (
patch("timmy.thinking.settings") as mock_settings,
patch("timmy.thinking._distillation.settings") as mock_settings,
patch(
"timmy.tools_intro.get_memory_status",
) as mock_status,
@@ -1131,7 +1131,7 @@ def test_maybe_check_memory_graceful_on_error(tmp_path):
engine._store_thought(f"Thought {i}.", "freeform")
with (
patch("timmy.thinking.settings") as mock_settings,
patch("timmy.thinking._distillation.settings") as mock_settings,
patch(
"timmy.tools_intro.get_memory_status",
side_effect=Exception("boom"),