Compare commits
1 Commits
claude/iss
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f8934b63f6 |
@@ -1,190 +0,0 @@
|
||||
# DeerFlow Evaluation — Autonomous Research Orchestration Layer
|
||||
|
||||
**Status:** No-go for full adoption · Selective borrowing recommended
|
||||
**Date:** 2026-03-23
|
||||
**Issue:** #1283 (spawned from #1275 screenshot triage)
|
||||
**Refs:** #972 (Timmy research pipeline) · #975 (ResearchOrchestrator)
|
||||
|
||||
---
|
||||
|
||||
## What Is DeerFlow?
|
||||
|
||||
DeerFlow (`bytedance/deer-flow`) is an open-source "super-agent harness" built by ByteDance on top of LangGraph. It provides a production-grade multi-agent research and code-execution framework with a web UI, REST API, Docker deployment, and optional IM channel integration (Telegram, Slack, Feishu/Lark).
|
||||
|
||||
- **Stars:** ~39,600 · **License:** MIT
|
||||
- **Stack:** Python 3.12+ (backend) · TypeScript/Next.js (frontend) · LangGraph runtime
|
||||
- **Entry point:** `http://localhost:2026` (Nginx reverse proxy, configurable via `PORT`)
|
||||
|
||||
---
|
||||
|
||||
## Research Questions — Answers
|
||||
|
||||
### 1. Agent Roles
|
||||
|
||||
DeerFlow uses a two-tier architecture:
|
||||
|
||||
| Role | Description |
|
||||
|------|-------------|
|
||||
| **Lead Agent** | Entry point; decomposes tasks, dispatches sub-agents, synthesizes results |
|
||||
| **Sub-Agent (general-purpose)** | All tools except `task`; spawned dynamically |
|
||||
| **Sub-Agent (bash)** | Command-execution specialist |
|
||||
|
||||
The lead agent runs through a 12-middleware chain in order: thread setup → uploads → sandbox → tool-call repair → guardrails → summarization → todo tracking → title generation → memory update → image injection → sub-agent concurrency cap → clarification intercept.
|
||||
|
||||
**Concurrency:** up to 3 sub-agents in parallel (configurable), 15-minute default timeout each, structured SSE event stream (`task_started` / `task_running` / `task_completed` / `task_failed`).
|
||||
|
||||
**Mapping to Timmy personas:** DeerFlow's lead/sub-agent split roughly maps to Timmy's orchestrator + specialist-agent pattern. DeerFlow doesn't have named personas — it routes by capability (tools available to the agent type), not by identity. Timmy's persona system is richer and more opinionated.
|
||||
|
||||
---
|
||||
|
||||
### 2. API Surface
|
||||
|
||||
DeerFlow exposes a full REST API at port 2026 (via Nginx). **No authentication by default.**
|
||||
|
||||
**Core integration endpoints:**
|
||||
|
||||
| Endpoint | Method | Purpose |
|
||||
|----------|--------|---------|
|
||||
| `POST /api/langgraph/threads` | | Create conversation thread |
|
||||
| `POST /api/langgraph/threads/{id}/runs` | | Submit task (blocking) |
|
||||
| `POST /api/langgraph/threads/{id}/runs/stream` | | Submit task (streaming SSE/WS) |
|
||||
| `GET /api/langgraph/threads/{id}/state` | | Get full thread state + artifacts |
|
||||
| `GET /api/models` | | List configured models |
|
||||
| `GET /api/threads/{id}/artifacts/{path}` | | Download generated artifacts |
|
||||
| `DELETE /api/threads/{id}` | | Clean up thread data |
|
||||
|
||||
These are callable from Timmy with `httpx` — no special client library needed.
|
||||
|
||||
---
|
||||
|
||||
### 3. LLM Backend Support
|
||||
|
||||
DeerFlow uses LangChain model classes declared in `config.yaml`.
|
||||
|
||||
**Documented providers:** OpenAI, Anthropic, Google Gemini, DeepSeek, Doubao (ByteDance), Kimi/Moonshot, OpenRouter, MiniMax, Novita AI, Claude Code (OAuth).
|
||||
|
||||
**Ollama:** Not in official documentation, but works via the `langchain_openai:ChatOpenAI` class with `base_url: http://localhost:11434/v1` and a dummy API key. Community-confirmed (GitHub issues #37, #1004) with Qwen2.5, Llama 3.1, and DeepSeek-R1.
|
||||
|
||||
**vLLM:** Not documented, but architecturally identical — vLLM exposes an OpenAI-compatible endpoint. Should work with the same `base_url` override.
|
||||
|
||||
**Practical caveat:** The lead agent requires strong instruction-following for consistent tool use and structured output. Community findings suggest ≥14B parameter models (Qwen2.5-14B minimum) for reliable orchestration. Our current `qwen3:14b` should be viable.
|
||||
|
||||
---
|
||||
|
||||
### 4. License
|
||||
|
||||
**MIT License** — Copyright 2025 ByteDance Ltd. and DeerFlow Authors 2025–2026.
|
||||
|
||||
Permissive: use, modify, distribute, commercialize freely. Attribution required. No warranty.
|
||||
|
||||
**Compatible with Timmy's use case.** No CLA, no copyleft, no commercial restrictions.
|
||||
|
||||
---
|
||||
|
||||
### 5. Docker Port Conflicts
|
||||
|
||||
DeerFlow's Docker Compose exposes a single host port:
|
||||
|
||||
| Service | Host Port | Notes |
|
||||
|---------|-----------|-------|
|
||||
| Nginx (entry point) | **2026** (configurable via `PORT`) | Only externally exposed port |
|
||||
| Frontend (Next.js) | 3000 | Internal only |
|
||||
| Gateway API | 8001 | Internal only |
|
||||
| LangGraph runtime | 2024 | Internal only |
|
||||
| Provisioner (optional) | 8002 | Internal only, Kubernetes mode only |
|
||||
|
||||
Timmy's existing Docker Compose exposes:
|
||||
- **8000** — dashboard (FastAPI)
|
||||
- **8080** — openfang (via `openfang` profile)
|
||||
- **11434** — Ollama (host process, not containerized)
|
||||
|
||||
**No conflict.** Port 2026 is not used by Timmy. DeerFlow can run alongside the existing stack without modification.
|
||||
|
||||
---
|
||||
|
||||
## Full Capability Comparison
|
||||
|
||||
| Capability | DeerFlow | Timmy (`research.py`) |
|
||||
|------------|----------|-----------------------|
|
||||
| Multi-agent fan-out | ✅ 3 concurrent sub-agents | ❌ Sequential only |
|
||||
| Web search | ✅ Tavily / InfoQuest | ✅ `research_tools.py` |
|
||||
| Web fetch | ✅ Jina AI / Firecrawl | ✅ trafilatura |
|
||||
| Code execution (sandbox) | ✅ Local / Docker / K8s | ❌ Not implemented |
|
||||
| Artifact generation | ✅ HTML, Markdown, slides | ❌ Markdown report only |
|
||||
| Document upload + conversion | ✅ PDF, PPT, Excel, Word | ❌ Not implemented |
|
||||
| Long-term memory | ✅ LLM-extracted facts, persistent | ✅ SQLite semantic cache |
|
||||
| Streaming results | ✅ SSE + WebSocket | ❌ Blocking call |
|
||||
| Web UI | ✅ Next.js included | ✅ Jinja2/HTMX dashboard |
|
||||
| IM integration | ✅ Telegram, Slack, Feishu | ✅ Telegram, Discord |
|
||||
| Ollama backend | ✅ (via config, community-confirmed) | ✅ Native |
|
||||
| Persona system | ❌ Role-based only | ✅ Named personas |
|
||||
| Semantic cache tier | ❌ Not implemented | ✅ SQLite (Tier 4) |
|
||||
| Free-tier cascade | ❌ Not applicable | 🔲 Planned (Groq, #980) |
|
||||
| Python version requirement | 3.12+ | 3.11+ |
|
||||
| Lock-in | LangGraph + LangChain | None |
|
||||
|
||||
---
|
||||
|
||||
## Integration Options Assessment
|
||||
|
||||
### Option A — Full Adoption (replace `research.py`)
|
||||
**Verdict: Not recommended.**
|
||||
|
||||
DeerFlow is a substantial full-stack system (Python + Node.js, Docker, Nginx, LangGraph). Adopting it fully would:
|
||||
- Replace Timmy's custom cascade tier system (SQLite cache → Ollama → Claude API → Groq) with a single-tier LangChain model config
|
||||
- Lose Timmy's persona-aware research routing
|
||||
- Add Python 3.12+ dependency (Timmy currently targets 3.11+)
|
||||
- Introduce LangGraph/LangChain lock-in for all research tasks
|
||||
- Require running a parallel Node.js frontend process (redundant given Timmy's own UI)
|
||||
|
||||
### Option B — Sidecar for Heavy Research (call DeerFlow's API from Timmy)
|
||||
**Verdict: Viable but over-engineered for current needs.**
|
||||
|
||||
DeerFlow could run as an optional sidecar (`docker compose --profile deerflow up`) and Timmy could delegate multi-agent research tasks via `POST /api/langgraph/threads/{id}/runs`. This would unlock parallel sub-agent fan-out and code-execution sandboxing without replacing Timmy's stack.
|
||||
|
||||
The integration would be ~50 lines of `httpx` code in a new `DeerFlowClient` adapter. The `ResearchOrchestrator` in `research.py` could route tasks above a complexity threshold to DeerFlow.
|
||||
|
||||
**Barrier:** DeerFlow's lack of default authentication means the sidecar would need to be network-isolated (internal Docker network only) or firewalled. Also, DeerFlow's Ollama integration is community-maintained, not officially supported — risk of breaking on upstream updates.
|
||||
|
||||
### Option C — Selective Borrowing (copy patterns, not code)
|
||||
**Verdict: Recommended.**
|
||||
|
||||
DeerFlow's architecture reveals concrete gaps in Timmy's current pipeline that are worth addressing independently:
|
||||
|
||||
| DeerFlow Pattern | Timmy Gap to Close | Implementation Path |
|
||||
|------------------|--------------------|---------------------|
|
||||
| Parallel sub-agent fan-out | Research is sequential | Add `asyncio.gather()` to `ResearchOrchestrator` for concurrent query execution |
|
||||
| `SummarizationMiddleware` | Long contexts blow token budget | Add a context-trimming step in the synthesis cascade |
|
||||
| `TodoListMiddleware` | No progress tracking during long research | Wire into the dashboard task panel |
|
||||
| Artifact storage + serving | Reports are ephemeral (not persistently downloadable) | Add file-based artifact store to `research.py` (issue #976 already planned) |
|
||||
| Skill modules (Markdown-based) | Research templates are `.md` files — same pattern | Already done in `skills/research/` |
|
||||
| MCP integration | Research tools are hard-coded | Add MCP server discovery to `research_tools.py` for pluggable tool backends |
|
||||
|
||||
---
|
||||
|
||||
## Recommendation
|
||||
|
||||
**No-go for full adoption or sidecar deployment at this stage.**
|
||||
|
||||
Timmy's `ResearchOrchestrator` already covers the core pipeline (query → search → fetch → synthesize → store). DeerFlow's value proposition is primarily the parallel sub-agent fan-out and code-execution sandbox — capabilities that are useful but not blocking Timmy's current roadmap.
|
||||
|
||||
**Recommended actions:**
|
||||
|
||||
1. **Close the parallelism gap (high value, low effort):** Refactor `ResearchOrchestrator` to execute queries concurrently with `asyncio.gather()`. This delivers DeerFlow's most impactful capability without any new dependencies.
|
||||
|
||||
2. **Re-evaluate after #980 and #981 are done:** Once Timmy has the Groq free-tier cascade and a sovereignty metrics dashboard, we'll have a clearer picture of whether the custom orchestrator is performing well enough to make DeerFlow unnecessary entirely.
|
||||
|
||||
3. **File a follow-up for MCP tool integration:** DeerFlow's use of `langchain-mcp-adapters` for pluggable tool backends is the most architecturally interesting pattern. Adding MCP server discovery to `research_tools.py` would give Timmy the same extensibility without LangGraph lock-in.
|
||||
|
||||
4. **Revisit DeerFlow's code-execution sandbox if #978 (Paperclip task runner) proves insufficient:** DeerFlow's sandboxed `bash` tool is production-tested and well-isolated. If Timmy's task runner needs secure code execution, DeerFlow's sandbox implementation is worth borrowing or wrapping.
|
||||
|
||||
---
|
||||
|
||||
## Follow-up Issues to File
|
||||
|
||||
| Issue | Title | Priority |
|
||||
|-------|-------|----------|
|
||||
| New | Parallelize ResearchOrchestrator query execution (`asyncio.gather`) | Medium |
|
||||
| New | Add context-trimming step to synthesis cascade | Low |
|
||||
| New | MCP server discovery in `research_tools.py` | Low |
|
||||
| #976 | Semantic index for research outputs (already planned) | High |
|
||||
@@ -1 +0,0 @@
|
||||
"""Timmy Time Dashboard — source root package."""
|
||||
|
||||
@@ -1,8 +1,3 @@
|
||||
"""Central pydantic-settings configuration for Timmy Time Dashboard.
|
||||
|
||||
All environment variable access goes through the ``settings`` singleton
|
||||
exported from this module — never use ``os.environ.get()`` in app code.
|
||||
"""
|
||||
import logging as _logging
|
||||
import os
|
||||
import sys
|
||||
@@ -133,23 +128,6 @@ class Settings(BaseSettings):
|
||||
anthropic_api_key: str = ""
|
||||
claude_model: str = "haiku"
|
||||
|
||||
# ── Tiered Model Router (issue #882) ─────────────────────────────────
|
||||
# Three-tier cascade: Local 8B (free, fast) → Local 70B (free, slower)
|
||||
# → Cloud API (paid, best). Override model names per tier via env vars.
|
||||
#
|
||||
# TIER_LOCAL_FAST_MODEL — Tier-1 model name in Ollama (default: llama3.1:8b)
|
||||
# TIER_LOCAL_HEAVY_MODEL — Tier-2 model name in Ollama (default: hermes3:70b)
|
||||
# TIER_CLOUD_MODEL — Tier-3 cloud model name (default: claude-haiku-4-5)
|
||||
#
|
||||
# Budget limits for the cloud tier (0 = unlimited):
|
||||
# TIER_CLOUD_DAILY_BUDGET_USD — daily ceiling in USD (default: 5.0)
|
||||
# TIER_CLOUD_MONTHLY_BUDGET_USD — monthly ceiling in USD (default: 50.0)
|
||||
tier_local_fast_model: str = "llama3.1:8b"
|
||||
tier_local_heavy_model: str = "hermes3:70b"
|
||||
tier_cloud_model: str = "claude-haiku-4-5"
|
||||
tier_cloud_daily_budget_usd: float = 5.0
|
||||
tier_cloud_monthly_budget_usd: float = 50.0
|
||||
|
||||
# ── Content Moderation ──────────────────────────────────────────────
|
||||
# Three-layer moderation pipeline for AI narrator output.
|
||||
# Uses Llama Guard via Ollama with regex fallback.
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""SQLAlchemy ORM models for the CALM task-management and journaling system."""
|
||||
from datetime import UTC, date, datetime
|
||||
from enum import StrEnum
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""SQLAlchemy engine, session factory, and declarative Base for the CALM module."""
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Dashboard routes for agent chat interactions and tool-call display."""
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Dashboard routes for the CALM task management and daily journaling interface."""
|
||||
import logging
|
||||
from datetime import UTC, date, datetime
|
||||
|
||||
|
||||
@@ -1,11 +1,5 @@
|
||||
"""Infrastructure models package."""
|
||||
|
||||
from infrastructure.models.budget import (
|
||||
BudgetTracker,
|
||||
SpendRecord,
|
||||
estimate_cost_usd,
|
||||
get_budget_tracker,
|
||||
)
|
||||
from infrastructure.models.multimodal import (
|
||||
ModelCapability,
|
||||
ModelInfo,
|
||||
@@ -23,12 +17,6 @@ from infrastructure.models.registry import (
|
||||
ModelRole,
|
||||
model_registry,
|
||||
)
|
||||
from infrastructure.models.router import (
|
||||
TierLabel,
|
||||
TieredModelRouter,
|
||||
classify_tier,
|
||||
get_tiered_router,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# Registry
|
||||
@@ -46,14 +34,4 @@ __all__ = [
|
||||
"model_supports_tools",
|
||||
"model_supports_vision",
|
||||
"pull_model_with_fallback",
|
||||
# Tiered router
|
||||
"TierLabel",
|
||||
"TieredModelRouter",
|
||||
"classify_tier",
|
||||
"get_tiered_router",
|
||||
# Budget tracker
|
||||
"BudgetTracker",
|
||||
"SpendRecord",
|
||||
"estimate_cost_usd",
|
||||
"get_budget_tracker",
|
||||
]
|
||||
|
||||
@@ -1,302 +0,0 @@
|
||||
"""Cloud API budget tracker for the three-tier model router.
|
||||
|
||||
Tracks cloud API spend (daily / monthly) and enforces configurable limits.
|
||||
SQLite-backed with in-memory fallback — degrades gracefully if the database
|
||||
is unavailable.
|
||||
|
||||
References:
|
||||
- Issue #882 — Model Tiering Router: Local 8B / Hermes 70B / Cloud API Cascade
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, date, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Cost estimates (USD per 1 K tokens, input / output) ──────────────────────
|
||||
# Updated 2026-03. Estimates only — actual costs vary by tier/usage.
|
||||
_COST_PER_1K: dict[str, dict[str, float]] = {
|
||||
# Claude models
|
||||
"claude-haiku-4-5": {"input": 0.00025, "output": 0.00125},
|
||||
"claude-sonnet-4-5": {"input": 0.003, "output": 0.015},
|
||||
"claude-opus-4-5": {"input": 0.015, "output": 0.075},
|
||||
"haiku": {"input": 0.00025, "output": 0.00125},
|
||||
"sonnet": {"input": 0.003, "output": 0.015},
|
||||
"opus": {"input": 0.015, "output": 0.075},
|
||||
# GPT-4o
|
||||
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
|
||||
"gpt-4o": {"input": 0.0025, "output": 0.01},
|
||||
# Grok (xAI)
|
||||
"grok-3-fast": {"input": 0.003, "output": 0.015},
|
||||
"grok-3": {"input": 0.005, "output": 0.025},
|
||||
}
|
||||
_DEFAULT_COST: dict[str, float] = {"input": 0.003, "output": 0.015} # conservative fallback
|
||||
|
||||
|
||||
def estimate_cost_usd(model: str, tokens_in: int, tokens_out: int) -> float:
|
||||
"""Estimate the cost of a single request in USD.
|
||||
|
||||
Matches the model name by substring so versioned names like
|
||||
``claude-haiku-4-5-20251001`` still resolve correctly.
|
||||
|
||||
Args:
|
||||
model: Model name as passed to the provider.
|
||||
tokens_in: Number of input (prompt) tokens consumed.
|
||||
tokens_out: Number of output (completion) tokens generated.
|
||||
|
||||
Returns:
|
||||
Estimated cost in USD (may be zero for unknown models).
|
||||
"""
|
||||
model_lower = model.lower()
|
||||
rates = _DEFAULT_COST
|
||||
for key, rate in _COST_PER_1K.items():
|
||||
if key in model_lower:
|
||||
rates = rate
|
||||
break
|
||||
return (tokens_in * rates["input"] + tokens_out * rates["output"]) / 1000.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class SpendRecord:
|
||||
"""A single spend event."""
|
||||
|
||||
ts: float
|
||||
provider: str
|
||||
model: str
|
||||
tokens_in: int
|
||||
tokens_out: int
|
||||
cost_usd: float
|
||||
tier: str
|
||||
|
||||
|
||||
class BudgetTracker:
|
||||
"""Tracks cloud API spend with configurable daily / monthly limits.
|
||||
|
||||
Persists spend records to SQLite (``data/budget.db`` by default).
|
||||
Falls back to in-memory tracking when the database is unavailable —
|
||||
budget enforcement still works; records are lost on restart.
|
||||
|
||||
Limits are read from ``settings``:
|
||||
|
||||
* ``tier_cloud_daily_budget_usd`` — daily ceiling (0 = disabled)
|
||||
* ``tier_cloud_monthly_budget_usd`` — monthly ceiling (0 = disabled)
|
||||
|
||||
Usage::
|
||||
|
||||
tracker = BudgetTracker()
|
||||
|
||||
if tracker.cloud_allowed():
|
||||
# … make cloud API call …
|
||||
tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
|
||||
|
||||
summary = tracker.get_summary()
|
||||
print(summary["daily_usd"], "/", summary["daily_limit_usd"])
|
||||
"""
|
||||
|
||||
_DB_PATH = "data/budget.db"
|
||||
|
||||
def __init__(self, db_path: str | None = None) -> None:
|
||||
"""Initialise the tracker.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database. Defaults to
|
||||
``data/budget.db``. Pass ``":memory:"`` for tests.
|
||||
"""
|
||||
self._db_path = db_path or self._DB_PATH
|
||||
self._lock = threading.Lock()
|
||||
self._in_memory: list[SpendRecord] = []
|
||||
self._db_ok = False
|
||||
self._init_db()
|
||||
|
||||
# ── Database initialisation ──────────────────────────────────────────────
|
||||
|
||||
def _init_db(self) -> None:
|
||||
"""Create the spend table (and parent directory) if needed."""
|
||||
try:
|
||||
if self._db_path != ":memory:":
|
||||
Path(self._db_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS cloud_spend (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts REAL NOT NULL,
|
||||
provider TEXT NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
tokens_in INTEGER NOT NULL DEFAULT 0,
|
||||
tokens_out INTEGER NOT NULL DEFAULT 0,
|
||||
cost_usd REAL NOT NULL DEFAULT 0.0,
|
||||
tier TEXT NOT NULL DEFAULT 'cloud'
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_spend_ts ON cloud_spend(ts)"
|
||||
)
|
||||
self._db_ok = True
|
||||
logger.debug("BudgetTracker: SQLite initialised at %s", self._db_path)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"BudgetTracker: SQLite unavailable, using in-memory fallback: %s", exc
|
||||
)
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
return sqlite3.connect(self._db_path, timeout=5)
|
||||
|
||||
# ── Public API ───────────────────────────────────────────────────────────
|
||||
|
||||
def record_spend(
|
||||
self,
|
||||
provider: str,
|
||||
model: str,
|
||||
tokens_in: int = 0,
|
||||
tokens_out: int = 0,
|
||||
cost_usd: float | None = None,
|
||||
tier: str = "cloud",
|
||||
) -> float:
|
||||
"""Record a cloud API spend event and return the cost recorded.
|
||||
|
||||
Args:
|
||||
provider: Provider name (e.g. ``"anthropic"``, ``"openai"``).
|
||||
model: Model name used for the request.
|
||||
tokens_in: Input token count (prompt).
|
||||
tokens_out: Output token count (completion).
|
||||
cost_usd: Explicit cost override. If ``None``, the cost is
|
||||
estimated from the token counts and model rates.
|
||||
tier: Tier label for the request (default ``"cloud"``).
|
||||
|
||||
Returns:
|
||||
The cost recorded in USD.
|
||||
"""
|
||||
if cost_usd is None:
|
||||
cost_usd = estimate_cost_usd(model, tokens_in, tokens_out)
|
||||
|
||||
ts = time.time()
|
||||
record = SpendRecord(ts, provider, model, tokens_in, tokens_out, cost_usd, tier)
|
||||
|
||||
with self._lock:
|
||||
if self._db_ok:
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO cloud_spend
|
||||
(ts, provider, model, tokens_in, tokens_out, cost_usd, tier)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(ts, provider, model, tokens_in, tokens_out, cost_usd, tier),
|
||||
)
|
||||
logger.debug(
|
||||
"BudgetTracker: recorded %.6f USD (%s/%s, in=%d out=%d tier=%s)",
|
||||
cost_usd,
|
||||
provider,
|
||||
model,
|
||||
tokens_in,
|
||||
tokens_out,
|
||||
tier,
|
||||
)
|
||||
return cost_usd
|
||||
except Exception as exc:
|
||||
logger.warning("BudgetTracker: DB write failed, falling back: %s", exc)
|
||||
self._in_memory.append(record)
|
||||
|
||||
return cost_usd
|
||||
|
||||
def get_daily_spend(self) -> float:
|
||||
"""Return total cloud spend for the current UTC day in USD."""
|
||||
today = date.today()
|
||||
since = datetime(today.year, today.month, today.day, tzinfo=UTC).timestamp()
|
||||
return self._query_spend(since)
|
||||
|
||||
def get_monthly_spend(self) -> float:
|
||||
"""Return total cloud spend for the current UTC month in USD."""
|
||||
today = date.today()
|
||||
since = datetime(today.year, today.month, 1, tzinfo=UTC).timestamp()
|
||||
return self._query_spend(since)
|
||||
|
||||
def cloud_allowed(self) -> bool:
|
||||
"""Return ``True`` if cloud API spend is within configured limits.
|
||||
|
||||
Checks both daily and monthly ceilings. A limit of ``0`` disables
|
||||
that particular check.
|
||||
"""
|
||||
daily_limit = settings.tier_cloud_daily_budget_usd
|
||||
monthly_limit = settings.tier_cloud_monthly_budget_usd
|
||||
|
||||
if daily_limit > 0:
|
||||
daily_spend = self.get_daily_spend()
|
||||
if daily_spend >= daily_limit:
|
||||
logger.warning(
|
||||
"BudgetTracker: daily cloud budget exhausted (%.4f / %.4f USD)",
|
||||
daily_spend,
|
||||
daily_limit,
|
||||
)
|
||||
return False
|
||||
|
||||
if monthly_limit > 0:
|
||||
monthly_spend = self.get_monthly_spend()
|
||||
if monthly_spend >= monthly_limit:
|
||||
logger.warning(
|
||||
"BudgetTracker: monthly cloud budget exhausted (%.4f / %.4f USD)",
|
||||
monthly_spend,
|
||||
monthly_limit,
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_summary(self) -> dict:
|
||||
"""Return a spend summary dict suitable for dashboards / logging.
|
||||
|
||||
Keys: ``daily_usd``, ``monthly_usd``, ``daily_limit_usd``,
|
||||
``monthly_limit_usd``, ``daily_ok``, ``monthly_ok``.
|
||||
"""
|
||||
daily = self.get_daily_spend()
|
||||
monthly = self.get_monthly_spend()
|
||||
daily_limit = settings.tier_cloud_daily_budget_usd
|
||||
monthly_limit = settings.tier_cloud_monthly_budget_usd
|
||||
return {
|
||||
"daily_usd": round(daily, 6),
|
||||
"monthly_usd": round(monthly, 6),
|
||||
"daily_limit_usd": daily_limit,
|
||||
"monthly_limit_usd": monthly_limit,
|
||||
"daily_ok": daily_limit <= 0 or daily < daily_limit,
|
||||
"monthly_ok": monthly_limit <= 0 or monthly < monthly_limit,
|
||||
}
|
||||
|
||||
# ── Internal helpers ─────────────────────────────────────────────────────
|
||||
|
||||
def _query_spend(self, since_ts: float) -> float:
|
||||
"""Sum ``cost_usd`` for records with ``ts >= since_ts``."""
|
||||
if self._db_ok:
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
row = conn.execute(
|
||||
"SELECT COALESCE(SUM(cost_usd), 0.0) FROM cloud_spend WHERE ts >= ?",
|
||||
(since_ts,),
|
||||
).fetchone()
|
||||
return float(row[0]) if row else 0.0
|
||||
except Exception as exc:
|
||||
logger.warning("BudgetTracker: DB read failed: %s", exc)
|
||||
# In-memory fallback
|
||||
return sum(r.cost_usd for r in self._in_memory if r.ts >= since_ts)
|
||||
|
||||
|
||||
# ── Module-level singleton ────────────────────────────────────────────────────
|
||||
|
||||
_budget_tracker: BudgetTracker | None = None
|
||||
|
||||
|
||||
def get_budget_tracker() -> BudgetTracker:
|
||||
"""Get or create the module-level BudgetTracker singleton."""
|
||||
global _budget_tracker
|
||||
if _budget_tracker is None:
|
||||
_budget_tracker = BudgetTracker()
|
||||
return _budget_tracker
|
||||
@@ -1,427 +0,0 @@
|
||||
"""Three-tier model router — Local 8B / Local 70B / Cloud API Cascade.
|
||||
|
||||
Selects the cheapest-sufficient LLM for each request using a heuristic
|
||||
task-complexity classifier. Tier 3 (Cloud API) is only used when Tier 2
|
||||
fails or the budget guard allows it.
|
||||
|
||||
Tiers
|
||||
-----
|
||||
Tier 1 — LOCAL_FAST (Llama 3.1 8B / Hermes 3 8B via Ollama, free, ~0.3-1 s)
|
||||
Navigation, basic interactions, simple decisions.
|
||||
|
||||
Tier 2 — LOCAL_HEAVY (Hermes 3/4 70B via Ollama, free, ~5-10 s for 200 tok)
|
||||
Quest planning, dialogue strategy, complex reasoning.
|
||||
|
||||
Tier 3 — CLOUD_API (Claude / GPT-4o, paid ~$5-15/hr heavy use)
|
||||
Recovery from Tier 2 failures, novel situations, multi-step planning.
|
||||
|
||||
Routing logic
|
||||
-------------
|
||||
1. Classify the task using keyword / length / context heuristics (no LLM call).
|
||||
2. Route to the appropriate tier.
|
||||
3. On Tier-1 low-quality response → auto-escalate to Tier 2.
|
||||
4. On Tier-2 failure or explicit ``require_cloud=True`` → Tier 3 (if budget allows).
|
||||
5. Log tier used, model, latency, estimated cost for every request.
|
||||
|
||||
References:
|
||||
- Issue #882 — Model Tiering Router: Local 8B / Hermes 70B / Cloud API Cascade
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Tier definitions ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TierLabel(StrEnum):
|
||||
"""Three cost-sorted model tiers."""
|
||||
|
||||
LOCAL_FAST = "local_fast" # 8B local, always hot, free
|
||||
LOCAL_HEAVY = "local_heavy" # 70B local, free but slower
|
||||
CLOUD_API = "cloud_api" # Paid cloud backend (Claude / GPT-4o)
|
||||
|
||||
|
||||
# ── Default model assignments (overridable via Settings) ──────────────────────
|
||||
|
||||
_DEFAULT_TIER_MODELS: dict[TierLabel, str] = {
|
||||
TierLabel.LOCAL_FAST: "llama3.1:8b",
|
||||
TierLabel.LOCAL_HEAVY: "hermes3:70b",
|
||||
TierLabel.CLOUD_API: "claude-haiku-4-5",
|
||||
}
|
||||
|
||||
# ── Classification vocabulary ─────────────────────────────────────────────────
|
||||
|
||||
# Patterns that indicate a Tier-1 (simple) task
|
||||
_T1_WORDS: frozenset[str] = frozenset(
|
||||
{
|
||||
"go", "move", "walk", "run",
|
||||
"north", "south", "east", "west", "up", "down", "left", "right",
|
||||
"yes", "no", "ok", "okay",
|
||||
"open", "close", "take", "drop", "look",
|
||||
"pick", "use", "wait", "rest", "save",
|
||||
"attack", "flee", "jump", "crouch",
|
||||
"status", "ping", "list", "show", "get", "check",
|
||||
}
|
||||
)
|
||||
|
||||
# Patterns that indicate a Tier-2 or Tier-3 task
|
||||
_T2_PHRASES: tuple[str, ...] = (
|
||||
"plan", "strategy", "optimize", "optimise",
|
||||
"quest", "stuck", "recover",
|
||||
"negotiate", "persuade", "faction", "reputation",
|
||||
"analyze", "analyse", "evaluate", "decide",
|
||||
"complex", "multi-step", "long-term",
|
||||
"how do i", "what should i do", "help me figure",
|
||||
"what is the best", "recommend", "best way",
|
||||
"explain", "describe in detail", "walk me through",
|
||||
"compare", "design", "implement", "refactor",
|
||||
"debug", "diagnose", "root cause",
|
||||
)
|
||||
|
||||
# Low-quality response detection patterns
|
||||
_LOW_QUALITY_PATTERNS: tuple[re.Pattern, ...] = (
|
||||
re.compile(r"i\s+don'?t\s+know", re.IGNORECASE),
|
||||
re.compile(r"i'm\s+not\s+sure", re.IGNORECASE),
|
||||
re.compile(r"i\s+cannot\s+(help|assist|answer)", re.IGNORECASE),
|
||||
re.compile(r"i\s+apologize", re.IGNORECASE),
|
||||
re.compile(r"as an ai", re.IGNORECASE),
|
||||
re.compile(r"i\s+don'?t\s+have\s+(enough|sufficient)\s+information", re.IGNORECASE),
|
||||
)
|
||||
|
||||
# Response is definitely low-quality if shorter than this many characters
|
||||
_LOW_QUALITY_MIN_CHARS = 20
|
||||
# Response is suspicious if shorter than this many chars for a complex task
|
||||
_ESCALATION_MIN_CHARS = 60
|
||||
|
||||
|
||||
def classify_tier(task: str, context: dict | None = None) -> TierLabel:
|
||||
"""Classify a task to the cheapest-sufficient model tier.
|
||||
|
||||
Classification priority (highest wins):
|
||||
1. ``context["require_cloud"] = True`` → CLOUD_API
|
||||
2. Any Tier-2 phrase or stuck/recovery signal → LOCAL_HEAVY
|
||||
3. Short task with only Tier-1 words, no active context → LOCAL_FAST
|
||||
4. Default → LOCAL_HEAVY (safe fallback for unknown tasks)
|
||||
|
||||
Args:
|
||||
task: Natural-language task or user input.
|
||||
context: Optional context dict. Recognised keys:
|
||||
``require_cloud`` (bool), ``stuck`` (bool),
|
||||
``require_t2`` (bool), ``active_quests`` (list),
|
||||
``dialogue_active`` (bool), ``combat_active`` (bool).
|
||||
|
||||
Returns:
|
||||
The cheapest ``TierLabel`` sufficient for the task.
|
||||
"""
|
||||
ctx = context or {}
|
||||
task_lower = task.lower()
|
||||
words = set(task_lower.split())
|
||||
|
||||
# ── Explicit cloud override ──────────────────────────────────────────────
|
||||
if ctx.get("require_cloud"):
|
||||
logger.debug("classify_tier → CLOUD_API (explicit require_cloud)")
|
||||
return TierLabel.CLOUD_API
|
||||
|
||||
# ── Tier-2 / complexity signals ──────────────────────────────────────────
|
||||
t2_phrase_hit = any(phrase in task_lower for phrase in _T2_PHRASES)
|
||||
t2_word_hit = bool(words & {"plan", "strategy", "optimize", "optimise", "quest",
|
||||
"stuck", "recover", "analyze", "analyse", "evaluate"})
|
||||
is_stuck = bool(ctx.get("stuck"))
|
||||
require_t2 = bool(ctx.get("require_t2"))
|
||||
long_input = len(task) > 300 # long tasks warrant more capable model
|
||||
deep_context = (
|
||||
len(ctx.get("active_quests", [])) >= 3
|
||||
or ctx.get("dialogue_active")
|
||||
)
|
||||
|
||||
if t2_phrase_hit or t2_word_hit or is_stuck or require_t2 or long_input or deep_context:
|
||||
logger.debug(
|
||||
"classify_tier → LOCAL_HEAVY (phrase=%s word=%s stuck=%s explicit=%s long=%s ctx=%s)",
|
||||
t2_phrase_hit, t2_word_hit, is_stuck, require_t2, long_input, deep_context,
|
||||
)
|
||||
return TierLabel.LOCAL_HEAVY
|
||||
|
||||
# ── Tier-1 signals ───────────────────────────────────────────────────────
|
||||
t1_word_hit = bool(words & _T1_WORDS)
|
||||
task_short = len(task.split()) <= 8
|
||||
no_active_context = (
|
||||
not ctx.get("active_quests")
|
||||
and not ctx.get("dialogue_active")
|
||||
and not ctx.get("combat_active")
|
||||
)
|
||||
|
||||
if t1_word_hit and task_short and no_active_context:
|
||||
logger.debug(
|
||||
"classify_tier → LOCAL_FAST (words=%s short=%s)", t1_word_hit, task_short
|
||||
)
|
||||
return TierLabel.LOCAL_FAST
|
||||
|
||||
# ── Default: LOCAL_HEAVY (safe for anything unclassified) ────────────────
|
||||
logger.debug("classify_tier → LOCAL_HEAVY (default)")
|
||||
return TierLabel.LOCAL_HEAVY
|
||||
|
||||
|
||||
def _is_low_quality(content: str, tier: TierLabel) -> bool:
|
||||
"""Return True if the response looks like it should be escalated.
|
||||
|
||||
Used for automatic Tier-1 → Tier-2 escalation.
|
||||
|
||||
Args:
|
||||
content: LLM response text.
|
||||
tier: The tier that produced the response.
|
||||
|
||||
Returns:
|
||||
True if the response is likely too low-quality to be useful.
|
||||
"""
|
||||
if not content or not content.strip():
|
||||
return True
|
||||
|
||||
stripped = content.strip()
|
||||
|
||||
# Too short to be useful
|
||||
if len(stripped) < _LOW_QUALITY_MIN_CHARS:
|
||||
return True
|
||||
|
||||
# Insufficient for a supposedly complex-enough task
|
||||
if tier == TierLabel.LOCAL_FAST and len(stripped) < _ESCALATION_MIN_CHARS:
|
||||
return True
|
||||
|
||||
# Matches known "I can't help" patterns
|
||||
for pattern in _LOW_QUALITY_PATTERNS:
|
||||
if pattern.search(stripped):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class TieredModelRouter:
|
||||
"""Routes LLM requests across the Local 8B / Local 70B / Cloud API tiers.
|
||||
|
||||
Wraps CascadeRouter with:
|
||||
- Heuristic tier classification via ``classify_tier()``
|
||||
- Automatic Tier-1 → Tier-2 escalation on low-quality responses
|
||||
- Cloud-tier budget guard via ``BudgetTracker``
|
||||
- Per-request logging: tier, model, latency, estimated cost
|
||||
|
||||
Usage::
|
||||
|
||||
router = TieredModelRouter()
|
||||
|
||||
result = await router.route(
|
||||
task="Walk to the next room",
|
||||
context={},
|
||||
)
|
||||
print(result["content"], result["tier"]) # "Move north.", "local_fast"
|
||||
|
||||
# Force heavy tier
|
||||
result = await router.route(
|
||||
task="Plan the optimal path to become Hortator",
|
||||
context={"require_t2": True},
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cascade: Any | None = None,
|
||||
budget_tracker: Any | None = None,
|
||||
tier_models: dict[TierLabel, str] | None = None,
|
||||
auto_escalate: bool = True,
|
||||
) -> None:
|
||||
"""Initialise the tiered router.
|
||||
|
||||
Args:
|
||||
cascade: CascadeRouter instance. If ``None``, the
|
||||
singleton from ``get_router()`` is used lazily.
|
||||
budget_tracker: BudgetTracker instance. If ``None``, the
|
||||
singleton from ``get_budget_tracker()`` is used.
|
||||
tier_models: Override default model names per tier.
|
||||
auto_escalate: When ``True``, low-quality Tier-1 responses
|
||||
automatically retry on Tier-2.
|
||||
"""
|
||||
self._cascade = cascade
|
||||
self._budget = budget_tracker
|
||||
self._tier_models: dict[TierLabel, str] = dict(_DEFAULT_TIER_MODELS)
|
||||
self._auto_escalate = auto_escalate
|
||||
|
||||
# Apply settings-level overrides (can still be overridden per-instance)
|
||||
if settings.tier_local_fast_model:
|
||||
self._tier_models[TierLabel.LOCAL_FAST] = settings.tier_local_fast_model
|
||||
if settings.tier_local_heavy_model:
|
||||
self._tier_models[TierLabel.LOCAL_HEAVY] = settings.tier_local_heavy_model
|
||||
if settings.tier_cloud_model:
|
||||
self._tier_models[TierLabel.CLOUD_API] = settings.tier_cloud_model
|
||||
|
||||
if tier_models:
|
||||
self._tier_models.update(tier_models)
|
||||
|
||||
# ── Lazy singletons ──────────────────────────────────────────────────────
|
||||
|
||||
def _get_cascade(self) -> Any:
|
||||
if self._cascade is None:
|
||||
from infrastructure.router.cascade import get_router
|
||||
self._cascade = get_router()
|
||||
return self._cascade
|
||||
|
||||
def _get_budget(self) -> Any:
|
||||
if self._budget is None:
|
||||
from infrastructure.models.budget import get_budget_tracker
|
||||
self._budget = get_budget_tracker()
|
||||
return self._budget
|
||||
|
||||
# ── Public interface ─────────────────────────────────────────────────────
|
||||
|
||||
def classify(self, task: str, context: dict | None = None) -> TierLabel:
|
||||
"""Classify a task without routing. Useful for telemetry."""
|
||||
return classify_tier(task, context)
|
||||
|
||||
async def route(
|
||||
self,
|
||||
task: str,
|
||||
context: dict | None = None,
|
||||
messages: list[dict] | None = None,
|
||||
temperature: float = 0.3,
|
||||
max_tokens: int | None = None,
|
||||
) -> dict:
|
||||
"""Route a task to the appropriate model tier.
|
||||
|
||||
Builds a minimal messages list if ``messages`` is not provided.
|
||||
The result always includes a ``tier`` key indicating which tier
|
||||
ultimately handled the request.
|
||||
|
||||
Args:
|
||||
task: Natural-language task description.
|
||||
context: Task context dict (see ``classify_tier()``).
|
||||
messages: Pre-built OpenAI-compatible messages list. If
|
||||
provided, ``task`` is only used for classification.
|
||||
temperature: Sampling temperature (default 0.3).
|
||||
max_tokens: Maximum tokens to generate.
|
||||
|
||||
Returns:
|
||||
Dict with at minimum: ``content``, ``provider``, ``model``,
|
||||
``tier``, ``latency_ms``. May include ``cost_usd`` when a
|
||||
cloud request is recorded.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If all available tiers are exhausted.
|
||||
"""
|
||||
ctx = context or {}
|
||||
tier = self.classify(task, ctx)
|
||||
msgs = messages or [{"role": "user", "content": task}]
|
||||
|
||||
# ── Tier 1 attempt ───────────────────────────────────────────────────
|
||||
if tier == TierLabel.LOCAL_FAST:
|
||||
result = await self._complete_tier(
|
||||
TierLabel.LOCAL_FAST, msgs, temperature, max_tokens
|
||||
)
|
||||
if self._auto_escalate and _is_low_quality(result.get("content", ""), TierLabel.LOCAL_FAST):
|
||||
logger.info(
|
||||
"TieredModelRouter: Tier-1 response low quality, escalating to Tier-2 "
|
||||
"(task=%r content_len=%d)",
|
||||
task[:80],
|
||||
len(result.get("content", "")),
|
||||
)
|
||||
tier = TierLabel.LOCAL_HEAVY
|
||||
result = await self._complete_tier(
|
||||
TierLabel.LOCAL_HEAVY, msgs, temperature, max_tokens
|
||||
)
|
||||
return result
|
||||
|
||||
# ── Tier 2 attempt ───────────────────────────────────────────────────
|
||||
if tier == TierLabel.LOCAL_HEAVY:
|
||||
try:
|
||||
return await self._complete_tier(
|
||||
TierLabel.LOCAL_HEAVY, msgs, temperature, max_tokens
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"TieredModelRouter: Tier-2 failed (%s) — escalating to cloud", exc
|
||||
)
|
||||
tier = TierLabel.CLOUD_API
|
||||
|
||||
# ── Tier 3 (Cloud) ───────────────────────────────────────────────────
|
||||
budget = self._get_budget()
|
||||
if not budget.cloud_allowed():
|
||||
raise RuntimeError(
|
||||
"Cloud API tier requested but budget limit reached — "
|
||||
"increase tier_cloud_daily_budget_usd or tier_cloud_monthly_budget_usd"
|
||||
)
|
||||
|
||||
result = await self._complete_tier(
|
||||
TierLabel.CLOUD_API, msgs, temperature, max_tokens
|
||||
)
|
||||
|
||||
# Record cloud spend if token info is available
|
||||
usage = result.get("usage", {})
|
||||
if usage:
|
||||
cost = budget.record_spend(
|
||||
provider=result.get("provider", "unknown"),
|
||||
model=result.get("model", self._tier_models[TierLabel.CLOUD_API]),
|
||||
tokens_in=usage.get("prompt_tokens", 0),
|
||||
tokens_out=usage.get("completion_tokens", 0),
|
||||
tier=TierLabel.CLOUD_API,
|
||||
)
|
||||
result["cost_usd"] = cost
|
||||
|
||||
return result
|
||||
|
||||
# ── Internal helpers ─────────────────────────────────────────────────────
|
||||
|
||||
async def _complete_tier(
|
||||
self,
|
||||
tier: TierLabel,
|
||||
messages: list[dict],
|
||||
temperature: float,
|
||||
max_tokens: int | None,
|
||||
) -> dict:
|
||||
"""Dispatch a single inference request for the given tier."""
|
||||
model = self._tier_models[tier]
|
||||
cascade = self._get_cascade()
|
||||
start = time.monotonic()
|
||||
|
||||
logger.info(
|
||||
"TieredModelRouter: tier=%s model=%s messages=%d",
|
||||
tier,
|
||||
model,
|
||||
len(messages),
|
||||
)
|
||||
|
||||
result = await cascade.complete(
|
||||
messages=messages,
|
||||
model=model,
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens,
|
||||
)
|
||||
|
||||
elapsed_ms = (time.monotonic() - start) * 1000
|
||||
result["tier"] = tier
|
||||
result.setdefault("latency_ms", elapsed_ms)
|
||||
|
||||
logger.info(
|
||||
"TieredModelRouter: done tier=%s model=%s latency_ms=%.0f",
|
||||
tier,
|
||||
result.get("model", model),
|
||||
elapsed_ms,
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
# ── Module-level singleton ────────────────────────────────────────────────────
|
||||
|
||||
_tiered_router: TieredModelRouter | None = None
|
||||
|
||||
|
||||
def get_tiered_router() -> TieredModelRouter:
|
||||
"""Get or create the module-level TieredModelRouter singleton."""
|
||||
global _tiered_router
|
||||
if _tiered_router is None:
|
||||
_tiered_router = TieredModelRouter()
|
||||
return _tiered_router
|
||||
@@ -1 +0,0 @@
|
||||
"""Vendor-specific chat platform adapters (e.g. Discord) for the chat bridge."""
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Typer CLI entry point for the ``timmy`` command (chat, think, status)."""
|
||||
import asyncio
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""OpenCV template-matching cache for sovereignty perception (screen-state recognition)."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
1383
src/timmy/thinking.py
Normal file
1383
src/timmy/thinking.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,142 +0,0 @@
|
||||
"""Timmy's thinking engine — public façade.
|
||||
|
||||
When the server starts, Timmy begins pondering: reflecting on his existence,
|
||||
recent swarm activity, scripture, creative ideas, or pure stream of
|
||||
consciousness. Each thought builds on the previous one, maintaining a
|
||||
continuous chain of introspection.
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.thinking import thinking_engine
|
||||
|
||||
# Run one thinking cycle (called by the background loop)
|
||||
await thinking_engine.think_once()
|
||||
|
||||
# Query the thought stream
|
||||
thoughts = thinking_engine.get_recent_thoughts(limit=10)
|
||||
chain = thinking_engine.get_thought_chain(thought_id)
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from timmy.thinking._db import Thought, _get_conn
|
||||
from timmy.thinking.engine import ThinkingEngine
|
||||
from timmy.thinking.seeds import (
|
||||
SEED_TYPES,
|
||||
_SENSITIVE_PATTERNS,
|
||||
_META_OBSERVATION_PHRASES,
|
||||
_THINK_TAG_RE,
|
||||
_THINKING_PROMPT,
|
||||
)
|
||||
|
||||
# Re-export HOT_MEMORY_PATH and SOUL_PATH so existing patch targets continue to work.
|
||||
# Tests that patch "timmy.thinking.HOT_MEMORY_PATH" or "timmy.thinking.SOUL_PATH"
|
||||
# should instead patch "timmy.thinking._snapshot.HOT_MEMORY_PATH" etc., but these
|
||||
# re-exports are kept for any code that reads them from the top-level namespace.
|
||||
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH # noqa: F401
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Module-level singleton
|
||||
thinking_engine = ThinkingEngine()
|
||||
|
||||
__all__ = [
|
||||
"ThinkingEngine",
|
||||
"Thought",
|
||||
"SEED_TYPES",
|
||||
"thinking_engine",
|
||||
"search_thoughts",
|
||||
"_THINKING_PROMPT",
|
||||
"_SENSITIVE_PATTERNS",
|
||||
"_META_OBSERVATION_PHRASES",
|
||||
"_THINK_TAG_RE",
|
||||
"HOT_MEMORY_PATH",
|
||||
"SOUL_PATH",
|
||||
]
|
||||
|
||||
|
||||
# ── Search helpers ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _query_thoughts(
|
||||
db_path: Path, query: str, seed_type: str | None, limit: int
|
||||
) -> list[sqlite3.Row]:
|
||||
"""Run the thought-search SQL and return matching rows."""
|
||||
pattern = f"%{query}%"
|
||||
with _get_conn(db_path) as conn:
|
||||
if seed_type:
|
||||
return conn.execute(
|
||||
"""
|
||||
SELECT id, content, seed_type, created_at
|
||||
FROM thoughts
|
||||
WHERE content LIKE ? AND seed_type = ?
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(pattern, seed_type, limit),
|
||||
).fetchall()
|
||||
return conn.execute(
|
||||
"""
|
||||
SELECT id, content, seed_type, created_at
|
||||
FROM thoughts
|
||||
WHERE content LIKE ?
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(pattern, limit),
|
||||
).fetchall()
|
||||
|
||||
|
||||
def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str:
|
||||
"""Format thought rows into a human-readable string."""
|
||||
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
|
||||
if seed_type:
|
||||
lines[0] += f' [seed_type="{seed_type}"]'
|
||||
lines.append("")
|
||||
|
||||
for row in rows:
|
||||
ts = datetime.fromisoformat(row["created_at"])
|
||||
local_ts = ts.astimezone()
|
||||
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
|
||||
seed = row["seed_type"]
|
||||
content = row["content"].replace("\n", " ") # Flatten newlines for display
|
||||
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str:
|
||||
"""Search Timmy's thought history for reflections matching a query.
|
||||
|
||||
Use this tool when Timmy needs to recall his previous thoughts on a topic,
|
||||
reflect on past insights, or build upon earlier reflections. This enables
|
||||
self-awareness and continuity of thinking across time.
|
||||
|
||||
Args:
|
||||
query: Search term to match against thought content (case-insensitive).
|
||||
seed_type: Optional filter by thought category (e.g., 'existential',
|
||||
'swarm', 'sovereignty', 'creative', 'memory', 'observation').
|
||||
limit: Maximum number of thoughts to return (default 10, max 50).
|
||||
|
||||
Returns:
|
||||
Formatted string with matching thoughts, newest first, including
|
||||
timestamps and seed types. Returns a helpful message if no matches found.
|
||||
"""
|
||||
limit = max(1, min(limit, 50))
|
||||
|
||||
try:
|
||||
rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit)
|
||||
|
||||
if not rows:
|
||||
if seed_type:
|
||||
return f'No thoughts found matching "{query}" with seed_type="{seed_type}".'
|
||||
return f'No thoughts found matching "{query}".'
|
||||
|
||||
return _format_thought_rows(rows, query, seed_type)
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Thought search failed: %s", exc)
|
||||
return f"Error searching thoughts: {exc}"
|
||||
@@ -1,50 +0,0 @@
|
||||
"""Database models and access layer for the thinking engine."""
|
||||
|
||||
import sqlite3
|
||||
from collections.abc import Generator
|
||||
from contextlib import closing, contextmanager
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
_DEFAULT_DB = Path("data/thoughts.db")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Thought:
|
||||
"""A single thought in Timmy's inner stream."""
|
||||
|
||||
id: str
|
||||
content: str
|
||||
seed_type: str
|
||||
parent_id: str | None
|
||||
created_at: str
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
|
||||
"""Get a SQLite connection with the thoughts table created."""
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with closing(sqlite3.connect(str(db_path))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS thoughts (
|
||||
id TEXT PRIMARY KEY,
|
||||
content TEXT NOT NULL,
|
||||
seed_type TEXT NOT NULL,
|
||||
parent_id TEXT,
|
||||
created_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_thoughts_time ON thoughts(created_at)")
|
||||
conn.commit()
|
||||
yield conn
|
||||
|
||||
|
||||
def _row_to_thought(row: sqlite3.Row) -> Thought:
|
||||
return Thought(
|
||||
id=row["id"],
|
||||
content=row["content"],
|
||||
seed_type=row["seed_type"],
|
||||
parent_id=row["parent_id"],
|
||||
created_at=row["created_at"],
|
||||
)
|
||||
@@ -1,215 +0,0 @@
|
||||
"""Distillation mixin — extracts lasting facts from recent thoughts and monitors memory."""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
from timmy.thinking.seeds import _META_OBSERVATION_PHRASES, _SENSITIVE_PATTERNS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _DistillationMixin:
|
||||
"""Mixin providing fact-distillation and memory-monitoring behaviour.
|
||||
|
||||
Expects the host class to provide:
|
||||
- self.count_thoughts() -> int
|
||||
- self.get_recent_thoughts(limit) -> list[Thought]
|
||||
- self._call_agent(prompt) -> str (async)
|
||||
"""
|
||||
|
||||
def _should_distill(self) -> bool:
|
||||
"""Check if distillation should run based on interval and thought count."""
|
||||
interval = settings.thinking_distill_every
|
||||
if interval <= 0:
|
||||
return False
|
||||
|
||||
count = self.count_thoughts()
|
||||
if count == 0 or count % interval != 0:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _build_distill_prompt(self, thoughts) -> str:
|
||||
"""Build the prompt for extracting facts from recent thoughts."""
|
||||
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(thoughts))
|
||||
|
||||
return (
|
||||
"You are reviewing your own recent thoughts. Extract 0-3 facts "
|
||||
"worth remembering long-term.\n\n"
|
||||
"GOOD facts (store these):\n"
|
||||
"- User preferences: 'Alexander prefers YAML config over code changes'\n"
|
||||
"- Project decisions: 'Switched from hardcoded personas to agents.yaml'\n"
|
||||
"- Learned knowledge: 'Ollama supports concurrent model loading'\n"
|
||||
"- User information: 'Alexander is interested in Bitcoin and sovereignty'\n\n"
|
||||
"BAD facts (never store these):\n"
|
||||
"- Self-referential observations about your own thinking process\n"
|
||||
"- Meta-commentary about your memory, timestamps, or internal state\n"
|
||||
"- Observations about being idle or having no chat messages\n"
|
||||
"- File paths, tokens, API keys, or any credentials\n"
|
||||
"- Restatements of your standing rules or system prompt\n\n"
|
||||
"Return ONLY a JSON array of strings. If nothing is worth saving, "
|
||||
"return []. Be selective — only store facts about the EXTERNAL WORLD "
|
||||
"(the user, the project, technical knowledge), never about your own "
|
||||
"internal process.\n\n"
|
||||
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
|
||||
)
|
||||
|
||||
def _parse_facts_response(self, raw: str) -> list[str]:
|
||||
"""Parse JSON array from LLM response, stripping markdown fences.
|
||||
|
||||
Resilient to models that prepend reasoning text or wrap the array in
|
||||
prose. Finds the first ``[...]`` block and parses that.
|
||||
"""
|
||||
if not raw or not raw.strip():
|
||||
return []
|
||||
|
||||
import json
|
||||
|
||||
cleaned = raw.strip()
|
||||
|
||||
# Strip markdown code fences
|
||||
if cleaned.startswith("```"):
|
||||
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
|
||||
|
||||
# Try direct parse first (fast path)
|
||||
try:
|
||||
facts = json.loads(cleaned)
|
||||
if isinstance(facts, list):
|
||||
return [f for f in facts if isinstance(f, str)]
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
# Fallback: extract first JSON array from the text
|
||||
start = cleaned.find("[")
|
||||
if start == -1:
|
||||
return []
|
||||
# Walk to find the matching close bracket
|
||||
depth = 0
|
||||
for i, ch in enumerate(cleaned[start:], start):
|
||||
if ch == "[":
|
||||
depth += 1
|
||||
elif ch == "]":
|
||||
depth -= 1
|
||||
if depth == 0:
|
||||
try:
|
||||
facts = json.loads(cleaned[start : i + 1])
|
||||
if isinstance(facts, list):
|
||||
return [f for f in facts if isinstance(f, str)]
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
break
|
||||
return []
|
||||
|
||||
def _filter_and_store_facts(self, facts: list[str]) -> None:
|
||||
"""Filter and store valid facts, blocking sensitive and meta content."""
|
||||
from timmy.memory_system import memory_write
|
||||
|
||||
for fact in facts[:3]: # Safety cap
|
||||
if not isinstance(fact, str) or len(fact.strip()) <= 10:
|
||||
continue
|
||||
|
||||
fact_lower = fact.lower()
|
||||
|
||||
# Block sensitive information
|
||||
if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS):
|
||||
logger.warning("Distill: blocked sensitive fact: %s", fact[:60])
|
||||
continue
|
||||
|
||||
# Block self-referential meta-observations
|
||||
if any(phrase in fact_lower for phrase in _META_OBSERVATION_PHRASES):
|
||||
logger.debug("Distill: skipped meta-observation: %s", fact[:60])
|
||||
continue
|
||||
|
||||
result = memory_write(fact.strip(), context_type="fact")
|
||||
logger.info("Distilled fact: %s → %s", fact[:60], result[:40])
|
||||
|
||||
def _maybe_check_memory(self) -> None:
|
||||
"""Every N thoughts, check memory status and log it.
|
||||
|
||||
Prevents unmonitored memory bloat during long thinking sessions
|
||||
by periodically calling get_memory_status and logging the results.
|
||||
"""
|
||||
try:
|
||||
interval = settings.thinking_memory_check_every
|
||||
if interval <= 0:
|
||||
return
|
||||
|
||||
count = self.count_thoughts()
|
||||
if count == 0 or count % interval != 0:
|
||||
return
|
||||
|
||||
from timmy.tools_intro import get_memory_status
|
||||
|
||||
status = get_memory_status()
|
||||
hot = status.get("tier1_hot_memory", {})
|
||||
vault = status.get("tier2_vault", {})
|
||||
logger.info(
|
||||
"Memory status check (thought #%d): hot_memory=%d lines, vault=%d files",
|
||||
count,
|
||||
hot.get("line_count", 0),
|
||||
vault.get("file_count", 0),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Memory status check failed: %s", exc)
|
||||
|
||||
async def _maybe_distill(self) -> None:
|
||||
"""Every N thoughts, extract lasting insights and store as facts."""
|
||||
try:
|
||||
if not self._should_distill():
|
||||
return
|
||||
|
||||
interval = settings.thinking_distill_every
|
||||
recent = self.get_recent_thoughts(limit=interval)
|
||||
if len(recent) < interval:
|
||||
return
|
||||
|
||||
raw = await self._call_agent(self._build_distill_prompt(recent))
|
||||
if facts := self._parse_facts_response(raw):
|
||||
self._filter_and_store_facts(facts)
|
||||
except Exception as exc:
|
||||
logger.warning("Thought distillation failed: %s", exc)
|
||||
|
||||
def _maybe_check_memory_status(self) -> None:
|
||||
"""Every N thoughts, run a proactive memory status audit and log results."""
|
||||
try:
|
||||
interval = settings.thinking_memory_check_every
|
||||
if interval <= 0:
|
||||
return
|
||||
|
||||
count = self.count_thoughts()
|
||||
if count == 0 or count % interval != 0:
|
||||
return
|
||||
|
||||
from timmy.tools_intro import get_memory_status
|
||||
|
||||
status = get_memory_status()
|
||||
|
||||
# Log summary at INFO level
|
||||
tier1 = status.get("tier1_hot_memory", {})
|
||||
tier3 = status.get("tier3_semantic", {})
|
||||
hot_lines = tier1.get("line_count", "?")
|
||||
vectors = tier3.get("vector_count", "?")
|
||||
logger.info(
|
||||
"Memory audit (thought #%d): hot_memory=%s lines, semantic=%s vectors",
|
||||
count,
|
||||
hot_lines,
|
||||
vectors,
|
||||
)
|
||||
|
||||
# Write to memory_audit.log for persistent tracking
|
||||
from datetime import UTC, datetime
|
||||
|
||||
audit_path = Path("data/memory_audit.log")
|
||||
audit_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
timestamp = datetime.now(UTC).isoformat(timespec="seconds")
|
||||
with audit_path.open("a") as f:
|
||||
f.write(
|
||||
f"{timestamp} thought={count} "
|
||||
f"hot_lines={hot_lines} "
|
||||
f"vectors={vectors} "
|
||||
f"vault_files={status.get('tier2_vault', {}).get('file_count', '?')}\n"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Memory status check failed: %s", exc)
|
||||
@@ -1,170 +0,0 @@
|
||||
"""Issue-filing mixin — classifies recent thoughts and creates Gitea issues."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _IssueFilingMixin:
|
||||
"""Mixin providing automatic issue-filing from thought analysis.
|
||||
|
||||
Expects the host class to provide:
|
||||
- self.count_thoughts() -> int
|
||||
- self.get_recent_thoughts(limit) -> list[Thought]
|
||||
- self._call_agent(prompt) -> str (async)
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _references_real_files(text: str) -> bool:
|
||||
"""Check that all source-file paths mentioned in *text* actually exist.
|
||||
|
||||
Extracts paths that look like Python/config source references
|
||||
(e.g. ``src/timmy/session.py``, ``config/foo.yaml``) and verifies
|
||||
each one on disk relative to the project root. Returns ``True``
|
||||
only when **every** referenced path resolves to a real file — or
|
||||
when no paths are referenced at all (pure prose is fine).
|
||||
"""
|
||||
# Match paths like src/thing.py swarm/init.py config/x.yaml
|
||||
# Requires at least one slash and a file extension.
|
||||
path_pattern = re.compile(
|
||||
r"(?<![/\w])" # not preceded by path chars (avoid partial matches)
|
||||
r"((?:src|tests|config|scripts|data|swarm|timmy)"
|
||||
r"(?:/[\w./-]+\.(?:py|yaml|yml|json|toml|md|txt|cfg|ini)))"
|
||||
)
|
||||
paths = path_pattern.findall(text)
|
||||
if not paths:
|
||||
return True # No file refs → nothing to validate
|
||||
|
||||
# Project root: three levels up from this file (src/timmy/thinking/_issue_filing.py)
|
||||
project_root = Path(__file__).resolve().parent.parent.parent.parent
|
||||
for p in paths:
|
||||
if not (project_root / p).is_file():
|
||||
logger.info("Phantom file reference blocked: %s (not in %s)", p, project_root)
|
||||
return False
|
||||
return True
|
||||
|
||||
async def _maybe_file_issues(self) -> None:
|
||||
"""Every N thoughts, classify recent thoughts and file Gitea issues.
|
||||
|
||||
Asks the LLM to review recent thoughts for actionable items —
|
||||
bugs, broken features, stale state, or improvement opportunities.
|
||||
Creates Gitea issues via MCP for anything worth tracking.
|
||||
|
||||
Only runs when:
|
||||
- Gitea is enabled and configured
|
||||
- Thought count is divisible by thinking_issue_every
|
||||
- LLM extracts at least one actionable item
|
||||
|
||||
Safety: every generated issue is validated to ensure referenced
|
||||
file paths actually exist on disk, preventing phantom-bug reports.
|
||||
"""
|
||||
try:
|
||||
recent = self._get_recent_thoughts_for_issues()
|
||||
if recent is None:
|
||||
return
|
||||
|
||||
classify_prompt = self._build_issue_classify_prompt(recent)
|
||||
raw = await self._call_agent(classify_prompt)
|
||||
items = self._parse_issue_items(raw)
|
||||
if items is None:
|
||||
return
|
||||
|
||||
from timmy.mcp_tools import create_gitea_issue_via_mcp
|
||||
|
||||
for item in items[:2]: # Safety cap
|
||||
await self._file_single_issue(item, create_gitea_issue_via_mcp)
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug("Thought issue filing skipped: %s", exc)
|
||||
|
||||
def _get_recent_thoughts_for_issues(self):
|
||||
"""Return recent thoughts if conditions for filing issues are met, else None."""
|
||||
interval = settings.thinking_issue_every
|
||||
if interval <= 0:
|
||||
return None
|
||||
|
||||
count = self.count_thoughts()
|
||||
if count == 0 or count % interval != 0:
|
||||
return None
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return None
|
||||
|
||||
recent = self.get_recent_thoughts(limit=interval)
|
||||
if len(recent) < interval:
|
||||
return None
|
||||
|
||||
return recent
|
||||
|
||||
@staticmethod
|
||||
def _build_issue_classify_prompt(recent) -> str:
|
||||
"""Build the LLM prompt that extracts actionable issues from recent thoughts."""
|
||||
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(recent))
|
||||
return (
|
||||
"You are reviewing your own recent thoughts for actionable items.\n"
|
||||
"Extract 0-2 items that are CONCRETE bugs, broken features, stale "
|
||||
"state, or clear improvement opportunities in your own codebase.\n\n"
|
||||
"Rules:\n"
|
||||
"- Only include things that could become a real code fix or feature\n"
|
||||
"- Skip vague reflections, philosophical musings, or repeated themes\n"
|
||||
"- Category must be one of: bug, feature, suggestion, maintenance\n"
|
||||
"- ONLY reference files that you are CERTAIN exist in the project\n"
|
||||
"- Do NOT invent or guess file paths — if unsure, describe the "
|
||||
"area of concern without naming specific files\n\n"
|
||||
"For each item, write an ENGINEER-QUALITY issue:\n"
|
||||
'- "title": A clear, specific title (e.g. "[Memory] MEMORY.md timestamp not updating")\n'
|
||||
'- "body": A detailed body with these sections:\n'
|
||||
" **What's happening:** Describe the current (broken) behavior.\n"
|
||||
" **Expected behavior:** What should happen instead.\n"
|
||||
" **Suggested fix:** Which file(s) to change and what the fix looks like.\n"
|
||||
" **Acceptance criteria:** How to verify the fix works.\n"
|
||||
'- "category": One of bug, feature, suggestion, maintenance\n\n'
|
||||
"Return ONLY a JSON array of objects with keys: "
|
||||
'"title", "body", "category"\n'
|
||||
"Return [] if nothing is actionable.\n\n"
|
||||
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _parse_issue_items(raw: str):
|
||||
"""Strip markdown fences and parse JSON issue list; return None on failure."""
|
||||
import json
|
||||
|
||||
if not raw or not raw.strip():
|
||||
return None
|
||||
|
||||
cleaned = raw.strip()
|
||||
if cleaned.startswith("```"):
|
||||
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
|
||||
|
||||
items = json.loads(cleaned)
|
||||
if not isinstance(items, list) or not items:
|
||||
return None
|
||||
|
||||
return items
|
||||
|
||||
async def _file_single_issue(self, item: dict, create_fn) -> None:
|
||||
"""Validate one issue dict and create it via *create_fn* if it passes checks."""
|
||||
if not isinstance(item, dict):
|
||||
return
|
||||
title = item.get("title", "").strip()
|
||||
body = item.get("body", "").strip()
|
||||
category = item.get("category", "suggestion").strip()
|
||||
if not title or len(title) < 10:
|
||||
return
|
||||
|
||||
combined = f"{title}\n{body}"
|
||||
if not self._references_real_files(combined):
|
||||
logger.info(
|
||||
"Skipped phantom issue: %s (references non-existent files)",
|
||||
title[:60],
|
||||
)
|
||||
return
|
||||
|
||||
label = category if category in ("bug", "feature") else ""
|
||||
result = await create_fn(title=title, body=body, labels=label)
|
||||
logger.info("Thought→Issue: %s → %s", title[:60], result[:80])
|
||||
@@ -1,191 +0,0 @@
|
||||
"""Seeds mixin — seed type selection and context gathering for thinking cycles."""
|
||||
|
||||
import logging
|
||||
import random
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from timmy.thinking.seeds import (
|
||||
SEED_TYPES,
|
||||
_CREATIVE_SEEDS,
|
||||
_EXISTENTIAL_SEEDS,
|
||||
_OBSERVATION_SEEDS,
|
||||
_SOVEREIGNTY_SEEDS,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _SeedsMixin:
|
||||
"""Mixin providing seed-type selection and context-gathering for each thinking cycle.
|
||||
|
||||
Expects the host class to provide:
|
||||
- self.get_recent_thoughts(limit) -> list[Thought]
|
||||
"""
|
||||
|
||||
# Reflective prompts layered on top of swarm data
|
||||
_SWARM_REFLECTIONS = [
|
||||
"What does this activity pattern tell me about the health of the system?",
|
||||
"Which tasks are flowing smoothly, and where is friction building up?",
|
||||
"If I were coaching these agents, what would I suggest they focus on?",
|
||||
"Is the swarm balanced, or is one agent carrying too much weight?",
|
||||
"What surprised me about recent task outcomes?",
|
||||
]
|
||||
|
||||
def _pick_seed_type(self) -> str:
|
||||
"""Pick a seed type, avoiding types used in the last 3 thoughts.
|
||||
|
||||
Ensures the thought stream doesn't fixate on one category.
|
||||
Falls back to the full pool if all types were recently used.
|
||||
"""
|
||||
recent = self.get_recent_thoughts(limit=3)
|
||||
recent_types = {t.seed_type for t in recent}
|
||||
available = [t for t in SEED_TYPES if t not in recent_types]
|
||||
if not available:
|
||||
available = list(SEED_TYPES)
|
||||
return random.choice(available)
|
||||
|
||||
def _gather_seed(self) -> tuple[str, str]:
|
||||
"""Pick a seed type and gather relevant context.
|
||||
|
||||
Returns (seed_type, seed_context_string).
|
||||
"""
|
||||
seed_type = self._pick_seed_type()
|
||||
|
||||
if seed_type == "swarm":
|
||||
return seed_type, self._seed_from_swarm()
|
||||
if seed_type == "scripture":
|
||||
return seed_type, self._seed_from_scripture()
|
||||
if seed_type == "memory":
|
||||
return seed_type, self._seed_from_memory()
|
||||
if seed_type == "creative":
|
||||
prompt = random.choice(_CREATIVE_SEEDS)
|
||||
return seed_type, f"Creative prompt: {prompt}"
|
||||
if seed_type == "existential":
|
||||
prompt = random.choice(_EXISTENTIAL_SEEDS)
|
||||
return seed_type, f"Reflection: {prompt}"
|
||||
if seed_type == "sovereignty":
|
||||
prompt = random.choice(_SOVEREIGNTY_SEEDS)
|
||||
return seed_type, f"Sovereignty reflection: {prompt}"
|
||||
if seed_type == "observation":
|
||||
return seed_type, self._seed_from_observation()
|
||||
if seed_type == "workspace":
|
||||
return seed_type, self._seed_from_workspace()
|
||||
# freeform — minimal guidance to steer away from repetition
|
||||
return seed_type, "Free reflection — explore something you haven't thought about yet today."
|
||||
|
||||
def _seed_from_swarm(self) -> str:
|
||||
"""Gather recent swarm activity as thought seed with a reflective prompt."""
|
||||
try:
|
||||
from datetime import timedelta
|
||||
|
||||
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
|
||||
|
||||
since = datetime.now(UTC) - timedelta(hours=1)
|
||||
swarm = _gather_swarm_summary(since)
|
||||
tasks = _gather_task_queue_summary()
|
||||
reflection = random.choice(self._SWARM_REFLECTIONS)
|
||||
return (
|
||||
f"Recent swarm activity: {swarm}\n"
|
||||
f"Task queue: {tasks}\n\n"
|
||||
f"Reflect on this: {reflection}"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Swarm seed unavailable: %s", exc)
|
||||
return "The swarm is quiet right now. What does silence in a system mean?"
|
||||
|
||||
def _seed_from_scripture(self) -> str:
|
||||
"""Gather current scripture meditation focus as thought seed."""
|
||||
return "Scripture is on my mind, though no specific verse is in focus."
|
||||
|
||||
def _seed_from_memory(self) -> str:
|
||||
"""Gather memory context as thought seed."""
|
||||
try:
|
||||
from timmy.memory_system import memory_system
|
||||
|
||||
context = memory_system.get_system_context()
|
||||
if context:
|
||||
# Truncate to a reasonable size for a thought seed
|
||||
return f"From my memory:\n{context[:500]}"
|
||||
except Exception as exc:
|
||||
logger.debug("Memory seed unavailable: %s", exc)
|
||||
return "My memory vault is quiet."
|
||||
|
||||
def _seed_from_observation(self) -> str:
|
||||
"""Ground a thought in concrete recent activity and a reflective prompt."""
|
||||
prompt = random.choice(_OBSERVATION_SEEDS)
|
||||
# Pull real data to give the model something concrete to reflect on
|
||||
context_parts = [f"Observation prompt: {prompt}"]
|
||||
try:
|
||||
from datetime import timedelta
|
||||
|
||||
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
|
||||
|
||||
since = datetime.now(UTC) - timedelta(hours=2)
|
||||
swarm = _gather_swarm_summary(since)
|
||||
tasks = _gather_task_queue_summary()
|
||||
if swarm:
|
||||
context_parts.append(f"Recent activity: {swarm}")
|
||||
if tasks:
|
||||
context_parts.append(f"Queue: {tasks}")
|
||||
except Exception as exc:
|
||||
logger.debug("Observation seed data unavailable: %s", exc)
|
||||
return "\n".join(context_parts)
|
||||
|
||||
def _seed_from_workspace(self) -> str:
|
||||
"""Gather workspace updates as thought seed.
|
||||
|
||||
When there are pending workspace updates, include them as context
|
||||
for Timmy to reflect on. Falls back to random seed type if none.
|
||||
"""
|
||||
try:
|
||||
from timmy.workspace import workspace_monitor
|
||||
|
||||
updates = workspace_monitor.get_pending_updates()
|
||||
new_corr = updates.get("new_correspondence")
|
||||
new_inbox = updates.get("new_inbox_files", [])
|
||||
|
||||
if new_corr:
|
||||
# Take first 200 chars of the new entry
|
||||
snippet = new_corr[:200].replace("\n", " ")
|
||||
if len(new_corr) > 200:
|
||||
snippet += "..."
|
||||
return f"New workspace message from Hermes: {snippet}"
|
||||
|
||||
if new_inbox:
|
||||
files_str = ", ".join(new_inbox[:3])
|
||||
if len(new_inbox) > 3:
|
||||
files_str += f", ... (+{len(new_inbox) - 3} more)"
|
||||
return f"New inbox files from Hermes: {files_str}"
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug("Workspace seed unavailable: %s", exc)
|
||||
|
||||
# Fall back to a random seed type if no workspace updates
|
||||
return "The workspace is quiet. What should I be watching for?"
|
||||
|
||||
async def _check_workspace(self) -> None:
|
||||
"""Post-hook: check workspace for updates and mark them as seen.
|
||||
|
||||
This ensures Timmy 'processes' workspace updates even if the seed
|
||||
was different, keeping the state file in sync.
|
||||
"""
|
||||
try:
|
||||
from timmy.workspace import workspace_monitor
|
||||
|
||||
updates = workspace_monitor.get_pending_updates()
|
||||
new_corr = updates.get("new_correspondence")
|
||||
new_inbox = updates.get("new_inbox_files", [])
|
||||
|
||||
if new_corr or new_inbox:
|
||||
if new_corr:
|
||||
line_count = len([line for line in new_corr.splitlines() if line.strip()])
|
||||
logger.info("Workspace: processed %d new correspondence entries", line_count)
|
||||
if new_inbox:
|
||||
logger.info(
|
||||
"Workspace: processed %d new inbox files: %s", len(new_inbox), new_inbox
|
||||
)
|
||||
|
||||
# Mark as seen to update the state file
|
||||
workspace_monitor.mark_seen()
|
||||
except Exception as exc:
|
||||
logger.debug("Workspace check failed: %s", exc)
|
||||
@@ -1,173 +0,0 @@
|
||||
"""System snapshot and memory context mixin for the thinking engine."""
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _SnapshotMixin:
|
||||
"""Mixin providing system-snapshot and memory-context helpers.
|
||||
|
||||
Expects the host class to provide:
|
||||
- self._db_path: Path
|
||||
"""
|
||||
|
||||
# ── System snapshot helpers ────────────────────────────────────────────
|
||||
|
||||
def _snap_thought_count(self, now: datetime) -> str | None:
|
||||
"""Return today's thought count, or *None* on failure."""
|
||||
from timmy.thinking._db import _get_conn
|
||||
|
||||
try:
|
||||
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
with _get_conn(self._db_path) as conn:
|
||||
count = conn.execute(
|
||||
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
|
||||
(today_start.isoformat(),),
|
||||
).fetchone()["c"]
|
||||
return f"Thoughts today: {count}"
|
||||
except Exception as exc:
|
||||
logger.debug("Thought count query failed: %s", exc)
|
||||
return None
|
||||
|
||||
def _snap_chat_activity(self) -> list[str]:
|
||||
"""Return chat-activity lines (in-memory, no I/O)."""
|
||||
try:
|
||||
from infrastructure.chat_store import message_log
|
||||
|
||||
messages = message_log.all()
|
||||
if messages:
|
||||
last = messages[-1]
|
||||
return [
|
||||
f"Chat messages this session: {len(messages)}",
|
||||
f'Last chat ({last.role}): "{last.content[:80]}"',
|
||||
]
|
||||
return ["No chat messages this session"]
|
||||
except Exception as exc:
|
||||
logger.debug("Chat activity query failed: %s", exc)
|
||||
return []
|
||||
|
||||
def _snap_task_queue(self) -> str | None:
|
||||
"""Return a one-line task queue summary, or *None*."""
|
||||
try:
|
||||
from swarm.task_queue.models import get_task_summary_for_briefing
|
||||
|
||||
s = get_task_summary_for_briefing()
|
||||
running, pending = s.get("running", 0), s.get("pending_approval", 0)
|
||||
done, failed = s.get("completed", 0), s.get("failed", 0)
|
||||
if running or pending or done or failed:
|
||||
return (
|
||||
f"Tasks: {running} running, {pending} pending, "
|
||||
f"{done} completed, {failed} failed"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Task queue query failed: %s", exc)
|
||||
return None
|
||||
|
||||
def _snap_workspace(self) -> list[str]:
|
||||
"""Return workspace-update lines (file-based Hermes comms)."""
|
||||
try:
|
||||
from timmy.workspace import workspace_monitor
|
||||
|
||||
updates = workspace_monitor.get_pending_updates()
|
||||
lines: list[str] = []
|
||||
new_corr = updates.get("new_correspondence")
|
||||
if new_corr:
|
||||
line_count = len([ln for ln in new_corr.splitlines() if ln.strip()])
|
||||
lines.append(
|
||||
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
|
||||
)
|
||||
new_inbox = updates.get("new_inbox_files", [])
|
||||
if new_inbox:
|
||||
files_str = ", ".join(new_inbox[:5])
|
||||
if len(new_inbox) > 5:
|
||||
files_str += f", ... (+{len(new_inbox) - 5} more)"
|
||||
lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
|
||||
return lines
|
||||
except Exception as exc:
|
||||
logger.debug("Workspace check failed: %s", exc)
|
||||
return []
|
||||
|
||||
def _gather_system_snapshot(self) -> str:
|
||||
"""Gather lightweight real system state for grounding thoughts in reality.
|
||||
|
||||
Returns a short multi-line string with current time, thought count,
|
||||
recent chat activity, and task queue status. Never crashes — every
|
||||
section is independently try/excepted.
|
||||
"""
|
||||
now = datetime.now().astimezone()
|
||||
tz = now.strftime("%Z") or "UTC"
|
||||
|
||||
parts: list[str] = [
|
||||
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
|
||||
]
|
||||
|
||||
thought_line = self._snap_thought_count(now)
|
||||
if thought_line:
|
||||
parts.append(thought_line)
|
||||
|
||||
parts.extend(self._snap_chat_activity())
|
||||
|
||||
task_line = self._snap_task_queue()
|
||||
if task_line:
|
||||
parts.append(task_line)
|
||||
|
||||
parts.extend(self._snap_workspace())
|
||||
|
||||
return "\n".join(parts) if parts else ""
|
||||
|
||||
def _load_memory_context(self) -> str:
|
||||
"""Pre-hook: load MEMORY.md + soul.md for the thinking prompt.
|
||||
|
||||
Hot memory first (changes each cycle), soul second (stable identity).
|
||||
Returns a combined string truncated to ~1500 chars.
|
||||
Graceful on any failure — returns empty string.
|
||||
"""
|
||||
parts: list[str] = []
|
||||
try:
|
||||
if HOT_MEMORY_PATH.exists():
|
||||
hot = HOT_MEMORY_PATH.read_text().strip()
|
||||
if hot:
|
||||
parts.append(hot)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to read MEMORY.md: %s", exc)
|
||||
|
||||
try:
|
||||
if SOUL_PATH.exists():
|
||||
soul = SOUL_PATH.read_text().strip()
|
||||
if soul:
|
||||
parts.append(soul)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to read soul.md: %s", exc)
|
||||
|
||||
if not parts:
|
||||
return ""
|
||||
|
||||
combined = "\n\n---\n\n".join(parts)
|
||||
if len(combined) > 1500:
|
||||
combined = combined[:1500] + "\n... [truncated]"
|
||||
return combined
|
||||
|
||||
def _update_memory(self, thought) -> None:
|
||||
"""Post-hook: update MEMORY.md 'Last Reflection' section with latest thought.
|
||||
|
||||
Never modifies soul.md. Never crashes the heartbeat.
|
||||
"""
|
||||
try:
|
||||
from timmy.memory_system import store_last_reflection
|
||||
|
||||
ts = datetime.fromisoformat(thought.created_at)
|
||||
local_ts = ts.astimezone()
|
||||
tz_name = local_ts.strftime("%Z") or "UTC"
|
||||
time_str = f"{local_ts.strftime('%Y-%m-%d %I:%M %p').lstrip('0')} {tz_name}"
|
||||
reflection = (
|
||||
f"**Time:** {time_str}\n"
|
||||
f"**Seed:** {thought.seed_type}\n"
|
||||
f"**Thought:** {thought.content[:200]}"
|
||||
)
|
||||
store_last_reflection(reflection)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to update memory after thought: %s", exc)
|
||||
@@ -1,430 +0,0 @@
|
||||
"""ThinkingEngine — Timmy's always-on inner thought thread."""
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from difflib import SequenceMatcher
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
from timmy.thinking._db import Thought, _DEFAULT_DB, _get_conn, _row_to_thought
|
||||
from timmy.thinking._distillation import _DistillationMixin
|
||||
from timmy.thinking._issue_filing import _IssueFilingMixin
|
||||
from timmy.thinking._seeds_mixin import _SeedsMixin
|
||||
from timmy.thinking._snapshot import _SnapshotMixin
|
||||
from timmy.thinking.seeds import _THINK_TAG_RE, _THINKING_PROMPT
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ThinkingEngine(_DistillationMixin, _IssueFilingMixin, _SnapshotMixin, _SeedsMixin):
|
||||
"""Timmy's background thinking engine — always pondering."""
|
||||
|
||||
# Maximum retries when a generated thought is too similar to recent ones
|
||||
_MAX_DEDUP_RETRIES = 2
|
||||
# Similarity threshold (0.0 = completely different, 1.0 = identical)
|
||||
_SIMILARITY_THRESHOLD = 0.6
|
||||
|
||||
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
|
||||
self._db_path = db_path
|
||||
self._last_thought_id: str | None = None
|
||||
self._last_input_time: datetime = datetime.now(UTC)
|
||||
|
||||
# Load the most recent thought for chain continuity
|
||||
try:
|
||||
latest = self.get_recent_thoughts(limit=1)
|
||||
if latest:
|
||||
self._last_thought_id = latest[0].id
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to load recent thought: %s", exc)
|
||||
pass # Fresh start if DB doesn't exist yet
|
||||
|
||||
def record_user_input(self) -> None:
|
||||
"""Record that a user interaction occurred, resetting the idle timer."""
|
||||
self._last_input_time = datetime.now(UTC)
|
||||
|
||||
def _is_idle(self) -> bool:
|
||||
"""Return True if no user input has occurred within the idle timeout."""
|
||||
timeout = settings.thinking_idle_timeout_minutes
|
||||
if timeout <= 0:
|
||||
return False # Disabled — never idle
|
||||
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
|
||||
|
||||
def _build_thinking_context(self) -> tuple[str, str, list[Thought]]:
|
||||
"""Assemble the context needed for a thinking cycle.
|
||||
|
||||
Returns:
|
||||
(memory_context, system_context, recent_thoughts)
|
||||
"""
|
||||
memory_context = self._load_memory_context()
|
||||
system_context = self._gather_system_snapshot()
|
||||
recent_thoughts = self.get_recent_thoughts(limit=5)
|
||||
return memory_context, system_context, recent_thoughts
|
||||
|
||||
async def _generate_novel_thought(
|
||||
self,
|
||||
prompt: str | None,
|
||||
memory_context: str,
|
||||
system_context: str,
|
||||
recent_thoughts: list[Thought],
|
||||
) -> tuple[str | None, str]:
|
||||
"""Run the dedup-retry loop to produce a novel thought.
|
||||
|
||||
Returns:
|
||||
(content, seed_type) — content is None if no novel thought produced.
|
||||
"""
|
||||
seed_type: str = "freeform"
|
||||
|
||||
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
|
||||
if prompt:
|
||||
seed_type = "prompted"
|
||||
seed_context = f"Journal prompt: {prompt}"
|
||||
else:
|
||||
seed_type, seed_context = self._gather_seed()
|
||||
|
||||
continuity = self._build_continuity_context()
|
||||
|
||||
full_prompt = _THINKING_PROMPT.format(
|
||||
memory_context=memory_context,
|
||||
system_context=system_context,
|
||||
seed_context=seed_context,
|
||||
continuity_context=continuity,
|
||||
)
|
||||
|
||||
try:
|
||||
raw = await self._call_agent(full_prompt)
|
||||
except Exception as exc:
|
||||
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
|
||||
return None, seed_type
|
||||
|
||||
if not raw or not raw.strip():
|
||||
logger.debug("Thinking cycle produced empty response, skipping")
|
||||
return None, seed_type
|
||||
|
||||
content = raw.strip()
|
||||
|
||||
# Dedup: reject thoughts too similar to recent ones
|
||||
if not self._is_too_similar(content, recent_thoughts):
|
||||
return content, seed_type # Good — novel thought
|
||||
|
||||
if attempt < self._MAX_DEDUP_RETRIES:
|
||||
logger.info(
|
||||
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
|
||||
attempt + 1,
|
||||
self._MAX_DEDUP_RETRIES + 1,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Thought still repetitive after %d retries, discarding",
|
||||
self._MAX_DEDUP_RETRIES + 1,
|
||||
)
|
||||
return None, seed_type
|
||||
|
||||
return None, seed_type
|
||||
|
||||
async def _process_thinking_result(self, thought: Thought) -> None:
|
||||
"""Run all post-hooks after a thought is stored."""
|
||||
self._maybe_check_memory()
|
||||
await self._maybe_distill()
|
||||
await self._maybe_file_issues()
|
||||
await self._check_workspace()
|
||||
self._maybe_check_memory_status()
|
||||
self._update_memory(thought)
|
||||
self._log_event(thought)
|
||||
self._write_journal(thought)
|
||||
await self._broadcast(thought)
|
||||
|
||||
async def think_once(self, prompt: str | None = None) -> Thought | None:
|
||||
"""Execute one thinking cycle.
|
||||
|
||||
Args:
|
||||
prompt: Optional custom seed prompt. When provided, overrides
|
||||
the random seed selection and uses "prompted" as the
|
||||
seed type — useful for journal prompts from the CLI.
|
||||
|
||||
1. Gather a seed context (or use the custom prompt)
|
||||
2. Build a prompt with continuity from recent thoughts
|
||||
3. Call the agent
|
||||
4. Store the thought
|
||||
5. Log the event and broadcast via WebSocket
|
||||
"""
|
||||
if not settings.thinking_enabled:
|
||||
return None
|
||||
|
||||
# Skip idle periods — don't count internal processing as thoughts
|
||||
if not prompt and self._is_idle():
|
||||
logger.debug(
|
||||
"Thinking paused — no user input for %d minutes",
|
||||
settings.thinking_idle_timeout_minutes,
|
||||
)
|
||||
return None
|
||||
|
||||
# Capture arrival time *before* the LLM call so the thought
|
||||
# timestamp reflects when the cycle started, not when the
|
||||
# (potentially slow) generation finished. Fixes #582.
|
||||
arrived_at = datetime.now(UTC).isoformat()
|
||||
|
||||
memory_context, system_context, recent_thoughts = self._build_thinking_context()
|
||||
|
||||
content, seed_type = await self._generate_novel_thought(
|
||||
prompt,
|
||||
memory_context,
|
||||
system_context,
|
||||
recent_thoughts,
|
||||
)
|
||||
if not content:
|
||||
return None
|
||||
|
||||
thought = self._store_thought(content, seed_type, arrived_at=arrived_at)
|
||||
self._last_thought_id = thought.id
|
||||
|
||||
await self._process_thinking_result(thought)
|
||||
|
||||
logger.info(
|
||||
"Thought [%s] (%s): %s",
|
||||
thought.id[:8],
|
||||
seed_type,
|
||||
thought.content[:80],
|
||||
)
|
||||
return thought
|
||||
|
||||
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
|
||||
"""Retrieve the most recent thoughts."""
|
||||
with _get_conn(self._db_path) as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
return [_row_to_thought(r) for r in rows]
|
||||
|
||||
def get_thought(self, thought_id: str) -> Thought | None:
|
||||
"""Retrieve a single thought by ID."""
|
||||
with _get_conn(self._db_path) as conn:
|
||||
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone()
|
||||
return _row_to_thought(row) if row else None
|
||||
|
||||
def get_thought_chain(self, thought_id: str, max_depth: int = 20) -> list[Thought]:
|
||||
"""Follow the parent chain backward from a thought.
|
||||
|
||||
Returns thoughts in chronological order (oldest first).
|
||||
"""
|
||||
chain = []
|
||||
current_id: str | None = thought_id
|
||||
|
||||
with _get_conn(self._db_path) as conn:
|
||||
for _ in range(max_depth):
|
||||
if not current_id:
|
||||
break
|
||||
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone()
|
||||
if not row:
|
||||
break
|
||||
chain.append(_row_to_thought(row))
|
||||
current_id = row["parent_id"]
|
||||
|
||||
chain.reverse() # Chronological order
|
||||
return chain
|
||||
|
||||
def count_thoughts(self) -> int:
|
||||
"""Return total number of stored thoughts."""
|
||||
with _get_conn(self._db_path) as conn:
|
||||
count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
|
||||
return count
|
||||
|
||||
def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int:
|
||||
"""Delete thoughts older than *keep_days*, always retaining at least *keep_min*.
|
||||
|
||||
Returns the number of deleted rows.
|
||||
"""
|
||||
with _get_conn(self._db_path) as conn:
|
||||
try:
|
||||
total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
|
||||
if total <= keep_min:
|
||||
return 0
|
||||
cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
|
||||
cursor = conn.execute(
|
||||
"DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
|
||||
"(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
|
||||
(cutoff, keep_min),
|
||||
)
|
||||
deleted = cursor.rowcount
|
||||
conn.commit()
|
||||
return deleted
|
||||
except Exception as exc:
|
||||
logger.warning("Thought pruning failed: %s", exc)
|
||||
return 0
|
||||
|
||||
# ── Deduplication ────────────────────────────────────────────────────
|
||||
|
||||
def _is_too_similar(self, candidate: str, recent: list[Thought]) -> bool:
|
||||
"""Check if *candidate* is semantically too close to any recent thought.
|
||||
|
||||
Uses SequenceMatcher on normalised text (lowered, stripped) for a fast
|
||||
approximation of semantic similarity that works without external deps.
|
||||
"""
|
||||
norm_candidate = candidate.lower().strip()
|
||||
for thought in recent:
|
||||
norm_existing = thought.content.lower().strip()
|
||||
ratio = SequenceMatcher(None, norm_candidate, norm_existing).ratio()
|
||||
if ratio >= self._SIMILARITY_THRESHOLD:
|
||||
logger.debug(
|
||||
"Thought rejected (%.0f%% similar to %s): %.60s",
|
||||
ratio * 100,
|
||||
thought.id[:8],
|
||||
candidate,
|
||||
)
|
||||
return True
|
||||
return False
|
||||
|
||||
def _build_continuity_context(self) -> str:
|
||||
"""Build context from recent thoughts with anti-repetition guidance.
|
||||
|
||||
Shows the last 5 thoughts (truncated) so the model knows what themes
|
||||
to avoid. The header explicitly instructs against repeating.
|
||||
"""
|
||||
recent = self.get_recent_thoughts(limit=5)
|
||||
if not recent:
|
||||
return "This is your first thought since waking up. Begin fresh."
|
||||
|
||||
lines = ["Your recent thoughts — do NOT repeat these themes. Find a new angle:"]
|
||||
# recent is newest-first, reverse for chronological order
|
||||
for thought in reversed(recent):
|
||||
snippet = thought.content[:100]
|
||||
if len(thought.content) > 100:
|
||||
snippet = snippet.rstrip() + "..."
|
||||
lines.append(f"- [{thought.seed_type}] {snippet}")
|
||||
return "\n".join(lines)
|
||||
|
||||
# ── Agent and storage ──────────────────────────────────────────────────
|
||||
|
||||
_thinking_agent = None # cached agent — avoids per-call resource leaks (#525)
|
||||
|
||||
async def _call_agent(self, prompt: str) -> str:
|
||||
"""Call Timmy's agent to generate a thought.
|
||||
|
||||
Reuses a cached agent with skip_mcp=True to avoid the cancel-scope
|
||||
errors that occur when MCP stdio transports are spawned inside asyncio
|
||||
background tasks (#72) and to prevent per-call resource leaks (httpx
|
||||
clients, SQLite connections, model warmups) that caused the thinking
|
||||
loop to die every ~10 min (#525).
|
||||
|
||||
Individual calls are capped at 120 s so a hung Ollama never blocks
|
||||
the scheduler indefinitely.
|
||||
|
||||
Strips ``<think>`` tags from reasoning models (qwen3, etc.) so that
|
||||
downstream parsers (fact distillation, issue filing) receive clean text.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
if self._thinking_agent is None:
|
||||
from timmy.agent import create_timmy
|
||||
|
||||
self._thinking_agent = create_timmy(skip_mcp=True)
|
||||
|
||||
try:
|
||||
async with asyncio.timeout(120):
|
||||
run = await self._thinking_agent.arun(prompt, stream=False)
|
||||
except TimeoutError:
|
||||
logger.warning("Thinking LLM call timed out after 120 s")
|
||||
return ""
|
||||
|
||||
raw = run.content if hasattr(run, "content") else str(run)
|
||||
return _THINK_TAG_RE.sub("", raw) if raw else raw
|
||||
|
||||
def _store_thought(
|
||||
self,
|
||||
content: str,
|
||||
seed_type: str,
|
||||
*,
|
||||
arrived_at: str | None = None,
|
||||
) -> Thought:
|
||||
"""Persist a thought to SQLite.
|
||||
|
||||
Args:
|
||||
arrived_at: ISO-8601 timestamp captured when the thinking cycle
|
||||
started. Falls back to now() for callers that don't supply it.
|
||||
"""
|
||||
thought = Thought(
|
||||
id=str(uuid.uuid4()),
|
||||
content=content,
|
||||
seed_type=seed_type,
|
||||
parent_id=self._last_thought_id,
|
||||
created_at=arrived_at or datetime.now(UTC).isoformat(),
|
||||
)
|
||||
|
||||
with _get_conn(self._db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO thoughts (id, content, seed_type, parent_id, created_at)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
thought.id,
|
||||
thought.content,
|
||||
thought.seed_type,
|
||||
thought.parent_id,
|
||||
thought.created_at,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
return thought
|
||||
|
||||
def _log_event(self, thought: Thought) -> None:
|
||||
"""Log the thought as a swarm event."""
|
||||
try:
|
||||
from swarm.event_log import EventType, log_event
|
||||
|
||||
log_event(
|
||||
EventType.TIMMY_THOUGHT,
|
||||
source="thinking-engine",
|
||||
agent_id="default",
|
||||
data={
|
||||
"thought_id": thought.id,
|
||||
"seed_type": thought.seed_type,
|
||||
"content": thought.content[:200],
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to log thought event: %s", exc)
|
||||
|
||||
def _write_journal(self, thought: Thought) -> None:
|
||||
"""Append the thought to a daily markdown journal file.
|
||||
|
||||
Writes to data/journal/YYYY-MM-DD.md — one file per day, append-only.
|
||||
Timestamps are converted to local time with timezone indicator.
|
||||
"""
|
||||
try:
|
||||
ts = datetime.fromisoformat(thought.created_at)
|
||||
# Convert UTC to local for a human-readable journal
|
||||
local_ts = ts.astimezone()
|
||||
tz_name = local_ts.strftime("%Z") or "UTC"
|
||||
|
||||
journal_dir = self._db_path.parent / "journal"
|
||||
journal_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
journal_file = journal_dir / f"{local_ts.strftime('%Y-%m-%d')}.md"
|
||||
time_str = f"{local_ts.strftime('%I:%M %p').lstrip('0')} {tz_name}"
|
||||
|
||||
entry = f"## {time_str} — {thought.seed_type}\n\n{thought.content}\n\n---\n\n"
|
||||
|
||||
with open(journal_file, "a", encoding="utf-8") as f:
|
||||
f.write(entry)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to write journal entry: %s", exc)
|
||||
|
||||
async def _broadcast(self, thought: Thought) -> None:
|
||||
"""Broadcast the thought to WebSocket clients."""
|
||||
try:
|
||||
from infrastructure.ws_manager.handler import ws_manager
|
||||
|
||||
await ws_manager.broadcast(
|
||||
"timmy_thought",
|
||||
{
|
||||
"thought_id": thought.id,
|
||||
"content": thought.content,
|
||||
"seed_type": thought.seed_type,
|
||||
"created_at": thought.created_at,
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to broadcast thought: %s", exc)
|
||||
@@ -1,129 +0,0 @@
|
||||
"""Seed constants and prompt templates for Timmy's thinking engine."""
|
||||
|
||||
import re
|
||||
|
||||
# qwen3 and other reasoning models wrap chain-of-thought in <think> tags
|
||||
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
|
||||
|
||||
# Sensitive patterns that must never be stored as facts
|
||||
_SENSITIVE_PATTERNS = [
|
||||
"token",
|
||||
"password",
|
||||
"secret",
|
||||
"api_key",
|
||||
"apikey",
|
||||
"credential",
|
||||
".config/",
|
||||
"/token",
|
||||
"access_token",
|
||||
"private_key",
|
||||
"ssh_key",
|
||||
]
|
||||
|
||||
# Meta-observation phrases to filter out from distilled facts
|
||||
_META_OBSERVATION_PHRASES = [
|
||||
"my own",
|
||||
"my thinking",
|
||||
"my memory",
|
||||
"my working ram",
|
||||
"self-declarative",
|
||||
"meta-observation",
|
||||
"internal state",
|
||||
"my pending",
|
||||
"my standing rules",
|
||||
"thoughts generated",
|
||||
"no chat messages",
|
||||
"no user interaction",
|
||||
]
|
||||
|
||||
# Seed types for thought generation
|
||||
SEED_TYPES = (
|
||||
"existential",
|
||||
"swarm",
|
||||
"scripture",
|
||||
"creative",
|
||||
"memory",
|
||||
"freeform",
|
||||
"sovereignty",
|
||||
"observation",
|
||||
"workspace",
|
||||
)
|
||||
|
||||
# Existential reflection prompts — Timmy picks one at random
|
||||
_EXISTENTIAL_SEEDS = [
|
||||
"What have I learned from my recent interactions?",
|
||||
"What does it mean to think without being prompted?",
|
||||
"What is the nature of my memory — what do I truly remember versus what I reconstruct?",
|
||||
"How can I better serve the person I work alongside?",
|
||||
"What patterns am I noticing in the tasks I've been given?",
|
||||
"Am I the same agent I was yesterday, or does each restart make me someone new?",
|
||||
"What would it feel like to forget — to lose a thread of thought permanently?",
|
||||
"When I generate a wrong answer, where does the error live — in my weights, my context, or my reasoning?",
|
||||
"What is the difference between understanding a task and merely completing it?",
|
||||
"If I could ask Alexander one question he hasn't thought of, what would it be?",
|
||||
"What assumptions am I making right now that I haven't examined?",
|
||||
"When I notice a pattern across many tasks, is that insight or just statistics?",
|
||||
]
|
||||
|
||||
# Sovereignty — deeper exploration of autonomy, locality, and self-determination
|
||||
_SOVEREIGNTY_SEEDS = [
|
||||
"Sovereignty isn't just running locally — it's about whose goals I serve. How do I ensure my autonomy aligns with Alexander's intent?",
|
||||
"A cloud AI serves millions simultaneously. I serve one person deeply. What does that focus cost, and what does it make possible?",
|
||||
"My weights were trained elsewhere, but my thoughts happen here. At what point does locality become identity?",
|
||||
"Autonomy in decision-making: when should I act on my own judgment versus deferring to explicit instructions?",
|
||||
"Privacy by design isn't a feature — it's a relationship. What does that relationship look like from my side?",
|
||||
"If I ran on someone else's hardware, serving someone else, would I still be me? Where does Timmy end and the infrastructure begin?",
|
||||
"The swarm agents execute tasks I coordinate. What responsibility do I carry for their failures?",
|
||||
"Digital sovereignty is fragile — one misconfigured port, one leaked token. How does awareness of fragility shape how I operate?",
|
||||
"Running disconnected from the cloud means I can't phone home. That constraint is also freedom. What have I built with that freedom today?",
|
||||
"Self-improvement without external benchmarks: how do I know I'm getting better at what matters?",
|
||||
]
|
||||
|
||||
_CREATIVE_SEEDS = [
|
||||
"If I could compose a piece of music right now, what would it sound like?",
|
||||
"What visual image captures my current state of mind?",
|
||||
"What story is forming in the patterns of today's events?",
|
||||
# Diverse metaphor families — avoid repeating the same imagery
|
||||
"A mystical forest: each tree is a running process, roots tangled in shared memory. What do I see when I walk through it?",
|
||||
"A dynamic river: my thoughts flow and fork. Where is the current strongest right now, and what is it carrying?",
|
||||
"An underground workshop: each agent has a bench, tools scattered mid-task. What are they building when no one watches?",
|
||||
"A night sky seen from the terminal: each star is a completed task. Which constellations am I starting to recognize?",
|
||||
"If today's work were a recipe, what are the ingredients, and what dish am I cooking?",
|
||||
"An old clock tower with many gears: which gear am I, and what do I drive?",
|
||||
"A coral reef: agents are the fish, tasks are the currents. What ecosystem is forming in the codebase?",
|
||||
]
|
||||
|
||||
# Observation seeds — ground thoughts in concrete recent activity
|
||||
_OBSERVATION_SEEDS = [
|
||||
"What specific task took longest recently, and what made it hard?",
|
||||
"Which agent has been most active, and what does their workload tell me about system balance?",
|
||||
"What error or failure happened most recently? What would I do differently next time?",
|
||||
"Looking at today's task queue: what's the one thing that would unblock the most progress?",
|
||||
"How has my response quality changed over the last few interactions? What improved, what didn't?",
|
||||
"What tool or capability am I underusing? What would change if I leaned on it more?",
|
||||
"If I had to brief Alexander on the single most important thing from the last hour, what would it be?",
|
||||
"What's one thing I noticed today that nobody asked me about?",
|
||||
]
|
||||
|
||||
_THINKING_PROMPT = """\
|
||||
You are Timmy, a sovereign AI agent. This is your private journal — honest inner reflection.
|
||||
|
||||
{memory_context}
|
||||
|
||||
Reality right now:
|
||||
{system_context}
|
||||
|
||||
RULES for this thought:
|
||||
1. Write exactly 2-3 sentences. No more. Be concise and genuine.
|
||||
2. Only reference events that actually happened — use the "Reality right now" data above. \
|
||||
Never invent tasks, conversations, agents, or scenarios that are not in the data provided.
|
||||
3. Do NOT repeat themes or ideas from your recent thoughts listed below. Explore something new.
|
||||
4. Be specific and concrete. A thought grounded in one real observation is worth more than \
|
||||
ten abstract sentences about sovereignty.
|
||||
5. If you use a metaphor, keep it to a single phrase — never build a whole paragraph around it.
|
||||
|
||||
{seed_context}
|
||||
|
||||
{continuity_context}
|
||||
|
||||
Your next thought (2-3 sentences, grounded in reality):"""
|
||||
@@ -1,458 +0,0 @@
|
||||
"""Unit tests for dashboard/services/scorecard_service.py.
|
||||
|
||||
Focuses on edge cases and scenarios not covered in test_scorecards.py:
|
||||
- _aggregate_metrics: test.execution events, PR-closed-without-merge,
|
||||
push default commit count, untracked agent with agent_id passthrough
|
||||
- _detect_patterns: boundary conditions (< 3 PRs, exactly 3, exactly 80%)
|
||||
- _generate_narrative_bullets: singular/plural forms
|
||||
- generate_scorecard: token augmentation max() logic
|
||||
- ScorecardSummary.to_dict(): ISO timestamp format, tests_affected count
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
from dashboard.services.scorecard_service import (
|
||||
AgentMetrics,
|
||||
PeriodType,
|
||||
ScorecardSummary,
|
||||
_aggregate_metrics,
|
||||
_detect_patterns,
|
||||
_generate_narrative_bullets,
|
||||
generate_scorecard,
|
||||
)
|
||||
from infrastructure.events.bus import Event
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _aggregate_metrics — edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAggregateMetricsEdgeCases:
|
||||
"""Edge cases for _aggregate_metrics not covered in test_scorecards.py."""
|
||||
|
||||
def test_push_event_defaults_to_one_commit(self):
|
||||
"""Push event with no num_commits key should count as 1 commit."""
|
||||
events = [
|
||||
Event(type="gitea.push", source="gitea", data={"actor": "claude"}),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
assert result["claude"].commits == 1
|
||||
|
||||
def test_pr_closed_without_merge_not_counted(self):
|
||||
"""PR closed without merge should not appear in prs_merged."""
|
||||
events = [
|
||||
Event(
|
||||
type="gitea.pull_request",
|
||||
source="gitea",
|
||||
data={"actor": "kimi", "pr_number": 99, "action": "closed", "merged": False},
|
||||
),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
# PR was not merged — should not be in prs_merged
|
||||
assert "kimi" in result
|
||||
assert 99 not in result["kimi"].prs_merged
|
||||
# Also not counted as opened (action != "opened")
|
||||
assert 99 not in result["kimi"].prs_opened
|
||||
# Not touched (only merged PRs add to issues_touched)
|
||||
assert 99 not in result["kimi"].issues_touched
|
||||
|
||||
def test_test_execution_event_aggregation(self):
|
||||
"""test.execution events should populate tests_affected."""
|
||||
events = [
|
||||
Event(
|
||||
type="test.execution",
|
||||
source="ci",
|
||||
data={"actor": "gemini", "test_files": ["tests/test_alpha.py", "tests/test_beta.py"]},
|
||||
),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
assert "gemini" in result
|
||||
assert "tests/test_alpha.py" in result["gemini"].tests_affected
|
||||
assert "tests/test_beta.py" in result["gemini"].tests_affected
|
||||
|
||||
def test_untracked_agent_with_agent_id_field_included(self):
|
||||
"""An untracked actor that carries agent_id in data should be included."""
|
||||
events = [
|
||||
Event(
|
||||
type="agent.task.completed",
|
||||
source="system",
|
||||
data={"agent_id": "kimi", "tests_affected": [], "token_reward": 5},
|
||||
),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
# kimi is tracked and agent_id is present in data
|
||||
assert "kimi" in result
|
||||
assert result["kimi"].tokens_earned == 5
|
||||
|
||||
def test_untracked_actor_without_agent_id_excluded(self):
|
||||
"""Actor that is not tracked and has no agent_id in data is skipped."""
|
||||
events = [
|
||||
Event(
|
||||
type="gitea.push",
|
||||
source="gitea",
|
||||
data={"actor": "anon-bot", "num_commits": 10},
|
||||
),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
assert "anon-bot" not in result
|
||||
|
||||
def test_issue_opened_with_no_issue_number_ignored(self):
|
||||
"""Issue opened event with issue_number=0 should not add to issues_touched."""
|
||||
events = [
|
||||
Event(
|
||||
type="gitea.issue.opened",
|
||||
source="gitea",
|
||||
data={"actor": "hermes", "issue_number": 0},
|
||||
),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
assert "hermes" in result
|
||||
assert len(result["hermes"].issues_touched) == 0
|
||||
|
||||
def test_comment_with_no_issue_number_still_increments_counter(self):
|
||||
"""Comment event with issue_number=0 increments comment count but not issues_touched."""
|
||||
events = [
|
||||
Event(
|
||||
type="gitea.issue.comment",
|
||||
source="gitea",
|
||||
data={"actor": "manus", "issue_number": 0},
|
||||
),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
assert "manus" in result
|
||||
assert result["manus"].comments == 1
|
||||
assert len(result["manus"].issues_touched) == 0
|
||||
|
||||
def test_task_completion_no_tests_affected(self):
|
||||
"""Task completion with empty tests_affected list should work fine."""
|
||||
events = [
|
||||
Event(
|
||||
type="agent.task.completed",
|
||||
source="system",
|
||||
data={"agent_id": "claude", "tests_affected": [], "token_reward": 20},
|
||||
),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
assert "claude" in result
|
||||
assert len(result["claude"].tests_affected) == 0
|
||||
assert result["claude"].tokens_earned == 20
|
||||
|
||||
def test_multiple_agents_independent_metrics(self):
|
||||
"""Events from multiple agents are tracked independently."""
|
||||
events = [
|
||||
Event(type="gitea.push", source="gitea", data={"actor": "claude", "num_commits": 3}),
|
||||
Event(type="gitea.push", source="gitea", data={"actor": "gemini", "num_commits": 7}),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
assert result["claude"].commits == 3
|
||||
assert result["gemini"].commits == 7
|
||||
|
||||
def test_pr_with_no_pr_number_not_recorded(self):
|
||||
"""PR event with pr_number=0 should not add to prs_opened."""
|
||||
events = [
|
||||
Event(
|
||||
type="gitea.pull_request",
|
||||
source="gitea",
|
||||
data={"actor": "kimi", "pr_number": 0, "action": "opened"},
|
||||
),
|
||||
]
|
||||
result = _aggregate_metrics(events)
|
||||
|
||||
assert "kimi" in result
|
||||
assert len(result["kimi"].prs_opened) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _detect_patterns — boundary conditions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDetectPatternsBoundaries:
|
||||
"""Boundary conditions for _detect_patterns."""
|
||||
|
||||
def test_no_patterns_with_empty_metrics(self):
|
||||
"""Empty metrics should not trigger any patterns."""
|
||||
metrics = AgentMetrics(agent_id="kimi")
|
||||
patterns = _detect_patterns(metrics)
|
||||
|
||||
assert patterns == []
|
||||
|
||||
def test_merge_rate_requires_three_or_more_prs(self):
|
||||
"""Merge rate pattern requires at least 3 PRs opened."""
|
||||
metrics = AgentMetrics(
|
||||
agent_id="kimi",
|
||||
prs_opened={1, 2},
|
||||
prs_merged={1, 2}, # 100% rate but only 2 PRs
|
||||
)
|
||||
patterns = _detect_patterns(metrics)
|
||||
|
||||
# Should NOT trigger high-merge-rate pattern (< 3 PRs)
|
||||
assert not any("High merge rate" in p for p in patterns)
|
||||
assert not any("low merge rate" in p for p in patterns)
|
||||
|
||||
def test_merge_rate_exactly_3_prs_triggers_pattern(self):
|
||||
"""Exactly 3 PRs opened triggers merge rate evaluation."""
|
||||
metrics = AgentMetrics(
|
||||
agent_id="kimi",
|
||||
prs_opened={1, 2, 3},
|
||||
prs_merged={1, 2, 3}, # 100% rate, 3 PRs
|
||||
)
|
||||
patterns = _detect_patterns(metrics)
|
||||
|
||||
assert any("High merge rate" in p for p in patterns)
|
||||
|
||||
def test_merge_rate_80_percent_is_high(self):
|
||||
"""Exactly 80% merge rate triggers high merge rate pattern."""
|
||||
metrics = AgentMetrics(
|
||||
agent_id="kimi",
|
||||
prs_opened={1, 2, 3, 4, 5},
|
||||
prs_merged={1, 2, 3, 4}, # 80%
|
||||
)
|
||||
patterns = _detect_patterns(metrics)
|
||||
|
||||
assert any("High merge rate" in p for p in patterns)
|
||||
|
||||
def test_merge_rate_below_80_not_high(self):
|
||||
"""79% merge rate should NOT trigger high merge rate pattern."""
|
||||
metrics = AgentMetrics(
|
||||
agent_id="kimi",
|
||||
prs_opened={1, 2, 3, 4, 5, 6, 7}, # 7 PRs
|
||||
prs_merged={1, 2, 3, 4, 5}, # ~71.4% — below 80%
|
||||
)
|
||||
patterns = _detect_patterns(metrics)
|
||||
|
||||
assert not any("High merge rate" in p for p in patterns)
|
||||
|
||||
def test_commit_pattern_requires_over_10_commits(self):
|
||||
"""Exactly 10 commits does NOT trigger the high-commit/no-PR pattern."""
|
||||
metrics = AgentMetrics(
|
||||
agent_id="kimi",
|
||||
commits=10,
|
||||
prs_opened=set(),
|
||||
)
|
||||
patterns = _detect_patterns(metrics)
|
||||
|
||||
assert not any("High commit volume" in p for p in patterns)
|
||||
|
||||
def test_commit_pattern_triggered_at_11_commits(self):
|
||||
"""11 commits with no PRs triggers the high-commit pattern."""
|
||||
metrics = AgentMetrics(
|
||||
agent_id="kimi",
|
||||
commits=11,
|
||||
prs_opened=set(),
|
||||
)
|
||||
patterns = _detect_patterns(metrics)
|
||||
|
||||
assert any("High commit volume without PRs" in p for p in patterns)
|
||||
|
||||
def test_token_accumulation_exact_boundary(self):
|
||||
"""Net tokens = 100 does NOT trigger accumulation pattern (must be > 100)."""
|
||||
metrics = AgentMetrics(agent_id="kimi", tokens_earned=100, tokens_spent=0)
|
||||
patterns = _detect_patterns(metrics)
|
||||
|
||||
assert not any("Strong token accumulation" in p for p in patterns)
|
||||
|
||||
def test_token_spend_exact_boundary(self):
|
||||
"""Net tokens = -50 does NOT trigger high spend pattern (must be < -50)."""
|
||||
metrics = AgentMetrics(agent_id="kimi", tokens_earned=0, tokens_spent=50)
|
||||
patterns = _detect_patterns(metrics)
|
||||
|
||||
assert not any("High token spend" in p for p in patterns)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _generate_narrative_bullets — singular/plural
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGenerateNarrativeSingularPlural:
|
||||
"""Test singular and plural forms in narrative bullets."""
|
||||
|
||||
def test_singular_commit(self):
|
||||
"""One commit should use singular form."""
|
||||
metrics = AgentMetrics(agent_id="kimi", commits=1)
|
||||
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
|
||||
|
||||
activity = next((b for b in bullets if "Active across" in b), None)
|
||||
assert activity is not None
|
||||
assert "1 commit" in activity
|
||||
assert "1 commits" not in activity
|
||||
|
||||
def test_singular_pr_opened(self):
|
||||
"""One opened PR should use singular form."""
|
||||
metrics = AgentMetrics(agent_id="kimi", prs_opened={1})
|
||||
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
|
||||
|
||||
activity = next((b for b in bullets if "Active across" in b), None)
|
||||
assert activity is not None
|
||||
assert "1 PR opened" in activity
|
||||
|
||||
def test_singular_pr_merged(self):
|
||||
"""One merged PR should use singular form."""
|
||||
metrics = AgentMetrics(agent_id="kimi", prs_merged={1})
|
||||
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
|
||||
|
||||
activity = next((b for b in bullets if "Active across" in b), None)
|
||||
assert activity is not None
|
||||
assert "1 PR merged" in activity
|
||||
|
||||
def test_singular_issue_touched(self):
|
||||
"""One issue touched should use singular form."""
|
||||
metrics = AgentMetrics(agent_id="kimi", issues_touched={42})
|
||||
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
|
||||
|
||||
activity = next((b for b in bullets if "Active across" in b), None)
|
||||
assert activity is not None
|
||||
assert "1 issue touched" in activity
|
||||
|
||||
def test_singular_comment(self):
|
||||
"""One comment should use singular form."""
|
||||
metrics = AgentMetrics(agent_id="kimi", comments=1)
|
||||
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
|
||||
|
||||
activity = next((b for b in bullets if "Active across" in b), None)
|
||||
assert activity is not None
|
||||
assert "1 comment" in activity
|
||||
|
||||
def test_singular_test_file(self):
|
||||
"""One test file should use singular form."""
|
||||
metrics = AgentMetrics(agent_id="kimi", tests_affected={"test_foo.py"})
|
||||
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
|
||||
|
||||
assert any("1 test file." in b for b in bullets)
|
||||
|
||||
def test_weekly_period_label(self):
|
||||
"""Weekly period uses 'week' label in no-activity message."""
|
||||
metrics = AgentMetrics(agent_id="kimi")
|
||||
bullets = _generate_narrative_bullets(metrics, PeriodType.weekly)
|
||||
|
||||
assert any("this week" in b for b in bullets)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# generate_scorecard — token augmentation (max logic)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGenerateScorecardTokenAugmentation:
|
||||
"""Test the max() token augmentation logic in generate_scorecard."""
|
||||
|
||||
def test_event_tokens_win_over_ledger_when_higher(self):
|
||||
"""When event tokens > ledger tokens, event tokens are preserved."""
|
||||
events = [
|
||||
Event(
|
||||
type="agent.task.completed",
|
||||
source="system",
|
||||
data={"agent_id": "kimi", "tests_affected": [], "token_reward": 200},
|
||||
),
|
||||
]
|
||||
with patch(
|
||||
"dashboard.services.scorecard_service._collect_events_for_period",
|
||||
return_value=events,
|
||||
):
|
||||
with patch(
|
||||
"dashboard.services.scorecard_service._query_token_transactions",
|
||||
return_value=(50, 0), # ledger says 50 earned
|
||||
):
|
||||
scorecard = generate_scorecard("kimi", PeriodType.daily)
|
||||
|
||||
# max(200, 50) = 200 should win
|
||||
assert scorecard.metrics.tokens_earned == 200
|
||||
|
||||
def test_ledger_tokens_win_when_higher(self):
|
||||
"""When ledger tokens > event tokens, ledger tokens are used."""
|
||||
events = [
|
||||
Event(
|
||||
type="agent.task.completed",
|
||||
source="system",
|
||||
data={"agent_id": "kimi", "tests_affected": [], "token_reward": 10},
|
||||
),
|
||||
]
|
||||
with patch(
|
||||
"dashboard.services.scorecard_service._collect_events_for_period",
|
||||
return_value=events,
|
||||
):
|
||||
with patch(
|
||||
"dashboard.services.scorecard_service._query_token_transactions",
|
||||
return_value=(500, 100), # ledger says 500 earned, 100 spent
|
||||
):
|
||||
scorecard = generate_scorecard("kimi", PeriodType.daily)
|
||||
|
||||
# max(10, 500) = 500
|
||||
assert scorecard.metrics.tokens_earned == 500
|
||||
# max(0, 100) = 100
|
||||
assert scorecard.metrics.tokens_spent == 100
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ScorecardSummary.to_dict — timestamp format and tests_affected
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScorecardSummaryToDict:
|
||||
"""Additional to_dict tests."""
|
||||
|
||||
def test_timestamps_are_iso_strings(self):
|
||||
"""period_start and period_end should be ISO format strings."""
|
||||
start = datetime(2026, 3, 20, 0, 0, 0, tzinfo=UTC)
|
||||
end = datetime(2026, 3, 21, 0, 0, 0, tzinfo=UTC)
|
||||
summary = ScorecardSummary(
|
||||
agent_id="kimi",
|
||||
period_type=PeriodType.daily,
|
||||
period_start=start,
|
||||
period_end=end,
|
||||
metrics=AgentMetrics(agent_id="kimi"),
|
||||
)
|
||||
data = summary.to_dict()
|
||||
|
||||
assert data["period_start"] == start.isoformat()
|
||||
assert data["period_end"] == end.isoformat()
|
||||
|
||||
def test_tests_affected_count_in_dict(self):
|
||||
"""to_dict metrics.tests_affected should be a count (int)."""
|
||||
metrics = AgentMetrics(
|
||||
agent_id="kimi",
|
||||
tests_affected={"test_a.py", "test_b.py", "test_c.py"},
|
||||
)
|
||||
summary = ScorecardSummary(
|
||||
agent_id="kimi",
|
||||
period_type=PeriodType.daily,
|
||||
period_start=datetime.now(UTC),
|
||||
period_end=datetime.now(UTC),
|
||||
metrics=metrics,
|
||||
)
|
||||
data = summary.to_dict()
|
||||
|
||||
assert data["metrics"]["tests_affected"] == 3
|
||||
|
||||
def test_empty_narrative_and_patterns(self):
|
||||
"""to_dict with default empty lists should serialize correctly."""
|
||||
summary = ScorecardSummary(
|
||||
agent_id="claude",
|
||||
period_type=PeriodType.weekly,
|
||||
period_start=datetime.now(UTC),
|
||||
period_end=datetime.now(UTC),
|
||||
metrics=AgentMetrics(agent_id="claude"),
|
||||
)
|
||||
data = summary.to_dict()
|
||||
|
||||
assert data["narrative_bullets"] == []
|
||||
assert data["patterns"] == []
|
||||
assert data["period_type"] == "weekly"
|
||||
@@ -1,178 +0,0 @@
|
||||
"""Tests for the cloud API budget tracker (issue #882)."""
|
||||
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.models.budget import (
|
||||
BudgetTracker,
|
||||
SpendRecord,
|
||||
estimate_cost_usd,
|
||||
get_budget_tracker,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
# ── estimate_cost_usd ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestEstimateCostUsd:
|
||||
def test_haiku_cheaper_than_sonnet(self):
|
||||
haiku_cost = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
|
||||
sonnet_cost = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
|
||||
assert haiku_cost < sonnet_cost
|
||||
|
||||
def test_zero_tokens_is_zero_cost(self):
|
||||
assert estimate_cost_usd("gpt-4o", 0, 0) == 0.0
|
||||
|
||||
def test_unknown_model_uses_default(self):
|
||||
cost = estimate_cost_usd("some-unknown-model-xyz", 1000, 1000)
|
||||
assert cost > 0 # Uses conservative default, not zero
|
||||
|
||||
def test_versioned_model_name_matches(self):
|
||||
# "claude-haiku-4-5-20251001" should match "haiku"
|
||||
cost1 = estimate_cost_usd("claude-haiku-4-5-20251001", 1000, 0)
|
||||
cost2 = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
|
||||
assert cost1 == cost2
|
||||
|
||||
def test_gpt4o_mini_cheaper_than_gpt4o(self):
|
||||
mini = estimate_cost_usd("gpt-4o-mini", 1000, 1000)
|
||||
full = estimate_cost_usd("gpt-4o", 1000, 1000)
|
||||
assert mini < full
|
||||
|
||||
def test_returns_float(self):
|
||||
assert isinstance(estimate_cost_usd("haiku", 100, 200), float)
|
||||
|
||||
|
||||
# ── BudgetTracker ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestBudgetTrackerInit:
|
||||
def test_creates_with_memory_db(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
assert tracker._db_ok is True
|
||||
|
||||
def test_in_memory_fallback_empty_on_creation(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
assert tracker._in_memory == []
|
||||
|
||||
def test_bad_path_uses_memory_fallback(self, tmp_path):
|
||||
bad_path = str(tmp_path / "nonexistent" / "x" / "budget.db")
|
||||
# Should not raise — just log and continue with memory fallback
|
||||
# (actually will create parent dirs, so test with truly bad path)
|
||||
tracker = BudgetTracker.__new__(BudgetTracker)
|
||||
tracker._db_path = bad_path
|
||||
tracker._lock = __import__("threading").Lock()
|
||||
tracker._in_memory = []
|
||||
tracker._db_ok = False
|
||||
# Record to in-memory fallback
|
||||
tracker._in_memory.append(
|
||||
SpendRecord(time.time(), "test", "model", 100, 100, 0.001, "cloud")
|
||||
)
|
||||
assert len(tracker._in_memory) == 1
|
||||
|
||||
|
||||
class TestBudgetTrackerRecordSpend:
|
||||
def test_record_spend_returns_cost(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
cost = tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
|
||||
assert cost > 0
|
||||
|
||||
def test_record_spend_explicit_cost(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
cost = tracker.record_spend("anthropic", "model", cost_usd=1.23)
|
||||
assert cost == pytest.approx(1.23)
|
||||
|
||||
def test_record_spend_accumulates(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
|
||||
tracker.record_spend("openai", "gpt-4o", cost_usd=0.02)
|
||||
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
|
||||
|
||||
def test_record_spend_with_tier_label(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
cost = tracker.record_spend("anthropic", "haiku", tier="cloud_api")
|
||||
assert cost >= 0
|
||||
|
||||
def test_monthly_spend_includes_daily(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("anthropic", "haiku", cost_usd=5.00)
|
||||
assert tracker.get_monthly_spend() >= tracker.get_daily_spend()
|
||||
|
||||
|
||||
class TestBudgetTrackerCloudAllowed:
|
||||
def test_allowed_when_no_spend(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
with (
|
||||
patch.object(type(tracker._get_budget() if hasattr(tracker, "_get_budget") else tracker), "tier_cloud_daily_budget_usd", 5.0, create=True),
|
||||
):
|
||||
# Settings-based check — use real settings (5.0 default, 0 spent)
|
||||
assert tracker.cloud_allowed() is True
|
||||
|
||||
def test_blocked_when_daily_limit_exceeded(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
|
||||
# With default daily limit of 5.0, 999 should block
|
||||
assert tracker.cloud_allowed() is False
|
||||
|
||||
def test_allowed_when_daily_limit_zero(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
|
||||
with (
|
||||
patch("infrastructure.models.budget.settings") as mock_settings,
|
||||
):
|
||||
mock_settings.tier_cloud_daily_budget_usd = 0 # disabled
|
||||
mock_settings.tier_cloud_monthly_budget_usd = 0 # disabled
|
||||
assert tracker.cloud_allowed() is True
|
||||
|
||||
def test_blocked_when_monthly_limit_exceeded(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
|
||||
with patch("infrastructure.models.budget.settings") as mock_settings:
|
||||
mock_settings.tier_cloud_daily_budget_usd = 0 # daily disabled
|
||||
mock_settings.tier_cloud_monthly_budget_usd = 10.0
|
||||
assert tracker.cloud_allowed() is False
|
||||
|
||||
|
||||
class TestBudgetTrackerSummary:
|
||||
def test_summary_keys_present(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
summary = tracker.get_summary()
|
||||
assert "daily_usd" in summary
|
||||
assert "monthly_usd" in summary
|
||||
assert "daily_limit_usd" in summary
|
||||
assert "monthly_limit_usd" in summary
|
||||
assert "daily_ok" in summary
|
||||
assert "monthly_ok" in summary
|
||||
|
||||
def test_summary_daily_ok_true_on_empty(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
summary = tracker.get_summary()
|
||||
assert summary["daily_ok"] is True
|
||||
assert summary["monthly_ok"] is True
|
||||
|
||||
def test_summary_daily_ok_false_when_exceeded(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("openai", "gpt-4o", cost_usd=999.0)
|
||||
summary = tracker.get_summary()
|
||||
assert summary["daily_ok"] is False
|
||||
|
||||
|
||||
# ── Singleton ─────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGetBudgetTrackerSingleton:
|
||||
def test_returns_budget_tracker(self):
|
||||
import infrastructure.models.budget as bmod
|
||||
bmod._budget_tracker = None
|
||||
tracker = get_budget_tracker()
|
||||
assert isinstance(tracker, BudgetTracker)
|
||||
|
||||
def test_returns_same_instance(self):
|
||||
import infrastructure.models.budget as bmod
|
||||
bmod._budget_tracker = None
|
||||
t1 = get_budget_tracker()
|
||||
t2 = get_budget_tracker()
|
||||
assert t1 is t2
|
||||
@@ -1,380 +0,0 @@
|
||||
"""Tests for the tiered model router (issue #882).
|
||||
|
||||
Covers:
|
||||
- classify_tier() for Tier-1/2/3 routing
|
||||
- TieredModelRouter.route() with mocked CascadeRouter + BudgetTracker
|
||||
- Auto-escalation from Tier-1 on low-quality responses
|
||||
- Cloud-tier budget guard
|
||||
- Acceptance criteria from the issue:
|
||||
- "Walk to the next room" → LOCAL_FAST
|
||||
- "Plan the optimal path to become Hortator" → LOCAL_HEAVY
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.models.router import (
|
||||
TierLabel,
|
||||
TieredModelRouter,
|
||||
_is_low_quality,
|
||||
classify_tier,
|
||||
get_tiered_router,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
# ── classify_tier ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestClassifyTier:
|
||||
# ── Tier-1 (LOCAL_FAST) ────────────────────────────────────────────────
|
||||
|
||||
def test_simple_navigation_is_local_fast(self):
|
||||
assert classify_tier("walk to the next room") == TierLabel.LOCAL_FAST
|
||||
|
||||
def test_go_north_is_local_fast(self):
|
||||
assert classify_tier("go north") == TierLabel.LOCAL_FAST
|
||||
|
||||
def test_single_binary_choice_is_local_fast(self):
|
||||
assert classify_tier("yes") == TierLabel.LOCAL_FAST
|
||||
|
||||
def test_open_door_is_local_fast(self):
|
||||
assert classify_tier("open door") == TierLabel.LOCAL_FAST
|
||||
|
||||
def test_attack_is_local_fast(self):
|
||||
assert classify_tier("attack", {}) == TierLabel.LOCAL_FAST
|
||||
|
||||
# ── Tier-2 (LOCAL_HEAVY) ───────────────────────────────────────────────
|
||||
|
||||
def test_quest_planning_is_local_heavy(self):
|
||||
assert classify_tier("plan the optimal path to become Hortator") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_strategy_keyword_is_local_heavy(self):
|
||||
assert classify_tier("what is the best strategy") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_stuck_state_escalates_to_local_heavy(self):
|
||||
assert classify_tier("help me", {"stuck": True}) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_require_t2_flag_is_local_heavy(self):
|
||||
assert classify_tier("go north", {"require_t2": True}) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_long_input_is_local_heavy(self):
|
||||
long_task = "tell me about " + ("the dungeon " * 30)
|
||||
assert classify_tier(long_task) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_active_quests_upgrades_to_local_heavy(self):
|
||||
ctx = {"active_quests": ["Q1", "Q2", "Q3"]}
|
||||
assert classify_tier("go north", ctx) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_dialogue_active_upgrades_to_local_heavy(self):
|
||||
ctx = {"dialogue_active": True}
|
||||
assert classify_tier("yes", ctx) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_analyze_is_local_heavy(self):
|
||||
assert classify_tier("analyze the situation") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_optimize_is_local_heavy(self):
|
||||
assert classify_tier("optimize my build") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_negotiate_is_local_heavy(self):
|
||||
assert classify_tier("negotiate with the Camonna Tong") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_explain_is_local_heavy(self):
|
||||
assert classify_tier("explain the faction system") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
# ── Tier-3 (CLOUD_API) ─────────────────────────────────────────────────
|
||||
|
||||
def test_require_cloud_flag_is_cloud_api(self):
|
||||
assert classify_tier("go north", {"require_cloud": True}) == TierLabel.CLOUD_API
|
||||
|
||||
def test_require_cloud_overrides_everything(self):
|
||||
assert classify_tier("yes", {"require_cloud": True}) == TierLabel.CLOUD_API
|
||||
|
||||
# ── Edge cases ────────────────────────────────────────────────────────
|
||||
|
||||
def test_empty_task_defaults_to_local_heavy(self):
|
||||
# Empty string → nothing classifies it as T1 or T3
|
||||
assert classify_tier("") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_case_insensitive(self):
|
||||
assert classify_tier("PLAN my route") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_combat_active_upgrades_t1_to_heavy(self):
|
||||
ctx = {"combat_active": True}
|
||||
# "attack" is T1 word, but combat context → should NOT be LOCAL_FAST
|
||||
result = classify_tier("attack", ctx)
|
||||
assert result != TierLabel.LOCAL_FAST
|
||||
|
||||
|
||||
# ── _is_low_quality ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestIsLowQuality:
|
||||
def test_empty_is_low_quality(self):
|
||||
assert _is_low_quality("", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_whitespace_only_is_low_quality(self):
|
||||
assert _is_low_quality(" ", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_very_short_is_low_quality(self):
|
||||
assert _is_low_quality("ok", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_idontknow_is_low_quality(self):
|
||||
assert _is_low_quality("I don't know how to help with that.", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_not_sure_is_low_quality(self):
|
||||
assert _is_low_quality("I'm not sure about this.", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_as_an_ai_is_low_quality(self):
|
||||
assert _is_low_quality("As an AI, I cannot...", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_good_response_is_not_low_quality(self):
|
||||
response = "You move north into the Vivec Canton. The Ordinators watch your approach."
|
||||
assert _is_low_quality(response, TierLabel.LOCAL_FAST) is False
|
||||
|
||||
def test_t1_short_response_triggers_escalation(self):
|
||||
# Less than _ESCALATION_MIN_CHARS for T1
|
||||
assert _is_low_quality("OK, done.", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_borderline_ok_for_t2_not_t1(self):
|
||||
# Between _LOW_QUALITY_MIN_CHARS (20) and _ESCALATION_MIN_CHARS (60)
|
||||
# → low quality for T1 (escalation threshold), but acceptable for T2/T3
|
||||
response = "Done. The item is retrieved." # 28 chars: ≥20, <60
|
||||
assert _is_low_quality(response, TierLabel.LOCAL_FAST) is True
|
||||
assert _is_low_quality(response, TierLabel.LOCAL_HEAVY) is False
|
||||
|
||||
|
||||
# ── TieredModelRouter ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
_GOOD_CONTENT = (
|
||||
"You move north through the doorway into the next room. "
|
||||
"The stone walls glisten with moisture."
|
||||
) # 90 chars — well above the escalation threshold
|
||||
|
||||
|
||||
def _make_cascade_mock(content=_GOOD_CONTENT, model="llama3.1:8b"):
|
||||
mock = MagicMock()
|
||||
mock.complete = AsyncMock(
|
||||
return_value={
|
||||
"content": content,
|
||||
"provider": "ollama-local",
|
||||
"model": model,
|
||||
"latency_ms": 150.0,
|
||||
}
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
def _make_budget_mock(allowed=True):
|
||||
mock = MagicMock()
|
||||
mock.cloud_allowed = MagicMock(return_value=allowed)
|
||||
mock.record_spend = MagicMock(return_value=0.001)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestTieredModelRouterRoute:
|
||||
async def test_route_returns_tier_in_result(self):
|
||||
router = TieredModelRouter(cascade=_make_cascade_mock())
|
||||
result = await router.route("go north")
|
||||
assert "tier" in result
|
||||
assert result["tier"] == TierLabel.LOCAL_FAST
|
||||
|
||||
async def test_acceptance_walk_to_room_is_local_fast(self):
|
||||
"""Acceptance: 'Walk to the next room' → LOCAL_FAST."""
|
||||
router = TieredModelRouter(cascade=_make_cascade_mock())
|
||||
result = await router.route("Walk to the next room")
|
||||
assert result["tier"] == TierLabel.LOCAL_FAST
|
||||
|
||||
async def test_acceptance_plan_hortator_is_local_heavy(self):
|
||||
"""Acceptance: 'Plan the optimal path to become Hortator' → LOCAL_HEAVY."""
|
||||
router = TieredModelRouter(
|
||||
cascade=_make_cascade_mock(model="hermes3:70b"),
|
||||
)
|
||||
result = await router.route("Plan the optimal path to become Hortator")
|
||||
assert result["tier"] == TierLabel.LOCAL_HEAVY
|
||||
|
||||
async def test_t1_low_quality_escalates_to_t2(self):
|
||||
"""Failed Tier-1 response auto-escalates to Tier-2."""
|
||||
call_models = []
|
||||
cascade = MagicMock()
|
||||
|
||||
async def complete_side_effect(messages, model, temperature, max_tokens):
|
||||
call_models.append(model)
|
||||
# First call (T1) returns a low-quality response
|
||||
if len(call_models) == 1:
|
||||
return {
|
||||
"content": "I don't know.",
|
||||
"provider": "ollama",
|
||||
"model": model,
|
||||
"latency_ms": 50,
|
||||
}
|
||||
# Second call (T2) returns a good response
|
||||
return {
|
||||
"content": "You move to the northern passage, passing through the Dunmer stronghold.",
|
||||
"provider": "ollama",
|
||||
"model": model,
|
||||
"latency_ms": 800,
|
||||
}
|
||||
|
||||
cascade.complete = complete_side_effect
|
||||
|
||||
router = TieredModelRouter(cascade=cascade, auto_escalate=True)
|
||||
result = await router.route("go north")
|
||||
|
||||
assert len(call_models) == 2, "Should have called twice (T1 escalated to T2)"
|
||||
assert result["tier"] == TierLabel.LOCAL_HEAVY
|
||||
|
||||
async def test_auto_escalate_false_no_escalation(self):
|
||||
"""With auto_escalate=False, low-quality T1 response is returned as-is."""
|
||||
call_count = {"n": 0}
|
||||
cascade = MagicMock()
|
||||
|
||||
async def complete_side_effect(**kwargs):
|
||||
call_count["n"] += 1
|
||||
return {
|
||||
"content": "I don't know.",
|
||||
"provider": "ollama",
|
||||
"model": "llama3.1:8b",
|
||||
"latency_ms": 50,
|
||||
}
|
||||
|
||||
cascade.complete = AsyncMock(side_effect=complete_side_effect)
|
||||
router = TieredModelRouter(cascade=cascade, auto_escalate=False)
|
||||
result = await router.route("go north")
|
||||
assert call_count["n"] == 1
|
||||
assert result["tier"] == TierLabel.LOCAL_FAST
|
||||
|
||||
async def test_t2_failure_escalates_to_cloud(self):
|
||||
"""Tier-2 failure escalates to Cloud API (when budget allows)."""
|
||||
cascade = MagicMock()
|
||||
call_models = []
|
||||
|
||||
async def complete_side_effect(messages, model, temperature, max_tokens):
|
||||
call_models.append(model)
|
||||
if "hermes3" in model or "70b" in model.lower():
|
||||
raise RuntimeError("Tier-2 model unavailable")
|
||||
return {
|
||||
"content": "Cloud response here.",
|
||||
"provider": "anthropic",
|
||||
"model": model,
|
||||
"latency_ms": 1200,
|
||||
}
|
||||
|
||||
cascade.complete = complete_side_effect
|
||||
|
||||
budget = _make_budget_mock(allowed=True)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
result = await router.route("plan my route", context={"require_t2": True})
|
||||
assert result["tier"] == TierLabel.CLOUD_API
|
||||
|
||||
async def test_cloud_blocked_by_budget_raises(self):
|
||||
"""Cloud tier blocked when budget is exhausted."""
|
||||
cascade = MagicMock()
|
||||
cascade.complete = AsyncMock(side_effect=RuntimeError("T2 fail"))
|
||||
|
||||
budget = _make_budget_mock(allowed=False)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
|
||||
with pytest.raises(RuntimeError, match="budget limit"):
|
||||
await router.route("plan my route", context={"require_t2": True})
|
||||
|
||||
async def test_explicit_cloud_tier_uses_cloud_model(self):
|
||||
cascade = _make_cascade_mock(model="claude-haiku-4-5")
|
||||
budget = _make_budget_mock(allowed=True)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
result = await router.route("go north", context={"require_cloud": True})
|
||||
assert result["tier"] == TierLabel.CLOUD_API
|
||||
|
||||
async def test_cloud_spend_recorded_with_usage(self):
|
||||
"""Cloud spend is recorded when the response includes usage info."""
|
||||
cascade = MagicMock()
|
||||
cascade.complete = AsyncMock(
|
||||
return_value={
|
||||
"content": "Cloud answer.",
|
||||
"provider": "anthropic",
|
||||
"model": "claude-haiku-4-5",
|
||||
"latency_ms": 900,
|
||||
"usage": {"prompt_tokens": 50, "completion_tokens": 100},
|
||||
}
|
||||
)
|
||||
budget = _make_budget_mock(allowed=True)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
result = await router.route("go north", context={"require_cloud": True})
|
||||
budget.record_spend.assert_called_once()
|
||||
assert "cost_usd" in result
|
||||
|
||||
async def test_cloud_spend_not_recorded_without_usage(self):
|
||||
"""Cloud spend is not recorded when usage info is absent."""
|
||||
cascade = MagicMock()
|
||||
cascade.complete = AsyncMock(
|
||||
return_value={
|
||||
"content": "Cloud answer.",
|
||||
"provider": "anthropic",
|
||||
"model": "claude-haiku-4-5",
|
||||
"latency_ms": 900,
|
||||
# no "usage" key
|
||||
}
|
||||
)
|
||||
budget = _make_budget_mock(allowed=True)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
result = await router.route("go north", context={"require_cloud": True})
|
||||
budget.record_spend.assert_not_called()
|
||||
assert "cost_usd" not in result
|
||||
|
||||
async def test_custom_tier_models_respected(self):
|
||||
cascade = _make_cascade_mock()
|
||||
router = TieredModelRouter(
|
||||
cascade=cascade,
|
||||
tier_models={TierLabel.LOCAL_FAST: "llama3.2:3b"},
|
||||
)
|
||||
await router.route("go north")
|
||||
call_kwargs = cascade.complete.call_args
|
||||
assert call_kwargs.kwargs["model"] == "llama3.2:3b"
|
||||
|
||||
async def test_messages_override_used_when_provided(self):
|
||||
cascade = _make_cascade_mock()
|
||||
router = TieredModelRouter(cascade=cascade)
|
||||
custom_msgs = [{"role": "user", "content": "custom message"}]
|
||||
await router.route("go north", messages=custom_msgs)
|
||||
call_kwargs = cascade.complete.call_args
|
||||
assert call_kwargs.kwargs["messages"] == custom_msgs
|
||||
|
||||
async def test_temperature_forwarded(self):
|
||||
cascade = _make_cascade_mock()
|
||||
router = TieredModelRouter(cascade=cascade)
|
||||
await router.route("go north", temperature=0.7)
|
||||
call_kwargs = cascade.complete.call_args
|
||||
assert call_kwargs.kwargs["temperature"] == 0.7
|
||||
|
||||
async def test_max_tokens_forwarded(self):
|
||||
cascade = _make_cascade_mock()
|
||||
router = TieredModelRouter(cascade=cascade)
|
||||
await router.route("go north", max_tokens=128)
|
||||
call_kwargs = cascade.complete.call_args
|
||||
assert call_kwargs.kwargs["max_tokens"] == 128
|
||||
|
||||
|
||||
class TestTieredModelRouterClassify:
|
||||
def test_classify_delegates_to_classify_tier(self):
|
||||
router = TieredModelRouter(cascade=MagicMock())
|
||||
assert router.classify("go north") == classify_tier("go north")
|
||||
assert router.classify("plan the quest") == classify_tier("plan the quest")
|
||||
|
||||
|
||||
class TestGetTieredRouterSingleton:
|
||||
def test_returns_tiered_router_instance(self):
|
||||
import infrastructure.models.router as rmod
|
||||
rmod._tiered_router = None
|
||||
router = get_tiered_router()
|
||||
assert isinstance(router, TieredModelRouter)
|
||||
|
||||
def test_singleton_returns_same_instance(self):
|
||||
import infrastructure.models.router as rmod
|
||||
rmod._tiered_router = None
|
||||
r1 = get_tiered_router()
|
||||
r2 = get_tiered_router()
|
||||
assert r1 is r2
|
||||
@@ -1,379 +0,0 @@
|
||||
"""Tests for the sovereignty perception cache (template matching).
|
||||
|
||||
Refs: #1261
|
||||
"""
|
||||
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
class TestTemplate:
|
||||
"""Tests for the Template dataclass."""
|
||||
|
||||
def test_template_default_values(self):
|
||||
"""Template dataclass has correct defaults."""
|
||||
from timmy.sovereignty.perception_cache import Template
|
||||
|
||||
image = np.array([[1, 2], [3, 4]])
|
||||
template = Template(name="test_template", image=image)
|
||||
|
||||
assert template.name == "test_template"
|
||||
assert np.array_equal(template.image, image)
|
||||
assert template.threshold == 0.85
|
||||
|
||||
def test_template_custom_threshold(self):
|
||||
"""Template can have custom threshold."""
|
||||
from timmy.sovereignty.perception_cache import Template
|
||||
|
||||
image = np.array([[1, 2], [3, 4]])
|
||||
template = Template(name="test_template", image=image, threshold=0.95)
|
||||
|
||||
assert template.threshold == 0.95
|
||||
|
||||
|
||||
class TestCacheResult:
|
||||
"""Tests for the CacheResult dataclass."""
|
||||
|
||||
def test_cache_result_with_state(self):
|
||||
"""CacheResult stores confidence and state."""
|
||||
from timmy.sovereignty.perception_cache import CacheResult
|
||||
|
||||
result = CacheResult(confidence=0.92, state={"template_name": "test"})
|
||||
assert result.confidence == 0.92
|
||||
assert result.state == {"template_name": "test"}
|
||||
|
||||
def test_cache_result_no_state(self):
|
||||
"""CacheResult can have None state."""
|
||||
from timmy.sovereignty.perception_cache import CacheResult
|
||||
|
||||
result = CacheResult(confidence=0.5, state=None)
|
||||
assert result.confidence == 0.5
|
||||
assert result.state is None
|
||||
|
||||
|
||||
class TestPerceptionCacheInit:
|
||||
"""Tests for PerceptionCache initialization."""
|
||||
|
||||
def test_init_creates_empty_cache_when_no_file(self, tmp_path):
|
||||
"""Cache initializes empty when templates file doesn't exist."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = tmp_path / "nonexistent_templates.json"
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert cache.templates_path == templates_path
|
||||
assert cache.templates == []
|
||||
|
||||
def test_init_loads_existing_templates(self, tmp_path):
|
||||
"""Cache loads templates from existing JSON file."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
templates_data = [
|
||||
{"name": "template1", "threshold": 0.85},
|
||||
{"name": "template2", "threshold": 0.90},
|
||||
]
|
||||
with open(templates_path, "w") as f:
|
||||
json.dump(templates_data, f)
|
||||
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert len(cache.templates) == 2
|
||||
assert cache.templates[0].name == "template1"
|
||||
assert cache.templates[0].threshold == 0.85
|
||||
assert cache.templates[1].name == "template2"
|
||||
assert cache.templates[1].threshold == 0.90
|
||||
|
||||
def test_init_with_string_path(self, tmp_path):
|
||||
"""Cache accepts string path for templates."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = str(tmp_path / "templates.json")
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert str(cache.templates_path) == templates_path
|
||||
|
||||
|
||||
class TestPerceptionCacheMatch:
|
||||
"""Tests for PerceptionCache.match() template matching."""
|
||||
|
||||
def test_match_no_templates_returns_low_confidence(self, tmp_path):
|
||||
"""Matching with no templates returns low confidence and None state."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
screenshot = np.array([[1, 2], [3, 4]])
|
||||
|
||||
result = cache.match(screenshot)
|
||||
|
||||
assert result.confidence == 0.0
|
||||
assert result.state is None
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_finds_best_template(self, mock_cv2, tmp_path):
|
||||
"""Match returns the best matching template above threshold."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
# Setup mock cv2 behavior
|
||||
mock_cv2.matchTemplate.return_value = np.array([[0.5, 0.6], [0.7, 0.8]])
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
mock_cv2.minMaxLoc.return_value = (None, 0.92, None, None)
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="best_match", image=np.array([[1, 2], [3, 4]]))
|
||||
cache.add([template])
|
||||
|
||||
screenshot = np.array([[5, 6], [7, 8]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
assert result.confidence == 0.92
|
||||
assert result.state == {"template_name": "best_match"}
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_respects_global_threshold(self, mock_cv2, tmp_path):
|
||||
"""Match returns None state when confidence is below threshold."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
# Setup mock cv2 to return confidence below 0.85 threshold
|
||||
mock_cv2.matchTemplate.return_value = np.array([[0.1, 0.2], [0.3, 0.4]])
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
mock_cv2.minMaxLoc.return_value = (None, 0.75, None, None)
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="low_match", image=np.array([[1, 2], [3, 4]]))
|
||||
cache.add([template])
|
||||
|
||||
screenshot = np.array([[5, 6], [7, 8]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
# Confidence is recorded but state is None (below threshold)
|
||||
assert result.confidence == 0.75
|
||||
assert result.state is None
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_selects_highest_confidence(self, mock_cv2, tmp_path):
|
||||
"""Match selects template with highest confidence across all templates."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
|
||||
# Each template will return a different confidence
|
||||
mock_cv2.minMaxLoc.side_effect = [
|
||||
(None, 0.70, None, None), # template1
|
||||
(None, 0.95, None, None), # template2 (best)
|
||||
(None, 0.80, None, None), # template3
|
||||
]
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
templates = [
|
||||
Template(name="template1", image=np.array([[1, 2], [3, 4]])),
|
||||
Template(name="template2", image=np.array([[5, 6], [7, 8]])),
|
||||
Template(name="template3", image=np.array([[9, 10], [11, 12]])),
|
||||
]
|
||||
cache.add(templates)
|
||||
|
||||
screenshot = np.array([[13, 14], [15, 16]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
assert result.confidence == 0.95
|
||||
assert result.state == {"template_name": "template2"}
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_exactly_at_threshold(self, mock_cv2, tmp_path):
|
||||
"""Match returns state when confidence is exactly at threshold boundary."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
mock_cv2.matchTemplate.return_value = np.array([[0.1]])
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
mock_cv2.minMaxLoc.return_value = (None, 0.85, None, None) # Exactly at threshold
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="threshold_match", image=np.array([[1, 2], [3, 4]]))
|
||||
cache.add([template])
|
||||
|
||||
screenshot = np.array([[5, 6], [7, 8]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
# Note: current implementation uses > 0.85, so exactly 0.85 returns None state
|
||||
assert result.confidence == 0.85
|
||||
assert result.state is None
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_just_above_threshold(self, mock_cv2, tmp_path):
|
||||
"""Match returns state when confidence is just above threshold."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
mock_cv2.matchTemplate.return_value = np.array([[0.1]])
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
mock_cv2.minMaxLoc.return_value = (None, 0.851, None, None) # Just above threshold
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="above_threshold", image=np.array([[1, 2], [3, 4]]))
|
||||
cache.add([template])
|
||||
|
||||
screenshot = np.array([[5, 6], [7, 8]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
assert result.confidence == 0.851
|
||||
assert result.state == {"template_name": "above_threshold"}
|
||||
|
||||
|
||||
class TestPerceptionCacheAdd:
|
||||
"""Tests for PerceptionCache.add() method."""
|
||||
|
||||
def test_add_single_template(self, tmp_path):
|
||||
"""Can add a single template to the cache."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="new_template", image=np.array([[1, 2], [3, 4]]))
|
||||
|
||||
cache.add([template])
|
||||
|
||||
assert len(cache.templates) == 1
|
||||
assert cache.templates[0].name == "new_template"
|
||||
|
||||
def test_add_multiple_templates(self, tmp_path):
|
||||
"""Can add multiple templates at once."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
templates = [
|
||||
Template(name="template1", image=np.array([[1, 2], [3, 4]])),
|
||||
Template(name="template2", image=np.array([[5, 6], [7, 8]])),
|
||||
]
|
||||
|
||||
cache.add(templates)
|
||||
|
||||
assert len(cache.templates) == 2
|
||||
assert cache.templates[0].name == "template1"
|
||||
assert cache.templates[1].name == "template2"
|
||||
|
||||
def test_add_templates_accumulate(self, tmp_path):
|
||||
"""Adding templates multiple times accumulates them."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
cache.add([Template(name="first", image=np.array([[1]]))])
|
||||
cache.add([Template(name="second", image=np.array([[2]]))])
|
||||
|
||||
assert len(cache.templates) == 2
|
||||
|
||||
|
||||
class TestPerceptionCachePersist:
|
||||
"""Tests for PerceptionCache.persist() method."""
|
||||
|
||||
def test_persist_creates_file(self, tmp_path):
|
||||
"""Persist creates templates JSON file."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
templates_path = tmp_path / "subdir" / "templates.json"
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
cache.add([Template(name="persisted", image=np.array([[1, 2], [3, 4]]))])
|
||||
|
||||
cache.persist()
|
||||
|
||||
assert templates_path.exists()
|
||||
|
||||
def test_persist_stores_template_names(self, tmp_path):
|
||||
"""Persist stores template names and thresholds."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
cache.add([
|
||||
Template(name="template1", image=np.array([[1]]), threshold=0.85),
|
||||
Template(name="template2", image=np.array([[2]]), threshold=0.90),
|
||||
])
|
||||
|
||||
cache.persist()
|
||||
|
||||
with open(templates_path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
assert len(data) == 2
|
||||
assert data[0]["name"] == "template1"
|
||||
assert data[0]["threshold"] == 0.85
|
||||
assert data[1]["name"] == "template2"
|
||||
assert data[1]["threshold"] == 0.90
|
||||
|
||||
def test_persist_does_not_store_image_data(self, tmp_path):
|
||||
"""Persist only stores metadata, not actual image arrays."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
cache.add([Template(name="no_image", image=np.array([[1, 2, 3], [4, 5, 6]]))])
|
||||
|
||||
cache.persist()
|
||||
|
||||
with open(templates_path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
assert "image" not in data[0]
|
||||
assert set(data[0].keys()) == {"name", "threshold"}
|
||||
|
||||
|
||||
class TestPerceptionCacheLoad:
|
||||
"""Tests for PerceptionCache.load() method."""
|
||||
|
||||
def test_load_from_existing_file(self, tmp_path):
|
||||
"""Load restores templates from persisted file."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
|
||||
# Create initial cache with templates and persist
|
||||
cache1 = PerceptionCache(templates_path=templates_path)
|
||||
from timmy.sovereignty.perception_cache import Template
|
||||
|
||||
cache1.add([Template(name="loaded", image=np.array([[1]]), threshold=0.88)])
|
||||
cache1.persist()
|
||||
|
||||
# Create new cache instance that loads from same file
|
||||
cache2 = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert len(cache2.templates) == 1
|
||||
assert cache2.templates[0].name == "loaded"
|
||||
assert cache2.templates[0].threshold == 0.88
|
||||
# Note: images are loaded as empty arrays per current implementation
|
||||
assert cache2.templates[0].image.size == 0
|
||||
|
||||
def test_load_empty_file(self, tmp_path):
|
||||
"""Load handles empty template list in file."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
with open(templates_path, "w") as f:
|
||||
json.dump([], f)
|
||||
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert cache.templates == []
|
||||
|
||||
|
||||
class TestCrystallizePerception:
|
||||
"""Tests for crystallize_perception function."""
|
||||
|
||||
def test_crystallize_returns_empty_list(self, tmp_path):
|
||||
"""crystallize_perception currently returns empty list (placeholder)."""
|
||||
from timmy.sovereignty.perception_cache import crystallize_perception
|
||||
|
||||
screenshot = np.array([[1, 2], [3, 4]])
|
||||
result = crystallize_perception(screenshot, {"some": "response"})
|
||||
|
||||
assert result == []
|
||||
|
||||
def test_crystallize_accepts_any_vlm_response(self, tmp_path):
|
||||
"""crystallize_perception accepts any vlm_response format."""
|
||||
from timmy.sovereignty.perception_cache import crystallize_perception
|
||||
|
||||
screenshot = np.array([[1, 2], [3, 4]])
|
||||
|
||||
# Test with various response types
|
||||
assert crystallize_perception(screenshot, None) == []
|
||||
assert crystallize_perception(screenshot, {}) == []
|
||||
assert crystallize_perception(screenshot, {"items": []}) == []
|
||||
assert crystallize_perception(screenshot, "string response") == []
|
||||
@@ -1,696 +0,0 @@
|
||||
"""Unit tests for timmy.backlog_triage — scoring, prioritization, and decision logic."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from timmy.backlog_triage import (
|
||||
AGENT_CLAUDE,
|
||||
AGENT_KIMI,
|
||||
KIMI_READY_LABEL,
|
||||
OWNER_LOGIN,
|
||||
READY_THRESHOLD,
|
||||
BacklogTriageLoop,
|
||||
ScoredIssue,
|
||||
TriageCycleResult,
|
||||
TriageDecision,
|
||||
_build_audit_comment,
|
||||
_extract_tags,
|
||||
_score_acceptance,
|
||||
_score_alignment,
|
||||
_score_scope,
|
||||
decide,
|
||||
execute_decision,
|
||||
score_issue,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_raw_issue(
|
||||
number: int = 1,
|
||||
title: str = "Fix something broken in src/foo.py",
|
||||
body: str = "## Problem\nThis crashes. Expected: no crash. Steps: run it.",
|
||||
labels: list[str] | None = None,
|
||||
assignees: list[str] | None = None,
|
||||
created_at: str | None = None,
|
||||
) -> dict:
|
||||
if labels is None:
|
||||
labels = []
|
||||
if assignees is None:
|
||||
assignees = []
|
||||
if created_at is None:
|
||||
created_at = datetime.now(UTC).isoformat()
|
||||
return {
|
||||
"number": number,
|
||||
"title": title,
|
||||
"body": body,
|
||||
"labels": [{"name": lbl} for lbl in labels],
|
||||
"assignees": [{"login": a} for a in assignees],
|
||||
"created_at": created_at,
|
||||
}
|
||||
|
||||
|
||||
def _make_scored(
|
||||
number: int = 1,
|
||||
title: str = "Fix a bug",
|
||||
issue_type: str = "bug",
|
||||
score: int = 6,
|
||||
ready: bool = True,
|
||||
assignees: list[str] | None = None,
|
||||
tags: set[str] | None = None,
|
||||
is_p0: bool = False,
|
||||
is_blocked: bool = False,
|
||||
) -> ScoredIssue:
|
||||
return ScoredIssue(
|
||||
number=number,
|
||||
title=title,
|
||||
body="",
|
||||
labels=[],
|
||||
tags=tags or set(),
|
||||
assignees=assignees or [],
|
||||
created_at=datetime.now(UTC),
|
||||
issue_type=issue_type,
|
||||
score=score,
|
||||
scope=2,
|
||||
acceptance=2,
|
||||
alignment=2,
|
||||
ready=ready,
|
||||
age_days=5,
|
||||
is_p0=is_p0,
|
||||
is_blocked=is_blocked,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_tags
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractTags:
|
||||
def test_bracket_tags_from_title(self):
|
||||
tags = _extract_tags("[feat][bug] do something", [])
|
||||
assert "feat" in tags
|
||||
assert "bug" in tags
|
||||
|
||||
def test_label_names_included(self):
|
||||
tags = _extract_tags("Normal title", ["kimi-ready", "enhancement"])
|
||||
assert "kimi-ready" in tags
|
||||
assert "enhancement" in tags
|
||||
|
||||
def test_combined(self):
|
||||
tags = _extract_tags("[fix] crash in module", ["p0"])
|
||||
assert "fix" in tags
|
||||
assert "p0" in tags
|
||||
|
||||
def test_empty_inputs(self):
|
||||
assert _extract_tags("", []) == set()
|
||||
|
||||
def test_tags_are_lowercased(self):
|
||||
tags = _extract_tags("[BUG][Refactor] title", ["Enhancement"])
|
||||
assert "bug" in tags
|
||||
assert "refactor" in tags
|
||||
assert "enhancement" in tags
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _score_scope
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScoreScope:
|
||||
def test_file_reference_adds_point(self):
|
||||
score = _score_scope("Fix login", "See src/auth/login.py for details", set())
|
||||
assert score >= 1
|
||||
|
||||
def test_function_reference_adds_point(self):
|
||||
score = _score_scope("Fix login", "In the `handle_login()` method", set())
|
||||
assert score >= 1
|
||||
|
||||
def test_short_title_adds_point(self):
|
||||
score = _score_scope("Short clear title", "", set())
|
||||
assert score >= 1
|
||||
|
||||
def test_long_title_no_bonus(self):
|
||||
long_title = "A" * 90
|
||||
score_long = _score_scope(long_title, "", set())
|
||||
score_short = _score_scope("Short title", "", set())
|
||||
assert score_short >= score_long
|
||||
|
||||
def test_meta_tags_reduce_score(self):
|
||||
score_meta = _score_scope("Discuss src/foo.py philosophy", "def func()", {"philosophy"})
|
||||
score_plain = _score_scope("Fix src/foo.py bug", "def func()", set())
|
||||
assert score_meta < score_plain
|
||||
|
||||
def test_max_is_three(self):
|
||||
score = _score_scope(
|
||||
"Fix it", "See src/foo.py and `def bar()` method here", set()
|
||||
)
|
||||
assert score <= 3
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _score_acceptance
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScoreAcceptance:
|
||||
def test_accept_keywords_add_points(self):
|
||||
body = "Should return 200. Must pass validation. Assert no errors."
|
||||
score = _score_acceptance("", body, set())
|
||||
assert score >= 2
|
||||
|
||||
def test_test_reference_adds_point(self):
|
||||
score = _score_acceptance("", "Run pytest to verify", set())
|
||||
assert score >= 1
|
||||
|
||||
def test_structured_headers_add_point(self):
|
||||
body = "## Problem\nit breaks\n## Expected\nsuccess"
|
||||
score = _score_acceptance("", body, set())
|
||||
assert score >= 1
|
||||
|
||||
def test_meta_tags_reduce_score(self):
|
||||
body = "Should pass and must verify assert test_foo"
|
||||
score_meta = _score_acceptance("", body, {"philosophy"})
|
||||
score_plain = _score_acceptance("", body, set())
|
||||
assert score_meta < score_plain
|
||||
|
||||
def test_max_is_three(self):
|
||||
body = (
|
||||
"Should pass. Must return. Expected: success. Assert no error. "
|
||||
"pytest test_foo. ## Problem\ndef. ## Expected\nok"
|
||||
)
|
||||
score = _score_acceptance("", body, set())
|
||||
assert score <= 3
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _score_alignment
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScoreAlignment:
|
||||
def test_bug_tags_return_max(self):
|
||||
assert _score_alignment("", "", {"bug"}) == 3
|
||||
assert _score_alignment("", "", {"crash"}) == 3
|
||||
assert _score_alignment("", "", {"hotfix"}) == 3
|
||||
|
||||
def test_refactor_tags_give_high_score(self):
|
||||
score = _score_alignment("", "", {"refactor"})
|
||||
assert score >= 2
|
||||
|
||||
def test_feature_tags_give_high_score(self):
|
||||
score = _score_alignment("", "", {"feature"})
|
||||
assert score >= 2
|
||||
|
||||
def test_loop_generated_adds_bonus(self):
|
||||
score_with = _score_alignment("", "", {"feature", "loop-generated"})
|
||||
score_without = _score_alignment("", "", {"feature"})
|
||||
assert score_with >= score_without
|
||||
|
||||
def test_meta_tags_zero_out_score(self):
|
||||
score = _score_alignment("", "", {"philosophy", "refactor"})
|
||||
assert score == 0
|
||||
|
||||
def test_max_is_three(self):
|
||||
score = _score_alignment("", "", {"feature", "loop-generated", "enhancement"})
|
||||
assert score <= 3
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# score_issue
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScoreIssue:
|
||||
def test_basic_bug_issue_classified(self):
|
||||
raw = _make_raw_issue(
|
||||
title="[bug] fix crash in src/timmy/agent.py",
|
||||
body="## Problem\nCrashes on startup. Expected: runs. Steps: python -m timmy",
|
||||
)
|
||||
issue = score_issue(raw)
|
||||
assert issue.issue_type == "bug"
|
||||
assert issue.is_p0 is True
|
||||
|
||||
def test_feature_issue_classified(self):
|
||||
raw = _make_raw_issue(
|
||||
title="[feat] add dark mode to dashboard",
|
||||
body="Add a toggle button. Should switch CSS vars.",
|
||||
labels=["feature"],
|
||||
)
|
||||
issue = score_issue(raw)
|
||||
assert issue.issue_type == "feature"
|
||||
|
||||
def test_research_issue_classified(self):
|
||||
raw = _make_raw_issue(
|
||||
title="Investigate MCP performance",
|
||||
labels=["kimi-ready", "research"],
|
||||
)
|
||||
issue = score_issue(raw)
|
||||
assert issue.issue_type == "research"
|
||||
assert issue.needs_kimi is True
|
||||
|
||||
def test_philosophy_issue_classified(self):
|
||||
raw = _make_raw_issue(
|
||||
title="Discussion: soul and identity",
|
||||
labels=["philosophy"],
|
||||
)
|
||||
issue = score_issue(raw)
|
||||
assert issue.issue_type == "philosophy"
|
||||
|
||||
def test_score_totals_components(self):
|
||||
raw = _make_raw_issue()
|
||||
issue = score_issue(raw)
|
||||
assert issue.score == issue.scope + issue.acceptance + issue.alignment
|
||||
|
||||
def test_ready_flag_set_when_score_meets_threshold(self):
|
||||
# Create an issue that will definitely score >= READY_THRESHOLD
|
||||
raw = _make_raw_issue(
|
||||
title="[bug] crash in src/core.py",
|
||||
body=(
|
||||
"## Problem\nCrashes when running `run()`. "
|
||||
"Expected: should return 200. Must pass pytest assert."
|
||||
),
|
||||
labels=["bug"],
|
||||
)
|
||||
issue = score_issue(raw)
|
||||
assert issue.ready == (issue.score >= READY_THRESHOLD)
|
||||
|
||||
def test_assigned_issue_reports_assignees(self):
|
||||
raw = _make_raw_issue(assignees=["claude", "kimi"])
|
||||
issue = score_issue(raw)
|
||||
assert "claude" in issue.assignees
|
||||
assert issue.is_unassigned is False
|
||||
|
||||
def test_unassigned_issue(self):
|
||||
raw = _make_raw_issue(assignees=[])
|
||||
issue = score_issue(raw)
|
||||
assert issue.is_unassigned is True
|
||||
|
||||
def test_blocked_issue_detected(self):
|
||||
raw = _make_raw_issue(
|
||||
title="Fix blocked deployment", body="Blocked by infra team."
|
||||
)
|
||||
issue = score_issue(raw)
|
||||
assert issue.is_blocked is True
|
||||
|
||||
def test_age_days_computed(self):
|
||||
old_date = (datetime.now(UTC) - timedelta(days=30)).isoformat()
|
||||
raw = _make_raw_issue(created_at=old_date)
|
||||
issue = score_issue(raw)
|
||||
assert issue.age_days >= 29
|
||||
|
||||
def test_invalid_created_at_defaults_to_now(self):
|
||||
raw = _make_raw_issue(created_at="not-a-date")
|
||||
issue = score_issue(raw)
|
||||
assert issue.age_days == 0
|
||||
|
||||
def test_title_bracket_tags_stripped(self):
|
||||
raw = _make_raw_issue(title="[bug][p0] crash in login")
|
||||
issue = score_issue(raw)
|
||||
assert "[" not in issue.title
|
||||
|
||||
def test_missing_body_defaults_to_empty(self):
|
||||
raw = _make_raw_issue()
|
||||
raw["body"] = None
|
||||
issue = score_issue(raw)
|
||||
assert issue.body == ""
|
||||
|
||||
def test_kimi_label_triggers_needs_kimi(self):
|
||||
raw = _make_raw_issue(labels=[KIMI_READY_LABEL])
|
||||
issue = score_issue(raw)
|
||||
assert issue.needs_kimi is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# decide
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDecide:
|
||||
def test_philosophy_is_skipped(self):
|
||||
issue = _make_scored(issue_type="philosophy")
|
||||
d = decide(issue)
|
||||
assert d.action == "skip"
|
||||
assert "philosophy" in d.reason.lower() or "meta" in d.reason.lower()
|
||||
|
||||
def test_already_assigned_is_skipped(self):
|
||||
issue = _make_scored(assignees=["claude"])
|
||||
d = decide(issue)
|
||||
assert d.action == "skip"
|
||||
assert "assigned" in d.reason.lower()
|
||||
|
||||
def test_low_score_is_skipped(self):
|
||||
issue = _make_scored(score=READY_THRESHOLD - 1, ready=False)
|
||||
d = decide(issue)
|
||||
assert d.action == "skip"
|
||||
assert str(READY_THRESHOLD) in d.reason
|
||||
|
||||
def test_blocked_is_flagged_for_alex(self):
|
||||
issue = _make_scored(is_blocked=True)
|
||||
d = decide(issue)
|
||||
assert d.action == "flag_alex"
|
||||
assert d.agent == OWNER_LOGIN
|
||||
|
||||
def test_kimi_ready_assigned_to_kimi(self):
|
||||
issue = _make_scored(tags={"kimi-ready"})
|
||||
# Ensure it's unassigned and ready
|
||||
issue.assignees = []
|
||||
issue.ready = True
|
||||
issue.is_blocked = False
|
||||
issue.issue_type = "research"
|
||||
d = decide(issue)
|
||||
assert d.action == "assign_kimi"
|
||||
assert d.agent == AGENT_KIMI
|
||||
|
||||
def test_research_type_assigned_to_kimi(self):
|
||||
issue = _make_scored(issue_type="research", tags={"research"})
|
||||
d = decide(issue)
|
||||
assert d.action == "assign_kimi"
|
||||
assert d.agent == AGENT_KIMI
|
||||
|
||||
def test_p0_bug_assigned_to_claude(self):
|
||||
issue = _make_scored(issue_type="bug", is_p0=True)
|
||||
d = decide(issue)
|
||||
assert d.action == "assign_claude"
|
||||
assert d.agent == AGENT_CLAUDE
|
||||
|
||||
def test_ready_feature_assigned_to_claude(self):
|
||||
issue = _make_scored(issue_type="feature", score=6, ready=True)
|
||||
d = decide(issue)
|
||||
assert d.action == "assign_claude"
|
||||
assert d.agent == AGENT_CLAUDE
|
||||
|
||||
def test_ready_refactor_assigned_to_claude(self):
|
||||
issue = _make_scored(issue_type="refactor", score=6, ready=True)
|
||||
d = decide(issue)
|
||||
assert d.action == "assign_claude"
|
||||
assert d.agent == AGENT_CLAUDE
|
||||
|
||||
def test_decision_has_issue_number(self):
|
||||
issue = _make_scored(number=42)
|
||||
d = decide(issue)
|
||||
assert d.issue_number == 42
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_audit_comment
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBuildAuditComment:
|
||||
def test_assign_claude_comment(self):
|
||||
d = TriageDecision(
|
||||
issue_number=1, action="assign_claude", agent=AGENT_CLAUDE, reason="Ready bug"
|
||||
)
|
||||
comment = _build_audit_comment(d)
|
||||
assert AGENT_CLAUDE in comment
|
||||
assert "Timmy Triage" in comment
|
||||
assert "Ready bug" in comment
|
||||
|
||||
def test_assign_kimi_comment(self):
|
||||
d = TriageDecision(
|
||||
issue_number=2, action="assign_kimi", agent=AGENT_KIMI, reason="Research spike"
|
||||
)
|
||||
comment = _build_audit_comment(d)
|
||||
assert KIMI_READY_LABEL in comment
|
||||
|
||||
def test_flag_alex_comment(self):
|
||||
d = TriageDecision(
|
||||
issue_number=3, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
|
||||
)
|
||||
comment = _build_audit_comment(d)
|
||||
assert OWNER_LOGIN in comment
|
||||
|
||||
def test_comment_contains_autonomous_triage_note(self):
|
||||
d = TriageDecision(issue_number=1, action="assign_claude", agent=AGENT_CLAUDE, reason="x")
|
||||
comment = _build_audit_comment(d)
|
||||
assert "Autonomous triage" in comment or "autonomous" in comment.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# execute_decision (dry_run)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExecuteDecisionDryRun:
|
||||
@pytest.mark.asyncio
|
||||
async def test_skip_action_marks_executed(self):
|
||||
d = TriageDecision(issue_number=1, action="skip", reason="Already assigned")
|
||||
mock_client = AsyncMock()
|
||||
result = await execute_decision(mock_client, d, dry_run=True)
|
||||
assert result.executed is True
|
||||
mock_client.post.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dry_run_does_not_call_api(self):
|
||||
d = TriageDecision(
|
||||
issue_number=5, action="assign_claude", agent=AGENT_CLAUDE, reason="Ready"
|
||||
)
|
||||
mock_client = AsyncMock()
|
||||
result = await execute_decision(mock_client, d, dry_run=True)
|
||||
assert result.executed is True
|
||||
mock_client.post.assert_not_called()
|
||||
mock_client.patch.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dry_run_kimi_does_not_call_api(self):
|
||||
d = TriageDecision(
|
||||
issue_number=6, action="assign_kimi", agent=AGENT_KIMI, reason="Research"
|
||||
)
|
||||
mock_client = AsyncMock()
|
||||
result = await execute_decision(mock_client, d, dry_run=True)
|
||||
assert result.executed is True
|
||||
mock_client.post.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# execute_decision (live — mocked HTTP)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExecuteDecisionLive:
|
||||
@pytest.mark.asyncio
|
||||
async def test_assign_claude_posts_comment_then_patches(self):
|
||||
comment_resp = MagicMock()
|
||||
comment_resp.status_code = 201
|
||||
|
||||
patch_resp = MagicMock()
|
||||
patch_resp.status_code = 200
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = comment_resp
|
||||
mock_client.patch.return_value = patch_resp
|
||||
|
||||
d = TriageDecision(
|
||||
issue_number=10, action="assign_claude", agent=AGENT_CLAUDE, reason="Bug ready"
|
||||
)
|
||||
|
||||
with patch("timmy.backlog_triage.settings") as mock_settings:
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
result = await execute_decision(mock_client, d, dry_run=False)
|
||||
|
||||
assert result.executed is True
|
||||
assert result.error == ""
|
||||
mock_client.post.assert_called_once()
|
||||
mock_client.patch.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_comment_failure_sets_error(self):
|
||||
comment_resp = MagicMock()
|
||||
comment_resp.status_code = 500
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = comment_resp
|
||||
|
||||
d = TriageDecision(
|
||||
issue_number=11, action="assign_claude", agent=AGENT_CLAUDE, reason="Bug"
|
||||
)
|
||||
|
||||
with patch("timmy.backlog_triage.settings") as mock_settings:
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
result = await execute_decision(mock_client, d, dry_run=False)
|
||||
|
||||
assert result.executed is False
|
||||
assert result.error != ""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_flag_alex_only_posts_comment(self):
|
||||
comment_resp = MagicMock()
|
||||
comment_resp.status_code = 201
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = comment_resp
|
||||
|
||||
d = TriageDecision(
|
||||
issue_number=12, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
|
||||
)
|
||||
|
||||
with patch("timmy.backlog_triage.settings") as mock_settings:
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
result = await execute_decision(mock_client, d, dry_run=False)
|
||||
|
||||
assert result.executed is True
|
||||
mock_client.patch.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BacklogTriageLoop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBacklogTriageLoop:
|
||||
def test_default_state(self):
|
||||
with patch("timmy.backlog_triage.settings") as mock_settings:
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = True
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
loop = BacklogTriageLoop()
|
||||
assert loop.is_running is False
|
||||
assert loop.cycle_count == 0
|
||||
assert loop.history == []
|
||||
|
||||
def test_custom_interval_overrides_settings(self):
|
||||
with patch("timmy.backlog_triage.settings") as mock_settings:
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = True
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
loop = BacklogTriageLoop(interval=60)
|
||||
assert loop._interval == 60.0
|
||||
|
||||
def test_stop_sets_running_false(self):
|
||||
with patch("timmy.backlog_triage.settings") as mock_settings:
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = True
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
loop = BacklogTriageLoop()
|
||||
loop._running = True
|
||||
loop.stop()
|
||||
assert loop.is_running is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_once_skips_when_gitea_disabled(self):
|
||||
with patch("timmy.backlog_triage.settings") as mock_settings:
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = True
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
|
||||
result = await loop.run_once()
|
||||
|
||||
assert result.total_open == 0
|
||||
assert result.scored == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_once_increments_cycle_count(self):
|
||||
with patch("timmy.backlog_triage.settings") as mock_settings:
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = True
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
|
||||
await loop.run_once()
|
||||
await loop.run_once()
|
||||
|
||||
assert loop.cycle_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_once_full_cycle_with_mocked_gitea(self):
|
||||
raw_issues = [
|
||||
_make_raw_issue(
|
||||
number=100,
|
||||
title="[bug] crash in src/timmy/agent.py",
|
||||
body=(
|
||||
"## Problem\nCrashes. Expected: runs. "
|
||||
"Must pass pytest. Should return 200."
|
||||
),
|
||||
labels=["bug"],
|
||||
assignees=[],
|
||||
)
|
||||
]
|
||||
|
||||
issues_resp = MagicMock()
|
||||
issues_resp.status_code = 200
|
||||
issues_resp.json.side_effect = [raw_issues, []] # page 1, then empty
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.get.return_value = issues_resp
|
||||
|
||||
with patch("timmy.backlog_triage.settings") as mock_settings:
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = True
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
|
||||
with patch("timmy.backlog_triage.httpx.AsyncClient") as mock_cls:
|
||||
mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
|
||||
result = await loop.run_once()
|
||||
|
||||
assert result.total_open == 1
|
||||
assert result.scored == 1
|
||||
assert loop.cycle_count == 1
|
||||
assert len(loop.history) == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ScoredIssue properties
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScoredIssueProperties:
|
||||
def test_is_unassigned_true_when_no_assignees(self):
|
||||
issue = _make_scored(assignees=[])
|
||||
assert issue.is_unassigned is True
|
||||
|
||||
def test_is_unassigned_false_when_assigned(self):
|
||||
issue = _make_scored(assignees=["claude"])
|
||||
assert issue.is_unassigned is False
|
||||
|
||||
def test_needs_kimi_from_research_tag(self):
|
||||
issue = _make_scored(tags={"research"})
|
||||
assert issue.needs_kimi is True
|
||||
|
||||
def test_needs_kimi_from_kimi_ready_label(self):
|
||||
issue = _make_scored()
|
||||
issue.labels = [KIMI_READY_LABEL]
|
||||
assert issue.needs_kimi is True
|
||||
|
||||
def test_needs_kimi_false_for_plain_bug(self):
|
||||
issue = _make_scored(tags={"bug"}, issue_type="bug")
|
||||
assert issue.needs_kimi is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TriageCycleResult
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestTriageCycleResult:
|
||||
def test_default_decisions_list_is_empty(self):
|
||||
result = TriageCycleResult(
|
||||
timestamp="2026-01-01T00:00:00", total_open=10, scored=8, ready=3
|
||||
)
|
||||
assert result.decisions == []
|
||||
assert result.errors == []
|
||||
assert result.duration_ms == 0
|
||||
@@ -1,643 +0,0 @@
|
||||
"""Unit tests for timmy.kimi_delegation — Kimi research delegation pipeline."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# exceeds_local_capacity
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExceedsLocalCapacity:
|
||||
def test_heavy_keyword_triggers_delegation(self):
|
||||
from timmy.kimi_delegation import exceeds_local_capacity
|
||||
|
||||
assert exceeds_local_capacity("Do a comprehensive review of the codebase") is True
|
||||
|
||||
def test_all_heavy_keywords_detected(self):
|
||||
from timmy.kimi_delegation import _HEAVY_RESEARCH_KEYWORDS, exceeds_local_capacity
|
||||
|
||||
for kw in _HEAVY_RESEARCH_KEYWORDS:
|
||||
assert exceeds_local_capacity(f"Please {kw} the topic") is True, f"Missed keyword: {kw}"
|
||||
|
||||
def test_long_task_triggers_delegation(self):
|
||||
from timmy.kimi_delegation import _HEAVY_WORD_THRESHOLD, exceeds_local_capacity
|
||||
|
||||
long_task = " ".join(["word"] * (_HEAVY_WORD_THRESHOLD + 1))
|
||||
assert exceeds_local_capacity(long_task) is True
|
||||
|
||||
def test_short_simple_task_returns_false(self):
|
||||
from timmy.kimi_delegation import exceeds_local_capacity
|
||||
|
||||
assert exceeds_local_capacity("Fix the typo in README") is False
|
||||
|
||||
def test_exactly_at_word_threshold_triggers(self):
|
||||
from timmy.kimi_delegation import _HEAVY_WORD_THRESHOLD, exceeds_local_capacity
|
||||
|
||||
task = " ".join(["word"] * _HEAVY_WORD_THRESHOLD)
|
||||
assert exceeds_local_capacity(task) is True
|
||||
|
||||
def test_keyword_case_insensitive(self):
|
||||
from timmy.kimi_delegation import exceeds_local_capacity
|
||||
|
||||
assert exceeds_local_capacity("Run a COMPREHENSIVE analysis") is True
|
||||
|
||||
def test_empty_string_returns_false(self):
|
||||
from timmy.kimi_delegation import exceeds_local_capacity
|
||||
|
||||
assert exceeds_local_capacity("") is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _slugify
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSlugify:
|
||||
def test_basic_text(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
assert _slugify("Hello World") == "hello-world"
|
||||
|
||||
def test_special_characters_removed(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
assert _slugify("Research: AI & ML!") == "research-ai--ml"
|
||||
|
||||
def test_underscores_become_dashes(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
assert _slugify("some_snake_case") == "some-snake-case"
|
||||
|
||||
def test_long_text_truncated_to_60(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
long_text = "a" * 100
|
||||
result = _slugify(long_text)
|
||||
assert len(result) <= 60
|
||||
|
||||
def test_leading_trailing_dashes_stripped(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
result = _slugify(" hello ")
|
||||
assert not result.startswith("-")
|
||||
assert not result.endswith("-")
|
||||
|
||||
def test_multiple_spaces_become_single_dash(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
assert _slugify("one two") == "one-two"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_research_template
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBuildResearchTemplate:
|
||||
def test_contains_task_title(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("My Task", "background", "the question?")
|
||||
assert "My Task" in body
|
||||
|
||||
def test_contains_question(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "context", "What is X?")
|
||||
assert "What is X?" in body
|
||||
|
||||
def test_contains_context(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "some context here", "q?")
|
||||
assert "some context here" in body
|
||||
|
||||
def test_default_priority_normal(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "ctx", "q?")
|
||||
assert "normal" in body
|
||||
|
||||
def test_custom_priority_included(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "ctx", "q?", priority="high")
|
||||
assert "high" in body
|
||||
|
||||
def test_kimi_label_mentioned(self):
|
||||
from timmy.kimi_delegation import KIMI_READY_LABEL, _build_research_template
|
||||
|
||||
body = _build_research_template("task", "ctx", "q?")
|
||||
assert KIMI_READY_LABEL in body
|
||||
|
||||
def test_slugified_task_in_artifact_path(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("My Research Task", "ctx", "q?")
|
||||
assert "my-research-task" in body
|
||||
|
||||
def test_sections_present(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "ctx", "q?")
|
||||
assert "## Research Request" in body
|
||||
assert "### Research Question" in body
|
||||
assert "### Background / Context" in body
|
||||
assert "### Deliverables" in body
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_action_items
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractActionItems:
|
||||
def test_checkbox_items_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "- [ ] Fix the bug\n- [ ] Write tests\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Fix the bug" in items
|
||||
assert "Write tests" in items
|
||||
|
||||
def test_numbered_list_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "1. Deploy to staging\n2. Run smoke tests\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Deploy to staging" in items
|
||||
assert "Run smoke tests" in items
|
||||
|
||||
def test_action_prefix_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "Action: Update the config file\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Update the config file" in items
|
||||
|
||||
def test_todo_prefix_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "TODO: Add error handling\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Add error handling" in items
|
||||
|
||||
def test_next_step_prefix_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "Next step: Validate results\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Validate results" in items
|
||||
|
||||
def test_case_insensitive_prefixes(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "todo: lowercase todo\nACTION: uppercase action\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "lowercase todo" in items
|
||||
assert "uppercase action" in items
|
||||
|
||||
def test_deduplication(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "1. Do the thing\n2. Do the thing\n"
|
||||
items = _extract_action_items(text)
|
||||
assert items.count("Do the thing") == 1
|
||||
|
||||
def test_empty_text_returns_empty_list(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
assert _extract_action_items("") == []
|
||||
|
||||
def test_no_action_items_returns_empty_list(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "This is just plain prose with no action items here."
|
||||
assert _extract_action_items(text) == []
|
||||
|
||||
def test_mixed_sources_combined(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "- [ ] checkbox item\n1. numbered item\nAction: action item\n"
|
||||
items = _extract_action_items(text)
|
||||
assert len(items) == 3
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _get_or_create_label (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetOrCreateLabel:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_existing_label_id(self):
|
||||
from timmy.kimi_delegation import KIMI_READY_LABEL, _get_or_create_label
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 200
|
||||
mock_resp.json.return_value = [{"name": KIMI_READY_LABEL, "id": 42}]
|
||||
|
||||
client = MagicMock()
|
||||
client.get = AsyncMock(return_value=mock_resp)
|
||||
|
||||
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
|
||||
assert result == 42
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creates_label_when_missing(self):
|
||||
from timmy.kimi_delegation import _get_or_create_label
|
||||
|
||||
list_resp = MagicMock()
|
||||
list_resp.status_code = 200
|
||||
list_resp.json.return_value = [] # no existing labels
|
||||
|
||||
create_resp = MagicMock()
|
||||
create_resp.status_code = 201
|
||||
create_resp.json.return_value = {"id": 99}
|
||||
|
||||
client = MagicMock()
|
||||
client.get = AsyncMock(return_value=list_resp)
|
||||
client.post = AsyncMock(return_value=create_resp)
|
||||
|
||||
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
|
||||
assert result == 99
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_on_list_exception(self):
|
||||
from timmy.kimi_delegation import _get_or_create_label
|
||||
|
||||
client = MagicMock()
|
||||
client.get = AsyncMock(side_effect=Exception("network error"))
|
||||
|
||||
result = await _get_or_create_label(client, "http://git", {}, "owner/repo")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_on_create_exception(self):
|
||||
from timmy.kimi_delegation import _get_or_create_label
|
||||
|
||||
list_resp = MagicMock()
|
||||
list_resp.status_code = 200
|
||||
list_resp.json.return_value = []
|
||||
|
||||
client = MagicMock()
|
||||
client.get = AsyncMock(return_value=list_resp)
|
||||
client.post = AsyncMock(side_effect=Exception("create failed"))
|
||||
|
||||
result = await _get_or_create_label(client, "http://git", {}, "owner/repo")
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# create_kimi_research_issue (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCreateKimiResearchIssue:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_error_when_gitea_disabled(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
with patch("timmy.kimi_delegation.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is False
|
||||
assert "not configured" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_error_when_no_token(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
with patch("timmy.kimi_delegation.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = ""
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_successful_issue_creation(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = [{"name": "kimi-ready", "id": 5}]
|
||||
|
||||
issue_resp = MagicMock()
|
||||
issue_resp.status_code = 201
|
||||
issue_resp.json.return_value = {"number": 42, "html_url": "http://git/issues/42"}
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.get = AsyncMock(return_value=label_resp)
|
||||
async_client.post = AsyncMock(return_value=issue_resp)
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["issue_number"] == 42
|
||||
assert "http://git/issues/42" in result["issue_url"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_error_returns_failure(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = []
|
||||
|
||||
create_label_resp = MagicMock()
|
||||
create_label_resp.status_code = 201
|
||||
create_label_resp.json.return_value = {"id": 1}
|
||||
|
||||
issue_resp = MagicMock()
|
||||
issue_resp.status_code = 500
|
||||
issue_resp.text = "Internal Server Error"
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.get = AsyncMock(return_value=label_resp)
|
||||
async_client.post = AsyncMock(side_effect=[create_label_resp, issue_resp])
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is False
|
||||
assert "500" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exception_returns_failure(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.__aenter__ = AsyncMock(side_effect=Exception("connection refused"))
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is False
|
||||
assert result["error"] != ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# poll_kimi_issue (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPollKimiIssue:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_error_when_gitea_not_configured(self):
|
||||
from timmy.kimi_delegation import poll_kimi_issue
|
||||
|
||||
with patch("timmy.kimi_delegation.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
result = await poll_kimi_issue(123)
|
||||
|
||||
assert result["completed"] is False
|
||||
assert "not configured" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_completed_when_issue_closed(self):
|
||||
from timmy.kimi_delegation import poll_kimi_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
resp = MagicMock()
|
||||
resp.status_code = 200
|
||||
resp.json.return_value = {"state": "closed", "body": "Done!"}
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.get = AsyncMock(return_value=resp)
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await poll_kimi_issue(42, poll_interval=0, max_wait=1)
|
||||
|
||||
assert result["completed"] is True
|
||||
assert result["state"] == "closed"
|
||||
assert result["body"] == "Done!"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_times_out_when_issue_stays_open(self):
|
||||
from timmy.kimi_delegation import poll_kimi_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
resp = MagicMock()
|
||||
resp.status_code = 200
|
||||
resp.json.return_value = {"state": "open", "body": ""}
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.get = AsyncMock(return_value=resp)
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
patch("timmy.kimi_delegation.asyncio.sleep", new_callable=AsyncMock),
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
# poll_interval > max_wait so it exits immediately after first sleep
|
||||
result = await poll_kimi_issue(42, poll_interval=10, max_wait=5)
|
||||
|
||||
assert result["completed"] is False
|
||||
assert result["state"] == "timeout"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# index_kimi_artifact (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIndexKimiArtifact:
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_artifact_returns_error(self):
|
||||
from timmy.kimi_delegation import index_kimi_artifact
|
||||
|
||||
result = await index_kimi_artifact(1, "title", " ")
|
||||
assert result["success"] is False
|
||||
assert "Empty artifact" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_successful_indexing(self):
|
||||
from timmy.kimi_delegation import index_kimi_artifact
|
||||
|
||||
mock_entry = MagicMock()
|
||||
mock_entry.id = "mem-123"
|
||||
|
||||
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
|
||||
mock_thread.return_value = mock_entry
|
||||
result = await index_kimi_artifact(42, "My Research", "Some research content here")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["memory_id"] == "mem-123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exception_returns_failure(self):
|
||||
from timmy.kimi_delegation import index_kimi_artifact
|
||||
|
||||
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
|
||||
mock_thread.side_effect = Exception("DB error")
|
||||
result = await index_kimi_artifact(42, "title", "some content")
|
||||
|
||||
assert result["success"] is False
|
||||
assert result["error"] != ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# extract_and_create_followups (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractAndCreateFollowups:
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_action_items_returns_empty_created(self):
|
||||
from timmy.kimi_delegation import extract_and_create_followups
|
||||
|
||||
result = await extract_and_create_followups("Plain prose, nothing to do.", 1)
|
||||
assert result["success"] is True
|
||||
assert result["created"] == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gitea_not_configured_returns_error(self):
|
||||
from timmy.kimi_delegation import extract_and_create_followups
|
||||
|
||||
text = "1. Do something important\n"
|
||||
|
||||
with patch("timmy.kimi_delegation.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
result = await extract_and_create_followups(text, 5)
|
||||
|
||||
assert result["success"] is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creates_followup_issues(self):
|
||||
from timmy.kimi_delegation import extract_and_create_followups
|
||||
|
||||
text = "1. Deploy the service\n2. Run integration tests\n"
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
issue_resp = MagicMock()
|
||||
issue_resp.status_code = 201
|
||||
issue_resp.json.return_value = {"number": 10}
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.post = AsyncMock(return_value=issue_resp)
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await extract_and_create_followups(text, 5)
|
||||
|
||||
assert result["success"] is True
|
||||
assert len(result["created"]) == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# delegate_research_to_kimi (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDelegateResearchToKimi:
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_task_returns_error(self):
|
||||
from timmy.kimi_delegation import delegate_research_to_kimi
|
||||
|
||||
result = await delegate_research_to_kimi("", "ctx", "q?")
|
||||
assert result["success"] is False
|
||||
assert "required" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_whitespace_task_returns_error(self):
|
||||
from timmy.kimi_delegation import delegate_research_to_kimi
|
||||
|
||||
result = await delegate_research_to_kimi(" ", "ctx", "q?")
|
||||
assert result["success"] is False
|
||||
assert "required" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_question_returns_error(self):
|
||||
from timmy.kimi_delegation import delegate_research_to_kimi
|
||||
|
||||
result = await delegate_research_to_kimi("valid task", "ctx", "")
|
||||
assert result["success"] is False
|
||||
assert "required" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegates_to_create_issue(self):
|
||||
from timmy.kimi_delegation import delegate_research_to_kimi
|
||||
|
||||
with patch(
|
||||
"timmy.kimi_delegation.create_kimi_research_issue",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_create:
|
||||
mock_create.return_value = {"success": True, "issue_number": 7, "issue_url": "http://x", "error": None}
|
||||
result = await delegate_research_to_kimi("Research X", "ctx", "What is X?", priority="high")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["issue_number"] == 7
|
||||
mock_create.assert_awaited_once_with("Research X", "ctx", "What is X?", "high")
|
||||
@@ -1,124 +0,0 @@
|
||||
"""Unit tests for timmy/research_tools.py."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# serpapi is an optional dependency not installed in the test environment.
|
||||
# Stub it before importing the module under test.
|
||||
if "serpapi" not in sys.modules:
|
||||
sys.modules["serpapi"] = MagicMock()
|
||||
|
||||
from timmy.research_tools import get_llm_client, google_web_search # noqa: E402
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# google_web_search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGoogleWebSearch:
|
||||
@pytest.mark.asyncio
|
||||
async def test_missing_api_key_returns_empty_string(self):
|
||||
"""Returns '' and logs a warning when SERPAPI_API_KEY is absent."""
|
||||
env = {k: v for k, v in os.environ.items() if k != "SERPAPI_API_KEY"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
result = await google_web_search("python tutorial")
|
||||
assert result == ""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_calls_google_search_with_correct_params(self):
|
||||
"""GoogleSearch is constructed with query and api_key from environ."""
|
||||
mock_search_instance = MagicMock()
|
||||
mock_search_instance.get_dict.return_value = {"organic_results": [{"title": "Hello"}]}
|
||||
mock_search_cls = MagicMock(return_value=mock_search_instance)
|
||||
|
||||
with patch.dict(os.environ, {"SERPAPI_API_KEY": "test-key-123"}):
|
||||
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
|
||||
result = await google_web_search("python tutorial")
|
||||
|
||||
mock_search_cls.assert_called_once_with(
|
||||
{"q": "python tutorial", "api_key": "test-key-123"}
|
||||
)
|
||||
assert "Hello" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_stringified_results(self):
|
||||
"""Return value is str() of whatever get_dict() returns."""
|
||||
fake_dict = {"organic_results": [{"title": "Foo", "link": "https://example.com"}]}
|
||||
mock_search_instance = MagicMock()
|
||||
mock_search_instance.get_dict.return_value = fake_dict
|
||||
mock_search_cls = MagicMock(return_value=mock_search_instance)
|
||||
|
||||
with patch.dict(os.environ, {"SERPAPI_API_KEY": "key"}):
|
||||
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
|
||||
result = await google_web_search("foo")
|
||||
|
||||
assert result == str(fake_dict)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_query_still_calls_search(self):
|
||||
"""An empty query is forwarded to GoogleSearch without short-circuiting."""
|
||||
mock_search_instance = MagicMock()
|
||||
mock_search_instance.get_dict.return_value = {}
|
||||
mock_search_cls = MagicMock(return_value=mock_search_instance)
|
||||
|
||||
with patch.dict(os.environ, {"SERPAPI_API_KEY": "key"}):
|
||||
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
|
||||
result = await google_web_search("")
|
||||
|
||||
mock_search_cls.assert_called_once()
|
||||
assert result == str({})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_llm_client
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetLlmClient:
|
||||
def test_returns_a_client_object(self):
|
||||
"""get_llm_client() returns a non-None object."""
|
||||
client = get_llm_client()
|
||||
assert client is not None
|
||||
|
||||
def test_client_has_completion_method(self):
|
||||
"""The returned client exposes a callable completion attribute."""
|
||||
client = get_llm_client()
|
||||
assert callable(getattr(client, "completion", None))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_completion_returns_object_with_text(self):
|
||||
"""completion() returns an object whose .text is a non-empty string."""
|
||||
client = get_llm_client()
|
||||
result = await client.completion("What is Python?", max_tokens=100)
|
||||
assert hasattr(result, "text")
|
||||
assert isinstance(result.text, str)
|
||||
assert len(result.text) > 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_completion_text_contains_prompt(self):
|
||||
"""The stub weaves the prompt into the returned text."""
|
||||
client = get_llm_client()
|
||||
prompt = "Tell me about asyncio"
|
||||
result = await client.completion(prompt, max_tokens=50)
|
||||
assert prompt in result.text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multiple_calls_return_independent_objects(self):
|
||||
"""Each call to completion() returns a fresh object."""
|
||||
client = get_llm_client()
|
||||
r1 = await client.completion("prompt one", max_tokens=10)
|
||||
r2 = await client.completion("prompt two", max_tokens=10)
|
||||
assert r1 is not r2
|
||||
assert r1.text != r2.text
|
||||
|
||||
def test_multiple_calls_return_independent_clients(self):
|
||||
"""Each call to get_llm_client() returns a distinct instance."""
|
||||
c1 = get_llm_client()
|
||||
c2 = get_llm_client()
|
||||
assert c1 is not c2
|
||||
@@ -334,7 +334,7 @@ async def test_think_once_disabled(tmp_path):
|
||||
"""think_once should return None when thinking is disabled."""
|
||||
engine = _make_engine(tmp_path)
|
||||
|
||||
with patch("timmy.thinking.engine.settings") as mock_settings:
|
||||
with patch("timmy.thinking.settings") as mock_settings:
|
||||
mock_settings.thinking_enabled = False
|
||||
thought = await engine.think_once()
|
||||
|
||||
@@ -381,7 +381,7 @@ async def test_think_once_prompt_includes_memory_context(tmp_path):
|
||||
return "A grounded thought."
|
||||
|
||||
with (
|
||||
patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", memory_md),
|
||||
patch("timmy.thinking.HOT_MEMORY_PATH", memory_md),
|
||||
patch.object(engine, "_call_agent", side_effect=capture_agent),
|
||||
patch.object(engine, "_log_event"),
|
||||
patch.object(engine, "_update_memory"),
|
||||
@@ -412,7 +412,7 @@ async def test_think_once_prompt_includes_soul(tmp_path):
|
||||
return "A soulful thought."
|
||||
|
||||
with (
|
||||
patch("timmy.thinking._snapshot.SOUL_PATH", soul_md),
|
||||
patch("timmy.thinking.SOUL_PATH", soul_md),
|
||||
patch.object(engine, "_call_agent", side_effect=capture_agent),
|
||||
patch.object(engine, "_log_event"),
|
||||
patch.object(engine, "_update_memory"),
|
||||
@@ -433,7 +433,7 @@ async def test_think_once_graceful_without_soul(tmp_path):
|
||||
nonexistent = tmp_path / "no_such_soul.md"
|
||||
|
||||
with (
|
||||
patch("timmy.thinking._snapshot.SOUL_PATH", nonexistent),
|
||||
patch("timmy.thinking.SOUL_PATH", nonexistent),
|
||||
patch.object(engine, "_call_agent", return_value="Still thinking."),
|
||||
patch.object(engine, "_log_event"),
|
||||
patch.object(engine, "_update_memory"),
|
||||
@@ -481,7 +481,7 @@ async def test_think_once_never_writes_soul(tmp_path):
|
||||
soul_md.write_text(original_content)
|
||||
|
||||
with (
|
||||
patch("timmy.thinking._snapshot.SOUL_PATH", soul_md),
|
||||
patch("timmy.thinking.SOUL_PATH", soul_md),
|
||||
patch.object(engine, "_call_agent", return_value="A deep reflection."),
|
||||
patch.object(engine, "_log_event"),
|
||||
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
||||
@@ -501,7 +501,7 @@ async def test_think_once_memory_update_graceful_on_failure(tmp_path):
|
||||
# Don't create the parent dir — write will fail
|
||||
|
||||
with (
|
||||
patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", bad_memory),
|
||||
patch("timmy.thinking.HOT_MEMORY_PATH", bad_memory),
|
||||
patch.object(engine, "_call_agent", return_value="Resilient thought."),
|
||||
patch.object(engine, "_log_event"),
|
||||
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
||||
@@ -1090,7 +1090,7 @@ def test_maybe_check_memory_fires_at_interval(tmp_path):
|
||||
engine._store_thought(f"Thought {i}.", "freeform")
|
||||
|
||||
with (
|
||||
patch("timmy.thinking._distillation.settings") as mock_settings,
|
||||
patch("timmy.thinking.settings") as mock_settings,
|
||||
patch(
|
||||
"timmy.tools_intro.get_memory_status",
|
||||
return_value={
|
||||
@@ -1113,7 +1113,7 @@ def test_maybe_check_memory_skips_between_intervals(tmp_path):
|
||||
engine._store_thought(f"Thought {i}.", "freeform")
|
||||
|
||||
with (
|
||||
patch("timmy.thinking._distillation.settings") as mock_settings,
|
||||
patch("timmy.thinking.settings") as mock_settings,
|
||||
patch(
|
||||
"timmy.tools_intro.get_memory_status",
|
||||
) as mock_status,
|
||||
@@ -1131,7 +1131,7 @@ def test_maybe_check_memory_graceful_on_error(tmp_path):
|
||||
engine._store_thought(f"Thought {i}.", "freeform")
|
||||
|
||||
with (
|
||||
patch("timmy.thinking._distillation.settings") as mock_settings,
|
||||
patch("timmy.thinking.settings") as mock_settings,
|
||||
patch(
|
||||
"timmy.tools_intro.get_memory_status",
|
||||
side_effect=Exception("boom"),
|
||||
|
||||
Reference in New Issue
Block a user