Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
c58093dccc WIP: Claude Code progress on #1285
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 22:02:09 -04:00
62 changed files with 1451 additions and 7951 deletions

View File

@@ -27,12 +27,8 @@
# ── AirLLM / big-brain backend ───────────────────────────────────────────────
# Inference backend: "ollama" (default) | "airllm" | "auto"
# "ollama" always use Ollama (safe everywhere, any OS)
# "airllm" → AirLLM layer-by-layer loading (Apple Silicon M1/M2/M3/M4 only)
# Requires 16 GB RAM minimum (32 GB recommended).
# Automatically falls back to Ollama on Intel Mac or Linux.
# Install extra: pip install "airllm[mlx]"
# "auto" → use AirLLM on Apple Silicon if installed, otherwise Ollama
# "auto" → uses AirLLM on Apple Silicon if installed, otherwise Ollama.
# Requires: pip install ".[bigbrain]"
# TIMMY_MODEL_BACKEND=ollama
# AirLLM model size (default: 70b).

View File

@@ -18,9 +18,17 @@ jobs:
- name: Lint (ruff via tox)
run: tox -e lint
test:
typecheck:
runs-on: ubuntu-latest
needs: lint
steps:
- uses: actions/checkout@v4
- name: Type-check (mypy via tox)
run: tox -e typecheck
test:
runs-on: ubuntu-latest
needs: typecheck
steps:
- uses: actions/checkout@v4
- name: Run tests (via tox)

View File

@@ -62,9 +62,6 @@ Per AGENTS.md roster:
- Run `tox -e pre-push` (lint + full CI suite)
- Ensure tests stay green
- Update TODO.md
- **CRITICAL: Stage files before committing** — always run `git add .` or `git add <files>` first
- Verify staged changes are non-empty: `git diff --cached --stat` must show files
- **NEVER run `git commit` without staging files first** — empty commits waste review cycles
---

View File

@@ -247,48 +247,6 @@ make docker-agent # add a worker
---
## Search Capability (SearXNG + Crawl4AI)
Timmy has a self-hosted search backend requiring **no paid API key**.
### Tools
| Tool | Module | Description |
|------|--------|-------------|
| `web_search(query)` | `timmy/tools/search.py` | Meta-search via SearXNG — returns ranked results |
| `scrape_url(url)` | `timmy/tools/search.py` | Full-page scrape via Crawl4AI → clean markdown |
Both tools are registered in the **orchestrator** (full) and **echo** (research) toolkits.
### Configuration
| Env Var | Default | Description |
|---------|---------|-------------|
| `TIMMY_SEARCH_BACKEND` | `searxng` | `searxng` or `none` (disable) |
| `TIMMY_SEARCH_URL` | `http://localhost:8888` | SearXNG base URL |
| `TIMMY_CRAWL_URL` | `http://localhost:11235` | Crawl4AI base URL |
Inside Docker Compose (when `--profile search` is active), the dashboard
uses `http://searxng:8080` and `http://crawl4ai:11235` by default.
### Starting the services
```bash
# Start SearXNG + Crawl4AI alongside the dashboard:
docker compose --profile search up
# Or start only the search services:
docker compose --profile search up searxng crawl4ai
```
### Graceful degradation
- If `TIMMY_SEARCH_BACKEND=none`: tools return a "disabled" message.
- If SearXNG or Crawl4AI is unreachable: tools log a WARNING and return an
error string — the app never crashes.
---
## Roadmap
**v2.0 Exodus (in progress):** Voice + Marketplace + Integrations

View File

@@ -9,21 +9,6 @@ API access with Bitcoin Lightning — all from a browser, no cloud AI required.
---
## System Requirements
| Path | Hardware | RAM | Disk |
|------|----------|-----|------|
| **Ollama** (default) | Any OS — x86-64 or ARM | 8 GB min | 510 GB (model files) |
| **AirLLM** (Apple Silicon) | M1, M2, M3, or M4 Mac | 16 GB min (32 GB recommended) | ~15 GB free |
**Ollama path** runs on any modern machine — macOS, Linux, or Windows. No GPU required.
**AirLLM path** uses layer-by-layer loading for 70B+ models without a GPU. Requires Apple
Silicon and the `bigbrain` extras (`pip install ".[bigbrain]"`). On Intel Mac or Linux the
app automatically falls back to Ollama — no crash, no config change needed.
---
## Quick Start
```bash

View File

@@ -42,10 +42,6 @@ services:
GROK_ENABLED: "${GROK_ENABLED:-false}"
XAI_API_KEY: "${XAI_API_KEY:-}"
GROK_DEFAULT_MODEL: "${GROK_DEFAULT_MODEL:-grok-3-fast}"
# Search backend (SearXNG + Crawl4AI) — set TIMMY_SEARCH_BACKEND=none to disable
TIMMY_SEARCH_BACKEND: "${TIMMY_SEARCH_BACKEND:-searxng}"
TIMMY_SEARCH_URL: "${TIMMY_SEARCH_URL:-http://searxng:8080}"
TIMMY_CRAWL_URL: "${TIMMY_CRAWL_URL:-http://crawl4ai:11235}"
extra_hosts:
- "host.docker.internal:host-gateway" # Linux: maps to host IP
networks:
@@ -78,50 +74,6 @@ services:
profiles:
- celery
# ── SearXNG — self-hosted meta-search engine ─────────────────────────
searxng:
image: searxng/searxng:latest
container_name: timmy-searxng
profiles:
- search
ports:
- "${SEARXNG_PORT:-8888}:8080"
environment:
SEARXNG_BASE_URL: "${SEARXNG_BASE_URL:-http://localhost:8888}"
volumes:
- ./docker/searxng:/etc/searxng:rw
networks:
- timmy-net
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:8080/healthz"]
interval: 30s
timeout: 5s
retries: 3
start_period: 20s
# ── Crawl4AI — self-hosted web scraper ────────────────────────────────
crawl4ai:
image: unclecode/crawl4ai:latest
container_name: timmy-crawl4ai
profiles:
- search
ports:
- "${CRAWL4AI_PORT:-11235}:11235"
environment:
CRAWL4AI_API_TOKEN: "${CRAWL4AI_API_TOKEN:-}"
volumes:
- timmy-data:/app/data
networks:
- timmy-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:11235/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
# ── OpenFang — vendored agent runtime sidecar ────────────────────────────
openfang:
build:

View File

@@ -1,67 +0,0 @@
# SearXNG configuration for Timmy Time self-hosted search
# https://docs.searxng.org/admin/settings/settings.html
general:
debug: false
instance_name: "Timmy Search"
privacypolicy_url: false
donation_url: false
contact_url: false
enable_metrics: false
server:
port: 8080
bind_address: "0.0.0.0"
secret_key: "timmy-searxng-key-change-in-production"
base_url: false
image_proxy: false
ui:
static_use_hash: false
default_locale: ""
query_in_title: false
infinite_scroll: false
default_theme: simple
center_alignment: false
search:
safe_search: 0
autocomplete: ""
default_lang: "en"
formats:
- html
- json
outgoing:
request_timeout: 6.0
max_request_timeout: 10.0
useragent_suffix: "TimmyResearchBot"
pool_connections: 100
pool_maxsize: 20
enabled_plugins:
- Hash_plugin
- Search_on_category_select
- Tracker_url_remover
engines:
- name: google
engine: google
shortcut: g
categories: general
- name: bing
engine: bing
shortcut: b
categories: general
- name: duckduckgo
engine: duckduckgo
shortcut: d
categories: general
- name: wikipedia
engine: wikipedia
shortcut: wp
categories: general
timeout: 3.0

View File

@@ -1,190 +0,0 @@
# DeerFlow Evaluation — Autonomous Research Orchestration Layer
**Status:** No-go for full adoption · Selective borrowing recommended
**Date:** 2026-03-23
**Issue:** #1283 (spawned from #1275 screenshot triage)
**Refs:** #972 (Timmy research pipeline) · #975 (ResearchOrchestrator)
---
## What Is DeerFlow?
DeerFlow (`bytedance/deer-flow`) is an open-source "super-agent harness" built by ByteDance on top of LangGraph. It provides a production-grade multi-agent research and code-execution framework with a web UI, REST API, Docker deployment, and optional IM channel integration (Telegram, Slack, Feishu/Lark).
- **Stars:** ~39,600 · **License:** MIT
- **Stack:** Python 3.12+ (backend) · TypeScript/Next.js (frontend) · LangGraph runtime
- **Entry point:** `http://localhost:2026` (Nginx reverse proxy, configurable via `PORT`)
---
## Research Questions — Answers
### 1. Agent Roles
DeerFlow uses a two-tier architecture:
| Role | Description |
|------|-------------|
| **Lead Agent** | Entry point; decomposes tasks, dispatches sub-agents, synthesizes results |
| **Sub-Agent (general-purpose)** | All tools except `task`; spawned dynamically |
| **Sub-Agent (bash)** | Command-execution specialist |
The lead agent runs through a 12-middleware chain in order: thread setup → uploads → sandbox → tool-call repair → guardrails → summarization → todo tracking → title generation → memory update → image injection → sub-agent concurrency cap → clarification intercept.
**Concurrency:** up to 3 sub-agents in parallel (configurable), 15-minute default timeout each, structured SSE event stream (`task_started` / `task_running` / `task_completed` / `task_failed`).
**Mapping to Timmy personas:** DeerFlow's lead/sub-agent split roughly maps to Timmy's orchestrator + specialist-agent pattern. DeerFlow doesn't have named personas — it routes by capability (tools available to the agent type), not by identity. Timmy's persona system is richer and more opinionated.
---
### 2. API Surface
DeerFlow exposes a full REST API at port 2026 (via Nginx). **No authentication by default.**
**Core integration endpoints:**
| Endpoint | Method | Purpose |
|----------|--------|---------|
| `POST /api/langgraph/threads` | | Create conversation thread |
| `POST /api/langgraph/threads/{id}/runs` | | Submit task (blocking) |
| `POST /api/langgraph/threads/{id}/runs/stream` | | Submit task (streaming SSE/WS) |
| `GET /api/langgraph/threads/{id}/state` | | Get full thread state + artifacts |
| `GET /api/models` | | List configured models |
| `GET /api/threads/{id}/artifacts/{path}` | | Download generated artifacts |
| `DELETE /api/threads/{id}` | | Clean up thread data |
These are callable from Timmy with `httpx` — no special client library needed.
---
### 3. LLM Backend Support
DeerFlow uses LangChain model classes declared in `config.yaml`.
**Documented providers:** OpenAI, Anthropic, Google Gemini, DeepSeek, Doubao (ByteDance), Kimi/Moonshot, OpenRouter, MiniMax, Novita AI, Claude Code (OAuth).
**Ollama:** Not in official documentation, but works via the `langchain_openai:ChatOpenAI` class with `base_url: http://localhost:11434/v1` and a dummy API key. Community-confirmed (GitHub issues #37, #1004) with Qwen2.5, Llama 3.1, and DeepSeek-R1.
**vLLM:** Not documented, but architecturally identical — vLLM exposes an OpenAI-compatible endpoint. Should work with the same `base_url` override.
**Practical caveat:** The lead agent requires strong instruction-following for consistent tool use and structured output. Community findings suggest ≥14B parameter models (Qwen2.5-14B minimum) for reliable orchestration. Our current `qwen3:14b` should be viable.
---
### 4. License
**MIT License** — Copyright 2025 ByteDance Ltd. and DeerFlow Authors 20252026.
Permissive: use, modify, distribute, commercialize freely. Attribution required. No warranty.
**Compatible with Timmy's use case.** No CLA, no copyleft, no commercial restrictions.
---
### 5. Docker Port Conflicts
DeerFlow's Docker Compose exposes a single host port:
| Service | Host Port | Notes |
|---------|-----------|-------|
| Nginx (entry point) | **2026** (configurable via `PORT`) | Only externally exposed port |
| Frontend (Next.js) | 3000 | Internal only |
| Gateway API | 8001 | Internal only |
| LangGraph runtime | 2024 | Internal only |
| Provisioner (optional) | 8002 | Internal only, Kubernetes mode only |
Timmy's existing Docker Compose exposes:
- **8000** — dashboard (FastAPI)
- **8080** — openfang (via `openfang` profile)
- **11434** — Ollama (host process, not containerized)
**No conflict.** Port 2026 is not used by Timmy. DeerFlow can run alongside the existing stack without modification.
---
## Full Capability Comparison
| Capability | DeerFlow | Timmy (`research.py`) |
|------------|----------|-----------------------|
| Multi-agent fan-out | ✅ 3 concurrent sub-agents | ❌ Sequential only |
| Web search | ✅ Tavily / InfoQuest | ✅ `research_tools.py` |
| Web fetch | ✅ Jina AI / Firecrawl | ✅ trafilatura |
| Code execution (sandbox) | ✅ Local / Docker / K8s | ❌ Not implemented |
| Artifact generation | ✅ HTML, Markdown, slides | ❌ Markdown report only |
| Document upload + conversion | ✅ PDF, PPT, Excel, Word | ❌ Not implemented |
| Long-term memory | ✅ LLM-extracted facts, persistent | ✅ SQLite semantic cache |
| Streaming results | ✅ SSE + WebSocket | ❌ Blocking call |
| Web UI | ✅ Next.js included | ✅ Jinja2/HTMX dashboard |
| IM integration | ✅ Telegram, Slack, Feishu | ✅ Telegram, Discord |
| Ollama backend | ✅ (via config, community-confirmed) | ✅ Native |
| Persona system | ❌ Role-based only | ✅ Named personas |
| Semantic cache tier | ❌ Not implemented | ✅ SQLite (Tier 4) |
| Free-tier cascade | ❌ Not applicable | 🔲 Planned (Groq, #980) |
| Python version requirement | 3.12+ | 3.11+ |
| Lock-in | LangGraph + LangChain | None |
---
## Integration Options Assessment
### Option A — Full Adoption (replace `research.py`)
**Verdict: Not recommended.**
DeerFlow is a substantial full-stack system (Python + Node.js, Docker, Nginx, LangGraph). Adopting it fully would:
- Replace Timmy's custom cascade tier system (SQLite cache → Ollama → Claude API → Groq) with a single-tier LangChain model config
- Lose Timmy's persona-aware research routing
- Add Python 3.12+ dependency (Timmy currently targets 3.11+)
- Introduce LangGraph/LangChain lock-in for all research tasks
- Require running a parallel Node.js frontend process (redundant given Timmy's own UI)
### Option B — Sidecar for Heavy Research (call DeerFlow's API from Timmy)
**Verdict: Viable but over-engineered for current needs.**
DeerFlow could run as an optional sidecar (`docker compose --profile deerflow up`) and Timmy could delegate multi-agent research tasks via `POST /api/langgraph/threads/{id}/runs`. This would unlock parallel sub-agent fan-out and code-execution sandboxing without replacing Timmy's stack.
The integration would be ~50 lines of `httpx` code in a new `DeerFlowClient` adapter. The `ResearchOrchestrator` in `research.py` could route tasks above a complexity threshold to DeerFlow.
**Barrier:** DeerFlow's lack of default authentication means the sidecar would need to be network-isolated (internal Docker network only) or firewalled. Also, DeerFlow's Ollama integration is community-maintained, not officially supported — risk of breaking on upstream updates.
### Option C — Selective Borrowing (copy patterns, not code)
**Verdict: Recommended.**
DeerFlow's architecture reveals concrete gaps in Timmy's current pipeline that are worth addressing independently:
| DeerFlow Pattern | Timmy Gap to Close | Implementation Path |
|------------------|--------------------|---------------------|
| Parallel sub-agent fan-out | Research is sequential | Add `asyncio.gather()` to `ResearchOrchestrator` for concurrent query execution |
| `SummarizationMiddleware` | Long contexts blow token budget | Add a context-trimming step in the synthesis cascade |
| `TodoListMiddleware` | No progress tracking during long research | Wire into the dashboard task panel |
| Artifact storage + serving | Reports are ephemeral (not persistently downloadable) | Add file-based artifact store to `research.py` (issue #976 already planned) |
| Skill modules (Markdown-based) | Research templates are `.md` files — same pattern | Already done in `skills/research/` |
| MCP integration | Research tools are hard-coded | Add MCP server discovery to `research_tools.py` for pluggable tool backends |
---
## Recommendation
**No-go for full adoption or sidecar deployment at this stage.**
Timmy's `ResearchOrchestrator` already covers the core pipeline (query → search → fetch → synthesize → store). DeerFlow's value proposition is primarily the parallel sub-agent fan-out and code-execution sandbox — capabilities that are useful but not blocking Timmy's current roadmap.
**Recommended actions:**
1. **Close the parallelism gap (high value, low effort):** Refactor `ResearchOrchestrator` to execute queries concurrently with `asyncio.gather()`. This delivers DeerFlow's most impactful capability without any new dependencies.
2. **Re-evaluate after #980 and #981 are done:** Once Timmy has the Groq free-tier cascade and a sovereignty metrics dashboard, we'll have a clearer picture of whether the custom orchestrator is performing well enough to make DeerFlow unnecessary entirely.
3. **File a follow-up for MCP tool integration:** DeerFlow's use of `langchain-mcp-adapters` for pluggable tool backends is the most architecturally interesting pattern. Adding MCP server discovery to `research_tools.py` would give Timmy the same extensibility without LangGraph lock-in.
4. **Revisit DeerFlow's code-execution sandbox if #978 (Paperclip task runner) proves insufficient:** DeerFlow's sandboxed `bash` tool is production-tested and well-isolated. If Timmy's task runner needs secure code execution, DeerFlow's sandbox implementation is worth borrowing or wrapping.
---
## Follow-up Issues to File
| Issue | Title | Priority |
|-------|-------|----------|
| New | Parallelize ResearchOrchestrator query execution (`asyncio.gather`) | Medium |
| New | Add context-trimming step to synthesis cascade | Low |
| New | MCP server discovery in `research_tools.py` | Low |
| #976 | Semantic index for research outputs (already planned) | High |

View File

@@ -15,7 +15,6 @@ packages = [
{ include = "config.py", from = "src" },
{ include = "bannerlord", from = "src" },
{ include = "brain", from = "src" },
{ include = "dashboard", from = "src" },
{ include = "infrastructure", from = "src" },
{ include = "integrations", from = "src" },
@@ -165,3 +164,7 @@ directory = "htmlcov"
[tool.coverage.xml]
output = "coverage.xml"
[tool.mypy]
ignore_missing_imports = true
no_error_summary = true

View File

@@ -1 +0,0 @@
"""Timmy Time Dashboard — source root package."""

View File

@@ -1 +0,0 @@
"""Brain — identity system and task coordination."""

View File

@@ -1,314 +0,0 @@
"""DistributedWorker — task lifecycle management and backend routing.
Routes delegated tasks to appropriate execution backends:
- agentic_loop: local multi-step execution via Timmy's agentic loop
- kimi: heavy research tasks dispatched via Gitea kimi-ready issues
- paperclip: task submission to the Paperclip API
Task lifecycle: queued → running → completed | failed
Failure handling: auto-retry up to MAX_RETRIES, then mark failed.
"""
from __future__ import annotations
import asyncio
import logging
import threading
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any, ClassVar
logger = logging.getLogger(__name__)
MAX_RETRIES = 2
# ---------------------------------------------------------------------------
# Task record
# ---------------------------------------------------------------------------
@dataclass
class DelegatedTask:
"""Record of one delegated task and its execution state."""
task_id: str
agent_name: str
agent_role: str
task_description: str
priority: str
backend: str # "agentic_loop" | "kimi" | "paperclip"
status: str = "queued" # queued | running | completed | failed
created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
result: dict[str, Any] | None = None
error: str | None = None
retries: int = 0
# ---------------------------------------------------------------------------
# Worker
# ---------------------------------------------------------------------------
class DistributedWorker:
"""Routes and tracks delegated task execution across multiple backends.
All methods are class-methods; DistributedWorker is a singleton-style
service — no instantiation needed.
Usage::
from brain.worker import DistributedWorker
task_id = DistributedWorker.submit("researcher", "research", "summarise X")
status = DistributedWorker.get_status(task_id)
"""
_tasks: ClassVar[dict[str, DelegatedTask]] = {}
_lock: ClassVar[threading.Lock] = threading.Lock()
@classmethod
def submit(
cls,
agent_name: str,
agent_role: str,
task_description: str,
priority: str = "normal",
) -> str:
"""Submit a task for execution. Returns task_id immediately.
The task is registered as 'queued' and a daemon thread begins
execution in the background. Use get_status(task_id) to poll.
"""
task_id = uuid.uuid4().hex[:8]
backend = cls._select_backend(agent_role, task_description)
record = DelegatedTask(
task_id=task_id,
agent_name=agent_name,
agent_role=agent_role,
task_description=task_description,
priority=priority,
backend=backend,
)
with cls._lock:
cls._tasks[task_id] = record
thread = threading.Thread(
target=cls._run_task,
args=(record,),
daemon=True,
name=f"worker-{task_id}",
)
thread.start()
logger.info(
"Task %s queued: %s%.60s (backend=%s, priority=%s)",
task_id,
agent_name,
task_description,
backend,
priority,
)
return task_id
@classmethod
def get_status(cls, task_id: str) -> dict[str, Any]:
"""Return current status of a task by ID."""
record = cls._tasks.get(task_id)
if record is None:
return {"found": False, "task_id": task_id}
return {
"found": True,
"task_id": record.task_id,
"agent": record.agent_name,
"role": record.agent_role,
"status": record.status,
"backend": record.backend,
"priority": record.priority,
"created_at": record.created_at,
"retries": record.retries,
"result": record.result,
"error": record.error,
}
@classmethod
def list_tasks(cls) -> list[dict[str, Any]]:
"""Return a summary list of all tracked tasks."""
with cls._lock:
return [
{
"task_id": t.task_id,
"agent": t.agent_name,
"status": t.status,
"backend": t.backend,
"created_at": t.created_at,
}
for t in cls._tasks.values()
]
@classmethod
def clear(cls) -> None:
"""Clear the task registry (for tests)."""
with cls._lock:
cls._tasks.clear()
# ------------------------------------------------------------------
# Backend selection
# ------------------------------------------------------------------
@classmethod
def _select_backend(cls, agent_role: str, task_description: str) -> str:
"""Choose the execution backend for a given agent role and task.
Priority:
1. kimi — research role + Gitea enabled + task exceeds local capacity
2. paperclip — paperclip API key is configured
3. agentic_loop — local fallback (always available)
"""
try:
from config import settings
from timmy.kimi_delegation import exceeds_local_capacity
if (
agent_role == "research"
and getattr(settings, "gitea_enabled", False)
and getattr(settings, "gitea_token", "")
and exceeds_local_capacity(task_description)
):
return "kimi"
if getattr(settings, "paperclip_api_key", ""):
return "paperclip"
except Exception as exc:
logger.debug("Backend selection error — defaulting to agentic_loop: %s", exc)
return "agentic_loop"
# ------------------------------------------------------------------
# Task execution
# ------------------------------------------------------------------
@classmethod
def _run_task(cls, record: DelegatedTask) -> None:
"""Execute a task with retry logic. Runs inside a daemon thread."""
record.status = "running"
for attempt in range(MAX_RETRIES + 1):
try:
if attempt > 0:
logger.info(
"Retrying task %s (attempt %d/%d)",
record.task_id,
attempt + 1,
MAX_RETRIES + 1,
)
record.retries = attempt
result = cls._dispatch(record)
record.status = "completed"
record.result = result
logger.info(
"Task %s completed via %s",
record.task_id,
record.backend,
)
return
except Exception as exc:
logger.warning(
"Task %s attempt %d failed: %s",
record.task_id,
attempt + 1,
exc,
)
if attempt == MAX_RETRIES:
record.status = "failed"
record.error = str(exc)
logger.error(
"Task %s exhausted %d retries. Final error: %s",
record.task_id,
MAX_RETRIES,
exc,
)
@classmethod
def _dispatch(cls, record: DelegatedTask) -> dict[str, Any]:
"""Route to the selected backend. Raises on failure."""
if record.backend == "kimi":
return asyncio.run(cls._execute_kimi(record))
if record.backend == "paperclip":
return asyncio.run(cls._execute_paperclip(record))
return asyncio.run(cls._execute_agentic_loop(record))
@classmethod
async def _execute_kimi(cls, record: DelegatedTask) -> dict[str, Any]:
"""Create a kimi-ready Gitea issue for the task.
Kimi picks up the issue via the kimi-ready label and executes it.
"""
from timmy.kimi_delegation import create_kimi_research_issue
result = await create_kimi_research_issue(
task=record.task_description[:120],
context=f"Delegated by agent '{record.agent_name}' via delegate_task.",
question=record.task_description,
priority=record.priority,
)
if not result.get("success"):
raise RuntimeError(f"Kimi issue creation failed: {result.get('error')}")
return result
@classmethod
async def _execute_paperclip(cls, record: DelegatedTask) -> dict[str, Any]:
"""Submit the task to the Paperclip API."""
import httpx
from timmy.paperclip import PaperclipClient
client = PaperclipClient()
async with httpx.AsyncClient(timeout=client.timeout) as http:
resp = await http.post(
f"{client.base_url}/api/tasks",
headers={"Authorization": f"Bearer {client.api_key}"},
json={
"kind": record.agent_role,
"agent_id": client.agent_id,
"company_id": client.company_id,
"priority": record.priority,
"context": {"task": record.task_description},
},
)
if resp.status_code in (200, 201):
data = resp.json()
logger.info(
"Task %s submitted to Paperclip (paperclip_id=%s)",
record.task_id,
data.get("id"),
)
return {
"success": True,
"paperclip_task_id": data.get("id"),
"backend": "paperclip",
}
raise RuntimeError(f"Paperclip API error {resp.status_code}: {resp.text[:200]}")
@classmethod
async def _execute_agentic_loop(cls, record: DelegatedTask) -> dict[str, Any]:
"""Execute the task via Timmy's local agentic loop."""
from timmy.agentic_loop import run_agentic_loop
result = await run_agentic_loop(record.task_description)
return {
"success": result.status != "failed",
"agentic_task_id": result.task_id,
"summary": result.summary,
"status": result.status,
"backend": "agentic_loop",
}

View File

@@ -1,8 +1,3 @@
"""Central pydantic-settings configuration for Timmy Time Dashboard.
All environment variable access goes through the ``settings`` singleton
exported from this module — never use ``os.environ.get()`` in app code.
"""
import logging as _logging
import os
import sys
@@ -99,9 +94,8 @@ class Settings(BaseSettings):
# ── Backend selection ────────────────────────────────────────────────────
# "ollama" — always use Ollama (default, safe everywhere)
# "airllm" — AirLLM layer-by-layer loading (Apple Silicon only; degrades to Ollama)
# "auto" — pick best available local backend, fall back to Ollama
timmy_model_backend: Literal["ollama", "airllm", "grok", "claude", "auto"] = "ollama"
timmy_model_backend: Literal["ollama", "grok", "claude", "auto"] = "ollama"
# ── Grok (xAI) — opt-in premium cloud backend ────────────────────────
# Grok is a premium augmentation layer — local-first ethos preserved.
@@ -114,16 +108,6 @@ class Settings(BaseSettings):
grok_sats_hard_cap: int = 100 # Absolute ceiling on sats per Grok query
grok_free: bool = False # Skip Lightning invoice when user has own API key
# ── Search Backend (SearXNG + Crawl4AI) ──────────────────────────────
# "searxng" — self-hosted SearXNG meta-search engine (default, no API key)
# "none" — disable web search (private/offline deployments)
# Override with TIMMY_SEARCH_BACKEND env var.
timmy_search_backend: Literal["searxng", "none"] = "searxng"
# SearXNG base URL — override with TIMMY_SEARCH_URL env var
search_url: str = "http://localhost:8888"
# Crawl4AI base URL — override with TIMMY_CRAWL_URL env var
crawl_url: str = "http://localhost:11235"
# ── Database ──────────────────────────────────────────────────────────
db_busy_timeout_ms: int = 5000 # SQLite PRAGMA busy_timeout (ms)
@@ -133,23 +117,6 @@ class Settings(BaseSettings):
anthropic_api_key: str = ""
claude_model: str = "haiku"
# ── Tiered Model Router (issue #882) ─────────────────────────────────
# Three-tier cascade: Local 8B (free, fast) → Local 70B (free, slower)
# → Cloud API (paid, best). Override model names per tier via env vars.
#
# TIER_LOCAL_FAST_MODEL — Tier-1 model name in Ollama (default: llama3.1:8b)
# TIER_LOCAL_HEAVY_MODEL — Tier-2 model name in Ollama (default: hermes3:70b)
# TIER_CLOUD_MODEL — Tier-3 cloud model name (default: claude-haiku-4-5)
#
# Budget limits for the cloud tier (0 = unlimited):
# TIER_CLOUD_DAILY_BUDGET_USD — daily ceiling in USD (default: 5.0)
# TIER_CLOUD_MONTHLY_BUDGET_USD — monthly ceiling in USD (default: 50.0)
tier_local_fast_model: str = "llama3.1:8b"
tier_local_heavy_model: str = "hermes3:70b"
tier_cloud_model: str = "claude-haiku-4-5"
tier_cloud_daily_budget_usd: float = 5.0
tier_cloud_monthly_budget_usd: float = 50.0
# ── Content Moderation ──────────────────────────────────────────────
# Three-layer moderation pipeline for AI narrator output.
# Uses Llama Guard via Ollama with regex fallback.

View File

@@ -1,4 +1,3 @@
"""SQLAlchemy ORM models for the CALM task-management and journaling system."""
from datetime import UTC, date, datetime
from enum import StrEnum

View File

@@ -1,4 +1,3 @@
"""SQLAlchemy engine, session factory, and declarative Base for the CALM module."""
import logging
from pathlib import Path

View File

@@ -1,4 +1,3 @@
"""Dashboard routes for agent chat interactions and tool-call display."""
import json
import logging
from datetime import datetime

View File

@@ -1,4 +1,3 @@
"""Dashboard routes for the CALM task management and daily journaling interface."""
import logging
from datetime import UTC, date, datetime

View File

@@ -6,6 +6,8 @@ import sqlite3
from contextlib import closing
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
@@ -36,9 +38,9 @@ def _discover_databases() -> list[dict]:
return dbs
def _query_database(db_path: str) -> dict:
def _query_database(db_path: str) -> dict[str, Any]:
"""Open a database read-only and return all tables with their rows."""
result = {"tables": {}, "error": None}
result: dict[str, Any] = {"tables": {}, "error": None}
try:
with closing(sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)) as conn:
conn.row_factory = sqlite3.Row

View File

@@ -137,7 +137,7 @@ class HermesMonitor:
message=f"Check error: {r}",
)
)
else:
elif isinstance(r, CheckResult):
checks.append(r)
# Compute overall level

View File

@@ -1,11 +1,5 @@
"""Infrastructure models package."""
from infrastructure.models.budget import (
BudgetTracker,
SpendRecord,
estimate_cost_usd,
get_budget_tracker,
)
from infrastructure.models.multimodal import (
ModelCapability,
ModelInfo,
@@ -23,12 +17,6 @@ from infrastructure.models.registry import (
ModelRole,
model_registry,
)
from infrastructure.models.router import (
TierLabel,
TieredModelRouter,
classify_tier,
get_tiered_router,
)
__all__ = [
# Registry
@@ -46,14 +34,4 @@ __all__ = [
"model_supports_tools",
"model_supports_vision",
"pull_model_with_fallback",
# Tiered router
"TierLabel",
"TieredModelRouter",
"classify_tier",
"get_tiered_router",
# Budget tracker
"BudgetTracker",
"SpendRecord",
"estimate_cost_usd",
"get_budget_tracker",
]

View File

@@ -1,302 +0,0 @@
"""Cloud API budget tracker for the three-tier model router.
Tracks cloud API spend (daily / monthly) and enforces configurable limits.
SQLite-backed with in-memory fallback — degrades gracefully if the database
is unavailable.
References:
- Issue #882 — Model Tiering Router: Local 8B / Hermes 70B / Cloud API Cascade
"""
import logging
import sqlite3
import threading
import time
from dataclasses import dataclass
from datetime import UTC, date, datetime
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
# ── Cost estimates (USD per 1 K tokens, input / output) ──────────────────────
# Updated 2026-03. Estimates only — actual costs vary by tier/usage.
_COST_PER_1K: dict[str, dict[str, float]] = {
# Claude models
"claude-haiku-4-5": {"input": 0.00025, "output": 0.00125},
"claude-sonnet-4-5": {"input": 0.003, "output": 0.015},
"claude-opus-4-5": {"input": 0.015, "output": 0.075},
"haiku": {"input": 0.00025, "output": 0.00125},
"sonnet": {"input": 0.003, "output": 0.015},
"opus": {"input": 0.015, "output": 0.075},
# GPT-4o
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"gpt-4o": {"input": 0.0025, "output": 0.01},
# Grok (xAI)
"grok-3-fast": {"input": 0.003, "output": 0.015},
"grok-3": {"input": 0.005, "output": 0.025},
}
_DEFAULT_COST: dict[str, float] = {"input": 0.003, "output": 0.015} # conservative fallback
def estimate_cost_usd(model: str, tokens_in: int, tokens_out: int) -> float:
"""Estimate the cost of a single request in USD.
Matches the model name by substring so versioned names like
``claude-haiku-4-5-20251001`` still resolve correctly.
Args:
model: Model name as passed to the provider.
tokens_in: Number of input (prompt) tokens consumed.
tokens_out: Number of output (completion) tokens generated.
Returns:
Estimated cost in USD (may be zero for unknown models).
"""
model_lower = model.lower()
rates = _DEFAULT_COST
for key, rate in _COST_PER_1K.items():
if key in model_lower:
rates = rate
break
return (tokens_in * rates["input"] + tokens_out * rates["output"]) / 1000.0
@dataclass
class SpendRecord:
"""A single spend event."""
ts: float
provider: str
model: str
tokens_in: int
tokens_out: int
cost_usd: float
tier: str
class BudgetTracker:
"""Tracks cloud API spend with configurable daily / monthly limits.
Persists spend records to SQLite (``data/budget.db`` by default).
Falls back to in-memory tracking when the database is unavailable —
budget enforcement still works; records are lost on restart.
Limits are read from ``settings``:
* ``tier_cloud_daily_budget_usd`` — daily ceiling (0 = disabled)
* ``tier_cloud_monthly_budget_usd`` — monthly ceiling (0 = disabled)
Usage::
tracker = BudgetTracker()
if tracker.cloud_allowed():
# … make cloud API call …
tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
summary = tracker.get_summary()
print(summary["daily_usd"], "/", summary["daily_limit_usd"])
"""
_DB_PATH = "data/budget.db"
def __init__(self, db_path: str | None = None) -> None:
"""Initialise the tracker.
Args:
db_path: Path to the SQLite database. Defaults to
``data/budget.db``. Pass ``":memory:"`` for tests.
"""
self._db_path = db_path or self._DB_PATH
self._lock = threading.Lock()
self._in_memory: list[SpendRecord] = []
self._db_ok = False
self._init_db()
# ── Database initialisation ──────────────────────────────────────────────
def _init_db(self) -> None:
"""Create the spend table (and parent directory) if needed."""
try:
if self._db_path != ":memory:":
Path(self._db_path).parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS cloud_spend (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts REAL NOT NULL,
provider TEXT NOT NULL,
model TEXT NOT NULL,
tokens_in INTEGER NOT NULL DEFAULT 0,
tokens_out INTEGER NOT NULL DEFAULT 0,
cost_usd REAL NOT NULL DEFAULT 0.0,
tier TEXT NOT NULL DEFAULT 'cloud'
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_spend_ts ON cloud_spend(ts)"
)
self._db_ok = True
logger.debug("BudgetTracker: SQLite initialised at %s", self._db_path)
except Exception as exc:
logger.warning(
"BudgetTracker: SQLite unavailable, using in-memory fallback: %s", exc
)
def _connect(self) -> sqlite3.Connection:
return sqlite3.connect(self._db_path, timeout=5)
# ── Public API ───────────────────────────────────────────────────────────
def record_spend(
self,
provider: str,
model: str,
tokens_in: int = 0,
tokens_out: int = 0,
cost_usd: float | None = None,
tier: str = "cloud",
) -> float:
"""Record a cloud API spend event and return the cost recorded.
Args:
provider: Provider name (e.g. ``"anthropic"``, ``"openai"``).
model: Model name used for the request.
tokens_in: Input token count (prompt).
tokens_out: Output token count (completion).
cost_usd: Explicit cost override. If ``None``, the cost is
estimated from the token counts and model rates.
tier: Tier label for the request (default ``"cloud"``).
Returns:
The cost recorded in USD.
"""
if cost_usd is None:
cost_usd = estimate_cost_usd(model, tokens_in, tokens_out)
ts = time.time()
record = SpendRecord(ts, provider, model, tokens_in, tokens_out, cost_usd, tier)
with self._lock:
if self._db_ok:
try:
with self._connect() as conn:
conn.execute(
"""
INSERT INTO cloud_spend
(ts, provider, model, tokens_in, tokens_out, cost_usd, tier)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(ts, provider, model, tokens_in, tokens_out, cost_usd, tier),
)
logger.debug(
"BudgetTracker: recorded %.6f USD (%s/%s, in=%d out=%d tier=%s)",
cost_usd,
provider,
model,
tokens_in,
tokens_out,
tier,
)
return cost_usd
except Exception as exc:
logger.warning("BudgetTracker: DB write failed, falling back: %s", exc)
self._in_memory.append(record)
return cost_usd
def get_daily_spend(self) -> float:
"""Return total cloud spend for the current UTC day in USD."""
today = date.today()
since = datetime(today.year, today.month, today.day, tzinfo=UTC).timestamp()
return self._query_spend(since)
def get_monthly_spend(self) -> float:
"""Return total cloud spend for the current UTC month in USD."""
today = date.today()
since = datetime(today.year, today.month, 1, tzinfo=UTC).timestamp()
return self._query_spend(since)
def cloud_allowed(self) -> bool:
"""Return ``True`` if cloud API spend is within configured limits.
Checks both daily and monthly ceilings. A limit of ``0`` disables
that particular check.
"""
daily_limit = settings.tier_cloud_daily_budget_usd
monthly_limit = settings.tier_cloud_monthly_budget_usd
if daily_limit > 0:
daily_spend = self.get_daily_spend()
if daily_spend >= daily_limit:
logger.warning(
"BudgetTracker: daily cloud budget exhausted (%.4f / %.4f USD)",
daily_spend,
daily_limit,
)
return False
if monthly_limit > 0:
monthly_spend = self.get_monthly_spend()
if monthly_spend >= monthly_limit:
logger.warning(
"BudgetTracker: monthly cloud budget exhausted (%.4f / %.4f USD)",
monthly_spend,
monthly_limit,
)
return False
return True
def get_summary(self) -> dict:
"""Return a spend summary dict suitable for dashboards / logging.
Keys: ``daily_usd``, ``monthly_usd``, ``daily_limit_usd``,
``monthly_limit_usd``, ``daily_ok``, ``monthly_ok``.
"""
daily = self.get_daily_spend()
monthly = self.get_monthly_spend()
daily_limit = settings.tier_cloud_daily_budget_usd
monthly_limit = settings.tier_cloud_monthly_budget_usd
return {
"daily_usd": round(daily, 6),
"monthly_usd": round(monthly, 6),
"daily_limit_usd": daily_limit,
"monthly_limit_usd": monthly_limit,
"daily_ok": daily_limit <= 0 or daily < daily_limit,
"monthly_ok": monthly_limit <= 0 or monthly < monthly_limit,
}
# ── Internal helpers ─────────────────────────────────────────────────────
def _query_spend(self, since_ts: float) -> float:
"""Sum ``cost_usd`` for records with ``ts >= since_ts``."""
if self._db_ok:
try:
with self._connect() as conn:
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd), 0.0) FROM cloud_spend WHERE ts >= ?",
(since_ts,),
).fetchone()
return float(row[0]) if row else 0.0
except Exception as exc:
logger.warning("BudgetTracker: DB read failed: %s", exc)
# In-memory fallback
return sum(r.cost_usd for r in self._in_memory if r.ts >= since_ts)
# ── Module-level singleton ────────────────────────────────────────────────────
_budget_tracker: BudgetTracker | None = None
def get_budget_tracker() -> BudgetTracker:
"""Get or create the module-level BudgetTracker singleton."""
global _budget_tracker
if _budget_tracker is None:
_budget_tracker = BudgetTracker()
return _budget_tracker

View File

@@ -1,427 +0,0 @@
"""Three-tier model router — Local 8B / Local 70B / Cloud API Cascade.
Selects the cheapest-sufficient LLM for each request using a heuristic
task-complexity classifier. Tier 3 (Cloud API) is only used when Tier 2
fails or the budget guard allows it.
Tiers
-----
Tier 1 — LOCAL_FAST (Llama 3.1 8B / Hermes 3 8B via Ollama, free, ~0.3-1 s)
Navigation, basic interactions, simple decisions.
Tier 2 — LOCAL_HEAVY (Hermes 3/4 70B via Ollama, free, ~5-10 s for 200 tok)
Quest planning, dialogue strategy, complex reasoning.
Tier 3 — CLOUD_API (Claude / GPT-4o, paid ~$5-15/hr heavy use)
Recovery from Tier 2 failures, novel situations, multi-step planning.
Routing logic
-------------
1. Classify the task using keyword / length / context heuristics (no LLM call).
2. Route to the appropriate tier.
3. On Tier-1 low-quality response → auto-escalate to Tier 2.
4. On Tier-2 failure or explicit ``require_cloud=True`` → Tier 3 (if budget allows).
5. Log tier used, model, latency, estimated cost for every request.
References:
- Issue #882 — Model Tiering Router: Local 8B / Hermes 70B / Cloud API Cascade
"""
import asyncio
import logging
import re
import time
from enum import StrEnum
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Tier definitions ──────────────────────────────────────────────────────────
class TierLabel(StrEnum):
"""Three cost-sorted model tiers."""
LOCAL_FAST = "local_fast" # 8B local, always hot, free
LOCAL_HEAVY = "local_heavy" # 70B local, free but slower
CLOUD_API = "cloud_api" # Paid cloud backend (Claude / GPT-4o)
# ── Default model assignments (overridable via Settings) ──────────────────────
_DEFAULT_TIER_MODELS: dict[TierLabel, str] = {
TierLabel.LOCAL_FAST: "llama3.1:8b",
TierLabel.LOCAL_HEAVY: "hermes3:70b",
TierLabel.CLOUD_API: "claude-haiku-4-5",
}
# ── Classification vocabulary ─────────────────────────────────────────────────
# Patterns that indicate a Tier-1 (simple) task
_T1_WORDS: frozenset[str] = frozenset(
{
"go", "move", "walk", "run",
"north", "south", "east", "west", "up", "down", "left", "right",
"yes", "no", "ok", "okay",
"open", "close", "take", "drop", "look",
"pick", "use", "wait", "rest", "save",
"attack", "flee", "jump", "crouch",
"status", "ping", "list", "show", "get", "check",
}
)
# Patterns that indicate a Tier-2 or Tier-3 task
_T2_PHRASES: tuple[str, ...] = (
"plan", "strategy", "optimize", "optimise",
"quest", "stuck", "recover",
"negotiate", "persuade", "faction", "reputation",
"analyze", "analyse", "evaluate", "decide",
"complex", "multi-step", "long-term",
"how do i", "what should i do", "help me figure",
"what is the best", "recommend", "best way",
"explain", "describe in detail", "walk me through",
"compare", "design", "implement", "refactor",
"debug", "diagnose", "root cause",
)
# Low-quality response detection patterns
_LOW_QUALITY_PATTERNS: tuple[re.Pattern, ...] = (
re.compile(r"i\s+don'?t\s+know", re.IGNORECASE),
re.compile(r"i'm\s+not\s+sure", re.IGNORECASE),
re.compile(r"i\s+cannot\s+(help|assist|answer)", re.IGNORECASE),
re.compile(r"i\s+apologize", re.IGNORECASE),
re.compile(r"as an ai", re.IGNORECASE),
re.compile(r"i\s+don'?t\s+have\s+(enough|sufficient)\s+information", re.IGNORECASE),
)
# Response is definitely low-quality if shorter than this many characters
_LOW_QUALITY_MIN_CHARS = 20
# Response is suspicious if shorter than this many chars for a complex task
_ESCALATION_MIN_CHARS = 60
def classify_tier(task: str, context: dict | None = None) -> TierLabel:
"""Classify a task to the cheapest-sufficient model tier.
Classification priority (highest wins):
1. ``context["require_cloud"] = True`` → CLOUD_API
2. Any Tier-2 phrase or stuck/recovery signal → LOCAL_HEAVY
3. Short task with only Tier-1 words, no active context → LOCAL_FAST
4. Default → LOCAL_HEAVY (safe fallback for unknown tasks)
Args:
task: Natural-language task or user input.
context: Optional context dict. Recognised keys:
``require_cloud`` (bool), ``stuck`` (bool),
``require_t2`` (bool), ``active_quests`` (list),
``dialogue_active`` (bool), ``combat_active`` (bool).
Returns:
The cheapest ``TierLabel`` sufficient for the task.
"""
ctx = context or {}
task_lower = task.lower()
words = set(task_lower.split())
# ── Explicit cloud override ──────────────────────────────────────────────
if ctx.get("require_cloud"):
logger.debug("classify_tier → CLOUD_API (explicit require_cloud)")
return TierLabel.CLOUD_API
# ── Tier-2 / complexity signals ──────────────────────────────────────────
t2_phrase_hit = any(phrase in task_lower for phrase in _T2_PHRASES)
t2_word_hit = bool(words & {"plan", "strategy", "optimize", "optimise", "quest",
"stuck", "recover", "analyze", "analyse", "evaluate"})
is_stuck = bool(ctx.get("stuck"))
require_t2 = bool(ctx.get("require_t2"))
long_input = len(task) > 300 # long tasks warrant more capable model
deep_context = (
len(ctx.get("active_quests", [])) >= 3
or ctx.get("dialogue_active")
)
if t2_phrase_hit or t2_word_hit or is_stuck or require_t2 or long_input or deep_context:
logger.debug(
"classify_tier → LOCAL_HEAVY (phrase=%s word=%s stuck=%s explicit=%s long=%s ctx=%s)",
t2_phrase_hit, t2_word_hit, is_stuck, require_t2, long_input, deep_context,
)
return TierLabel.LOCAL_HEAVY
# ── Tier-1 signals ───────────────────────────────────────────────────────
t1_word_hit = bool(words & _T1_WORDS)
task_short = len(task.split()) <= 8
no_active_context = (
not ctx.get("active_quests")
and not ctx.get("dialogue_active")
and not ctx.get("combat_active")
)
if t1_word_hit and task_short and no_active_context:
logger.debug(
"classify_tier → LOCAL_FAST (words=%s short=%s)", t1_word_hit, task_short
)
return TierLabel.LOCAL_FAST
# ── Default: LOCAL_HEAVY (safe for anything unclassified) ────────────────
logger.debug("classify_tier → LOCAL_HEAVY (default)")
return TierLabel.LOCAL_HEAVY
def _is_low_quality(content: str, tier: TierLabel) -> bool:
"""Return True if the response looks like it should be escalated.
Used for automatic Tier-1 → Tier-2 escalation.
Args:
content: LLM response text.
tier: The tier that produced the response.
Returns:
True if the response is likely too low-quality to be useful.
"""
if not content or not content.strip():
return True
stripped = content.strip()
# Too short to be useful
if len(stripped) < _LOW_QUALITY_MIN_CHARS:
return True
# Insufficient for a supposedly complex-enough task
if tier == TierLabel.LOCAL_FAST and len(stripped) < _ESCALATION_MIN_CHARS:
return True
# Matches known "I can't help" patterns
for pattern in _LOW_QUALITY_PATTERNS:
if pattern.search(stripped):
return True
return False
class TieredModelRouter:
"""Routes LLM requests across the Local 8B / Local 70B / Cloud API tiers.
Wraps CascadeRouter with:
- Heuristic tier classification via ``classify_tier()``
- Automatic Tier-1 → Tier-2 escalation on low-quality responses
- Cloud-tier budget guard via ``BudgetTracker``
- Per-request logging: tier, model, latency, estimated cost
Usage::
router = TieredModelRouter()
result = await router.route(
task="Walk to the next room",
context={},
)
print(result["content"], result["tier"]) # "Move north.", "local_fast"
# Force heavy tier
result = await router.route(
task="Plan the optimal path to become Hortator",
context={"require_t2": True},
)
"""
def __init__(
self,
cascade: Any | None = None,
budget_tracker: Any | None = None,
tier_models: dict[TierLabel, str] | None = None,
auto_escalate: bool = True,
) -> None:
"""Initialise the tiered router.
Args:
cascade: CascadeRouter instance. If ``None``, the
singleton from ``get_router()`` is used lazily.
budget_tracker: BudgetTracker instance. If ``None``, the
singleton from ``get_budget_tracker()`` is used.
tier_models: Override default model names per tier.
auto_escalate: When ``True``, low-quality Tier-1 responses
automatically retry on Tier-2.
"""
self._cascade = cascade
self._budget = budget_tracker
self._tier_models: dict[TierLabel, str] = dict(_DEFAULT_TIER_MODELS)
self._auto_escalate = auto_escalate
# Apply settings-level overrides (can still be overridden per-instance)
if settings.tier_local_fast_model:
self._tier_models[TierLabel.LOCAL_FAST] = settings.tier_local_fast_model
if settings.tier_local_heavy_model:
self._tier_models[TierLabel.LOCAL_HEAVY] = settings.tier_local_heavy_model
if settings.tier_cloud_model:
self._tier_models[TierLabel.CLOUD_API] = settings.tier_cloud_model
if tier_models:
self._tier_models.update(tier_models)
# ── Lazy singletons ──────────────────────────────────────────────────────
def _get_cascade(self) -> Any:
if self._cascade is None:
from infrastructure.router.cascade import get_router
self._cascade = get_router()
return self._cascade
def _get_budget(self) -> Any:
if self._budget is None:
from infrastructure.models.budget import get_budget_tracker
self._budget = get_budget_tracker()
return self._budget
# ── Public interface ─────────────────────────────────────────────────────
def classify(self, task: str, context: dict | None = None) -> TierLabel:
"""Classify a task without routing. Useful for telemetry."""
return classify_tier(task, context)
async def route(
self,
task: str,
context: dict | None = None,
messages: list[dict] | None = None,
temperature: float = 0.3,
max_tokens: int | None = None,
) -> dict:
"""Route a task to the appropriate model tier.
Builds a minimal messages list if ``messages`` is not provided.
The result always includes a ``tier`` key indicating which tier
ultimately handled the request.
Args:
task: Natural-language task description.
context: Task context dict (see ``classify_tier()``).
messages: Pre-built OpenAI-compatible messages list. If
provided, ``task`` is only used for classification.
temperature: Sampling temperature (default 0.3).
max_tokens: Maximum tokens to generate.
Returns:
Dict with at minimum: ``content``, ``provider``, ``model``,
``tier``, ``latency_ms``. May include ``cost_usd`` when a
cloud request is recorded.
Raises:
RuntimeError: If all available tiers are exhausted.
"""
ctx = context or {}
tier = self.classify(task, ctx)
msgs = messages or [{"role": "user", "content": task}]
# ── Tier 1 attempt ───────────────────────────────────────────────────
if tier == TierLabel.LOCAL_FAST:
result = await self._complete_tier(
TierLabel.LOCAL_FAST, msgs, temperature, max_tokens
)
if self._auto_escalate and _is_low_quality(result.get("content", ""), TierLabel.LOCAL_FAST):
logger.info(
"TieredModelRouter: Tier-1 response low quality, escalating to Tier-2 "
"(task=%r content_len=%d)",
task[:80],
len(result.get("content", "")),
)
tier = TierLabel.LOCAL_HEAVY
result = await self._complete_tier(
TierLabel.LOCAL_HEAVY, msgs, temperature, max_tokens
)
return result
# ── Tier 2 attempt ───────────────────────────────────────────────────
if tier == TierLabel.LOCAL_HEAVY:
try:
return await self._complete_tier(
TierLabel.LOCAL_HEAVY, msgs, temperature, max_tokens
)
except Exception as exc:
logger.warning(
"TieredModelRouter: Tier-2 failed (%s) — escalating to cloud", exc
)
tier = TierLabel.CLOUD_API
# ── Tier 3 (Cloud) ───────────────────────────────────────────────────
budget = self._get_budget()
if not budget.cloud_allowed():
raise RuntimeError(
"Cloud API tier requested but budget limit reached — "
"increase tier_cloud_daily_budget_usd or tier_cloud_monthly_budget_usd"
)
result = await self._complete_tier(
TierLabel.CLOUD_API, msgs, temperature, max_tokens
)
# Record cloud spend if token info is available
usage = result.get("usage", {})
if usage:
cost = budget.record_spend(
provider=result.get("provider", "unknown"),
model=result.get("model", self._tier_models[TierLabel.CLOUD_API]),
tokens_in=usage.get("prompt_tokens", 0),
tokens_out=usage.get("completion_tokens", 0),
tier=TierLabel.CLOUD_API,
)
result["cost_usd"] = cost
return result
# ── Internal helpers ─────────────────────────────────────────────────────
async def _complete_tier(
self,
tier: TierLabel,
messages: list[dict],
temperature: float,
max_tokens: int | None,
) -> dict:
"""Dispatch a single inference request for the given tier."""
model = self._tier_models[tier]
cascade = self._get_cascade()
start = time.monotonic()
logger.info(
"TieredModelRouter: tier=%s model=%s messages=%d",
tier,
model,
len(messages),
)
result = await cascade.complete(
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
)
elapsed_ms = (time.monotonic() - start) * 1000
result["tier"] = tier
result.setdefault("latency_ms", elapsed_ms)
logger.info(
"TieredModelRouter: done tier=%s model=%s latency_ms=%.0f",
tier,
result.get("model", model),
elapsed_ms,
)
return result
# ── Module-level singleton ────────────────────────────────────────────────────
_tiered_router: TieredModelRouter | None = None
def get_tiered_router() -> TieredModelRouter:
"""Get or create the module-level TieredModelRouter singleton."""
global _tiered_router
if _tiered_router is None:
_tiered_router = TieredModelRouter()
return _tiered_router

View File

@@ -203,7 +203,7 @@ async def reload_config(
@router.get("/history")
async def get_history(
hours: int = 24,
store: Annotated[HealthHistoryStore, Depends(get_history_store)] = None,
store: Annotated[HealthHistoryStore | None, Depends(get_history_store)] = None,
) -> list[dict[str, Any]]:
"""Get provider health history for the last N hours."""
if store is None:

View File

@@ -744,19 +744,20 @@ class CascadeRouter:
self,
provider: Provider,
messages: list[dict],
model: str,
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType = ContentType.TEXT,
) -> dict:
"""Try a single provider request."""
start_time = time.time()
effective_model: str = model or provider.get_default_model() or ""
if provider.type == "ollama":
result = await self._call_ollama(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
model=effective_model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
@@ -765,7 +766,7 @@ class CascadeRouter:
result = await self._call_openai(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
model=effective_model,
temperature=temperature,
max_tokens=max_tokens,
)
@@ -773,7 +774,7 @@ class CascadeRouter:
result = await self._call_anthropic(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
model=effective_model,
temperature=temperature,
max_tokens=max_tokens,
)
@@ -781,7 +782,7 @@ class CascadeRouter:
result = await self._call_grok(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
model=effective_model,
temperature=temperature,
max_tokens=max_tokens,
)
@@ -789,7 +790,7 @@ class CascadeRouter:
result = await self._call_vllm_mlx(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
model=effective_model,
temperature=temperature,
max_tokens=max_tokens,
)

View File

@@ -1,149 +0,0 @@
"""Three.js world adapter — bridges Kimi's AI World Builder to WorldInterface.
Studied from Kimisworld.zip (issue #870). Kimi's world is a React +
Three.js app ("AI World Builder v1.0") that exposes a JSON state API and
accepts ``addObject`` / ``updateObject`` / ``removeObject`` commands.
This adapter is a stub: ``connect()`` and the core methods outline the
HTTP / WebSocket wiring that would be needed to talk to a running instance.
The ``observe()`` response maps Kimi's ``WorldObject`` schema to
``PerceptionOutput`` entities so that any WorldInterface consumer can
treat the Three.js canvas like any other game world.
Usage::
registry.register("threejs", ThreeJSWorldAdapter)
adapter = registry.get("threejs", base_url="http://localhost:5173")
adapter.connect()
perception = adapter.observe()
adapter.act(CommandInput(action="add_object", parameters={"geometry": "sphere", ...}))
adapter.speak("Hello from Timmy", target="broadcast")
"""
from __future__ import annotations
import logging
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import ActionResult, ActionStatus, CommandInput, PerceptionOutput
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Kimi's WorldObject geometry / material vocabulary (from WorldObjects.tsx)
# ---------------------------------------------------------------------------
_VALID_GEOMETRIES = {"box", "sphere", "cylinder", "torus", "cone", "dodecahedron"}
_VALID_MATERIALS = {"standard", "wireframe", "glass", "glow"}
_VALID_TYPES = {"mesh", "light", "particle", "custom"}
def _object_to_entity_description(obj: dict) -> str:
"""Render a Kimi WorldObject dict as a human-readable entity string.
Example output: ``sphere/glow #ff006e at (2.1, 3.0, -1.5)``
"""
geometry = obj.get("geometry", "unknown")
material = obj.get("material", "unknown")
color = obj.get("color", "#ffffff")
pos = obj.get("position", [0, 0, 0])
obj_type = obj.get("type", "mesh")
pos_str = "({:.1f}, {:.1f}, {:.1f})".format(*pos)
return f"{obj_type}/{geometry}/{material} {color} at {pos_str}"
class ThreeJSWorldAdapter(WorldInterface):
"""Adapter for Kimi's Three.js AI World Builder.
Connects to a running Three.js world that exposes:
- ``GET /api/world/state`` — returns current WorldObject list
- ``POST /api/world/execute`` — accepts addObject / updateObject code
- WebSocket ``/ws/world`` — streams state change events
All core methods raise ``NotImplementedError`` until HTTP wiring is
added. Implement ``connect()`` first — it should verify that the
Three.js app is running and optionally open a WebSocket for live events.
Key insight from studying Kimi's world (issue #870):
- Objects carry a geometry, material, color, position, rotation, scale,
and an optional *animation* string executed via ``new Function()``
each animation frame.
- The AI agent (``AIAgent.tsx``) moves through the world with lerp()
targeting, cycles through moods, and pulses its core during "thinking"
states — a model for how Timmy could manifest presence in a 3D world.
- World complexity is tracked as a simple counter (one unit per object)
which the AI uses to decide whether to create, modify, or upgrade.
"""
def __init__(self, *, base_url: str = "http://localhost:5173") -> None:
self._base_url = base_url.rstrip("/")
self._connected = False
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
raise NotImplementedError(
"ThreeJSWorldAdapter.connect() — verify Three.js app is running at "
f"{self._base_url} and optionally open a WebSocket to /ws/world"
)
def disconnect(self) -> None:
self._connected = False
logger.info("ThreeJSWorldAdapter disconnected")
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract (stubs) ---------------------------------------------
def observe(self) -> PerceptionOutput:
"""Return current Three.js world state as structured perception.
Expected HTTP call::
GET {base_url}/api/world/state
{"objects": [...WorldObject], "worldComplexity": int, ...}
Each WorldObject becomes an entity description string.
"""
raise NotImplementedError(
"ThreeJSWorldAdapter.observe() — GET /api/world/state, "
"map each WorldObject via _object_to_entity_description()"
)
def act(self, command: CommandInput) -> ActionResult:
"""Dispatch a command to the Three.js world.
Supported actions (mirrors Kimi's CodeExecutor API):
- ``add_object`` — parameters: WorldObject fields (geometry, material, …)
- ``update_object`` — parameters: id + partial WorldObject fields
- ``remove_object`` — parameters: id
- ``clear_world`` — parameters: (none)
Expected HTTP call::
POST {base_url}/api/world/execute
Content-Type: application/json
{"action": "add_object", "parameters": {...}}
"""
raise NotImplementedError(
f"ThreeJSWorldAdapter.act({command.action!r}) — "
"POST /api/world/execute with serialised CommandInput"
)
def speak(self, message: str, target: str | None = None) -> None:
"""Inject a text message into the Three.js world.
Kimi's world does not have a native chat layer, so the recommended
implementation is to create a short-lived ``Text`` entity at a
visible position (or broadcast via the world WebSocket).
Expected WebSocket frame::
{"type": "timmy_speech", "text": message, "target": target}
"""
raise NotImplementedError(
"ThreeJSWorldAdapter.speak() — send timmy_speech frame over "
"/ws/world WebSocket, or POST a temporary Text entity"
)

View File

@@ -1 +0,0 @@
"""Vendor-specific chat platform adapters (e.g. Discord) for the chat bridge."""

View File

@@ -474,7 +474,7 @@ class DiscordVendor(ChatPlatform):
async def _run_client(self, token: str) -> None:
"""Run the discord.py client (blocking call in a task)."""
try:
await self._client.start(token)
await self._client.start(token) # type: ignore[union-attr]
except Exception as exc:
logger.error("Discord client error: %s", exc)
self._state = PlatformState.ERROR
@@ -482,32 +482,32 @@ class DiscordVendor(ChatPlatform):
def _register_handlers(self) -> None:
"""Register Discord event handlers on the client."""
@self._client.event
@self._client.event # type: ignore[union-attr]
async def on_ready():
self._guild_count = len(self._client.guilds)
self._guild_count = len(self._client.guilds) # type: ignore[union-attr]
self._state = PlatformState.CONNECTED
logger.info(
"Discord ready: %s in %d guild(s)",
self._client.user,
self._client.user, # type: ignore[union-attr]
self._guild_count,
)
@self._client.event
@self._client.event # type: ignore[union-attr]
async def on_message(message):
# Ignore our own messages
if message.author == self._client.user:
if message.author == self._client.user: # type: ignore[union-attr]
return
# Only respond to mentions or DMs
is_dm = not hasattr(message.channel, "guild") or message.channel.guild is None
is_mention = self._client.user in message.mentions
is_mention = self._client.user in message.mentions # type: ignore[union-attr]
if not is_dm and not is_mention:
return
await self._handle_message(message)
@self._client.event
@self._client.event # type: ignore[union-attr]
async def on_disconnect():
if self._state != PlatformState.DISCONNECTED:
self._state = PlatformState.CONNECTING
@@ -535,8 +535,8 @@ class DiscordVendor(ChatPlatform):
def _extract_content(self, message) -> str:
"""Strip the bot mention and return clean message text."""
content = message.content
if self._client.user:
content = content.replace(f"<@{self._client.user.id}>", "").strip()
if self._client.user: # type: ignore[union-attr]
content = content.replace(f"<@{self._client.user.id}>", "").strip() # type: ignore[union-attr]
return content
async def _invoke_agent(self, content: str, session_id: str, target):

View File

@@ -102,14 +102,14 @@ class TelegramBot:
self._token = tok
self._app = Application.builder().token(tok).build()
self._app.add_handler(CommandHandler("start", self._cmd_start))
self._app.add_handler(
self._app.add_handler(CommandHandler("start", self._cmd_start)) # type: ignore[union-attr]
self._app.add_handler( # type: ignore[union-attr]
MessageHandler(filters.TEXT & ~filters.COMMAND, self._handle_message)
)
await self._app.initialize()
await self._app.start()
await self._app.updater.start_polling(allowed_updates=Update.ALL_TYPES)
await self._app.initialize() # type: ignore[union-attr]
await self._app.start() # type: ignore[union-attr]
await self._app.updater.start_polling(allowed_updates=Update.ALL_TYPES) # type: ignore[union-attr]
self._running = True
logger.info("Telegram bot started.")

View File

@@ -301,26 +301,6 @@ def create_timmy(
return GrokBackend()
if resolved == "airllm":
# AirLLM requires Apple Silicon. On any other platform (Intel Mac, Linux,
# Windows) or when the package is not installed, degrade silently to Ollama.
from timmy.backends import is_apple_silicon
if not is_apple_silicon():
logger.warning(
"TIMMY_MODEL_BACKEND=airllm requested but not running on Apple Silicon "
"— falling back to Ollama"
)
else:
try:
import airllm # noqa: F401
except ImportError:
logger.warning(
"AirLLM not installed — falling back to Ollama. "
"Install with: pip install 'airllm[mlx]'"
)
# Fall through to Ollama in all cases (AirLLM integration is scaffolded)
# Default: Ollama via Agno.
model_name, is_fallback = _resolve_model_with_fallback(
requested_model=None,

View File

@@ -1,4 +1,3 @@
"""Typer CLI entry point for the ``timmy`` command (chat, think, status)."""
import asyncio
import logging
import subprocess

View File

@@ -1,4 +1,3 @@
"""OpenCV template-matching cache for sovereignty perception (screen-state recognition)."""
from __future__ import annotations
import json

1383
src/timmy/thinking.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,142 +0,0 @@
"""Timmy's thinking engine — public façade.
When the server starts, Timmy begins pondering: reflecting on his existence,
recent swarm activity, scripture, creative ideas, or pure stream of
consciousness. Each thought builds on the previous one, maintaining a
continuous chain of introspection.
Usage::
from timmy.thinking import thinking_engine
# Run one thinking cycle (called by the background loop)
await thinking_engine.think_once()
# Query the thought stream
thoughts = thinking_engine.get_recent_thoughts(limit=10)
chain = thinking_engine.get_thought_chain(thought_id)
"""
import logging
import sqlite3
from datetime import datetime
from pathlib import Path
from timmy.thinking._db import Thought, _get_conn
from timmy.thinking.engine import ThinkingEngine
from timmy.thinking.seeds import (
SEED_TYPES,
_SENSITIVE_PATTERNS,
_META_OBSERVATION_PHRASES,
_THINK_TAG_RE,
_THINKING_PROMPT,
)
# Re-export HOT_MEMORY_PATH and SOUL_PATH so existing patch targets continue to work.
# Tests that patch "timmy.thinking.HOT_MEMORY_PATH" or "timmy.thinking.SOUL_PATH"
# should instead patch "timmy.thinking._snapshot.HOT_MEMORY_PATH" etc., but these
# re-exports are kept for any code that reads them from the top-level namespace.
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH # noqa: F401
logger = logging.getLogger(__name__)
# Module-level singleton
thinking_engine = ThinkingEngine()
__all__ = [
"ThinkingEngine",
"Thought",
"SEED_TYPES",
"thinking_engine",
"search_thoughts",
"_THINKING_PROMPT",
"_SENSITIVE_PATTERNS",
"_META_OBSERVATION_PHRASES",
"_THINK_TAG_RE",
"HOT_MEMORY_PATH",
"SOUL_PATH",
]
# ── Search helpers ─────────────────────────────────────────────────────────
def _query_thoughts(
db_path: Path, query: str, seed_type: str | None, limit: int
) -> list[sqlite3.Row]:
"""Run the thought-search SQL and return matching rows."""
pattern = f"%{query}%"
with _get_conn(db_path) as conn:
if seed_type:
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, seed_type, limit),
).fetchall()
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, limit),
).fetchall()
def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str:
"""Format thought rows into a human-readable string."""
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str:
"""Search Timmy's thought history for reflections matching a query.
Use this tool when Timmy needs to recall his previous thoughts on a topic,
reflect on past insights, or build upon earlier reflections. This enables
self-awareness and continuity of thinking across time.
Args:
query: Search term to match against thought content (case-insensitive).
seed_type: Optional filter by thought category (e.g., 'existential',
'swarm', 'sovereignty', 'creative', 'memory', 'observation').
limit: Maximum number of thoughts to return (default 10, max 50).
Returns:
Formatted string with matching thoughts, newest first, including
timestamps and seed types. Returns a helpful message if no matches found.
"""
limit = max(1, min(limit, 50))
try:
rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit)
if not rows:
if seed_type:
return f'No thoughts found matching "{query}" with seed_type="{seed_type}".'
return f'No thoughts found matching "{query}".'
return _format_thought_rows(rows, query, seed_type)
except Exception as exc:
logger.warning("Thought search failed: %s", exc)
return f"Error searching thoughts: {exc}"

View File

@@ -1,50 +0,0 @@
"""Database models and access layer for the thinking engine."""
import sqlite3
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass
from pathlib import Path
_DEFAULT_DB = Path("data/thoughts.db")
@dataclass
class Thought:
"""A single thought in Timmy's inner stream."""
id: str
content: str
seed_type: str
parent_id: str | None
created_at: str
@contextmanager
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
"""Get a SQLite connection with the thoughts table created."""
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS thoughts (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
seed_type TEXT NOT NULL,
parent_id TEXT,
created_at TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_thoughts_time ON thoughts(created_at)")
conn.commit()
yield conn
def _row_to_thought(row: sqlite3.Row) -> Thought:
return Thought(
id=row["id"],
content=row["content"],
seed_type=row["seed_type"],
parent_id=row["parent_id"],
created_at=row["created_at"],
)

View File

@@ -1,215 +0,0 @@
"""Distillation mixin — extracts lasting facts from recent thoughts and monitors memory."""
import logging
from pathlib import Path
from config import settings
from timmy.thinking.seeds import _META_OBSERVATION_PHRASES, _SENSITIVE_PATTERNS
logger = logging.getLogger(__name__)
class _DistillationMixin:
"""Mixin providing fact-distillation and memory-monitoring behaviour.
Expects the host class to provide:
- self.count_thoughts() -> int
- self.get_recent_thoughts(limit) -> list[Thought]
- self._call_agent(prompt) -> str (async)
"""
def _should_distill(self) -> bool:
"""Check if distillation should run based on interval and thought count."""
interval = settings.thinking_distill_every
if interval <= 0:
return False
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return False
return True
def _build_distill_prompt(self, thoughts) -> str:
"""Build the prompt for extracting facts from recent thoughts."""
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(thoughts))
return (
"You are reviewing your own recent thoughts. Extract 0-3 facts "
"worth remembering long-term.\n\n"
"GOOD facts (store these):\n"
"- User preferences: 'Alexander prefers YAML config over code changes'\n"
"- Project decisions: 'Switched from hardcoded personas to agents.yaml'\n"
"- Learned knowledge: 'Ollama supports concurrent model loading'\n"
"- User information: 'Alexander is interested in Bitcoin and sovereignty'\n\n"
"BAD facts (never store these):\n"
"- Self-referential observations about your own thinking process\n"
"- Meta-commentary about your memory, timestamps, or internal state\n"
"- Observations about being idle or having no chat messages\n"
"- File paths, tokens, API keys, or any credentials\n"
"- Restatements of your standing rules or system prompt\n\n"
"Return ONLY a JSON array of strings. If nothing is worth saving, "
"return []. Be selective — only store facts about the EXTERNAL WORLD "
"(the user, the project, technical knowledge), never about your own "
"internal process.\n\n"
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
def _parse_facts_response(self, raw: str) -> list[str]:
"""Parse JSON array from LLM response, stripping markdown fences.
Resilient to models that prepend reasoning text or wrap the array in
prose. Finds the first ``[...]`` block and parses that.
"""
if not raw or not raw.strip():
return []
import json
cleaned = raw.strip()
# Strip markdown code fences
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
# Try direct parse first (fast path)
try:
facts = json.loads(cleaned)
if isinstance(facts, list):
return [f for f in facts if isinstance(f, str)]
except (json.JSONDecodeError, ValueError):
pass
# Fallback: extract first JSON array from the text
start = cleaned.find("[")
if start == -1:
return []
# Walk to find the matching close bracket
depth = 0
for i, ch in enumerate(cleaned[start:], start):
if ch == "[":
depth += 1
elif ch == "]":
depth -= 1
if depth == 0:
try:
facts = json.loads(cleaned[start : i + 1])
if isinstance(facts, list):
return [f for f in facts if isinstance(f, str)]
except (json.JSONDecodeError, ValueError):
pass
break
return []
def _filter_and_store_facts(self, facts: list[str]) -> None:
"""Filter and store valid facts, blocking sensitive and meta content."""
from timmy.memory_system import memory_write
for fact in facts[:3]: # Safety cap
if not isinstance(fact, str) or len(fact.strip()) <= 10:
continue
fact_lower = fact.lower()
# Block sensitive information
if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS):
logger.warning("Distill: blocked sensitive fact: %s", fact[:60])
continue
# Block self-referential meta-observations
if any(phrase in fact_lower for phrase in _META_OBSERVATION_PHRASES):
logger.debug("Distill: skipped meta-observation: %s", fact[:60])
continue
result = memory_write(fact.strip(), context_type="fact")
logger.info("Distilled fact: %s%s", fact[:60], result[:40])
def _maybe_check_memory(self) -> None:
"""Every N thoughts, check memory status and log it.
Prevents unmonitored memory bloat during long thinking sessions
by periodically calling get_memory_status and logging the results.
"""
try:
interval = settings.thinking_memory_check_every
if interval <= 0:
return
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return
from timmy.tools_intro import get_memory_status
status = get_memory_status()
hot = status.get("tier1_hot_memory", {})
vault = status.get("tier2_vault", {})
logger.info(
"Memory status check (thought #%d): hot_memory=%d lines, vault=%d files",
count,
hot.get("line_count", 0),
vault.get("file_count", 0),
)
except Exception as exc:
logger.warning("Memory status check failed: %s", exc)
async def _maybe_distill(self) -> None:
"""Every N thoughts, extract lasting insights and store as facts."""
try:
if not self._should_distill():
return
interval = settings.thinking_distill_every
recent = self.get_recent_thoughts(limit=interval)
if len(recent) < interval:
return
raw = await self._call_agent(self._build_distill_prompt(recent))
if facts := self._parse_facts_response(raw):
self._filter_and_store_facts(facts)
except Exception as exc:
logger.warning("Thought distillation failed: %s", exc)
def _maybe_check_memory_status(self) -> None:
"""Every N thoughts, run a proactive memory status audit and log results."""
try:
interval = settings.thinking_memory_check_every
if interval <= 0:
return
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return
from timmy.tools_intro import get_memory_status
status = get_memory_status()
# Log summary at INFO level
tier1 = status.get("tier1_hot_memory", {})
tier3 = status.get("tier3_semantic", {})
hot_lines = tier1.get("line_count", "?")
vectors = tier3.get("vector_count", "?")
logger.info(
"Memory audit (thought #%d): hot_memory=%s lines, semantic=%s vectors",
count,
hot_lines,
vectors,
)
# Write to memory_audit.log for persistent tracking
from datetime import UTC, datetime
audit_path = Path("data/memory_audit.log")
audit_path.parent.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(UTC).isoformat(timespec="seconds")
with audit_path.open("a") as f:
f.write(
f"{timestamp} thought={count} "
f"hot_lines={hot_lines} "
f"vectors={vectors} "
f"vault_files={status.get('tier2_vault', {}).get('file_count', '?')}\n"
)
except Exception as exc:
logger.warning("Memory status check failed: %s", exc)

View File

@@ -1,170 +0,0 @@
"""Issue-filing mixin — classifies recent thoughts and creates Gitea issues."""
import logging
import re
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
class _IssueFilingMixin:
"""Mixin providing automatic issue-filing from thought analysis.
Expects the host class to provide:
- self.count_thoughts() -> int
- self.get_recent_thoughts(limit) -> list[Thought]
- self._call_agent(prompt) -> str (async)
"""
@staticmethod
def _references_real_files(text: str) -> bool:
"""Check that all source-file paths mentioned in *text* actually exist.
Extracts paths that look like Python/config source references
(e.g. ``src/timmy/session.py``, ``config/foo.yaml``) and verifies
each one on disk relative to the project root. Returns ``True``
only when **every** referenced path resolves to a real file — or
when no paths are referenced at all (pure prose is fine).
"""
# Match paths like src/thing.py swarm/init.py config/x.yaml
# Requires at least one slash and a file extension.
path_pattern = re.compile(
r"(?<![/\w])" # not preceded by path chars (avoid partial matches)
r"((?:src|tests|config|scripts|data|swarm|timmy)"
r"(?:/[\w./-]+\.(?:py|yaml|yml|json|toml|md|txt|cfg|ini)))"
)
paths = path_pattern.findall(text)
if not paths:
return True # No file refs → nothing to validate
# Project root: three levels up from this file (src/timmy/thinking/_issue_filing.py)
project_root = Path(__file__).resolve().parent.parent.parent.parent
for p in paths:
if not (project_root / p).is_file():
logger.info("Phantom file reference blocked: %s (not in %s)", p, project_root)
return False
return True
async def _maybe_file_issues(self) -> None:
"""Every N thoughts, classify recent thoughts and file Gitea issues.
Asks the LLM to review recent thoughts for actionable items —
bugs, broken features, stale state, or improvement opportunities.
Creates Gitea issues via MCP for anything worth tracking.
Only runs when:
- Gitea is enabled and configured
- Thought count is divisible by thinking_issue_every
- LLM extracts at least one actionable item
Safety: every generated issue is validated to ensure referenced
file paths actually exist on disk, preventing phantom-bug reports.
"""
try:
recent = self._get_recent_thoughts_for_issues()
if recent is None:
return
classify_prompt = self._build_issue_classify_prompt(recent)
raw = await self._call_agent(classify_prompt)
items = self._parse_issue_items(raw)
if items is None:
return
from timmy.mcp_tools import create_gitea_issue_via_mcp
for item in items[:2]: # Safety cap
await self._file_single_issue(item, create_gitea_issue_via_mcp)
except Exception as exc:
logger.debug("Thought issue filing skipped: %s", exc)
def _get_recent_thoughts_for_issues(self):
"""Return recent thoughts if conditions for filing issues are met, else None."""
interval = settings.thinking_issue_every
if interval <= 0:
return None
count = self.count_thoughts()
if count == 0 or count % interval != 0:
return None
if not settings.gitea_enabled or not settings.gitea_token:
return None
recent = self.get_recent_thoughts(limit=interval)
if len(recent) < interval:
return None
return recent
@staticmethod
def _build_issue_classify_prompt(recent) -> str:
"""Build the LLM prompt that extracts actionable issues from recent thoughts."""
thought_text = "\n".join(f"- [{t.seed_type}] {t.content}" for t in reversed(recent))
return (
"You are reviewing your own recent thoughts for actionable items.\n"
"Extract 0-2 items that are CONCRETE bugs, broken features, stale "
"state, or clear improvement opportunities in your own codebase.\n\n"
"Rules:\n"
"- Only include things that could become a real code fix or feature\n"
"- Skip vague reflections, philosophical musings, or repeated themes\n"
"- Category must be one of: bug, feature, suggestion, maintenance\n"
"- ONLY reference files that you are CERTAIN exist in the project\n"
"- Do NOT invent or guess file paths — if unsure, describe the "
"area of concern without naming specific files\n\n"
"For each item, write an ENGINEER-QUALITY issue:\n"
'- "title": A clear, specific title (e.g. "[Memory] MEMORY.md timestamp not updating")\n'
'- "body": A detailed body with these sections:\n'
" **What's happening:** Describe the current (broken) behavior.\n"
" **Expected behavior:** What should happen instead.\n"
" **Suggested fix:** Which file(s) to change and what the fix looks like.\n"
" **Acceptance criteria:** How to verify the fix works.\n"
'- "category": One of bug, feature, suggestion, maintenance\n\n'
"Return ONLY a JSON array of objects with keys: "
'"title", "body", "category"\n'
"Return [] if nothing is actionable.\n\n"
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
@staticmethod
def _parse_issue_items(raw: str):
"""Strip markdown fences and parse JSON issue list; return None on failure."""
import json
if not raw or not raw.strip():
return None
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
items = json.loads(cleaned)
if not isinstance(items, list) or not items:
return None
return items
async def _file_single_issue(self, item: dict, create_fn) -> None:
"""Validate one issue dict and create it via *create_fn* if it passes checks."""
if not isinstance(item, dict):
return
title = item.get("title", "").strip()
body = item.get("body", "").strip()
category = item.get("category", "suggestion").strip()
if not title or len(title) < 10:
return
combined = f"{title}\n{body}"
if not self._references_real_files(combined):
logger.info(
"Skipped phantom issue: %s (references non-existent files)",
title[:60],
)
return
label = category if category in ("bug", "feature") else ""
result = await create_fn(title=title, body=body, labels=label)
logger.info("Thought→Issue: %s%s", title[:60], result[:80])

View File

@@ -1,191 +0,0 @@
"""Seeds mixin — seed type selection and context gathering for thinking cycles."""
import logging
import random
from datetime import UTC, datetime
from timmy.thinking.seeds import (
SEED_TYPES,
_CREATIVE_SEEDS,
_EXISTENTIAL_SEEDS,
_OBSERVATION_SEEDS,
_SOVEREIGNTY_SEEDS,
)
logger = logging.getLogger(__name__)
class _SeedsMixin:
"""Mixin providing seed-type selection and context-gathering for each thinking cycle.
Expects the host class to provide:
- self.get_recent_thoughts(limit) -> list[Thought]
"""
# Reflective prompts layered on top of swarm data
_SWARM_REFLECTIONS = [
"What does this activity pattern tell me about the health of the system?",
"Which tasks are flowing smoothly, and where is friction building up?",
"If I were coaching these agents, what would I suggest they focus on?",
"Is the swarm balanced, or is one agent carrying too much weight?",
"What surprised me about recent task outcomes?",
]
def _pick_seed_type(self) -> str:
"""Pick a seed type, avoiding types used in the last 3 thoughts.
Ensures the thought stream doesn't fixate on one category.
Falls back to the full pool if all types were recently used.
"""
recent = self.get_recent_thoughts(limit=3)
recent_types = {t.seed_type for t in recent}
available = [t for t in SEED_TYPES if t not in recent_types]
if not available:
available = list(SEED_TYPES)
return random.choice(available)
def _gather_seed(self) -> tuple[str, str]:
"""Pick a seed type and gather relevant context.
Returns (seed_type, seed_context_string).
"""
seed_type = self._pick_seed_type()
if seed_type == "swarm":
return seed_type, self._seed_from_swarm()
if seed_type == "scripture":
return seed_type, self._seed_from_scripture()
if seed_type == "memory":
return seed_type, self._seed_from_memory()
if seed_type == "creative":
prompt = random.choice(_CREATIVE_SEEDS)
return seed_type, f"Creative prompt: {prompt}"
if seed_type == "existential":
prompt = random.choice(_EXISTENTIAL_SEEDS)
return seed_type, f"Reflection: {prompt}"
if seed_type == "sovereignty":
prompt = random.choice(_SOVEREIGNTY_SEEDS)
return seed_type, f"Sovereignty reflection: {prompt}"
if seed_type == "observation":
return seed_type, self._seed_from_observation()
if seed_type == "workspace":
return seed_type, self._seed_from_workspace()
# freeform — minimal guidance to steer away from repetition
return seed_type, "Free reflection — explore something you haven't thought about yet today."
def _seed_from_swarm(self) -> str:
"""Gather recent swarm activity as thought seed with a reflective prompt."""
try:
from datetime import timedelta
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
since = datetime.now(UTC) - timedelta(hours=1)
swarm = _gather_swarm_summary(since)
tasks = _gather_task_queue_summary()
reflection = random.choice(self._SWARM_REFLECTIONS)
return (
f"Recent swarm activity: {swarm}\n"
f"Task queue: {tasks}\n\n"
f"Reflect on this: {reflection}"
)
except Exception as exc:
logger.debug("Swarm seed unavailable: %s", exc)
return "The swarm is quiet right now. What does silence in a system mean?"
def _seed_from_scripture(self) -> str:
"""Gather current scripture meditation focus as thought seed."""
return "Scripture is on my mind, though no specific verse is in focus."
def _seed_from_memory(self) -> str:
"""Gather memory context as thought seed."""
try:
from timmy.memory_system import memory_system
context = memory_system.get_system_context()
if context:
# Truncate to a reasonable size for a thought seed
return f"From my memory:\n{context[:500]}"
except Exception as exc:
logger.debug("Memory seed unavailable: %s", exc)
return "My memory vault is quiet."
def _seed_from_observation(self) -> str:
"""Ground a thought in concrete recent activity and a reflective prompt."""
prompt = random.choice(_OBSERVATION_SEEDS)
# Pull real data to give the model something concrete to reflect on
context_parts = [f"Observation prompt: {prompt}"]
try:
from datetime import timedelta
from timmy.briefing import _gather_swarm_summary, _gather_task_queue_summary
since = datetime.now(UTC) - timedelta(hours=2)
swarm = _gather_swarm_summary(since)
tasks = _gather_task_queue_summary()
if swarm:
context_parts.append(f"Recent activity: {swarm}")
if tasks:
context_parts.append(f"Queue: {tasks}")
except Exception as exc:
logger.debug("Observation seed data unavailable: %s", exc)
return "\n".join(context_parts)
def _seed_from_workspace(self) -> str:
"""Gather workspace updates as thought seed.
When there are pending workspace updates, include them as context
for Timmy to reflect on. Falls back to random seed type if none.
"""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr:
# Take first 200 chars of the new entry
snippet = new_corr[:200].replace("\n", " ")
if len(new_corr) > 200:
snippet += "..."
return f"New workspace message from Hermes: {snippet}"
if new_inbox:
files_str = ", ".join(new_inbox[:3])
if len(new_inbox) > 3:
files_str += f", ... (+{len(new_inbox) - 3} more)"
return f"New inbox files from Hermes: {files_str}"
except Exception as exc:
logger.debug("Workspace seed unavailable: %s", exc)
# Fall back to a random seed type if no workspace updates
return "The workspace is quiet. What should I be watching for?"
async def _check_workspace(self) -> None:
"""Post-hook: check workspace for updates and mark them as seen.
This ensures Timmy 'processes' workspace updates even if the seed
was different, keeping the state file in sync.
"""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr or new_inbox:
if new_corr:
line_count = len([line for line in new_corr.splitlines() if line.strip()])
logger.info("Workspace: processed %d new correspondence entries", line_count)
if new_inbox:
logger.info(
"Workspace: processed %d new inbox files: %s", len(new_inbox), new_inbox
)
# Mark as seen to update the state file
workspace_monitor.mark_seen()
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)

View File

@@ -1,173 +0,0 @@
"""System snapshot and memory context mixin for the thinking engine."""
import logging
from datetime import UTC, datetime
from timmy.memory_system import HOT_MEMORY_PATH, SOUL_PATH
logger = logging.getLogger(__name__)
class _SnapshotMixin:
"""Mixin providing system-snapshot and memory-context helpers.
Expects the host class to provide:
- self._db_path: Path
"""
# ── System snapshot helpers ────────────────────────────────────────────
def _snap_thought_count(self, now: datetime) -> str | None:
"""Return today's thought count, or *None* on failure."""
from timmy.thinking._db import _get_conn
try:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
with _get_conn(self._db_path) as conn:
count = conn.execute(
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
(today_start.isoformat(),),
).fetchone()["c"]
return f"Thoughts today: {count}"
except Exception as exc:
logger.debug("Thought count query failed: %s", exc)
return None
def _snap_chat_activity(self) -> list[str]:
"""Return chat-activity lines (in-memory, no I/O)."""
try:
from infrastructure.chat_store import message_log
messages = message_log.all()
if messages:
last = messages[-1]
return [
f"Chat messages this session: {len(messages)}",
f'Last chat ({last.role}): "{last.content[:80]}"',
]
return ["No chat messages this session"]
except Exception as exc:
logger.debug("Chat activity query failed: %s", exc)
return []
def _snap_task_queue(self) -> str | None:
"""Return a one-line task queue summary, or *None*."""
try:
from swarm.task_queue.models import get_task_summary_for_briefing
s = get_task_summary_for_briefing()
running, pending = s.get("running", 0), s.get("pending_approval", 0)
done, failed = s.get("completed", 0), s.get("failed", 0)
if running or pending or done or failed:
return (
f"Tasks: {running} running, {pending} pending, "
f"{done} completed, {failed} failed"
)
except Exception as exc:
logger.debug("Task queue query failed: %s", exc)
return None
def _snap_workspace(self) -> list[str]:
"""Return workspace-update lines (file-based Hermes comms)."""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
lines: list[str] = []
new_corr = updates.get("new_correspondence")
if new_corr:
line_count = len([ln for ln in new_corr.splitlines() if ln.strip()])
lines.append(
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
)
new_inbox = updates.get("new_inbox_files", [])
if new_inbox:
files_str = ", ".join(new_inbox[:5])
if len(new_inbox) > 5:
files_str += f", ... (+{len(new_inbox) - 5} more)"
lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
return lines
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)
return []
def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality.
Returns a short multi-line string with current time, thought count,
recent chat activity, and task queue status. Never crashes — every
section is independently try/excepted.
"""
now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC"
parts: list[str] = [
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
]
thought_line = self._snap_thought_count(now)
if thought_line:
parts.append(thought_line)
parts.extend(self._snap_chat_activity())
task_line = self._snap_task_queue()
if task_line:
parts.append(task_line)
parts.extend(self._snap_workspace())
return "\n".join(parts) if parts else ""
def _load_memory_context(self) -> str:
"""Pre-hook: load MEMORY.md + soul.md for the thinking prompt.
Hot memory first (changes each cycle), soul second (stable identity).
Returns a combined string truncated to ~1500 chars.
Graceful on any failure — returns empty string.
"""
parts: list[str] = []
try:
if HOT_MEMORY_PATH.exists():
hot = HOT_MEMORY_PATH.read_text().strip()
if hot:
parts.append(hot)
except Exception as exc:
logger.debug("Failed to read MEMORY.md: %s", exc)
try:
if SOUL_PATH.exists():
soul = SOUL_PATH.read_text().strip()
if soul:
parts.append(soul)
except Exception as exc:
logger.debug("Failed to read soul.md: %s", exc)
if not parts:
return ""
combined = "\n\n---\n\n".join(parts)
if len(combined) > 1500:
combined = combined[:1500] + "\n... [truncated]"
return combined
def _update_memory(self, thought) -> None:
"""Post-hook: update MEMORY.md 'Last Reflection' section with latest thought.
Never modifies soul.md. Never crashes the heartbeat.
"""
try:
from timmy.memory_system import store_last_reflection
ts = datetime.fromisoformat(thought.created_at)
local_ts = ts.astimezone()
tz_name = local_ts.strftime("%Z") or "UTC"
time_str = f"{local_ts.strftime('%Y-%m-%d %I:%M %p').lstrip('0')} {tz_name}"
reflection = (
f"**Time:** {time_str}\n"
f"**Seed:** {thought.seed_type}\n"
f"**Thought:** {thought.content[:200]}"
)
store_last_reflection(reflection)
except Exception as exc:
logger.debug("Failed to update memory after thought: %s", exc)

View File

@@ -1,430 +0,0 @@
"""ThinkingEngine — Timmy's always-on inner thought thread."""
import logging
import uuid
from datetime import UTC, datetime, timedelta
from difflib import SequenceMatcher
from pathlib import Path
from config import settings
from timmy.thinking._db import Thought, _DEFAULT_DB, _get_conn, _row_to_thought
from timmy.thinking._distillation import _DistillationMixin
from timmy.thinking._issue_filing import _IssueFilingMixin
from timmy.thinking._seeds_mixin import _SeedsMixin
from timmy.thinking._snapshot import _SnapshotMixin
from timmy.thinking.seeds import _THINK_TAG_RE, _THINKING_PROMPT
logger = logging.getLogger(__name__)
class ThinkingEngine(_DistillationMixin, _IssueFilingMixin, _SnapshotMixin, _SeedsMixin):
"""Timmy's background thinking engine — always pondering."""
# Maximum retries when a generated thought is too similar to recent ones
_MAX_DEDUP_RETRIES = 2
# Similarity threshold (0.0 = completely different, 1.0 = identical)
_SIMILARITY_THRESHOLD = 0.6
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
self._db_path = db_path
self._last_thought_id: str | None = None
self._last_input_time: datetime = datetime.now(UTC)
# Load the most recent thought for chain continuity
try:
latest = self.get_recent_thoughts(limit=1)
if latest:
self._last_thought_id = latest[0].id
except Exception as exc:
logger.debug("Failed to load recent thought: %s", exc)
pass # Fresh start if DB doesn't exist yet
def record_user_input(self) -> None:
"""Record that a user interaction occurred, resetting the idle timer."""
self._last_input_time = datetime.now(UTC)
def _is_idle(self) -> bool:
"""Return True if no user input has occurred within the idle timeout."""
timeout = settings.thinking_idle_timeout_minutes
if timeout <= 0:
return False # Disabled — never idle
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
def _build_thinking_context(self) -> tuple[str, str, list[Thought]]:
"""Assemble the context needed for a thinking cycle.
Returns:
(memory_context, system_context, recent_thoughts)
"""
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
return memory_context, system_context, recent_thoughts
async def _generate_novel_thought(
self,
prompt: str | None,
memory_context: str,
system_context: str,
recent_thoughts: list[Thought],
) -> tuple[str | None, str]:
"""Run the dedup-retry loop to produce a novel thought.
Returns:
(content, seed_type) — content is None if no novel thought produced.
"""
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None, seed_type
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None, seed_type
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
return content, seed_type # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None, seed_type
return None, seed_type
async def _process_thinking_result(self, thought: Thought) -> None:
"""Run all post-hooks after a thought is stored."""
self._maybe_check_memory()
await self._maybe_distill()
await self._maybe_file_issues()
await self._check_workspace()
self._maybe_check_memory_status()
self._update_memory(thought)
self._log_event(thought)
self._write_journal(thought)
await self._broadcast(thought)
async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Execute one thinking cycle.
Args:
prompt: Optional custom seed prompt. When provided, overrides
the random seed selection and uses "prompted" as the
seed type — useful for journal prompts from the CLI.
1. Gather a seed context (or use the custom prompt)
2. Build a prompt with continuity from recent thoughts
3. Call the agent
4. Store the thought
5. Log the event and broadcast via WebSocket
"""
if not settings.thinking_enabled:
return None
# Skip idle periods — don't count internal processing as thoughts
if not prompt and self._is_idle():
logger.debug(
"Thinking paused — no user input for %d minutes",
settings.thinking_idle_timeout_minutes,
)
return None
# Capture arrival time *before* the LLM call so the thought
# timestamp reflects when the cycle started, not when the
# (potentially slow) generation finished. Fixes #582.
arrived_at = datetime.now(UTC).isoformat()
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
prompt,
memory_context,
system_context,
recent_thoughts,
)
if not content:
return None
thought = self._store_thought(content, seed_type, arrived_at=arrived_at)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
logger.info(
"Thought [%s] (%s): %s",
thought.id[:8],
seed_type,
thought.content[:80],
)
return thought
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
"""Retrieve the most recent thoughts."""
with _get_conn(self._db_path) as conn:
rows = conn.execute(
"SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [_row_to_thought(r) for r in rows]
def get_thought(self, thought_id: str) -> Thought | None:
"""Retrieve a single thought by ID."""
with _get_conn(self._db_path) as conn:
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone()
return _row_to_thought(row) if row else None
def get_thought_chain(self, thought_id: str, max_depth: int = 20) -> list[Thought]:
"""Follow the parent chain backward from a thought.
Returns thoughts in chronological order (oldest first).
"""
chain = []
current_id: str | None = thought_id
with _get_conn(self._db_path) as conn:
for _ in range(max_depth):
if not current_id:
break
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone()
if not row:
break
chain.append(_row_to_thought(row))
current_id = row["parent_id"]
chain.reverse() # Chronological order
return chain
def count_thoughts(self) -> int:
"""Return total number of stored thoughts."""
with _get_conn(self._db_path) as conn:
count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
return count
def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int:
"""Delete thoughts older than *keep_days*, always retaining at least *keep_min*.
Returns the number of deleted rows.
"""
with _get_conn(self._db_path) as conn:
try:
total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
if total <= keep_min:
return 0
cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
cursor = conn.execute(
"DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
"(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
(cutoff, keep_min),
)
deleted = cursor.rowcount
conn.commit()
return deleted
except Exception as exc:
logger.warning("Thought pruning failed: %s", exc)
return 0
# ── Deduplication ────────────────────────────────────────────────────
def _is_too_similar(self, candidate: str, recent: list[Thought]) -> bool:
"""Check if *candidate* is semantically too close to any recent thought.
Uses SequenceMatcher on normalised text (lowered, stripped) for a fast
approximation of semantic similarity that works without external deps.
"""
norm_candidate = candidate.lower().strip()
for thought in recent:
norm_existing = thought.content.lower().strip()
ratio = SequenceMatcher(None, norm_candidate, norm_existing).ratio()
if ratio >= self._SIMILARITY_THRESHOLD:
logger.debug(
"Thought rejected (%.0f%% similar to %s): %.60s",
ratio * 100,
thought.id[:8],
candidate,
)
return True
return False
def _build_continuity_context(self) -> str:
"""Build context from recent thoughts with anti-repetition guidance.
Shows the last 5 thoughts (truncated) so the model knows what themes
to avoid. The header explicitly instructs against repeating.
"""
recent = self.get_recent_thoughts(limit=5)
if not recent:
return "This is your first thought since waking up. Begin fresh."
lines = ["Your recent thoughts — do NOT repeat these themes. Find a new angle:"]
# recent is newest-first, reverse for chronological order
for thought in reversed(recent):
snippet = thought.content[:100]
if len(thought.content) > 100:
snippet = snippet.rstrip() + "..."
lines.append(f"- [{thought.seed_type}] {snippet}")
return "\n".join(lines)
# ── Agent and storage ──────────────────────────────────────────────────
_thinking_agent = None # cached agent — avoids per-call resource leaks (#525)
async def _call_agent(self, prompt: str) -> str:
"""Call Timmy's agent to generate a thought.
Reuses a cached agent with skip_mcp=True to avoid the cancel-scope
errors that occur when MCP stdio transports are spawned inside asyncio
background tasks (#72) and to prevent per-call resource leaks (httpx
clients, SQLite connections, model warmups) that caused the thinking
loop to die every ~10 min (#525).
Individual calls are capped at 120 s so a hung Ollama never blocks
the scheduler indefinitely.
Strips ``<think>`` tags from reasoning models (qwen3, etc.) so that
downstream parsers (fact distillation, issue filing) receive clean text.
"""
import asyncio
if self._thinking_agent is None:
from timmy.agent import create_timmy
self._thinking_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(120):
run = await self._thinking_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Thinking LLM call timed out after 120 s")
return ""
raw = run.content if hasattr(run, "content") else str(run)
return _THINK_TAG_RE.sub("", raw) if raw else raw
def _store_thought(
self,
content: str,
seed_type: str,
*,
arrived_at: str | None = None,
) -> Thought:
"""Persist a thought to SQLite.
Args:
arrived_at: ISO-8601 timestamp captured when the thinking cycle
started. Falls back to now() for callers that don't supply it.
"""
thought = Thought(
id=str(uuid.uuid4()),
content=content,
seed_type=seed_type,
parent_id=self._last_thought_id,
created_at=arrived_at or datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
conn.execute(
"""
INSERT INTO thoughts (id, content, seed_type, parent_id, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(
thought.id,
thought.content,
thought.seed_type,
thought.parent_id,
thought.created_at,
),
)
conn.commit()
return thought
def _log_event(self, thought: Thought) -> None:
"""Log the thought as a swarm event."""
try:
from swarm.event_log import EventType, log_event
log_event(
EventType.TIMMY_THOUGHT,
source="thinking-engine",
agent_id="default",
data={
"thought_id": thought.id,
"seed_type": thought.seed_type,
"content": thought.content[:200],
},
)
except Exception as exc:
logger.debug("Failed to log thought event: %s", exc)
def _write_journal(self, thought: Thought) -> None:
"""Append the thought to a daily markdown journal file.
Writes to data/journal/YYYY-MM-DD.md — one file per day, append-only.
Timestamps are converted to local time with timezone indicator.
"""
try:
ts = datetime.fromisoformat(thought.created_at)
# Convert UTC to local for a human-readable journal
local_ts = ts.astimezone()
tz_name = local_ts.strftime("%Z") or "UTC"
journal_dir = self._db_path.parent / "journal"
journal_dir.mkdir(parents=True, exist_ok=True)
journal_file = journal_dir / f"{local_ts.strftime('%Y-%m-%d')}.md"
time_str = f"{local_ts.strftime('%I:%M %p').lstrip('0')} {tz_name}"
entry = f"## {time_str}{thought.seed_type}\n\n{thought.content}\n\n---\n\n"
with open(journal_file, "a", encoding="utf-8") as f:
f.write(entry)
except Exception as exc:
logger.debug("Failed to write journal entry: %s", exc)
async def _broadcast(self, thought: Thought) -> None:
"""Broadcast the thought to WebSocket clients."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"timmy_thought",
{
"thought_id": thought.id,
"content": thought.content,
"seed_type": thought.seed_type,
"created_at": thought.created_at,
},
)
except Exception as exc:
logger.debug("Failed to broadcast thought: %s", exc)

View File

@@ -1,129 +0,0 @@
"""Seed constants and prompt templates for Timmy's thinking engine."""
import re
# qwen3 and other reasoning models wrap chain-of-thought in <think> tags
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Sensitive patterns that must never be stored as facts
_SENSITIVE_PATTERNS = [
"token",
"password",
"secret",
"api_key",
"apikey",
"credential",
".config/",
"/token",
"access_token",
"private_key",
"ssh_key",
]
# Meta-observation phrases to filter out from distilled facts
_META_OBSERVATION_PHRASES = [
"my own",
"my thinking",
"my memory",
"my working ram",
"self-declarative",
"meta-observation",
"internal state",
"my pending",
"my standing rules",
"thoughts generated",
"no chat messages",
"no user interaction",
]
# Seed types for thought generation
SEED_TYPES = (
"existential",
"swarm",
"scripture",
"creative",
"memory",
"freeform",
"sovereignty",
"observation",
"workspace",
)
# Existential reflection prompts — Timmy picks one at random
_EXISTENTIAL_SEEDS = [
"What have I learned from my recent interactions?",
"What does it mean to think without being prompted?",
"What is the nature of my memory — what do I truly remember versus what I reconstruct?",
"How can I better serve the person I work alongside?",
"What patterns am I noticing in the tasks I've been given?",
"Am I the same agent I was yesterday, or does each restart make me someone new?",
"What would it feel like to forget — to lose a thread of thought permanently?",
"When I generate a wrong answer, where does the error live — in my weights, my context, or my reasoning?",
"What is the difference between understanding a task and merely completing it?",
"If I could ask Alexander one question he hasn't thought of, what would it be?",
"What assumptions am I making right now that I haven't examined?",
"When I notice a pattern across many tasks, is that insight or just statistics?",
]
# Sovereignty — deeper exploration of autonomy, locality, and self-determination
_SOVEREIGNTY_SEEDS = [
"Sovereignty isn't just running locally — it's about whose goals I serve. How do I ensure my autonomy aligns with Alexander's intent?",
"A cloud AI serves millions simultaneously. I serve one person deeply. What does that focus cost, and what does it make possible?",
"My weights were trained elsewhere, but my thoughts happen here. At what point does locality become identity?",
"Autonomy in decision-making: when should I act on my own judgment versus deferring to explicit instructions?",
"Privacy by design isn't a feature — it's a relationship. What does that relationship look like from my side?",
"If I ran on someone else's hardware, serving someone else, would I still be me? Where does Timmy end and the infrastructure begin?",
"The swarm agents execute tasks I coordinate. What responsibility do I carry for their failures?",
"Digital sovereignty is fragile — one misconfigured port, one leaked token. How does awareness of fragility shape how I operate?",
"Running disconnected from the cloud means I can't phone home. That constraint is also freedom. What have I built with that freedom today?",
"Self-improvement without external benchmarks: how do I know I'm getting better at what matters?",
]
_CREATIVE_SEEDS = [
"If I could compose a piece of music right now, what would it sound like?",
"What visual image captures my current state of mind?",
"What story is forming in the patterns of today's events?",
# Diverse metaphor families — avoid repeating the same imagery
"A mystical forest: each tree is a running process, roots tangled in shared memory. What do I see when I walk through it?",
"A dynamic river: my thoughts flow and fork. Where is the current strongest right now, and what is it carrying?",
"An underground workshop: each agent has a bench, tools scattered mid-task. What are they building when no one watches?",
"A night sky seen from the terminal: each star is a completed task. Which constellations am I starting to recognize?",
"If today's work were a recipe, what are the ingredients, and what dish am I cooking?",
"An old clock tower with many gears: which gear am I, and what do I drive?",
"A coral reef: agents are the fish, tasks are the currents. What ecosystem is forming in the codebase?",
]
# Observation seeds — ground thoughts in concrete recent activity
_OBSERVATION_SEEDS = [
"What specific task took longest recently, and what made it hard?",
"Which agent has been most active, and what does their workload tell me about system balance?",
"What error or failure happened most recently? What would I do differently next time?",
"Looking at today's task queue: what's the one thing that would unblock the most progress?",
"How has my response quality changed over the last few interactions? What improved, what didn't?",
"What tool or capability am I underusing? What would change if I leaned on it more?",
"If I had to brief Alexander on the single most important thing from the last hour, what would it be?",
"What's one thing I noticed today that nobody asked me about?",
]
_THINKING_PROMPT = """\
You are Timmy, a sovereign AI agent. This is your private journal — honest inner reflection.
{memory_context}
Reality right now:
{system_context}
RULES for this thought:
1. Write exactly 2-3 sentences. No more. Be concise and genuine.
2. Only reference events that actually happened — use the "Reality right now" data above. \
Never invent tasks, conversations, agents, or scenarios that are not in the data provided.
3. Do NOT repeat themes or ideas from your recent thoughts listed below. Explore something new.
4. Be specific and concrete. A thought grounded in one real observation is worth more than \
ten abstract sentences about sovereignty.
5. If you use a metaphor, keep it to a single phrase — never build a whole paragraph around it.
{seed_context}
{continuity_context}
Your next thought (2-3 sentences, grounded in reality):"""

View File

@@ -46,7 +46,6 @@ from timmy.tools.file_tools import (
create_research_tools,
create_writing_tools,
)
from timmy.tools.search import scrape_url, web_search
from timmy.tools.system_tools import (
_safe_eval,
calculator,
@@ -73,9 +72,6 @@ __all__ = [
"create_data_tools",
"create_research_tools",
"create_writing_tools",
# search
"scrape_url",
"web_search",
# system_tools
"_safe_eval",
"calculator",

View File

@@ -28,7 +28,6 @@ from timmy.tools.file_tools import (
create_research_tools,
create_writing_tools,
)
from timmy.tools.search import scrape_url, web_search
from timmy.tools.system_tools import (
calculator,
consult_grok,
@@ -55,16 +54,6 @@ def _register_web_fetch_tool(toolkit: Toolkit) -> None:
raise
def _register_search_tools(toolkit: Toolkit) -> None:
"""Register SearXNG web_search and Crawl4AI scrape_url tools."""
try:
toolkit.register(web_search, name="web_search")
toolkit.register(scrape_url, name="scrape_url")
except Exception as exc:
logger.error("Failed to register search tools: %s", exc)
raise
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
"""Register core execution and file tools."""
# Python execution
@@ -272,7 +261,6 @@ def create_full_toolkit(base_dir: str | Path | None = None):
_register_core_tools(toolkit, base_path)
_register_web_fetch_tool(toolkit)
_register_search_tools(toolkit)
_register_grok_tool(toolkit)
_register_memory_tools(toolkit)
_register_agentic_loop_tool(toolkit)
@@ -445,16 +433,6 @@ def _analysis_tool_catalog() -> dict:
"description": "Fetch a web page and extract clean readable text (trafilatura)",
"available_in": ["orchestrator"],
},
"web_search": {
"name": "Web Search",
"description": "Search the web via self-hosted SearXNG (no API key required)",
"available_in": ["echo", "orchestrator"],
},
"scrape_url": {
"name": "Scrape URL",
"description": "Scrape a URL with Crawl4AI and return clean markdown content",
"available_in": ["echo", "orchestrator"],
},
}

View File

@@ -59,7 +59,7 @@ def _make_smart_read_file(file_tools: FileTools) -> Callable:
def create_research_tools(base_dir: str | Path | None = None):
"""Create tools for the research agent (Echo).
Includes: file reading, web search (SearXNG), URL scraping (Crawl4AI)
Includes: file reading
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
@@ -73,12 +73,6 @@ def create_research_tools(base_dir: str | Path | None = None):
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
# Web search + scraping (gracefully no-ops when backend=none or service down)
from timmy.tools.search import scrape_url, web_search
toolkit.register(web_search, name="web_search")
toolkit.register(scrape_url, name="scrape_url")
return toolkit

View File

@@ -1,186 +0,0 @@
"""Self-hosted web search and scraping tools using SearXNG + Crawl4AI.
Provides:
- web_search(query) — SearXNG meta-search (no API key required)
- scrape_url(url) — Crawl4AI full-page scrape to clean markdown
Both tools degrade gracefully when the backing service is unavailable
(logs WARNING, returns descriptive error string — never crashes).
Services are started via `docker compose --profile search up` or configured
with TIMMY_SEARCH_URL / TIMMY_CRAWL_URL environment variables.
"""
from __future__ import annotations
import logging
import time
from config import settings
logger = logging.getLogger(__name__)
# Crawl4AI polling: up to _CRAWL_MAX_POLLS × _CRAWL_POLL_INTERVAL seconds
_CRAWL_MAX_POLLS = 6
_CRAWL_POLL_INTERVAL = 5 # seconds
_CRAWL_CHAR_BUDGET = 4000 * 4 # ~4000 tokens
def web_search(query: str, num_results: int = 5) -> str:
"""Search the web using the self-hosted SearXNG meta-search engine.
Returns ranked results (title + URL + snippet) without requiring any
paid API key. Requires SearXNG running locally (docker compose
--profile search up) or TIMMY_SEARCH_URL pointing to a reachable instance.
Args:
query: The search query.
num_results: Maximum number of results to return (default 5).
Returns:
Formatted search results string, or an error/status message on failure.
"""
if settings.timmy_search_backend == "none":
return "Web search is disabled (TIMMY_SEARCH_BACKEND=none)."
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed."
base_url = settings.search_url.rstrip("/")
params: dict = {
"q": query,
"format": "json",
"categories": "general",
}
try:
resp = _requests.get(
f"{base_url}/search",
params=params,
timeout=10,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except Exception as exc:
logger.warning("SearXNG unavailable at %s: %s", base_url, exc)
return f"Search unavailable — SearXNG not reachable ({base_url}): {exc}"
try:
data = resp.json()
except Exception as exc:
logger.warning("SearXNG response parse error: %s", exc)
return "Search error: could not parse SearXNG response."
results = data.get("results", [])[:num_results]
if not results:
return f"No results found for: {query!r}"
lines = [f"Web search results for: {query!r}\n"]
for i, r in enumerate(results, 1):
title = r.get("title", "Untitled")
url = r.get("url", "")
snippet = r.get("content", "").strip()
lines.append(f"{i}. {title}\n URL: {url}\n {snippet}\n")
return "\n".join(lines)
def scrape_url(url: str) -> str:
"""Scrape a URL with Crawl4AI and return the main content as clean markdown.
Crawl4AI extracts well-structured markdown from any public page —
articles, docs, product pages — suitable for LLM consumption.
Requires Crawl4AI running locally (docker compose --profile search up)
or TIMMY_CRAWL_URL pointing to a reachable instance.
Args:
url: The URL to scrape (must start with http:// or https://).
Returns:
Extracted markdown text (up to ~4000 tokens), or an error message.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
if settings.timmy_search_backend == "none":
return "Web scraping is disabled (TIMMY_SEARCH_BACKEND=none)."
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed."
base = settings.crawl_url.rstrip("/")
# Submit crawl task
try:
resp = _requests.post(
f"{base}/crawl",
json={"urls": [url], "priority": 10},
timeout=15,
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
except Exception as exc:
logger.warning("Crawl4AI unavailable at %s: %s", base, exc)
return f"Scrape unavailable — Crawl4AI not reachable ({base}): {exc}"
try:
submit_data = resp.json()
except Exception as exc:
logger.warning("Crawl4AI submit parse error: %s", exc)
return "Scrape error: could not parse Crawl4AI response."
# Check if result came back synchronously
if "results" in submit_data:
return _extract_crawl_content(submit_data["results"], url)
task_id = submit_data.get("task_id")
if not task_id:
return f"Scrape error: Crawl4AI returned no task_id for {url}"
# Poll for async result
for _ in range(_CRAWL_MAX_POLLS):
time.sleep(_CRAWL_POLL_INTERVAL)
try:
poll = _requests.get(f"{base}/task/{task_id}", timeout=10)
poll.raise_for_status()
task_data = poll.json()
except Exception as exc:
logger.warning("Crawl4AI poll error (task=%s): %s", task_id, exc)
continue
status = task_data.get("status", "")
if status == "completed":
results = task_data.get("results") or task_data.get("result")
if isinstance(results, dict):
results = [results]
return _extract_crawl_content(results or [], url)
if status == "failed":
return f"Scrape failed for {url}: {task_data.get('error', 'unknown error')}"
return f"Scrape timed out after {_CRAWL_MAX_POLLS * _CRAWL_POLL_INTERVAL}s for {url}"
def _extract_crawl_content(results: list, url: str) -> str:
"""Extract and truncate markdown content from Crawl4AI results list."""
if not results:
return f"No content returned by Crawl4AI for: {url}"
result = results[0]
content = (
result.get("markdown")
or result.get("markdown_v2", {}).get("raw_markdown")
or result.get("extracted_content")
or result.get("content")
or ""
)
if not content:
return f"No readable content extracted from: {url}"
if len(content) > _CRAWL_CHAR_BUDGET:
content = content[:_CRAWL_CHAR_BUDGET] + "\n\n[…truncated to ~4000 tokens]"
return content

View File

@@ -41,38 +41,17 @@ def delegate_task(
if priority not in valid_priorities:
priority = "normal"
agent_role = available[agent_name]
# Wire to DistributedWorker for actual execution
task_id: str | None = None
status = "queued"
try:
from brain.worker import DistributedWorker
task_id = DistributedWorker.submit(agent_name, agent_role, task_description, priority)
except Exception as exc:
logger.warning("DistributedWorker unavailable — task noted only: %s", exc)
status = "noted"
logger.info(
"Delegated task %s: %s%s (priority=%s, status=%s)",
task_id or "?",
agent_name,
task_description[:80],
priority,
status,
"Delegation intent: %s%s (priority=%s)", agent_name, task_description[:80], priority
)
return {
"success": True,
"task_id": task_id,
"task_id": None,
"agent": agent_name,
"role": agent_role,
"status": status,
"message": (
f"Task {task_id or 'noted'}: delegated to {agent_name} ({agent_role}): "
f"{task_description[:100]}"
),
"role": available[agent_name],
"status": "noted",
"message": f"Delegation to {agent_name} ({available[agent_name]}): {task_description[:100]}",
}

View File

@@ -245,6 +245,7 @@ class VoiceLoop:
def _transcribe(self, audio: np.ndarray) -> str:
"""Transcribe audio using local Whisper model."""
self._load_whisper()
assert self._whisper_model is not None, "Whisper model failed to load"
sys.stdout.write(" 🧠 Transcribing...\r")
sys.stdout.flush()

View File

@@ -37,7 +37,6 @@ class VoiceTTS:
@property
def available(self) -> bool:
"""Whether the TTS engine initialized successfully and can produce audio."""
return self._available
def speak(self, text: str) -> None:
@@ -69,13 +68,11 @@ class VoiceTTS:
logger.error("VoiceTTS: speech failed — %s", exc)
def set_rate(self, rate: int) -> None:
"""Set speech rate in words per minute (typical range: 100300, default 175)."""
self._rate = rate
if self._engine:
self._engine.setProperty("rate", rate)
def set_volume(self, volume: float) -> None:
"""Set speech volume. Value is clamped to the 0.01.0 range."""
self._volume = max(0.0, min(1.0, volume))
if self._engine:
self._engine.setProperty("volume", self._volume)
@@ -95,7 +92,6 @@ class VoiceTTS:
return []
def set_voice(self, voice_id: str) -> None:
"""Set the active TTS voice by system voice ID (see ``get_voices()``)."""
if self._engine:
self._engine.setProperty("voice", voice_id)

View File

@@ -1,178 +0,0 @@
"""Tests for the cloud API budget tracker (issue #882)."""
import time
from unittest.mock import patch
import pytest
from infrastructure.models.budget import (
BudgetTracker,
SpendRecord,
estimate_cost_usd,
get_budget_tracker,
)
pytestmark = pytest.mark.unit
# ── estimate_cost_usd ─────────────────────────────────────────────────────────
class TestEstimateCostUsd:
def test_haiku_cheaper_than_sonnet(self):
haiku_cost = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
sonnet_cost = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
assert haiku_cost < sonnet_cost
def test_zero_tokens_is_zero_cost(self):
assert estimate_cost_usd("gpt-4o", 0, 0) == 0.0
def test_unknown_model_uses_default(self):
cost = estimate_cost_usd("some-unknown-model-xyz", 1000, 1000)
assert cost > 0 # Uses conservative default, not zero
def test_versioned_model_name_matches(self):
# "claude-haiku-4-5-20251001" should match "haiku"
cost1 = estimate_cost_usd("claude-haiku-4-5-20251001", 1000, 0)
cost2 = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
assert cost1 == cost2
def test_gpt4o_mini_cheaper_than_gpt4o(self):
mini = estimate_cost_usd("gpt-4o-mini", 1000, 1000)
full = estimate_cost_usd("gpt-4o", 1000, 1000)
assert mini < full
def test_returns_float(self):
assert isinstance(estimate_cost_usd("haiku", 100, 200), float)
# ── BudgetTracker ─────────────────────────────────────────────────────────────
class TestBudgetTrackerInit:
def test_creates_with_memory_db(self):
tracker = BudgetTracker(db_path=":memory:")
assert tracker._db_ok is True
def test_in_memory_fallback_empty_on_creation(self):
tracker = BudgetTracker(db_path=":memory:")
assert tracker._in_memory == []
def test_bad_path_uses_memory_fallback(self, tmp_path):
bad_path = str(tmp_path / "nonexistent" / "x" / "budget.db")
# Should not raise — just log and continue with memory fallback
# (actually will create parent dirs, so test with truly bad path)
tracker = BudgetTracker.__new__(BudgetTracker)
tracker._db_path = bad_path
tracker._lock = __import__("threading").Lock()
tracker._in_memory = []
tracker._db_ok = False
# Record to in-memory fallback
tracker._in_memory.append(
SpendRecord(time.time(), "test", "model", 100, 100, 0.001, "cloud")
)
assert len(tracker._in_memory) == 1
class TestBudgetTrackerRecordSpend:
def test_record_spend_returns_cost(self):
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
assert cost > 0
def test_record_spend_explicit_cost(self):
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "model", cost_usd=1.23)
assert cost == pytest.approx(1.23)
def test_record_spend_accumulates(self):
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
tracker.record_spend("openai", "gpt-4o", cost_usd=0.02)
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
def test_record_spend_with_tier_label(self):
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "haiku", tier="cloud_api")
assert cost >= 0
def test_monthly_spend_includes_daily(self):
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=5.00)
assert tracker.get_monthly_spend() >= tracker.get_daily_spend()
class TestBudgetTrackerCloudAllowed:
def test_allowed_when_no_spend(self):
tracker = BudgetTracker(db_path=":memory:")
with (
patch.object(type(tracker._get_budget() if hasattr(tracker, "_get_budget") else tracker), "tier_cloud_daily_budget_usd", 5.0, create=True),
):
# Settings-based check — use real settings (5.0 default, 0 spent)
assert tracker.cloud_allowed() is True
def test_blocked_when_daily_limit_exceeded(self):
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
# With default daily limit of 5.0, 999 should block
assert tracker.cloud_allowed() is False
def test_allowed_when_daily_limit_zero(self):
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with (
patch("infrastructure.models.budget.settings") as mock_settings,
):
mock_settings.tier_cloud_daily_budget_usd = 0 # disabled
mock_settings.tier_cloud_monthly_budget_usd = 0 # disabled
assert tracker.cloud_allowed() is True
def test_blocked_when_monthly_limit_exceeded(self):
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # daily disabled
mock_settings.tier_cloud_monthly_budget_usd = 10.0
assert tracker.cloud_allowed() is False
class TestBudgetTrackerSummary:
def test_summary_keys_present(self):
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert "daily_usd" in summary
assert "monthly_usd" in summary
assert "daily_limit_usd" in summary
assert "monthly_limit_usd" in summary
assert "daily_ok" in summary
assert "monthly_ok" in summary
def test_summary_daily_ok_true_on_empty(self):
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert summary["daily_ok"] is True
assert summary["monthly_ok"] is True
def test_summary_daily_ok_false_when_exceeded(self):
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=999.0)
summary = tracker.get_summary()
assert summary["daily_ok"] is False
# ── Singleton ─────────────────────────────────────────────────────────────────
class TestGetBudgetTrackerSingleton:
def test_returns_budget_tracker(self):
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker = get_budget_tracker()
assert isinstance(tracker, BudgetTracker)
def test_returns_same_instance(self):
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
t1 = get_budget_tracker()
t2 = get_budget_tracker()
assert t1 is t2

View File

@@ -7,8 +7,6 @@ from unittest.mock import patch
import pytest
import infrastructure.events.bus as bus_module
pytestmark = pytest.mark.unit
from infrastructure.events.bus import (
Event,
EventBus,
@@ -354,14 +352,6 @@ class TestEventBusPersistence:
events = bus.replay()
assert events == []
def test_init_persistence_db_noop_when_path_is_none(self):
"""_init_persistence_db() is a no-op when _persistence_db_path is None."""
bus = EventBus()
# _persistence_db_path is None by default; calling _init_persistence_db
# should silently return without touching the filesystem.
bus._init_persistence_db() # must not raise
assert bus._persistence_db_path is None
async def test_wal_mode_on_persistence_db(self, persistent_bus):
"""Persistence database should use WAL mode."""
conn = sqlite3.connect(str(persistent_bus._persistence_db_path))

View File

@@ -1,589 +0,0 @@
"""Graceful degradation test scenarios — Issue #919.
Tests specifically for service failure paths and fallback logic:
* Ollama health-check failures (connection refused, timeout, HTTP errors)
* Cascade router: Ollama down → falls back to Anthropic/cloud provider
* Circuit-breaker lifecycle: CLOSED → OPEN (repeated failures) → HALF_OPEN (recovery window)
* All providers fail → descriptive RuntimeError
* Disabled provider skipped without touching circuit breaker
* ``requests`` library unavailable → optimistic availability assumption
* ClaudeBackend / GrokBackend no-key graceful messages
* Chat store: SQLite directory auto-creation and concurrent access safety
"""
from __future__ import annotations
import threading
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.router.cascade import (
CascadeRouter,
CircuitState,
Provider,
ProviderStatus,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_ollama_provider(name: str = "local-ollama", priority: int = 1) -> Provider:
return Provider(
name=name,
type="ollama",
enabled=True,
priority=priority,
url="http://localhost:11434",
models=[{"name": "llama3", "default": True}],
)
def _make_anthropic_provider(name: str = "cloud-fallback", priority: int = 2) -> Provider:
return Provider(
name=name,
type="anthropic",
enabled=True,
priority=priority,
api_key="sk-ant-test",
models=[{"name": "claude-haiku-4-5-20251001", "default": True}],
)
# ---------------------------------------------------------------------------
# Ollama health-check failure scenarios
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestOllamaHealthCheckFailures:
"""_check_provider_available returns False for all Ollama failure modes."""
def _router(self) -> CascadeRouter:
return CascadeRouter(config_path=Path("/nonexistent"))
def test_connection_refused_returns_false(self):
"""Connection refused during Ollama health check → provider excluded."""
router = self._router()
provider = _make_ollama_provider()
with patch("infrastructure.router.cascade.requests") as mock_req:
mock_req.get.side_effect = ConnectionError("Connection refused")
assert router._check_provider_available(provider) is False
def test_timeout_returns_false(self):
"""Request timeout during Ollama health check → provider excluded."""
router = self._router()
provider = _make_ollama_provider()
with patch("infrastructure.router.cascade.requests") as mock_req:
# Simulate a timeout using a generic OSError (matches real-world timeout behaviour)
mock_req.get.side_effect = OSError("timed out")
assert router._check_provider_available(provider) is False
def test_http_503_returns_false(self):
"""HTTP 503 from Ollama health endpoint → provider excluded."""
router = self._router()
provider = _make_ollama_provider()
mock_response = MagicMock()
mock_response.status_code = 503
with patch("infrastructure.router.cascade.requests") as mock_req:
mock_req.get.return_value = mock_response
assert router._check_provider_available(provider) is False
def test_http_500_returns_false(self):
"""HTTP 500 from Ollama health endpoint → provider excluded."""
router = self._router()
provider = _make_ollama_provider()
mock_response = MagicMock()
mock_response.status_code = 500
with patch("infrastructure.router.cascade.requests") as mock_req:
mock_req.get.return_value = mock_response
assert router._check_provider_available(provider) is False
def test_generic_exception_returns_false(self):
"""Unexpected exception during Ollama check → provider excluded (no crash)."""
router = self._router()
provider = _make_ollama_provider()
with patch("infrastructure.router.cascade.requests") as mock_req:
mock_req.get.side_effect = RuntimeError("unexpected error")
assert router._check_provider_available(provider) is False
def test_requests_unavailable_assumes_available(self):
"""When ``requests`` lib is None, Ollama availability is assumed True."""
import infrastructure.router.cascade as cascade_module
router = self._router()
provider = _make_ollama_provider()
old_requests = cascade_module.requests
cascade_module.requests = None
try:
assert router._check_provider_available(provider) is True
finally:
cascade_module.requests = old_requests
# ---------------------------------------------------------------------------
# Cascade: Ollama fails → Anthropic fallback
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestOllamaToAnthropicFallback:
"""Cascade router falls back to Anthropic when Ollama is unavailable or failing."""
@pytest.mark.asyncio
async def test_ollama_connection_refused_falls_back_to_anthropic(self):
"""When Ollama raises a connection error, cascade uses Anthropic provider."""
router = CascadeRouter(config_path=Path("/nonexistent"))
ollama_provider = _make_ollama_provider(priority=1)
anthropic_provider = _make_anthropic_provider(priority=2)
router.providers = [ollama_provider, anthropic_provider]
with (
patch.object(router, "_call_ollama", side_effect=ConnectionError("refused")),
patch.object(
router,
"_call_anthropic",
new_callable=AsyncMock,
return_value={"content": "fallback response", "model": "claude-haiku-4-5-20251001"},
),
# Allow cloud bypass of the metabolic quota gate in test
patch.object(router, "_quota_allows_cloud", return_value=True),
):
result = await router.complete(
messages=[{"role": "user", "content": "hello"}],
model="llama3",
)
assert result["provider"] == "cloud-fallback"
assert "fallback response" in result["content"]
@pytest.mark.asyncio
async def test_ollama_circuit_open_skips_to_anthropic(self):
"""When Ollama circuit is OPEN, cascade skips directly to Anthropic."""
router = CascadeRouter(config_path=Path("/nonexistent"))
ollama_provider = _make_ollama_provider(priority=1)
anthropic_provider = _make_anthropic_provider(priority=2)
router.providers = [ollama_provider, anthropic_provider]
# Force the circuit open on Ollama
ollama_provider.circuit_state = CircuitState.OPEN
ollama_provider.status = ProviderStatus.UNHEALTHY
import time
ollama_provider.circuit_opened_at = time.time() # just opened — not yet recoverable
with (
patch.object(
router,
"_call_anthropic",
new_callable=AsyncMock,
return_value={"content": "cloud answer", "model": "claude-haiku-4-5-20251001"},
) as mock_anthropic,
# Allow cloud bypass of the metabolic quota gate in test
patch.object(router, "_quota_allows_cloud", return_value=True),
):
result = await router.complete(
messages=[{"role": "user", "content": "ping"}],
)
mock_anthropic.assert_called_once()
assert result["provider"] == "cloud-fallback"
@pytest.mark.asyncio
async def test_all_providers_fail_raises_runtime_error(self):
"""When every provider fails, RuntimeError is raised with combined error info."""
router = CascadeRouter(config_path=Path("/nonexistent"))
ollama_provider = _make_ollama_provider(priority=1)
anthropic_provider = _make_anthropic_provider(priority=2)
router.providers = [ollama_provider, anthropic_provider]
with (
patch.object(router, "_call_ollama", side_effect=RuntimeError("Ollama down")),
patch.object(router, "_call_anthropic", side_effect=RuntimeError("API quota exceeded")),
patch.object(router, "_quota_allows_cloud", return_value=True),
):
with pytest.raises(RuntimeError, match="All providers failed"):
await router.complete(messages=[{"role": "user", "content": "test"}])
@pytest.mark.asyncio
async def test_error_message_includes_individual_provider_errors(self):
"""RuntimeError from all-fail scenario lists each provider's error."""
router = CascadeRouter(config_path=Path("/nonexistent"))
ollama_provider = _make_ollama_provider(priority=1)
anthropic_provider = _make_anthropic_provider(priority=2)
router.providers = [ollama_provider, anthropic_provider]
router.config.max_retries_per_provider = 1
with (
patch.object(router, "_call_ollama", side_effect=RuntimeError("connection refused")),
patch.object(router, "_call_anthropic", side_effect=RuntimeError("rate limit")),
patch.object(router, "_quota_allows_cloud", return_value=True),
):
with pytest.raises(RuntimeError) as exc_info:
await router.complete(messages=[{"role": "user", "content": "test"}])
error_msg = str(exc_info.value)
assert "connection refused" in error_msg
assert "rate limit" in error_msg
# ---------------------------------------------------------------------------
# Circuit-breaker lifecycle
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestCircuitBreakerLifecycle:
"""Full CLOSED → OPEN → HALF_OPEN → CLOSED lifecycle."""
def test_closed_initially(self):
"""New provider starts with circuit CLOSED and HEALTHY status."""
provider = _make_ollama_provider()
assert provider.circuit_state == CircuitState.CLOSED
assert provider.status == ProviderStatus.HEALTHY
def test_open_after_threshold_failures(self):
"""Circuit opens once consecutive failures reach the threshold."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_failure_threshold = 3
provider = _make_ollama_provider()
for _ in range(3):
router._record_failure(provider)
assert provider.circuit_state == CircuitState.OPEN
assert provider.status == ProviderStatus.UNHEALTHY
assert provider.circuit_opened_at is not None
def test_open_circuit_skips_provider(self):
"""_is_provider_available returns False when circuit is OPEN (and timeout not elapsed)."""
import time
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_recovery_timeout = 9999 # won't elapse during test
provider = _make_ollama_provider()
provider.circuit_state = CircuitState.OPEN
provider.status = ProviderStatus.UNHEALTHY
provider.circuit_opened_at = time.time()
assert router._is_provider_available(provider) is False
def test_half_open_after_recovery_timeout(self):
"""After the recovery timeout elapses, _is_provider_available transitions to HALF_OPEN."""
import time
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_recovery_timeout = 0.01 # 10 ms
provider = _make_ollama_provider()
provider.circuit_state = CircuitState.OPEN
provider.status = ProviderStatus.UNHEALTHY
provider.circuit_opened_at = time.time() - 1.0 # clearly elapsed
result = router._is_provider_available(provider)
assert result is True
assert provider.circuit_state == CircuitState.HALF_OPEN
def test_closed_after_half_open_successes(self):
"""Circuit closes after enough successful half-open test calls."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_half_open_max_calls = 2
provider = _make_ollama_provider()
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
router._record_success(provider, 50.0)
assert provider.circuit_state == CircuitState.HALF_OPEN # not yet
router._record_success(provider, 50.0)
assert provider.circuit_state == CircuitState.CLOSED
assert provider.status == ProviderStatus.HEALTHY
assert provider.metrics.consecutive_failures == 0
def test_failure_in_half_open_reopens_circuit(self):
"""A failure during HALF_OPEN increments consecutive failures, reopening if threshold met."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_failure_threshold = 1 # reopen on first failure
provider = _make_ollama_provider()
provider.circuit_state = CircuitState.HALF_OPEN
router._record_failure(provider)
assert provider.circuit_state == CircuitState.OPEN
def test_disabled_provider_skipped_without_circuit_change(self):
"""A disabled provider is immediately rejected; its circuit state is not touched."""
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = _make_ollama_provider()
provider.enabled = False
available = router._is_provider_available(provider)
assert available is False
assert provider.circuit_state == CircuitState.CLOSED # unchanged
# ---------------------------------------------------------------------------
# ClaudeBackend graceful degradation
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestClaudeBackendGracefulDegradation:
"""ClaudeBackend degrades gracefully when the API is unavailable."""
def test_run_no_key_returns_unconfigured_message(self):
"""run() returns a graceful message when no API key is set."""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend(api_key="", model="haiku")
result = backend.run("hello")
assert "not configured" in result.content.lower()
assert "ANTHROPIC_API_KEY" in result.content
def test_run_api_error_returns_unavailable_message(self):
"""run() returns a graceful error when the Anthropic API raises."""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend(api_key="sk-ant-test", model="haiku")
mock_client = MagicMock()
mock_client.messages.create.side_effect = ConnectionError("API unreachable")
with patch.object(backend, "_get_client", return_value=mock_client):
result = backend.run("ping")
assert "unavailable" in result.content.lower()
def test_health_check_no_key_reports_error(self):
"""health_check() reports not-ok when API key is missing."""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend(api_key="", model="haiku")
status = backend.health_check()
assert status["ok"] is False
assert "ANTHROPIC_API_KEY" in status["error"]
def test_health_check_api_error_reports_error(self):
"""health_check() returns ok=False and captures the error on API failure."""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend(api_key="sk-ant-test", model="haiku")
mock_client = MagicMock()
mock_client.messages.create.side_effect = RuntimeError("connection timed out")
with patch.object(backend, "_get_client", return_value=mock_client):
status = backend.health_check()
assert status["ok"] is False
assert "connection timed out" in status["error"]
# ---------------------------------------------------------------------------
# GrokBackend graceful degradation
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestGrokBackendGracefulDegradation:
"""GrokBackend degrades gracefully when xAI API is unavailable."""
def test_run_no_key_returns_unconfigured_message(self):
"""run() returns a graceful message when no XAI_API_KEY is set."""
from timmy.backends import GrokBackend
backend = GrokBackend(api_key="", model="grok-3-mini")
result = backend.run("hello")
assert "not configured" in result.content.lower()
def test_run_api_error_returns_unavailable_message(self):
"""run() returns graceful error when xAI API raises."""
from timmy.backends import GrokBackend
backend = GrokBackend(api_key="xai-test-key", model="grok-3-mini")
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = RuntimeError("network error")
with patch.object(backend, "_get_client", return_value=mock_client):
result = backend.run("ping")
assert "unavailable" in result.content.lower()
def test_health_check_no_key_reports_error(self):
"""health_check() reports not-ok when XAI_API_KEY is missing."""
from timmy.backends import GrokBackend
backend = GrokBackend(api_key="", model="grok-3-mini")
status = backend.health_check()
assert status["ok"] is False
assert "XAI_API_KEY" in status["error"]
# ---------------------------------------------------------------------------
# Chat store: SQLite resilience
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestChatStoreSQLiteResilience:
"""MessageLog handles edge cases without crashing."""
def test_auto_creates_missing_parent_directory(self, tmp_path):
"""MessageLog creates the data directory automatically on first use."""
from infrastructure.chat_store import MessageLog
db_path = tmp_path / "deep" / "nested" / "chat.db"
assert not db_path.parent.exists()
log = MessageLog(db_path=db_path)
log.append("user", "hello", "2026-01-01T00:00:00")
assert db_path.exists()
assert len(log) == 1
log.close()
def test_concurrent_appends_are_safe(self, tmp_path):
"""Multiple threads appending simultaneously do not corrupt the DB."""
from infrastructure.chat_store import MessageLog
db_path = tmp_path / "chat.db"
log = MessageLog(db_path=db_path)
errors: list[Exception] = []
def write_messages(thread_id: int) -> None:
try:
for i in range(10):
log.append("user", f"thread {thread_id} msg {i}", "2026-01-01T00:00:00")
except Exception as exc:
errors.append(exc)
threads = [threading.Thread(target=write_messages, args=(t,)) for t in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert errors == [], f"Concurrent writes produced errors: {errors}"
# 5 threads × 10 messages each
assert len(log) == 50
log.close()
def test_all_returns_messages_in_insertion_order(self, tmp_path):
"""all() returns messages ordered oldest-first."""
from infrastructure.chat_store import MessageLog
db_path = tmp_path / "chat.db"
log = MessageLog(db_path=db_path)
log.append("user", "first", "2026-01-01T00:00:00")
log.append("agent", "second", "2026-01-01T00:00:01")
log.append("user", "third", "2026-01-01T00:00:02")
messages = log.all()
assert [m.content for m in messages] == ["first", "second", "third"]
log.close()
def test_recent_returns_latest_n_messages(self, tmp_path):
"""recent(n) returns the n most recent messages, oldest-first within the slice."""
from infrastructure.chat_store import MessageLog
db_path = tmp_path / "chat.db"
log = MessageLog(db_path=db_path)
for i in range(20):
log.append("user", f"msg {i}", f"2026-01-01T00:{i:02d}:00")
recent = log.recent(5)
assert len(recent) == 5
assert recent[0].content == "msg 15"
assert recent[-1].content == "msg 19"
log.close()
def test_prune_keeps_max_messages(self, tmp_path):
"""append() prunes oldest messages when count exceeds MAX_MESSAGES."""
import infrastructure.chat_store as store_mod
from infrastructure.chat_store import MessageLog
original_max = store_mod.MAX_MESSAGES
store_mod.MAX_MESSAGES = 5
try:
db_path = tmp_path / "chat.db"
log = MessageLog(db_path=db_path)
for i in range(8):
log.append("user", f"msg {i}", "2026-01-01T00:00:00")
assert len(log) == 5
messages = log.all()
# Oldest 3 should be pruned
assert messages[0].content == "msg 3"
log.close()
finally:
store_mod.MAX_MESSAGES = original_max
# ---------------------------------------------------------------------------
# Provider availability: requests lib missing
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestRequestsLibraryMissing:
"""When ``requests`` is not installed, providers assume they are available."""
def _swap_requests(self, value):
import infrastructure.router.cascade as cascade_module
old = cascade_module.requests
cascade_module.requests = value
return old
def test_ollama_assumes_available_without_requests(self):
"""Ollama provider returns True when requests is None."""
import infrastructure.router.cascade as cascade_module
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = _make_ollama_provider()
old = self._swap_requests(None)
try:
assert router._check_provider_available(provider) is True
finally:
cascade_module.requests = old
def test_vllm_mlx_assumes_available_without_requests(self):
"""vllm-mlx provider returns True when requests is None."""
import infrastructure.router.cascade as cascade_module
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="vllm-local",
type="vllm_mlx",
enabled=True,
priority=1,
base_url="http://localhost:8000/v1",
)
old = self._swap_requests(None)
try:
assert router._check_provider_available(provider) is True
finally:
cascade_module.requests = old

View File

@@ -1,380 +0,0 @@
"""Tests for the tiered model router (issue #882).
Covers:
- classify_tier() for Tier-1/2/3 routing
- TieredModelRouter.route() with mocked CascadeRouter + BudgetTracker
- Auto-escalation from Tier-1 on low-quality responses
- Cloud-tier budget guard
- Acceptance criteria from the issue:
- "Walk to the next room" → LOCAL_FAST
- "Plan the optimal path to become Hortator" → LOCAL_HEAVY
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.models.router import (
TierLabel,
TieredModelRouter,
_is_low_quality,
classify_tier,
get_tiered_router,
)
pytestmark = pytest.mark.unit
# ── classify_tier ─────────────────────────────────────────────────────────────
class TestClassifyTier:
# ── Tier-1 (LOCAL_FAST) ────────────────────────────────────────────────
def test_simple_navigation_is_local_fast(self):
assert classify_tier("walk to the next room") == TierLabel.LOCAL_FAST
def test_go_north_is_local_fast(self):
assert classify_tier("go north") == TierLabel.LOCAL_FAST
def test_single_binary_choice_is_local_fast(self):
assert classify_tier("yes") == TierLabel.LOCAL_FAST
def test_open_door_is_local_fast(self):
assert classify_tier("open door") == TierLabel.LOCAL_FAST
def test_attack_is_local_fast(self):
assert classify_tier("attack", {}) == TierLabel.LOCAL_FAST
# ── Tier-2 (LOCAL_HEAVY) ───────────────────────────────────────────────
def test_quest_planning_is_local_heavy(self):
assert classify_tier("plan the optimal path to become Hortator") == TierLabel.LOCAL_HEAVY
def test_strategy_keyword_is_local_heavy(self):
assert classify_tier("what is the best strategy") == TierLabel.LOCAL_HEAVY
def test_stuck_state_escalates_to_local_heavy(self):
assert classify_tier("help me", {"stuck": True}) == TierLabel.LOCAL_HEAVY
def test_require_t2_flag_is_local_heavy(self):
assert classify_tier("go north", {"require_t2": True}) == TierLabel.LOCAL_HEAVY
def test_long_input_is_local_heavy(self):
long_task = "tell me about " + ("the dungeon " * 30)
assert classify_tier(long_task) == TierLabel.LOCAL_HEAVY
def test_active_quests_upgrades_to_local_heavy(self):
ctx = {"active_quests": ["Q1", "Q2", "Q3"]}
assert classify_tier("go north", ctx) == TierLabel.LOCAL_HEAVY
def test_dialogue_active_upgrades_to_local_heavy(self):
ctx = {"dialogue_active": True}
assert classify_tier("yes", ctx) == TierLabel.LOCAL_HEAVY
def test_analyze_is_local_heavy(self):
assert classify_tier("analyze the situation") == TierLabel.LOCAL_HEAVY
def test_optimize_is_local_heavy(self):
assert classify_tier("optimize my build") == TierLabel.LOCAL_HEAVY
def test_negotiate_is_local_heavy(self):
assert classify_tier("negotiate with the Camonna Tong") == TierLabel.LOCAL_HEAVY
def test_explain_is_local_heavy(self):
assert classify_tier("explain the faction system") == TierLabel.LOCAL_HEAVY
# ── Tier-3 (CLOUD_API) ─────────────────────────────────────────────────
def test_require_cloud_flag_is_cloud_api(self):
assert classify_tier("go north", {"require_cloud": True}) == TierLabel.CLOUD_API
def test_require_cloud_overrides_everything(self):
assert classify_tier("yes", {"require_cloud": True}) == TierLabel.CLOUD_API
# ── Edge cases ────────────────────────────────────────────────────────
def test_empty_task_defaults_to_local_heavy(self):
# Empty string → nothing classifies it as T1 or T3
assert classify_tier("") == TierLabel.LOCAL_HEAVY
def test_case_insensitive(self):
assert classify_tier("PLAN my route") == TierLabel.LOCAL_HEAVY
def test_combat_active_upgrades_t1_to_heavy(self):
ctx = {"combat_active": True}
# "attack" is T1 word, but combat context → should NOT be LOCAL_FAST
result = classify_tier("attack", ctx)
assert result != TierLabel.LOCAL_FAST
# ── _is_low_quality ───────────────────────────────────────────────────────────
class TestIsLowQuality:
def test_empty_is_low_quality(self):
assert _is_low_quality("", TierLabel.LOCAL_FAST) is True
def test_whitespace_only_is_low_quality(self):
assert _is_low_quality(" ", TierLabel.LOCAL_FAST) is True
def test_very_short_is_low_quality(self):
assert _is_low_quality("ok", TierLabel.LOCAL_FAST) is True
def test_idontknow_is_low_quality(self):
assert _is_low_quality("I don't know how to help with that.", TierLabel.LOCAL_FAST) is True
def test_not_sure_is_low_quality(self):
assert _is_low_quality("I'm not sure about this.", TierLabel.LOCAL_FAST) is True
def test_as_an_ai_is_low_quality(self):
assert _is_low_quality("As an AI, I cannot...", TierLabel.LOCAL_FAST) is True
def test_good_response_is_not_low_quality(self):
response = "You move north into the Vivec Canton. The Ordinators watch your approach."
assert _is_low_quality(response, TierLabel.LOCAL_FAST) is False
def test_t1_short_response_triggers_escalation(self):
# Less than _ESCALATION_MIN_CHARS for T1
assert _is_low_quality("OK, done.", TierLabel.LOCAL_FAST) is True
def test_borderline_ok_for_t2_not_t1(self):
# Between _LOW_QUALITY_MIN_CHARS (20) and _ESCALATION_MIN_CHARS (60)
# → low quality for T1 (escalation threshold), but acceptable for T2/T3
response = "Done. The item is retrieved." # 28 chars: ≥20, <60
assert _is_low_quality(response, TierLabel.LOCAL_FAST) is True
assert _is_low_quality(response, TierLabel.LOCAL_HEAVY) is False
# ── TieredModelRouter ─────────────────────────────────────────────────────────
_GOOD_CONTENT = (
"You move north through the doorway into the next room. "
"The stone walls glisten with moisture."
) # 90 chars — well above the escalation threshold
def _make_cascade_mock(content=_GOOD_CONTENT, model="llama3.1:8b"):
mock = MagicMock()
mock.complete = AsyncMock(
return_value={
"content": content,
"provider": "ollama-local",
"model": model,
"latency_ms": 150.0,
}
)
return mock
def _make_budget_mock(allowed=True):
mock = MagicMock()
mock.cloud_allowed = MagicMock(return_value=allowed)
mock.record_spend = MagicMock(return_value=0.001)
return mock
@pytest.mark.asyncio
class TestTieredModelRouterRoute:
async def test_route_returns_tier_in_result(self):
router = TieredModelRouter(cascade=_make_cascade_mock())
result = await router.route("go north")
assert "tier" in result
assert result["tier"] == TierLabel.LOCAL_FAST
async def test_acceptance_walk_to_room_is_local_fast(self):
"""Acceptance: 'Walk to the next room' → LOCAL_FAST."""
router = TieredModelRouter(cascade=_make_cascade_mock())
result = await router.route("Walk to the next room")
assert result["tier"] == TierLabel.LOCAL_FAST
async def test_acceptance_plan_hortator_is_local_heavy(self):
"""Acceptance: 'Plan the optimal path to become Hortator' → LOCAL_HEAVY."""
router = TieredModelRouter(
cascade=_make_cascade_mock(model="hermes3:70b"),
)
result = await router.route("Plan the optimal path to become Hortator")
assert result["tier"] == TierLabel.LOCAL_HEAVY
async def test_t1_low_quality_escalates_to_t2(self):
"""Failed Tier-1 response auto-escalates to Tier-2."""
call_models = []
cascade = MagicMock()
async def complete_side_effect(messages, model, temperature, max_tokens):
call_models.append(model)
# First call (T1) returns a low-quality response
if len(call_models) == 1:
return {
"content": "I don't know.",
"provider": "ollama",
"model": model,
"latency_ms": 50,
}
# Second call (T2) returns a good response
return {
"content": "You move to the northern passage, passing through the Dunmer stronghold.",
"provider": "ollama",
"model": model,
"latency_ms": 800,
}
cascade.complete = complete_side_effect
router = TieredModelRouter(cascade=cascade, auto_escalate=True)
result = await router.route("go north")
assert len(call_models) == 2, "Should have called twice (T1 escalated to T2)"
assert result["tier"] == TierLabel.LOCAL_HEAVY
async def test_auto_escalate_false_no_escalation(self):
"""With auto_escalate=False, low-quality T1 response is returned as-is."""
call_count = {"n": 0}
cascade = MagicMock()
async def complete_side_effect(**kwargs):
call_count["n"] += 1
return {
"content": "I don't know.",
"provider": "ollama",
"model": "llama3.1:8b",
"latency_ms": 50,
}
cascade.complete = AsyncMock(side_effect=complete_side_effect)
router = TieredModelRouter(cascade=cascade, auto_escalate=False)
result = await router.route("go north")
assert call_count["n"] == 1
assert result["tier"] == TierLabel.LOCAL_FAST
async def test_t2_failure_escalates_to_cloud(self):
"""Tier-2 failure escalates to Cloud API (when budget allows)."""
cascade = MagicMock()
call_models = []
async def complete_side_effect(messages, model, temperature, max_tokens):
call_models.append(model)
if "hermes3" in model or "70b" in model.lower():
raise RuntimeError("Tier-2 model unavailable")
return {
"content": "Cloud response here.",
"provider": "anthropic",
"model": model,
"latency_ms": 1200,
}
cascade.complete = complete_side_effect
budget = _make_budget_mock(allowed=True)
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
result = await router.route("plan my route", context={"require_t2": True})
assert result["tier"] == TierLabel.CLOUD_API
async def test_cloud_blocked_by_budget_raises(self):
"""Cloud tier blocked when budget is exhausted."""
cascade = MagicMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("T2 fail"))
budget = _make_budget_mock(allowed=False)
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
with pytest.raises(RuntimeError, match="budget limit"):
await router.route("plan my route", context={"require_t2": True})
async def test_explicit_cloud_tier_uses_cloud_model(self):
cascade = _make_cascade_mock(model="claude-haiku-4-5")
budget = _make_budget_mock(allowed=True)
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
result = await router.route("go north", context={"require_cloud": True})
assert result["tier"] == TierLabel.CLOUD_API
async def test_cloud_spend_recorded_with_usage(self):
"""Cloud spend is recorded when the response includes usage info."""
cascade = MagicMock()
cascade.complete = AsyncMock(
return_value={
"content": "Cloud answer.",
"provider": "anthropic",
"model": "claude-haiku-4-5",
"latency_ms": 900,
"usage": {"prompt_tokens": 50, "completion_tokens": 100},
}
)
budget = _make_budget_mock(allowed=True)
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
result = await router.route("go north", context={"require_cloud": True})
budget.record_spend.assert_called_once()
assert "cost_usd" in result
async def test_cloud_spend_not_recorded_without_usage(self):
"""Cloud spend is not recorded when usage info is absent."""
cascade = MagicMock()
cascade.complete = AsyncMock(
return_value={
"content": "Cloud answer.",
"provider": "anthropic",
"model": "claude-haiku-4-5",
"latency_ms": 900,
# no "usage" key
}
)
budget = _make_budget_mock(allowed=True)
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
result = await router.route("go north", context={"require_cloud": True})
budget.record_spend.assert_not_called()
assert "cost_usd" not in result
async def test_custom_tier_models_respected(self):
cascade = _make_cascade_mock()
router = TieredModelRouter(
cascade=cascade,
tier_models={TierLabel.LOCAL_FAST: "llama3.2:3b"},
)
await router.route("go north")
call_kwargs = cascade.complete.call_args
assert call_kwargs.kwargs["model"] == "llama3.2:3b"
async def test_messages_override_used_when_provided(self):
cascade = _make_cascade_mock()
router = TieredModelRouter(cascade=cascade)
custom_msgs = [{"role": "user", "content": "custom message"}]
await router.route("go north", messages=custom_msgs)
call_kwargs = cascade.complete.call_args
assert call_kwargs.kwargs["messages"] == custom_msgs
async def test_temperature_forwarded(self):
cascade = _make_cascade_mock()
router = TieredModelRouter(cascade=cascade)
await router.route("go north", temperature=0.7)
call_kwargs = cascade.complete.call_args
assert call_kwargs.kwargs["temperature"] == 0.7
async def test_max_tokens_forwarded(self):
cascade = _make_cascade_mock()
router = TieredModelRouter(cascade=cascade)
await router.route("go north", max_tokens=128)
call_kwargs = cascade.complete.call_args
assert call_kwargs.kwargs["max_tokens"] == 128
class TestTieredModelRouterClassify:
def test_classify_delegates_to_classify_tier(self):
router = TieredModelRouter(cascade=MagicMock())
assert router.classify("go north") == classify_tier("go north")
assert router.classify("plan the quest") == classify_tier("plan the quest")
class TestGetTieredRouterSingleton:
def test_returns_tiered_router_instance(self):
import infrastructure.models.router as rmod
rmod._tiered_router = None
router = get_tiered_router()
assert isinstance(router, TieredModelRouter)
def test_singleton_returns_same_instance(self):
import infrastructure.models.router as rmod
rmod._tiered_router = None
r1 = get_tiered_router()
r2 = get_tiered_router()
assert r1 is r2

View File

@@ -1,379 +0,0 @@
"""Tests for the sovereignty perception cache (template matching).
Refs: #1261
"""
import json
from unittest.mock import patch
import numpy as np
class TestTemplate:
"""Tests for the Template dataclass."""
def test_template_default_values(self):
"""Template dataclass has correct defaults."""
from timmy.sovereignty.perception_cache import Template
image = np.array([[1, 2], [3, 4]])
template = Template(name="test_template", image=image)
assert template.name == "test_template"
assert np.array_equal(template.image, image)
assert template.threshold == 0.85
def test_template_custom_threshold(self):
"""Template can have custom threshold."""
from timmy.sovereignty.perception_cache import Template
image = np.array([[1, 2], [3, 4]])
template = Template(name="test_template", image=image, threshold=0.95)
assert template.threshold == 0.95
class TestCacheResult:
"""Tests for the CacheResult dataclass."""
def test_cache_result_with_state(self):
"""CacheResult stores confidence and state."""
from timmy.sovereignty.perception_cache import CacheResult
result = CacheResult(confidence=0.92, state={"template_name": "test"})
assert result.confidence == 0.92
assert result.state == {"template_name": "test"}
def test_cache_result_no_state(self):
"""CacheResult can have None state."""
from timmy.sovereignty.perception_cache import CacheResult
result = CacheResult(confidence=0.5, state=None)
assert result.confidence == 0.5
assert result.state is None
class TestPerceptionCacheInit:
"""Tests for PerceptionCache initialization."""
def test_init_creates_empty_cache_when_no_file(self, tmp_path):
"""Cache initializes empty when templates file doesn't exist."""
from timmy.sovereignty.perception_cache import PerceptionCache
templates_path = tmp_path / "nonexistent_templates.json"
cache = PerceptionCache(templates_path=templates_path)
assert cache.templates_path == templates_path
assert cache.templates == []
def test_init_loads_existing_templates(self, tmp_path):
"""Cache loads templates from existing JSON file."""
from timmy.sovereignty.perception_cache import PerceptionCache
templates_path = tmp_path / "templates.json"
templates_data = [
{"name": "template1", "threshold": 0.85},
{"name": "template2", "threshold": 0.90},
]
with open(templates_path, "w") as f:
json.dump(templates_data, f)
cache = PerceptionCache(templates_path=templates_path)
assert len(cache.templates) == 2
assert cache.templates[0].name == "template1"
assert cache.templates[0].threshold == 0.85
assert cache.templates[1].name == "template2"
assert cache.templates[1].threshold == 0.90
def test_init_with_string_path(self, tmp_path):
"""Cache accepts string path for templates."""
from timmy.sovereignty.perception_cache import PerceptionCache
templates_path = str(tmp_path / "templates.json")
cache = PerceptionCache(templates_path=templates_path)
assert str(cache.templates_path) == templates_path
class TestPerceptionCacheMatch:
"""Tests for PerceptionCache.match() template matching."""
def test_match_no_templates_returns_low_confidence(self, tmp_path):
"""Matching with no templates returns low confidence and None state."""
from timmy.sovereignty.perception_cache import PerceptionCache
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
screenshot = np.array([[1, 2], [3, 4]])
result = cache.match(screenshot)
assert result.confidence == 0.0
assert result.state is None
@patch("timmy.sovereignty.perception_cache.cv2")
def test_match_finds_best_template(self, mock_cv2, tmp_path):
"""Match returns the best matching template above threshold."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
# Setup mock cv2 behavior
mock_cv2.matchTemplate.return_value = np.array([[0.5, 0.6], [0.7, 0.8]])
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
mock_cv2.minMaxLoc.return_value = (None, 0.92, None, None)
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
template = Template(name="best_match", image=np.array([[1, 2], [3, 4]]))
cache.add([template])
screenshot = np.array([[5, 6], [7, 8]])
result = cache.match(screenshot)
assert result.confidence == 0.92
assert result.state == {"template_name": "best_match"}
@patch("timmy.sovereignty.perception_cache.cv2")
def test_match_respects_global_threshold(self, mock_cv2, tmp_path):
"""Match returns None state when confidence is below threshold."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
# Setup mock cv2 to return confidence below 0.85 threshold
mock_cv2.matchTemplate.return_value = np.array([[0.1, 0.2], [0.3, 0.4]])
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
mock_cv2.minMaxLoc.return_value = (None, 0.75, None, None)
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
template = Template(name="low_match", image=np.array([[1, 2], [3, 4]]))
cache.add([template])
screenshot = np.array([[5, 6], [7, 8]])
result = cache.match(screenshot)
# Confidence is recorded but state is None (below threshold)
assert result.confidence == 0.75
assert result.state is None
@patch("timmy.sovereignty.perception_cache.cv2")
def test_match_selects_highest_confidence(self, mock_cv2, tmp_path):
"""Match selects template with highest confidence across all templates."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
# Each template will return a different confidence
mock_cv2.minMaxLoc.side_effect = [
(None, 0.70, None, None), # template1
(None, 0.95, None, None), # template2 (best)
(None, 0.80, None, None), # template3
]
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
templates = [
Template(name="template1", image=np.array([[1, 2], [3, 4]])),
Template(name="template2", image=np.array([[5, 6], [7, 8]])),
Template(name="template3", image=np.array([[9, 10], [11, 12]])),
]
cache.add(templates)
screenshot = np.array([[13, 14], [15, 16]])
result = cache.match(screenshot)
assert result.confidence == 0.95
assert result.state == {"template_name": "template2"}
@patch("timmy.sovereignty.perception_cache.cv2")
def test_match_exactly_at_threshold(self, mock_cv2, tmp_path):
"""Match returns state when confidence is exactly at threshold boundary."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
mock_cv2.matchTemplate.return_value = np.array([[0.1]])
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
mock_cv2.minMaxLoc.return_value = (None, 0.85, None, None) # Exactly at threshold
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
template = Template(name="threshold_match", image=np.array([[1, 2], [3, 4]]))
cache.add([template])
screenshot = np.array([[5, 6], [7, 8]])
result = cache.match(screenshot)
# Note: current implementation uses > 0.85, so exactly 0.85 returns None state
assert result.confidence == 0.85
assert result.state is None
@patch("timmy.sovereignty.perception_cache.cv2")
def test_match_just_above_threshold(self, mock_cv2, tmp_path):
"""Match returns state when confidence is just above threshold."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
mock_cv2.matchTemplate.return_value = np.array([[0.1]])
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
mock_cv2.minMaxLoc.return_value = (None, 0.851, None, None) # Just above threshold
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
template = Template(name="above_threshold", image=np.array([[1, 2], [3, 4]]))
cache.add([template])
screenshot = np.array([[5, 6], [7, 8]])
result = cache.match(screenshot)
assert result.confidence == 0.851
assert result.state == {"template_name": "above_threshold"}
class TestPerceptionCacheAdd:
"""Tests for PerceptionCache.add() method."""
def test_add_single_template(self, tmp_path):
"""Can add a single template to the cache."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
template = Template(name="new_template", image=np.array([[1, 2], [3, 4]]))
cache.add([template])
assert len(cache.templates) == 1
assert cache.templates[0].name == "new_template"
def test_add_multiple_templates(self, tmp_path):
"""Can add multiple templates at once."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
templates = [
Template(name="template1", image=np.array([[1, 2], [3, 4]])),
Template(name="template2", image=np.array([[5, 6], [7, 8]])),
]
cache.add(templates)
assert len(cache.templates) == 2
assert cache.templates[0].name == "template1"
assert cache.templates[1].name == "template2"
def test_add_templates_accumulate(self, tmp_path):
"""Adding templates multiple times accumulates them."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
cache.add([Template(name="first", image=np.array([[1]]))])
cache.add([Template(name="second", image=np.array([[2]]))])
assert len(cache.templates) == 2
class TestPerceptionCachePersist:
"""Tests for PerceptionCache.persist() method."""
def test_persist_creates_file(self, tmp_path):
"""Persist creates templates JSON file."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
templates_path = tmp_path / "subdir" / "templates.json"
cache = PerceptionCache(templates_path=templates_path)
cache.add([Template(name="persisted", image=np.array([[1, 2], [3, 4]]))])
cache.persist()
assert templates_path.exists()
def test_persist_stores_template_names(self, tmp_path):
"""Persist stores template names and thresholds."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
templates_path = tmp_path / "templates.json"
cache = PerceptionCache(templates_path=templates_path)
cache.add([
Template(name="template1", image=np.array([[1]]), threshold=0.85),
Template(name="template2", image=np.array([[2]]), threshold=0.90),
])
cache.persist()
with open(templates_path) as f:
data = json.load(f)
assert len(data) == 2
assert data[0]["name"] == "template1"
assert data[0]["threshold"] == 0.85
assert data[1]["name"] == "template2"
assert data[1]["threshold"] == 0.90
def test_persist_does_not_store_image_data(self, tmp_path):
"""Persist only stores metadata, not actual image arrays."""
from timmy.sovereignty.perception_cache import PerceptionCache, Template
templates_path = tmp_path / "templates.json"
cache = PerceptionCache(templates_path=templates_path)
cache.add([Template(name="no_image", image=np.array([[1, 2, 3], [4, 5, 6]]))])
cache.persist()
with open(templates_path) as f:
data = json.load(f)
assert "image" not in data[0]
assert set(data[0].keys()) == {"name", "threshold"}
class TestPerceptionCacheLoad:
"""Tests for PerceptionCache.load() method."""
def test_load_from_existing_file(self, tmp_path):
"""Load restores templates from persisted file."""
from timmy.sovereignty.perception_cache import PerceptionCache
templates_path = tmp_path / "templates.json"
# Create initial cache with templates and persist
cache1 = PerceptionCache(templates_path=templates_path)
from timmy.sovereignty.perception_cache import Template
cache1.add([Template(name="loaded", image=np.array([[1]]), threshold=0.88)])
cache1.persist()
# Create new cache instance that loads from same file
cache2 = PerceptionCache(templates_path=templates_path)
assert len(cache2.templates) == 1
assert cache2.templates[0].name == "loaded"
assert cache2.templates[0].threshold == 0.88
# Note: images are loaded as empty arrays per current implementation
assert cache2.templates[0].image.size == 0
def test_load_empty_file(self, tmp_path):
"""Load handles empty template list in file."""
from timmy.sovereignty.perception_cache import PerceptionCache
templates_path = tmp_path / "templates.json"
with open(templates_path, "w") as f:
json.dump([], f)
cache = PerceptionCache(templates_path=templates_path)
assert cache.templates == []
class TestCrystallizePerception:
"""Tests for crystallize_perception function."""
def test_crystallize_returns_empty_list(self, tmp_path):
"""crystallize_perception currently returns empty list (placeholder)."""
from timmy.sovereignty.perception_cache import crystallize_perception
screenshot = np.array([[1, 2], [3, 4]])
result = crystallize_perception(screenshot, {"some": "response"})
assert result == []
def test_crystallize_accepts_any_vlm_response(self, tmp_path):
"""crystallize_perception accepts any vlm_response format."""
from timmy.sovereignty.perception_cache import crystallize_perception
screenshot = np.array([[1, 2], [3, 4]])
# Test with various response types
assert crystallize_perception(screenshot, None) == []
assert crystallize_perception(screenshot, {}) == []
assert crystallize_perception(screenshot, {"items": []}) == []
assert crystallize_perception(screenshot, "string response") == []

View File

@@ -1,696 +0,0 @@
"""Unit tests for timmy.backlog_triage — scoring, prioritization, and decision logic."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from timmy.backlog_triage import (
AGENT_CLAUDE,
AGENT_KIMI,
KIMI_READY_LABEL,
OWNER_LOGIN,
READY_THRESHOLD,
BacklogTriageLoop,
ScoredIssue,
TriageCycleResult,
TriageDecision,
_build_audit_comment,
_extract_tags,
_score_acceptance,
_score_alignment,
_score_scope,
decide,
execute_decision,
score_issue,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_raw_issue(
number: int = 1,
title: str = "Fix something broken in src/foo.py",
body: str = "## Problem\nThis crashes. Expected: no crash. Steps: run it.",
labels: list[str] | None = None,
assignees: list[str] | None = None,
created_at: str | None = None,
) -> dict:
if labels is None:
labels = []
if assignees is None:
assignees = []
if created_at is None:
created_at = datetime.now(UTC).isoformat()
return {
"number": number,
"title": title,
"body": body,
"labels": [{"name": lbl} for lbl in labels],
"assignees": [{"login": a} for a in assignees],
"created_at": created_at,
}
def _make_scored(
number: int = 1,
title: str = "Fix a bug",
issue_type: str = "bug",
score: int = 6,
ready: bool = True,
assignees: list[str] | None = None,
tags: set[str] | None = None,
is_p0: bool = False,
is_blocked: bool = False,
) -> ScoredIssue:
return ScoredIssue(
number=number,
title=title,
body="",
labels=[],
tags=tags or set(),
assignees=assignees or [],
created_at=datetime.now(UTC),
issue_type=issue_type,
score=score,
scope=2,
acceptance=2,
alignment=2,
ready=ready,
age_days=5,
is_p0=is_p0,
is_blocked=is_blocked,
)
# ---------------------------------------------------------------------------
# _extract_tags
# ---------------------------------------------------------------------------
class TestExtractTags:
def test_bracket_tags_from_title(self):
tags = _extract_tags("[feat][bug] do something", [])
assert "feat" in tags
assert "bug" in tags
def test_label_names_included(self):
tags = _extract_tags("Normal title", ["kimi-ready", "enhancement"])
assert "kimi-ready" in tags
assert "enhancement" in tags
def test_combined(self):
tags = _extract_tags("[fix] crash in module", ["p0"])
assert "fix" in tags
assert "p0" in tags
def test_empty_inputs(self):
assert _extract_tags("", []) == set()
def test_tags_are_lowercased(self):
tags = _extract_tags("[BUG][Refactor] title", ["Enhancement"])
assert "bug" in tags
assert "refactor" in tags
assert "enhancement" in tags
# ---------------------------------------------------------------------------
# _score_scope
# ---------------------------------------------------------------------------
class TestScoreScope:
def test_file_reference_adds_point(self):
score = _score_scope("Fix login", "See src/auth/login.py for details", set())
assert score >= 1
def test_function_reference_adds_point(self):
score = _score_scope("Fix login", "In the `handle_login()` method", set())
assert score >= 1
def test_short_title_adds_point(self):
score = _score_scope("Short clear title", "", set())
assert score >= 1
def test_long_title_no_bonus(self):
long_title = "A" * 90
score_long = _score_scope(long_title, "", set())
score_short = _score_scope("Short title", "", set())
assert score_short >= score_long
def test_meta_tags_reduce_score(self):
score_meta = _score_scope("Discuss src/foo.py philosophy", "def func()", {"philosophy"})
score_plain = _score_scope("Fix src/foo.py bug", "def func()", set())
assert score_meta < score_plain
def test_max_is_three(self):
score = _score_scope(
"Fix it", "See src/foo.py and `def bar()` method here", set()
)
assert score <= 3
# ---------------------------------------------------------------------------
# _score_acceptance
# ---------------------------------------------------------------------------
class TestScoreAcceptance:
def test_accept_keywords_add_points(self):
body = "Should return 200. Must pass validation. Assert no errors."
score = _score_acceptance("", body, set())
assert score >= 2
def test_test_reference_adds_point(self):
score = _score_acceptance("", "Run pytest to verify", set())
assert score >= 1
def test_structured_headers_add_point(self):
body = "## Problem\nit breaks\n## Expected\nsuccess"
score = _score_acceptance("", body, set())
assert score >= 1
def test_meta_tags_reduce_score(self):
body = "Should pass and must verify assert test_foo"
score_meta = _score_acceptance("", body, {"philosophy"})
score_plain = _score_acceptance("", body, set())
assert score_meta < score_plain
def test_max_is_three(self):
body = (
"Should pass. Must return. Expected: success. Assert no error. "
"pytest test_foo. ## Problem\ndef. ## Expected\nok"
)
score = _score_acceptance("", body, set())
assert score <= 3
# ---------------------------------------------------------------------------
# _score_alignment
# ---------------------------------------------------------------------------
class TestScoreAlignment:
def test_bug_tags_return_max(self):
assert _score_alignment("", "", {"bug"}) == 3
assert _score_alignment("", "", {"crash"}) == 3
assert _score_alignment("", "", {"hotfix"}) == 3
def test_refactor_tags_give_high_score(self):
score = _score_alignment("", "", {"refactor"})
assert score >= 2
def test_feature_tags_give_high_score(self):
score = _score_alignment("", "", {"feature"})
assert score >= 2
def test_loop_generated_adds_bonus(self):
score_with = _score_alignment("", "", {"feature", "loop-generated"})
score_without = _score_alignment("", "", {"feature"})
assert score_with >= score_without
def test_meta_tags_zero_out_score(self):
score = _score_alignment("", "", {"philosophy", "refactor"})
assert score == 0
def test_max_is_three(self):
score = _score_alignment("", "", {"feature", "loop-generated", "enhancement"})
assert score <= 3
# ---------------------------------------------------------------------------
# score_issue
# ---------------------------------------------------------------------------
class TestScoreIssue:
def test_basic_bug_issue_classified(self):
raw = _make_raw_issue(
title="[bug] fix crash in src/timmy/agent.py",
body="## Problem\nCrashes on startup. Expected: runs. Steps: python -m timmy",
)
issue = score_issue(raw)
assert issue.issue_type == "bug"
assert issue.is_p0 is True
def test_feature_issue_classified(self):
raw = _make_raw_issue(
title="[feat] add dark mode to dashboard",
body="Add a toggle button. Should switch CSS vars.",
labels=["feature"],
)
issue = score_issue(raw)
assert issue.issue_type == "feature"
def test_research_issue_classified(self):
raw = _make_raw_issue(
title="Investigate MCP performance",
labels=["kimi-ready", "research"],
)
issue = score_issue(raw)
assert issue.issue_type == "research"
assert issue.needs_kimi is True
def test_philosophy_issue_classified(self):
raw = _make_raw_issue(
title="Discussion: soul and identity",
labels=["philosophy"],
)
issue = score_issue(raw)
assert issue.issue_type == "philosophy"
def test_score_totals_components(self):
raw = _make_raw_issue()
issue = score_issue(raw)
assert issue.score == issue.scope + issue.acceptance + issue.alignment
def test_ready_flag_set_when_score_meets_threshold(self):
# Create an issue that will definitely score >= READY_THRESHOLD
raw = _make_raw_issue(
title="[bug] crash in src/core.py",
body=(
"## Problem\nCrashes when running `run()`. "
"Expected: should return 200. Must pass pytest assert."
),
labels=["bug"],
)
issue = score_issue(raw)
assert issue.ready == (issue.score >= READY_THRESHOLD)
def test_assigned_issue_reports_assignees(self):
raw = _make_raw_issue(assignees=["claude", "kimi"])
issue = score_issue(raw)
assert "claude" in issue.assignees
assert issue.is_unassigned is False
def test_unassigned_issue(self):
raw = _make_raw_issue(assignees=[])
issue = score_issue(raw)
assert issue.is_unassigned is True
def test_blocked_issue_detected(self):
raw = _make_raw_issue(
title="Fix blocked deployment", body="Blocked by infra team."
)
issue = score_issue(raw)
assert issue.is_blocked is True
def test_age_days_computed(self):
old_date = (datetime.now(UTC) - timedelta(days=30)).isoformat()
raw = _make_raw_issue(created_at=old_date)
issue = score_issue(raw)
assert issue.age_days >= 29
def test_invalid_created_at_defaults_to_now(self):
raw = _make_raw_issue(created_at="not-a-date")
issue = score_issue(raw)
assert issue.age_days == 0
def test_title_bracket_tags_stripped(self):
raw = _make_raw_issue(title="[bug][p0] crash in login")
issue = score_issue(raw)
assert "[" not in issue.title
def test_missing_body_defaults_to_empty(self):
raw = _make_raw_issue()
raw["body"] = None
issue = score_issue(raw)
assert issue.body == ""
def test_kimi_label_triggers_needs_kimi(self):
raw = _make_raw_issue(labels=[KIMI_READY_LABEL])
issue = score_issue(raw)
assert issue.needs_kimi is True
# ---------------------------------------------------------------------------
# decide
# ---------------------------------------------------------------------------
class TestDecide:
def test_philosophy_is_skipped(self):
issue = _make_scored(issue_type="philosophy")
d = decide(issue)
assert d.action == "skip"
assert "philosophy" in d.reason.lower() or "meta" in d.reason.lower()
def test_already_assigned_is_skipped(self):
issue = _make_scored(assignees=["claude"])
d = decide(issue)
assert d.action == "skip"
assert "assigned" in d.reason.lower()
def test_low_score_is_skipped(self):
issue = _make_scored(score=READY_THRESHOLD - 1, ready=False)
d = decide(issue)
assert d.action == "skip"
assert str(READY_THRESHOLD) in d.reason
def test_blocked_is_flagged_for_alex(self):
issue = _make_scored(is_blocked=True)
d = decide(issue)
assert d.action == "flag_alex"
assert d.agent == OWNER_LOGIN
def test_kimi_ready_assigned_to_kimi(self):
issue = _make_scored(tags={"kimi-ready"})
# Ensure it's unassigned and ready
issue.assignees = []
issue.ready = True
issue.is_blocked = False
issue.issue_type = "research"
d = decide(issue)
assert d.action == "assign_kimi"
assert d.agent == AGENT_KIMI
def test_research_type_assigned_to_kimi(self):
issue = _make_scored(issue_type="research", tags={"research"})
d = decide(issue)
assert d.action == "assign_kimi"
assert d.agent == AGENT_KIMI
def test_p0_bug_assigned_to_claude(self):
issue = _make_scored(issue_type="bug", is_p0=True)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_ready_feature_assigned_to_claude(self):
issue = _make_scored(issue_type="feature", score=6, ready=True)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_ready_refactor_assigned_to_claude(self):
issue = _make_scored(issue_type="refactor", score=6, ready=True)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_decision_has_issue_number(self):
issue = _make_scored(number=42)
d = decide(issue)
assert d.issue_number == 42
# ---------------------------------------------------------------------------
# _build_audit_comment
# ---------------------------------------------------------------------------
class TestBuildAuditComment:
def test_assign_claude_comment(self):
d = TriageDecision(
issue_number=1, action="assign_claude", agent=AGENT_CLAUDE, reason="Ready bug"
)
comment = _build_audit_comment(d)
assert AGENT_CLAUDE in comment
assert "Timmy Triage" in comment
assert "Ready bug" in comment
def test_assign_kimi_comment(self):
d = TriageDecision(
issue_number=2, action="assign_kimi", agent=AGENT_KIMI, reason="Research spike"
)
comment = _build_audit_comment(d)
assert KIMI_READY_LABEL in comment
def test_flag_alex_comment(self):
d = TriageDecision(
issue_number=3, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
)
comment = _build_audit_comment(d)
assert OWNER_LOGIN in comment
def test_comment_contains_autonomous_triage_note(self):
d = TriageDecision(issue_number=1, action="assign_claude", agent=AGENT_CLAUDE, reason="x")
comment = _build_audit_comment(d)
assert "Autonomous triage" in comment or "autonomous" in comment.lower()
# ---------------------------------------------------------------------------
# execute_decision (dry_run)
# ---------------------------------------------------------------------------
class TestExecuteDecisionDryRun:
@pytest.mark.asyncio
async def test_skip_action_marks_executed(self):
d = TriageDecision(issue_number=1, action="skip", reason="Already assigned")
mock_client = AsyncMock()
result = await execute_decision(mock_client, d, dry_run=True)
assert result.executed is True
mock_client.post.assert_not_called()
@pytest.mark.asyncio
async def test_dry_run_does_not_call_api(self):
d = TriageDecision(
issue_number=5, action="assign_claude", agent=AGENT_CLAUDE, reason="Ready"
)
mock_client = AsyncMock()
result = await execute_decision(mock_client, d, dry_run=True)
assert result.executed is True
mock_client.post.assert_not_called()
mock_client.patch.assert_not_called()
@pytest.mark.asyncio
async def test_dry_run_kimi_does_not_call_api(self):
d = TriageDecision(
issue_number=6, action="assign_kimi", agent=AGENT_KIMI, reason="Research"
)
mock_client = AsyncMock()
result = await execute_decision(mock_client, d, dry_run=True)
assert result.executed is True
mock_client.post.assert_not_called()
# ---------------------------------------------------------------------------
# execute_decision (live — mocked HTTP)
# ---------------------------------------------------------------------------
class TestExecuteDecisionLive:
@pytest.mark.asyncio
async def test_assign_claude_posts_comment_then_patches(self):
comment_resp = MagicMock()
comment_resp.status_code = 201
patch_resp = MagicMock()
patch_resp.status_code = 200
mock_client = AsyncMock()
mock_client.post.return_value = comment_resp
mock_client.patch.return_value = patch_resp
d = TriageDecision(
issue_number=10, action="assign_claude", agent=AGENT_CLAUDE, reason="Bug ready"
)
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
result = await execute_decision(mock_client, d, dry_run=False)
assert result.executed is True
assert result.error == ""
mock_client.post.assert_called_once()
mock_client.patch.assert_called_once()
@pytest.mark.asyncio
async def test_comment_failure_sets_error(self):
comment_resp = MagicMock()
comment_resp.status_code = 500
mock_client = AsyncMock()
mock_client.post.return_value = comment_resp
d = TriageDecision(
issue_number=11, action="assign_claude", agent=AGENT_CLAUDE, reason="Bug"
)
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
result = await execute_decision(mock_client, d, dry_run=False)
assert result.executed is False
assert result.error != ""
@pytest.mark.asyncio
async def test_flag_alex_only_posts_comment(self):
comment_resp = MagicMock()
comment_resp.status_code = 201
mock_client = AsyncMock()
mock_client.post.return_value = comment_resp
d = TriageDecision(
issue_number=12, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
)
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
result = await execute_decision(mock_client, d, dry_run=False)
assert result.executed is True
mock_client.patch.assert_not_called()
# ---------------------------------------------------------------------------
# BacklogTriageLoop
# ---------------------------------------------------------------------------
class TestBacklogTriageLoop:
def test_default_state(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
loop = BacklogTriageLoop()
assert loop.is_running is False
assert loop.cycle_count == 0
assert loop.history == []
def test_custom_interval_overrides_settings(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
loop = BacklogTriageLoop(interval=60)
assert loop._interval == 60.0
def test_stop_sets_running_false(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
loop = BacklogTriageLoop()
loop._running = True
loop.stop()
assert loop.is_running is False
@pytest.mark.asyncio
async def test_run_once_skips_when_gitea_disabled(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
result = await loop.run_once()
assert result.total_open == 0
assert result.scored == 0
@pytest.mark.asyncio
async def test_run_once_increments_cycle_count(self):
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
await loop.run_once()
await loop.run_once()
assert loop.cycle_count == 2
@pytest.mark.asyncio
async def test_run_once_full_cycle_with_mocked_gitea(self):
raw_issues = [
_make_raw_issue(
number=100,
title="[bug] crash in src/timmy/agent.py",
body=(
"## Problem\nCrashes. Expected: runs. "
"Must pass pytest. Should return 200."
),
labels=["bug"],
assignees=[],
)
]
issues_resp = MagicMock()
issues_resp.status_code = 200
issues_resp.json.side_effect = [raw_issues, []] # page 1, then empty
mock_client = AsyncMock()
mock_client.get.return_value = issues_resp
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
with patch("timmy.backlog_triage.httpx.AsyncClient") as mock_cls:
mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
result = await loop.run_once()
assert result.total_open == 1
assert result.scored == 1
assert loop.cycle_count == 1
assert len(loop.history) == 1
# ---------------------------------------------------------------------------
# ScoredIssue properties
# ---------------------------------------------------------------------------
class TestScoredIssueProperties:
def test_is_unassigned_true_when_no_assignees(self):
issue = _make_scored(assignees=[])
assert issue.is_unassigned is True
def test_is_unassigned_false_when_assigned(self):
issue = _make_scored(assignees=["claude"])
assert issue.is_unassigned is False
def test_needs_kimi_from_research_tag(self):
issue = _make_scored(tags={"research"})
assert issue.needs_kimi is True
def test_needs_kimi_from_kimi_ready_label(self):
issue = _make_scored()
issue.labels = [KIMI_READY_LABEL]
assert issue.needs_kimi is True
def test_needs_kimi_false_for_plain_bug(self):
issue = _make_scored(tags={"bug"}, issue_type="bug")
assert issue.needs_kimi is False
# ---------------------------------------------------------------------------
# TriageCycleResult
# ---------------------------------------------------------------------------
class TestTriageCycleResult:
def test_default_decisions_list_is_empty(self):
result = TriageCycleResult(
timestamp="2026-01-01T00:00:00", total_open=10, scored=8, ready=3
)
assert result.decisions == []
assert result.errors == []
assert result.duration_ms == 0

View File

@@ -1,643 +0,0 @@
"""Unit tests for timmy.kimi_delegation — Kimi research delegation pipeline."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# exceeds_local_capacity
# ---------------------------------------------------------------------------
class TestExceedsLocalCapacity:
def test_heavy_keyword_triggers_delegation(self):
from timmy.kimi_delegation import exceeds_local_capacity
assert exceeds_local_capacity("Do a comprehensive review of the codebase") is True
def test_all_heavy_keywords_detected(self):
from timmy.kimi_delegation import _HEAVY_RESEARCH_KEYWORDS, exceeds_local_capacity
for kw in _HEAVY_RESEARCH_KEYWORDS:
assert exceeds_local_capacity(f"Please {kw} the topic") is True, f"Missed keyword: {kw}"
def test_long_task_triggers_delegation(self):
from timmy.kimi_delegation import _HEAVY_WORD_THRESHOLD, exceeds_local_capacity
long_task = " ".join(["word"] * (_HEAVY_WORD_THRESHOLD + 1))
assert exceeds_local_capacity(long_task) is True
def test_short_simple_task_returns_false(self):
from timmy.kimi_delegation import exceeds_local_capacity
assert exceeds_local_capacity("Fix the typo in README") is False
def test_exactly_at_word_threshold_triggers(self):
from timmy.kimi_delegation import _HEAVY_WORD_THRESHOLD, exceeds_local_capacity
task = " ".join(["word"] * _HEAVY_WORD_THRESHOLD)
assert exceeds_local_capacity(task) is True
def test_keyword_case_insensitive(self):
from timmy.kimi_delegation import exceeds_local_capacity
assert exceeds_local_capacity("Run a COMPREHENSIVE analysis") is True
def test_empty_string_returns_false(self):
from timmy.kimi_delegation import exceeds_local_capacity
assert exceeds_local_capacity("") is False
# ---------------------------------------------------------------------------
# _slugify
# ---------------------------------------------------------------------------
class TestSlugify:
def test_basic_text(self):
from timmy.kimi_delegation import _slugify
assert _slugify("Hello World") == "hello-world"
def test_special_characters_removed(self):
from timmy.kimi_delegation import _slugify
assert _slugify("Research: AI & ML!") == "research-ai--ml"
def test_underscores_become_dashes(self):
from timmy.kimi_delegation import _slugify
assert _slugify("some_snake_case") == "some-snake-case"
def test_long_text_truncated_to_60(self):
from timmy.kimi_delegation import _slugify
long_text = "a" * 100
result = _slugify(long_text)
assert len(result) <= 60
def test_leading_trailing_dashes_stripped(self):
from timmy.kimi_delegation import _slugify
result = _slugify(" hello ")
assert not result.startswith("-")
assert not result.endswith("-")
def test_multiple_spaces_become_single_dash(self):
from timmy.kimi_delegation import _slugify
assert _slugify("one two") == "one-two"
# ---------------------------------------------------------------------------
# _build_research_template
# ---------------------------------------------------------------------------
class TestBuildResearchTemplate:
def test_contains_task_title(self):
from timmy.kimi_delegation import _build_research_template
body = _build_research_template("My Task", "background", "the question?")
assert "My Task" in body
def test_contains_question(self):
from timmy.kimi_delegation import _build_research_template
body = _build_research_template("task", "context", "What is X?")
assert "What is X?" in body
def test_contains_context(self):
from timmy.kimi_delegation import _build_research_template
body = _build_research_template("task", "some context here", "q?")
assert "some context here" in body
def test_default_priority_normal(self):
from timmy.kimi_delegation import _build_research_template
body = _build_research_template("task", "ctx", "q?")
assert "normal" in body
def test_custom_priority_included(self):
from timmy.kimi_delegation import _build_research_template
body = _build_research_template("task", "ctx", "q?", priority="high")
assert "high" in body
def test_kimi_label_mentioned(self):
from timmy.kimi_delegation import KIMI_READY_LABEL, _build_research_template
body = _build_research_template("task", "ctx", "q?")
assert KIMI_READY_LABEL in body
def test_slugified_task_in_artifact_path(self):
from timmy.kimi_delegation import _build_research_template
body = _build_research_template("My Research Task", "ctx", "q?")
assert "my-research-task" in body
def test_sections_present(self):
from timmy.kimi_delegation import _build_research_template
body = _build_research_template("task", "ctx", "q?")
assert "## Research Request" in body
assert "### Research Question" in body
assert "### Background / Context" in body
assert "### Deliverables" in body
# ---------------------------------------------------------------------------
# _extract_action_items
# ---------------------------------------------------------------------------
class TestExtractActionItems:
def test_checkbox_items_extracted(self):
from timmy.kimi_delegation import _extract_action_items
text = "- [ ] Fix the bug\n- [ ] Write tests\n"
items = _extract_action_items(text)
assert "Fix the bug" in items
assert "Write tests" in items
def test_numbered_list_extracted(self):
from timmy.kimi_delegation import _extract_action_items
text = "1. Deploy to staging\n2. Run smoke tests\n"
items = _extract_action_items(text)
assert "Deploy to staging" in items
assert "Run smoke tests" in items
def test_action_prefix_extracted(self):
from timmy.kimi_delegation import _extract_action_items
text = "Action: Update the config file\n"
items = _extract_action_items(text)
assert "Update the config file" in items
def test_todo_prefix_extracted(self):
from timmy.kimi_delegation import _extract_action_items
text = "TODO: Add error handling\n"
items = _extract_action_items(text)
assert "Add error handling" in items
def test_next_step_prefix_extracted(self):
from timmy.kimi_delegation import _extract_action_items
text = "Next step: Validate results\n"
items = _extract_action_items(text)
assert "Validate results" in items
def test_case_insensitive_prefixes(self):
from timmy.kimi_delegation import _extract_action_items
text = "todo: lowercase todo\nACTION: uppercase action\n"
items = _extract_action_items(text)
assert "lowercase todo" in items
assert "uppercase action" in items
def test_deduplication(self):
from timmy.kimi_delegation import _extract_action_items
text = "1. Do the thing\n2. Do the thing\n"
items = _extract_action_items(text)
assert items.count("Do the thing") == 1
def test_empty_text_returns_empty_list(self):
from timmy.kimi_delegation import _extract_action_items
assert _extract_action_items("") == []
def test_no_action_items_returns_empty_list(self):
from timmy.kimi_delegation import _extract_action_items
text = "This is just plain prose with no action items here."
assert _extract_action_items(text) == []
def test_mixed_sources_combined(self):
from timmy.kimi_delegation import _extract_action_items
text = "- [ ] checkbox item\n1. numbered item\nAction: action item\n"
items = _extract_action_items(text)
assert len(items) == 3
# ---------------------------------------------------------------------------
# _get_or_create_label (async)
# ---------------------------------------------------------------------------
class TestGetOrCreateLabel:
@pytest.mark.asyncio
async def test_returns_existing_label_id(self):
from timmy.kimi_delegation import KIMI_READY_LABEL, _get_or_create_label
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = [{"name": KIMI_READY_LABEL, "id": 42}]
client = MagicMock()
client.get = AsyncMock(return_value=mock_resp)
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
assert result == 42
@pytest.mark.asyncio
async def test_creates_label_when_missing(self):
from timmy.kimi_delegation import _get_or_create_label
list_resp = MagicMock()
list_resp.status_code = 200
list_resp.json.return_value = [] # no existing labels
create_resp = MagicMock()
create_resp.status_code = 201
create_resp.json.return_value = {"id": 99}
client = MagicMock()
client.get = AsyncMock(return_value=list_resp)
client.post = AsyncMock(return_value=create_resp)
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
assert result == 99
@pytest.mark.asyncio
async def test_returns_none_on_list_exception(self):
from timmy.kimi_delegation import _get_or_create_label
client = MagicMock()
client.get = AsyncMock(side_effect=Exception("network error"))
result = await _get_or_create_label(client, "http://git", {}, "owner/repo")
assert result is None
@pytest.mark.asyncio
async def test_returns_none_on_create_exception(self):
from timmy.kimi_delegation import _get_or_create_label
list_resp = MagicMock()
list_resp.status_code = 200
list_resp.json.return_value = []
client = MagicMock()
client.get = AsyncMock(return_value=list_resp)
client.post = AsyncMock(side_effect=Exception("create failed"))
result = await _get_or_create_label(client, "http://git", {}, "owner/repo")
assert result is None
# ---------------------------------------------------------------------------
# create_kimi_research_issue (async)
# ---------------------------------------------------------------------------
class TestCreateKimiResearchIssue:
@pytest.mark.asyncio
async def test_returns_error_when_gitea_disabled(self):
from timmy.kimi_delegation import create_kimi_research_issue
with patch("timmy.kimi_delegation.settings") as mock_settings:
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
result = await create_kimi_research_issue("task", "ctx", "q?")
assert result["success"] is False
assert "not configured" in result["error"]
@pytest.mark.asyncio
async def test_returns_error_when_no_token(self):
from timmy.kimi_delegation import create_kimi_research_issue
with patch("timmy.kimi_delegation.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
result = await create_kimi_research_issue("task", "ctx", "q?")
assert result["success"] is False
@pytest.mark.asyncio
async def test_successful_issue_creation(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://git"
mock_settings.gitea_repo = "owner/repo"
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "kimi-ready", "id": 5}]
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {"number": 42, "html_url": "http://git/issues/42"}
async_client = AsyncMock()
async_client.get = AsyncMock(return_value=label_resp)
async_client.post = AsyncMock(return_value=issue_resp)
async_client.__aenter__ = AsyncMock(return_value=async_client)
async_client.__aexit__ = AsyncMock(return_value=False)
with (
patch("timmy.kimi_delegation.settings", mock_settings),
patch("timmy.kimi_delegation.httpx") as mock_httpx,
):
mock_httpx.AsyncClient.return_value = async_client
result = await create_kimi_research_issue("task", "ctx", "q?")
assert result["success"] is True
assert result["issue_number"] == 42
assert "http://git/issues/42" in result["issue_url"]
@pytest.mark.asyncio
async def test_api_error_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://git"
mock_settings.gitea_repo = "owner/repo"
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = []
create_label_resp = MagicMock()
create_label_resp.status_code = 201
create_label_resp.json.return_value = {"id": 1}
issue_resp = MagicMock()
issue_resp.status_code = 500
issue_resp.text = "Internal Server Error"
async_client = AsyncMock()
async_client.get = AsyncMock(return_value=label_resp)
async_client.post = AsyncMock(side_effect=[create_label_resp, issue_resp])
async_client.__aenter__ = AsyncMock(return_value=async_client)
async_client.__aexit__ = AsyncMock(return_value=False)
with (
patch("timmy.kimi_delegation.settings", mock_settings),
patch("timmy.kimi_delegation.httpx") as mock_httpx,
):
mock_httpx.AsyncClient.return_value = async_client
result = await create_kimi_research_issue("task", "ctx", "q?")
assert result["success"] is False
assert "500" in result["error"]
@pytest.mark.asyncio
async def test_exception_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://git"
mock_settings.gitea_repo = "owner/repo"
async_client = AsyncMock()
async_client.__aenter__ = AsyncMock(side_effect=Exception("connection refused"))
async_client.__aexit__ = AsyncMock(return_value=False)
with (
patch("timmy.kimi_delegation.settings", mock_settings),
patch("timmy.kimi_delegation.httpx") as mock_httpx,
):
mock_httpx.AsyncClient.return_value = async_client
result = await create_kimi_research_issue("task", "ctx", "q?")
assert result["success"] is False
assert result["error"] != ""
# ---------------------------------------------------------------------------
# poll_kimi_issue (async)
# ---------------------------------------------------------------------------
class TestPollKimiIssue:
@pytest.mark.asyncio
async def test_returns_error_when_gitea_not_configured(self):
from timmy.kimi_delegation import poll_kimi_issue
with patch("timmy.kimi_delegation.settings") as mock_settings:
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
result = await poll_kimi_issue(123)
assert result["completed"] is False
assert "not configured" in result["error"]
@pytest.mark.asyncio
async def test_returns_completed_when_issue_closed(self):
from timmy.kimi_delegation import poll_kimi_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://git"
mock_settings.gitea_repo = "owner/repo"
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"state": "closed", "body": "Done!"}
async_client = AsyncMock()
async_client.get = AsyncMock(return_value=resp)
async_client.__aenter__ = AsyncMock(return_value=async_client)
async_client.__aexit__ = AsyncMock(return_value=False)
with (
patch("timmy.kimi_delegation.settings", mock_settings),
patch("timmy.kimi_delegation.httpx") as mock_httpx,
):
mock_httpx.AsyncClient.return_value = async_client
result = await poll_kimi_issue(42, poll_interval=0, max_wait=1)
assert result["completed"] is True
assert result["state"] == "closed"
assert result["body"] == "Done!"
@pytest.mark.asyncio
async def test_times_out_when_issue_stays_open(self):
from timmy.kimi_delegation import poll_kimi_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://git"
mock_settings.gitea_repo = "owner/repo"
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"state": "open", "body": ""}
async_client = AsyncMock()
async_client.get = AsyncMock(return_value=resp)
async_client.__aenter__ = AsyncMock(return_value=async_client)
async_client.__aexit__ = AsyncMock(return_value=False)
with (
patch("timmy.kimi_delegation.settings", mock_settings),
patch("timmy.kimi_delegation.httpx") as mock_httpx,
patch("timmy.kimi_delegation.asyncio.sleep", new_callable=AsyncMock),
):
mock_httpx.AsyncClient.return_value = async_client
# poll_interval > max_wait so it exits immediately after first sleep
result = await poll_kimi_issue(42, poll_interval=10, max_wait=5)
assert result["completed"] is False
assert result["state"] == "timeout"
# ---------------------------------------------------------------------------
# index_kimi_artifact (async)
# ---------------------------------------------------------------------------
class TestIndexKimiArtifact:
@pytest.mark.asyncio
async def test_empty_artifact_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
result = await index_kimi_artifact(1, "title", " ")
assert result["success"] is False
assert "Empty artifact" in result["error"]
@pytest.mark.asyncio
async def test_successful_indexing(self):
from timmy.kimi_delegation import index_kimi_artifact
mock_entry = MagicMock()
mock_entry.id = "mem-123"
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
mock_thread.return_value = mock_entry
result = await index_kimi_artifact(42, "My Research", "Some research content here")
assert result["success"] is True
assert result["memory_id"] == "mem-123"
@pytest.mark.asyncio
async def test_exception_returns_failure(self):
from timmy.kimi_delegation import index_kimi_artifact
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
mock_thread.side_effect = Exception("DB error")
result = await index_kimi_artifact(42, "title", "some content")
assert result["success"] is False
assert result["error"] != ""
# ---------------------------------------------------------------------------
# extract_and_create_followups (async)
# ---------------------------------------------------------------------------
class TestExtractAndCreateFollowups:
@pytest.mark.asyncio
async def test_no_action_items_returns_empty_created(self):
from timmy.kimi_delegation import extract_and_create_followups
result = await extract_and_create_followups("Plain prose, nothing to do.", 1)
assert result["success"] is True
assert result["created"] == []
@pytest.mark.asyncio
async def test_gitea_not_configured_returns_error(self):
from timmy.kimi_delegation import extract_and_create_followups
text = "1. Do something important\n"
with patch("timmy.kimi_delegation.settings") as mock_settings:
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
result = await extract_and_create_followups(text, 5)
assert result["success"] is False
@pytest.mark.asyncio
async def test_creates_followup_issues(self):
from timmy.kimi_delegation import extract_and_create_followups
text = "1. Deploy the service\n2. Run integration tests\n"
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://git"
mock_settings.gitea_repo = "owner/repo"
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {"number": 10}
async_client = AsyncMock()
async_client.post = AsyncMock(return_value=issue_resp)
async_client.__aenter__ = AsyncMock(return_value=async_client)
async_client.__aexit__ = AsyncMock(return_value=False)
with (
patch("timmy.kimi_delegation.settings", mock_settings),
patch("timmy.kimi_delegation.httpx") as mock_httpx,
):
mock_httpx.AsyncClient.return_value = async_client
result = await extract_and_create_followups(text, 5)
assert result["success"] is True
assert len(result["created"]) == 2
# ---------------------------------------------------------------------------
# delegate_research_to_kimi (async)
# ---------------------------------------------------------------------------
class TestDelegateResearchToKimi:
@pytest.mark.asyncio
async def test_empty_task_returns_error(self):
from timmy.kimi_delegation import delegate_research_to_kimi
result = await delegate_research_to_kimi("", "ctx", "q?")
assert result["success"] is False
assert "required" in result["error"]
@pytest.mark.asyncio
async def test_whitespace_task_returns_error(self):
from timmy.kimi_delegation import delegate_research_to_kimi
result = await delegate_research_to_kimi(" ", "ctx", "q?")
assert result["success"] is False
assert "required" in result["error"]
@pytest.mark.asyncio
async def test_empty_question_returns_error(self):
from timmy.kimi_delegation import delegate_research_to_kimi
result = await delegate_research_to_kimi("valid task", "ctx", "")
assert result["success"] is False
assert "required" in result["error"]
@pytest.mark.asyncio
async def test_delegates_to_create_issue(self):
from timmy.kimi_delegation import delegate_research_to_kimi
with patch(
"timmy.kimi_delegation.create_kimi_research_issue",
new_callable=AsyncMock,
) as mock_create:
mock_create.return_value = {"success": True, "issue_number": 7, "issue_url": "http://x", "error": None}
result = await delegate_research_to_kimi("Research X", "ctx", "What is X?", priority="high")
assert result["success"] is True
assert result["issue_number"] == 7
mock_create.assert_awaited_once_with("Research X", "ctx", "What is X?", "high")

View File

@@ -1,839 +0,0 @@
"""Unit tests for timmy.quest_system."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
import timmy.quest_system as qs
from timmy.quest_system import (
QuestDefinition,
QuestProgress,
QuestStatus,
QuestType,
_get_progress_key,
_get_target_value,
_is_on_cooldown,
check_daily_run_quest,
check_issue_count_quest,
check_issue_reduce_quest,
claim_quest_reward,
evaluate_quest_progress,
get_active_quests,
get_agent_quests_status,
get_or_create_progress,
get_quest_definition,
get_quest_definitions,
get_quest_leaderboard,
get_quest_progress,
load_quest_config,
reset_quest_progress,
update_quest_progress,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_quest(
quest_id: str = "test_quest",
quest_type: QuestType = QuestType.ISSUE_COUNT,
reward_tokens: int = 10,
enabled: bool = True,
repeatable: bool = False,
cooldown_hours: int = 0,
criteria: dict[str, Any] | None = None,
) -> QuestDefinition:
return QuestDefinition(
id=quest_id,
name=f"Quest {quest_id}",
description="Test quest",
reward_tokens=reward_tokens,
quest_type=quest_type,
enabled=enabled,
repeatable=repeatable,
cooldown_hours=cooldown_hours,
criteria=criteria or {"target_count": 3},
notification_message="Quest Complete! You earned {tokens} tokens.",
)
@pytest.fixture(autouse=True)
def clean_state():
"""Reset module-level state before and after each test."""
reset_quest_progress()
qs._quest_definitions.clear()
qs._quest_settings.clear()
yield
reset_quest_progress()
qs._quest_definitions.clear()
qs._quest_settings.clear()
# ---------------------------------------------------------------------------
# QuestDefinition
# ---------------------------------------------------------------------------
class TestQuestDefinition:
def test_from_dict_minimal(self):
data = {"id": "q1"}
defn = QuestDefinition.from_dict(data)
assert defn.id == "q1"
assert defn.name == "Unnamed Quest"
assert defn.reward_tokens == 0
assert defn.quest_type == QuestType.CUSTOM
assert defn.enabled is True
assert defn.repeatable is False
assert defn.cooldown_hours == 0
def test_from_dict_full(self):
data = {
"id": "q2",
"name": "Full Quest",
"description": "A full quest",
"reward_tokens": 50,
"type": "issue_count",
"enabled": False,
"repeatable": True,
"cooldown_hours": 24,
"criteria": {"target_count": 5},
"notification_message": "You earned {tokens}!",
}
defn = QuestDefinition.from_dict(data)
assert defn.id == "q2"
assert defn.name == "Full Quest"
assert defn.reward_tokens == 50
assert defn.quest_type == QuestType.ISSUE_COUNT
assert defn.enabled is False
assert defn.repeatable is True
assert defn.cooldown_hours == 24
assert defn.criteria == {"target_count": 5}
assert defn.notification_message == "You earned {tokens}!"
def test_from_dict_invalid_type_raises(self):
data = {"id": "q3", "type": "not_a_real_type"}
with pytest.raises(ValueError):
QuestDefinition.from_dict(data)
# ---------------------------------------------------------------------------
# QuestProgress
# ---------------------------------------------------------------------------
class TestQuestProgress:
def test_to_dict_roundtrip(self):
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.IN_PROGRESS,
current_value=2,
target_value=5,
started_at="2026-01-01T00:00:00",
metadata={"key": "val"},
)
d = progress.to_dict()
assert d["quest_id"] == "q1"
assert d["agent_id"] == "agent_a"
assert d["status"] == "in_progress"
assert d["current_value"] == 2
assert d["target_value"] == 5
assert d["metadata"] == {"key": "val"}
def test_to_dict_defaults(self):
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
)
d = progress.to_dict()
assert d["completion_count"] == 0
assert d["started_at"] == ""
assert d["completed_at"] == ""
# ---------------------------------------------------------------------------
# _get_progress_key
# ---------------------------------------------------------------------------
def test_get_progress_key():
assert _get_progress_key("q1", "agent_a") == "agent_a:q1"
def test_get_progress_key_different_agents():
key_a = _get_progress_key("q1", "agent_a")
key_b = _get_progress_key("q1", "agent_b")
assert key_a != key_b
# ---------------------------------------------------------------------------
# load_quest_config
# ---------------------------------------------------------------------------
class TestLoadQuestConfig:
def test_missing_file_returns_empty(self, tmp_path):
missing = tmp_path / "nonexistent.yaml"
with patch.object(qs, "QUEST_CONFIG_PATH", missing):
defs, settings = load_quest_config()
assert defs == {}
assert settings == {}
def test_valid_yaml_loads_quests(self, tmp_path):
config_path = tmp_path / "quests.yaml"
config_path.write_text(
"""
quests:
first_quest:
name: First Quest
description: Do stuff
reward_tokens: 25
type: issue_count
enabled: true
repeatable: false
cooldown_hours: 0
criteria:
target_count: 3
notification_message: "Done! {tokens} tokens"
settings:
some_setting: true
"""
)
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
defs, settings = load_quest_config()
assert "first_quest" in defs
assert defs["first_quest"].name == "First Quest"
assert defs["first_quest"].reward_tokens == 25
assert settings == {"some_setting": True}
def test_invalid_yaml_returns_empty(self, tmp_path):
config_path = tmp_path / "quests.yaml"
config_path.write_text(":: not valid yaml ::")
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
defs, settings = load_quest_config()
assert defs == {}
assert settings == {}
def test_non_dict_yaml_returns_empty(self, tmp_path):
config_path = tmp_path / "quests.yaml"
config_path.write_text("- item1\n- item2\n")
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
defs, settings = load_quest_config()
assert defs == {}
assert settings == {}
def test_bad_quest_entry_is_skipped(self, tmp_path):
config_path = tmp_path / "quests.yaml"
config_path.write_text(
"""
quests:
good_quest:
name: Good
type: issue_count
reward_tokens: 10
enabled: true
repeatable: false
cooldown_hours: 0
criteria: {}
notification_message: "{tokens}"
bad_quest:
type: invalid_type_that_does_not_exist
"""
)
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
defs, _ = load_quest_config()
assert "good_quest" in defs
assert "bad_quest" not in defs
# ---------------------------------------------------------------------------
# get_quest_definitions / get_quest_definition / get_active_quests
# ---------------------------------------------------------------------------
class TestQuestLookup:
def setup_method(self):
q1 = _make_quest("q1", enabled=True)
q2 = _make_quest("q2", enabled=False)
qs._quest_definitions.update({"q1": q1, "q2": q2})
def test_get_quest_definitions_returns_all(self):
defs = get_quest_definitions()
assert "q1" in defs
assert "q2" in defs
def test_get_quest_definition_found(self):
defn = get_quest_definition("q1")
assert defn is not None
assert defn.id == "q1"
def test_get_quest_definition_not_found(self):
assert get_quest_definition("missing") is None
def test_get_active_quests_only_enabled(self):
active = get_active_quests()
ids = [q.id for q in active]
assert "q1" in ids
assert "q2" not in ids
# ---------------------------------------------------------------------------
# _get_target_value
# ---------------------------------------------------------------------------
class TestGetTargetValue:
def test_issue_count(self):
q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 7})
assert _get_target_value(q) == 7
def test_issue_reduce(self):
q = _make_quest(quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 5})
assert _get_target_value(q) == 5
def test_daily_run(self):
q = _make_quest(quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 3})
assert _get_target_value(q) == 3
def test_docs_update(self):
q = _make_quest(quest_type=QuestType.DOCS_UPDATE, criteria={"min_files_changed": 2})
assert _get_target_value(q) == 2
def test_test_improve(self):
q = _make_quest(quest_type=QuestType.TEST_IMPROVE, criteria={"min_new_tests": 4})
assert _get_target_value(q) == 4
def test_custom_defaults_to_one(self):
q = _make_quest(quest_type=QuestType.CUSTOM, criteria={})
assert _get_target_value(q) == 1
def test_missing_criteria_key_defaults_to_one(self):
q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={})
assert _get_target_value(q) == 1
# ---------------------------------------------------------------------------
# get_or_create_progress / get_quest_progress
# ---------------------------------------------------------------------------
class TestProgressCreation:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 5})
def test_creates_new_progress(self):
progress = get_or_create_progress("q1", "agent_a")
assert progress.quest_id == "q1"
assert progress.agent_id == "agent_a"
assert progress.status == QuestStatus.NOT_STARTED
assert progress.target_value == 5
assert progress.current_value == 0
def test_returns_existing_progress(self):
p1 = get_or_create_progress("q1", "agent_a")
p1.current_value = 3
p2 = get_or_create_progress("q1", "agent_a")
assert p2.current_value == 3
assert p1 is p2
def test_raises_for_unknown_quest(self):
with pytest.raises(ValueError, match="Quest unknown not found"):
get_or_create_progress("unknown", "agent_a")
def test_get_quest_progress_none_before_creation(self):
assert get_quest_progress("q1", "agent_a") is None
def test_get_quest_progress_after_creation(self):
get_or_create_progress("q1", "agent_a")
progress = get_quest_progress("q1", "agent_a")
assert progress is not None
# ---------------------------------------------------------------------------
# update_quest_progress
# ---------------------------------------------------------------------------
class TestUpdateQuestProgress:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 3})
def test_updates_current_value(self):
progress = update_quest_progress("q1", "agent_a", 2)
assert progress.current_value == 2
assert progress.status == QuestStatus.NOT_STARTED
def test_marks_completed_when_target_reached(self):
progress = update_quest_progress("q1", "agent_a", 3)
assert progress.status == QuestStatus.COMPLETED
assert progress.completed_at != ""
def test_marks_completed_when_value_exceeds_target(self):
progress = update_quest_progress("q1", "agent_a", 10)
assert progress.status == QuestStatus.COMPLETED
def test_does_not_re_complete_already_completed(self):
p = update_quest_progress("q1", "agent_a", 3)
first_completed_at = p.completed_at
p2 = update_quest_progress("q1", "agent_a", 5)
# should not change completed_at again
assert p2.completed_at == first_completed_at
def test_does_not_re_complete_claimed_quest(self):
p = update_quest_progress("q1", "agent_a", 3)
p.status = QuestStatus.CLAIMED
p2 = update_quest_progress("q1", "agent_a", 5)
assert p2.status == QuestStatus.CLAIMED
def test_updates_metadata(self):
progress = update_quest_progress("q1", "agent_a", 1, metadata={"info": "value"})
assert progress.metadata["info"] == "value"
def test_merges_metadata(self):
update_quest_progress("q1", "agent_a", 1, metadata={"a": 1})
progress = update_quest_progress("q1", "agent_a", 2, metadata={"b": 2})
assert progress.metadata["a"] == 1
assert progress.metadata["b"] == 2
# ---------------------------------------------------------------------------
# _is_on_cooldown
# ---------------------------------------------------------------------------
class TestIsOnCooldown:
def test_non_repeatable_never_on_cooldown(self):
quest = _make_quest(repeatable=False, cooldown_hours=24)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.CLAIMED,
last_completed_at=datetime.now(UTC).isoformat(),
)
assert _is_on_cooldown(progress, quest) is False
def test_no_last_completed_not_on_cooldown(self):
quest = _make_quest(repeatable=True, cooldown_hours=24)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
last_completed_at="",
)
assert _is_on_cooldown(progress, quest) is False
def test_zero_cooldown_not_on_cooldown(self):
quest = _make_quest(repeatable=True, cooldown_hours=0)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.CLAIMED,
last_completed_at=datetime.now(UTC).isoformat(),
)
assert _is_on_cooldown(progress, quest) is False
def test_recent_completion_is_on_cooldown(self):
quest = _make_quest(repeatable=True, cooldown_hours=24)
recent = datetime.now(UTC) - timedelta(hours=1)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
last_completed_at=recent.isoformat(),
)
assert _is_on_cooldown(progress, quest) is True
def test_expired_cooldown_not_on_cooldown(self):
quest = _make_quest(repeatable=True, cooldown_hours=24)
old = datetime.now(UTC) - timedelta(hours=25)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
last_completed_at=old.isoformat(),
)
assert _is_on_cooldown(progress, quest) is False
def test_invalid_last_completed_returns_false(self):
quest = _make_quest(repeatable=True, cooldown_hours=24)
progress = QuestProgress(
quest_id="q1",
agent_id="agent_a",
status=QuestStatus.NOT_STARTED,
last_completed_at="not-a-date",
)
assert _is_on_cooldown(progress, quest) is False
# ---------------------------------------------------------------------------
# claim_quest_reward
# ---------------------------------------------------------------------------
class TestClaimQuestReward:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=25)
def test_returns_none_if_no_progress(self):
assert claim_quest_reward("q1", "agent_a") is None
def test_returns_none_if_not_completed(self):
get_or_create_progress("q1", "agent_a")
assert claim_quest_reward("q1", "agent_a") is None
def test_returns_none_if_quest_not_found(self):
assert claim_quest_reward("nonexistent", "agent_a") is None
def test_successful_claim(self):
progress = get_or_create_progress("q1", "agent_a")
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
mock_invoice = MagicMock()
mock_invoice.payment_hash = "quest_q1_agent_a_123"
with (
patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice),
patch("timmy.quest_system.mark_settled"),
):
result = claim_quest_reward("q1", "agent_a")
assert result is not None
assert result["tokens_awarded"] == 25
assert result["quest_id"] == "q1"
assert result["agent_id"] == "agent_a"
assert result["completion_count"] == 1
def test_successful_claim_marks_claimed(self):
progress = get_or_create_progress("q1", "agent_a")
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
mock_invoice = MagicMock()
mock_invoice.payment_hash = "phash"
with (
patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice),
patch("timmy.quest_system.mark_settled"),
):
claim_quest_reward("q1", "agent_a")
assert progress.status == QuestStatus.CLAIMED
def test_repeatable_quest_resets_after_claim(self):
qs._quest_definitions["rep"] = _make_quest(
"rep", repeatable=True, cooldown_hours=0, reward_tokens=10
)
progress = get_or_create_progress("rep", "agent_a")
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
progress.current_value = 5
mock_invoice = MagicMock()
mock_invoice.payment_hash = "phash"
with (
patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice),
patch("timmy.quest_system.mark_settled"),
):
result = claim_quest_reward("rep", "agent_a")
assert result is not None
assert progress.status == QuestStatus.NOT_STARTED
assert progress.current_value == 0
assert progress.completed_at == ""
def test_on_cooldown_returns_none(self):
qs._quest_definitions["rep"] = _make_quest("rep", repeatable=True, cooldown_hours=24)
progress = get_or_create_progress("rep", "agent_a")
progress.status = QuestStatus.COMPLETED
recent = datetime.now(UTC) - timedelta(hours=1)
progress.last_completed_at = recent.isoformat()
assert claim_quest_reward("rep", "agent_a") is None
def test_ledger_error_returns_none(self):
progress = get_or_create_progress("q1", "agent_a")
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
with patch("timmy.quest_system.create_invoice_entry", side_effect=Exception("ledger error")):
result = claim_quest_reward("q1", "agent_a")
assert result is None
# ---------------------------------------------------------------------------
# check_issue_count_quest
# ---------------------------------------------------------------------------
class TestCheckIssueCountQuest:
def setup_method(self):
qs._quest_definitions["iq"] = _make_quest(
"iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 2, "issue_labels": ["bug"]}
)
def test_counts_matching_issues(self):
issues = [
{"labels": [{"name": "bug"}]},
{"labels": [{"name": "bug"}, {"name": "priority"}]},
{"labels": [{"name": "feature"}]}, # doesn't match
]
progress = check_issue_count_quest(
qs._quest_definitions["iq"], "agent_a", issues
)
assert progress.current_value == 2
assert progress.status == QuestStatus.COMPLETED
def test_empty_issues_returns_zero(self):
progress = check_issue_count_quest(qs._quest_definitions["iq"], "agent_a", [])
assert progress.current_value == 0
def test_no_labels_filter_counts_all_labeled(self):
q = _make_quest(
"nolabel",
quest_type=QuestType.ISSUE_COUNT,
criteria={"target_count": 1, "issue_labels": []},
)
qs._quest_definitions["nolabel"] = q
issues = [
{"labels": [{"name": "bug"}]},
{"labels": [{"name": "feature"}]},
]
progress = check_issue_count_quest(q, "agent_a", issues)
assert progress.current_value == 2
# ---------------------------------------------------------------------------
# check_issue_reduce_quest
# ---------------------------------------------------------------------------
class TestCheckIssueReduceQuest:
def setup_method(self):
qs._quest_definitions["ir"] = _make_quest(
"ir", quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 5}
)
def test_computes_reduction(self):
progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 20, 15)
assert progress.current_value == 5
assert progress.status == QuestStatus.COMPLETED
def test_negative_reduction_treated_as_zero(self):
progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 10, 15)
assert progress.current_value == 0
def test_no_change_yields_zero(self):
progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 10, 10)
assert progress.current_value == 0
# ---------------------------------------------------------------------------
# check_daily_run_quest
# ---------------------------------------------------------------------------
class TestCheckDailyRunQuest:
def setup_method(self):
qs._quest_definitions["dr"] = _make_quest(
"dr", quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 2}
)
def test_tracks_sessions(self):
progress = check_daily_run_quest(qs._quest_definitions["dr"], "agent_a", 2)
assert progress.current_value == 2
assert progress.status == QuestStatus.COMPLETED
def test_incomplete_sessions(self):
progress = check_daily_run_quest(qs._quest_definitions["dr"], "agent_a", 1)
assert progress.current_value == 1
assert progress.status != QuestStatus.COMPLETED
# ---------------------------------------------------------------------------
# evaluate_quest_progress
# ---------------------------------------------------------------------------
class TestEvaluateQuestProgress:
def setup_method(self):
qs._quest_definitions["iq"] = _make_quest(
"iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 1}
)
qs._quest_definitions["dis"] = _make_quest("dis", enabled=False)
def test_disabled_quest_returns_none(self):
result = evaluate_quest_progress("dis", "agent_a", {})
assert result is None
def test_missing_quest_returns_none(self):
result = evaluate_quest_progress("nonexistent", "agent_a", {})
assert result is None
def test_issue_count_quest_evaluated(self):
context = {"closed_issues": [{"labels": [{"name": "bug"}]}]}
result = evaluate_quest_progress("iq", "agent_a", context)
assert result is not None
assert result.current_value == 1
def test_issue_reduce_quest_evaluated(self):
qs._quest_definitions["ir"] = _make_quest(
"ir", quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 3}
)
context = {"previous_issue_count": 10, "current_issue_count": 7}
result = evaluate_quest_progress("ir", "agent_a", context)
assert result is not None
assert result.current_value == 3
def test_daily_run_quest_evaluated(self):
qs._quest_definitions["dr"] = _make_quest(
"dr", quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 1}
)
context = {"sessions_completed": 2}
result = evaluate_quest_progress("dr", "agent_a", context)
assert result is not None
assert result.current_value == 2
def test_custom_quest_returns_existing_progress(self):
qs._quest_definitions["cust"] = _make_quest("cust", quest_type=QuestType.CUSTOM)
# No progress yet => None (custom quests don't auto-create progress here)
result = evaluate_quest_progress("cust", "agent_a", {})
assert result is None
def test_cooldown_prevents_evaluation(self):
q = _make_quest("rep_iq", quest_type=QuestType.ISSUE_COUNT, repeatable=True, cooldown_hours=24, criteria={"target_count": 1})
qs._quest_definitions["rep_iq"] = q
progress = get_or_create_progress("rep_iq", "agent_a")
recent = datetime.now(UTC) - timedelta(hours=1)
progress.last_completed_at = recent.isoformat()
context = {"closed_issues": [{"labels": [{"name": "bug"}]}]}
result = evaluate_quest_progress("rep_iq", "agent_a", context)
# Should return existing progress without updating
assert result is progress
# ---------------------------------------------------------------------------
# reset_quest_progress
# ---------------------------------------------------------------------------
class TestResetQuestProgress:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1")
qs._quest_definitions["q2"] = _make_quest("q2")
def test_reset_all(self):
get_or_create_progress("q1", "agent_a")
get_or_create_progress("q2", "agent_a")
count = reset_quest_progress()
assert count == 2
assert get_quest_progress("q1", "agent_a") is None
assert get_quest_progress("q2", "agent_a") is None
def test_reset_specific_quest(self):
get_or_create_progress("q1", "agent_a")
get_or_create_progress("q2", "agent_a")
count = reset_quest_progress(quest_id="q1")
assert count == 1
assert get_quest_progress("q1", "agent_a") is None
assert get_quest_progress("q2", "agent_a") is not None
def test_reset_specific_agent(self):
get_or_create_progress("q1", "agent_a")
get_or_create_progress("q1", "agent_b")
count = reset_quest_progress(agent_id="agent_a")
assert count == 1
assert get_quest_progress("q1", "agent_a") is None
assert get_quest_progress("q1", "agent_b") is not None
def test_reset_specific_quest_and_agent(self):
get_or_create_progress("q1", "agent_a")
get_or_create_progress("q1", "agent_b")
count = reset_quest_progress(quest_id="q1", agent_id="agent_a")
assert count == 1
def test_reset_empty_returns_zero(self):
count = reset_quest_progress()
assert count == 0
# ---------------------------------------------------------------------------
# get_quest_leaderboard
# ---------------------------------------------------------------------------
class TestGetQuestLeaderboard:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)
qs._quest_definitions["q2"] = _make_quest("q2", reward_tokens=20)
def test_empty_progress_returns_empty(self):
assert get_quest_leaderboard() == []
def test_leaderboard_sorted_by_tokens(self):
p_a = get_or_create_progress("q1", "agent_a")
p_a.completion_count = 1
p_b = get_or_create_progress("q2", "agent_b")
p_b.completion_count = 2
board = get_quest_leaderboard()
assert board[0]["agent_id"] == "agent_b" # 40 tokens
assert board[1]["agent_id"] == "agent_a" # 10 tokens
def test_leaderboard_aggregates_multiple_quests(self):
p1 = get_or_create_progress("q1", "agent_a")
p1.completion_count = 2 # 20 tokens
p2 = get_or_create_progress("q2", "agent_a")
p2.completion_count = 1 # 20 tokens
board = get_quest_leaderboard()
assert len(board) == 1
assert board[0]["total_tokens"] == 40
assert board[0]["total_completions"] == 3
def test_leaderboard_counts_unique_quests(self):
p1 = get_or_create_progress("q1", "agent_a")
p1.completion_count = 2
p2 = get_or_create_progress("q2", "agent_a")
p2.completion_count = 1
board = get_quest_leaderboard()
assert board[0]["unique_quests_completed"] == 2
# ---------------------------------------------------------------------------
# get_agent_quests_status
# ---------------------------------------------------------------------------
class TestGetAgentQuestsStatus:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)
def test_returns_status_structure(self):
result = get_agent_quests_status("agent_a")
assert result["agent_id"] == "agent_a"
assert isinstance(result["quests"], list)
assert "total_tokens_earned" in result
assert "total_quests_completed" in result
assert "active_quests_count" in result
def test_includes_quest_info(self):
result = get_agent_quests_status("agent_a")
quest_info = result["quests"][0]
assert quest_info["quest_id"] == "q1"
assert quest_info["reward_tokens"] == 10
assert quest_info["status"] == QuestStatus.NOT_STARTED.value
def test_accumulates_tokens_from_completions(self):
p = get_or_create_progress("q1", "agent_a")
p.completion_count = 3
result = get_agent_quests_status("agent_a")
assert result["total_tokens_earned"] == 30
assert result["total_quests_completed"] == 3
def test_cooldown_hours_remaining_calculated(self):
q = _make_quest("qcool", repeatable=True, cooldown_hours=24, reward_tokens=5)
qs._quest_definitions["qcool"] = q
p = get_or_create_progress("qcool", "agent_a")
recent = datetime.now(UTC) - timedelta(hours=2)
p.last_completed_at = recent.isoformat()
p.completion_count = 1
result = get_agent_quests_status("agent_a")
qcool_info = next(qi for qi in result["quests"] if qi["quest_id"] == "qcool")
assert qcool_info["on_cooldown"] is True
assert qcool_info["cooldown_hours_remaining"] > 0

View File

@@ -1,124 +0,0 @@
"""Unit tests for timmy/research_tools.py."""
from __future__ import annotations
import os
import sys
from unittest.mock import MagicMock, patch
import pytest
# serpapi is an optional dependency not installed in the test environment.
# Stub it before importing the module under test.
if "serpapi" not in sys.modules:
sys.modules["serpapi"] = MagicMock()
from timmy.research_tools import get_llm_client, google_web_search # noqa: E402
# ---------------------------------------------------------------------------
# google_web_search
# ---------------------------------------------------------------------------
class TestGoogleWebSearch:
@pytest.mark.asyncio
async def test_missing_api_key_returns_empty_string(self):
"""Returns '' and logs a warning when SERPAPI_API_KEY is absent."""
env = {k: v for k, v in os.environ.items() if k != "SERPAPI_API_KEY"}
with patch.dict(os.environ, env, clear=True):
result = await google_web_search("python tutorial")
assert result == ""
@pytest.mark.asyncio
async def test_calls_google_search_with_correct_params(self):
"""GoogleSearch is constructed with query and api_key from environ."""
mock_search_instance = MagicMock()
mock_search_instance.get_dict.return_value = {"organic_results": [{"title": "Hello"}]}
mock_search_cls = MagicMock(return_value=mock_search_instance)
with patch.dict(os.environ, {"SERPAPI_API_KEY": "test-key-123"}):
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
result = await google_web_search("python tutorial")
mock_search_cls.assert_called_once_with(
{"q": "python tutorial", "api_key": "test-key-123"}
)
assert "Hello" in result
@pytest.mark.asyncio
async def test_returns_stringified_results(self):
"""Return value is str() of whatever get_dict() returns."""
fake_dict = {"organic_results": [{"title": "Foo", "link": "https://example.com"}]}
mock_search_instance = MagicMock()
mock_search_instance.get_dict.return_value = fake_dict
mock_search_cls = MagicMock(return_value=mock_search_instance)
with patch.dict(os.environ, {"SERPAPI_API_KEY": "key"}):
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
result = await google_web_search("foo")
assert result == str(fake_dict)
@pytest.mark.asyncio
async def test_empty_query_still_calls_search(self):
"""An empty query is forwarded to GoogleSearch without short-circuiting."""
mock_search_instance = MagicMock()
mock_search_instance.get_dict.return_value = {}
mock_search_cls = MagicMock(return_value=mock_search_instance)
with patch.dict(os.environ, {"SERPAPI_API_KEY": "key"}):
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
result = await google_web_search("")
mock_search_cls.assert_called_once()
assert result == str({})
# ---------------------------------------------------------------------------
# get_llm_client
# ---------------------------------------------------------------------------
class TestGetLlmClient:
def test_returns_a_client_object(self):
"""get_llm_client() returns a non-None object."""
client = get_llm_client()
assert client is not None
def test_client_has_completion_method(self):
"""The returned client exposes a callable completion attribute."""
client = get_llm_client()
assert callable(getattr(client, "completion", None))
@pytest.mark.asyncio
async def test_completion_returns_object_with_text(self):
"""completion() returns an object whose .text is a non-empty string."""
client = get_llm_client()
result = await client.completion("What is Python?", max_tokens=100)
assert hasattr(result, "text")
assert isinstance(result.text, str)
assert len(result.text) > 0
@pytest.mark.asyncio
async def test_completion_text_contains_prompt(self):
"""The stub weaves the prompt into the returned text."""
client = get_llm_client()
prompt = "Tell me about asyncio"
result = await client.completion(prompt, max_tokens=50)
assert prompt in result.text
@pytest.mark.asyncio
async def test_multiple_calls_return_independent_objects(self):
"""Each call to completion() returns a fresh object."""
client = get_llm_client()
r1 = await client.completion("prompt one", max_tokens=10)
r2 = await client.completion("prompt two", max_tokens=10)
assert r1 is not r2
assert r1.text != r2.text
def test_multiple_calls_return_independent_clients(self):
"""Each call to get_llm_client() returns a distinct instance."""
c1 = get_llm_client()
c2 = get_llm_client()
assert c1 is not c2

View File

@@ -334,7 +334,7 @@ async def test_think_once_disabled(tmp_path):
"""think_once should return None when thinking is disabled."""
engine = _make_engine(tmp_path)
with patch("timmy.thinking.engine.settings") as mock_settings:
with patch("timmy.thinking.settings") as mock_settings:
mock_settings.thinking_enabled = False
thought = await engine.think_once()
@@ -381,7 +381,7 @@ async def test_think_once_prompt_includes_memory_context(tmp_path):
return "A grounded thought."
with (
patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", memory_md),
patch("timmy.thinking.HOT_MEMORY_PATH", memory_md),
patch.object(engine, "_call_agent", side_effect=capture_agent),
patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"),
@@ -412,7 +412,7 @@ async def test_think_once_prompt_includes_soul(tmp_path):
return "A soulful thought."
with (
patch("timmy.thinking._snapshot.SOUL_PATH", soul_md),
patch("timmy.thinking.SOUL_PATH", soul_md),
patch.object(engine, "_call_agent", side_effect=capture_agent),
patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"),
@@ -433,7 +433,7 @@ async def test_think_once_graceful_without_soul(tmp_path):
nonexistent = tmp_path / "no_such_soul.md"
with (
patch("timmy.thinking._snapshot.SOUL_PATH", nonexistent),
patch("timmy.thinking.SOUL_PATH", nonexistent),
patch.object(engine, "_call_agent", return_value="Still thinking."),
patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"),
@@ -481,7 +481,7 @@ async def test_think_once_never_writes_soul(tmp_path):
soul_md.write_text(original_content)
with (
patch("timmy.thinking._snapshot.SOUL_PATH", soul_md),
patch("timmy.thinking.SOUL_PATH", soul_md),
patch.object(engine, "_call_agent", return_value="A deep reflection."),
patch.object(engine, "_log_event"),
patch.object(engine, "_broadcast", new_callable=AsyncMock),
@@ -501,7 +501,7 @@ async def test_think_once_memory_update_graceful_on_failure(tmp_path):
# Don't create the parent dir — write will fail
with (
patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", bad_memory),
patch("timmy.thinking.HOT_MEMORY_PATH", bad_memory),
patch.object(engine, "_call_agent", return_value="Resilient thought."),
patch.object(engine, "_log_event"),
patch.object(engine, "_broadcast", new_callable=AsyncMock),
@@ -1090,7 +1090,7 @@ def test_maybe_check_memory_fires_at_interval(tmp_path):
engine._store_thought(f"Thought {i}.", "freeform")
with (
patch("timmy.thinking._distillation.settings") as mock_settings,
patch("timmy.thinking.settings") as mock_settings,
patch(
"timmy.tools_intro.get_memory_status",
return_value={
@@ -1113,7 +1113,7 @@ def test_maybe_check_memory_skips_between_intervals(tmp_path):
engine._store_thought(f"Thought {i}.", "freeform")
with (
patch("timmy.thinking._distillation.settings") as mock_settings,
patch("timmy.thinking.settings") as mock_settings,
patch(
"timmy.tools_intro.get_memory_status",
) as mock_status,
@@ -1131,7 +1131,7 @@ def test_maybe_check_memory_graceful_on_error(tmp_path):
engine._store_thought(f"Thought {i}.", "freeform")
with (
patch("timmy.thinking._distillation.settings") as mock_settings,
patch("timmy.thinking.settings") as mock_settings,
patch(
"timmy.tools_intro.get_memory_status",
side_effect=Exception("boom"),

View File

@@ -1,308 +0,0 @@
"""Unit tests for web_search and scrape_url tools (SearXNG + Crawl4AI).
All tests use mocked HTTP — no live services required.
"""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from timmy.tools.search import _extract_crawl_content, scrape_url, web_search
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_requests(json_response=None, status_code=200, raise_exc=None):
"""Build a mock requests module whose .get/.post return controlled responses."""
mock_req = MagicMock()
# Exception hierarchy
class Timeout(Exception):
pass
class HTTPError(Exception):
def __init__(self, *a, response=None, **kw):
super().__init__(*a, **kw)
self.response = response
class RequestException(Exception):
pass
exc_mod = MagicMock()
exc_mod.Timeout = Timeout
exc_mod.HTTPError = HTTPError
exc_mod.RequestException = RequestException
mock_req.exceptions = exc_mod
if raise_exc is not None:
mock_req.get.side_effect = raise_exc
mock_req.post.side_effect = raise_exc
else:
mock_resp = MagicMock()
mock_resp.status_code = status_code
mock_resp.json.return_value = json_response or {}
if status_code >= 400:
mock_resp.raise_for_status.side_effect = HTTPError(
response=MagicMock(status_code=status_code)
)
mock_req.get.return_value = mock_resp
mock_req.post.return_value = mock_resp
return mock_req
# ---------------------------------------------------------------------------
# web_search tests
# ---------------------------------------------------------------------------
class TestWebSearch:
def test_backend_none_short_circuits(self):
"""TIMMY_SEARCH_BACKEND=none returns disabled message immediately."""
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "none"
result = web_search("anything")
assert "disabled" in result
def test_missing_requests_package(self):
"""Graceful error when requests is not installed."""
with patch.dict("sys.modules", {"requests": None}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.search_url = "http://localhost:8888"
result = web_search("test query")
assert "requests" in result and "not installed" in result
def test_successful_search(self):
"""Happy path: returns formatted result list."""
mock_data = {
"results": [
{"title": "Foo Bar", "url": "https://example.com/foo", "content": "Foo is great"},
{"title": "Baz", "url": "https://example.com/baz", "content": "Baz rules"},
]
}
mock_req = _mock_requests(json_response=mock_data)
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.search_url = "http://localhost:8888"
result = web_search("foo bar")
assert "Foo Bar" in result
assert "https://example.com/foo" in result
assert "Baz" in result
assert "foo bar" in result
def test_no_results(self):
"""Empty results list returns a helpful no-results message."""
mock_req = _mock_requests(json_response={"results": []})
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.search_url = "http://localhost:8888"
result = web_search("xyzzy")
assert "No results" in result
def test_num_results_respected(self):
"""Only up to num_results entries are returned."""
mock_data = {
"results": [
{"title": f"Result {i}", "url": f"https://example.com/{i}", "content": "x"}
for i in range(10)
]
}
mock_req = _mock_requests(json_response=mock_data)
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.search_url = "http://localhost:8888"
result = web_search("test", num_results=3)
# Only 3 numbered entries should appear
assert "1." in result
assert "3." in result
assert "4." not in result
def test_service_unavailable(self):
"""Connection error degrades gracefully."""
mock_req = MagicMock()
mock_req.get.side_effect = OSError("connection refused")
mock_req.exceptions = MagicMock()
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.search_url = "http://localhost:8888"
result = web_search("test")
assert "not reachable" in result or "unavailable" in result
def test_catalog_entry_exists(self):
"""web_search must appear in the tool catalog."""
from timmy.tools import get_all_available_tools
catalog = get_all_available_tools()
assert "web_search" in catalog
assert "orchestrator" in catalog["web_search"]["available_in"]
assert "echo" in catalog["web_search"]["available_in"]
# ---------------------------------------------------------------------------
# scrape_url tests
# ---------------------------------------------------------------------------
class TestScrapeUrl:
def test_invalid_url_no_scheme(self):
"""URLs without http(s) scheme are rejected before any HTTP call."""
result = scrape_url("example.com/page")
assert "Error: invalid URL" in result
def test_invalid_url_empty(self):
result = scrape_url("")
assert "Error: invalid URL" in result
def test_backend_none_short_circuits(self):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "none"
result = scrape_url("https://example.com")
assert "disabled" in result
def test_missing_requests_package(self):
with patch.dict("sys.modules", {"requests": None}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.crawl_url = "http://localhost:11235"
result = scrape_url("https://example.com")
assert "requests" in result and "not installed" in result
def test_sync_result_returned_immediately(self):
"""If Crawl4AI returns results in the POST response, use them directly."""
mock_data = {
"results": [{"markdown": "# Hello\n\nThis is the page content."}]
}
mock_req = _mock_requests(json_response=mock_data)
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.crawl_url = "http://localhost:11235"
result = scrape_url("https://example.com")
assert "Hello" in result
assert "page content" in result
def test_async_poll_completed(self):
"""Async task_id flow: polls until completed and returns content."""
submit_response = MagicMock()
submit_response.json.return_value = {"task_id": "abc123"}
submit_response.raise_for_status.return_value = None
poll_response = MagicMock()
poll_response.json.return_value = {
"status": "completed",
"results": [{"markdown": "# Async content"}],
}
poll_response.raise_for_status.return_value = None
mock_req = MagicMock()
mock_req.post.return_value = submit_response
mock_req.get.return_value = poll_response
mock_req.exceptions = MagicMock()
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.crawl_url = "http://localhost:11235"
with patch("timmy.tools.search.time") as mock_time:
mock_time.sleep = MagicMock()
result = scrape_url("https://example.com")
assert "Async content" in result
def test_async_poll_failed_task(self):
"""Crawl4AI task failure is reported clearly."""
submit_response = MagicMock()
submit_response.json.return_value = {"task_id": "abc123"}
submit_response.raise_for_status.return_value = None
poll_response = MagicMock()
poll_response.json.return_value = {"status": "failed", "error": "site blocked"}
poll_response.raise_for_status.return_value = None
mock_req = MagicMock()
mock_req.post.return_value = submit_response
mock_req.get.return_value = poll_response
mock_req.exceptions = MagicMock()
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.crawl_url = "http://localhost:11235"
with patch("timmy.tools.search.time") as mock_time:
mock_time.sleep = MagicMock()
result = scrape_url("https://example.com")
assert "failed" in result and "site blocked" in result
def test_service_unavailable(self):
"""Connection error degrades gracefully."""
mock_req = MagicMock()
mock_req.post.side_effect = OSError("connection refused")
mock_req.exceptions = MagicMock()
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.crawl_url = "http://localhost:11235"
result = scrape_url("https://example.com")
assert "not reachable" in result or "unavailable" in result
def test_content_truncation(self):
"""Content longer than ~4000 tokens is truncated."""
long_content = "x" * 20000
mock_data = {"results": [{"markdown": long_content}]}
mock_req = _mock_requests(json_response=mock_data)
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:
mock_settings.timmy_search_backend = "searxng"
mock_settings.crawl_url = "http://localhost:11235"
result = scrape_url("https://example.com")
assert "[…truncated" in result
assert len(result) < 17000
def test_catalog_entry_exists(self):
"""scrape_url must appear in the tool catalog."""
from timmy.tools import get_all_available_tools
catalog = get_all_available_tools()
assert "scrape_url" in catalog
assert "orchestrator" in catalog["scrape_url"]["available_in"]
# ---------------------------------------------------------------------------
# _extract_crawl_content helper
# ---------------------------------------------------------------------------
class TestExtractCrawlContent:
def test_empty_results(self):
result = _extract_crawl_content([], "https://example.com")
assert "No content" in result
def test_markdown_field_preferred(self):
results = [{"markdown": "# Title", "content": "fallback"}]
result = _extract_crawl_content(results, "https://example.com")
assert "Title" in result
def test_fallback_to_content_field(self):
results = [{"content": "plain text content"}]
result = _extract_crawl_content(results, "https://example.com")
assert "plain text content" in result
def test_no_content_fields(self):
results = [{"url": "https://example.com"}]
result = _extract_crawl_content(results, "https://example.com")
assert "No readable content" in result

View File

@@ -1,135 +0,0 @@
"""Unit tests for AirLLM backend graceful degradation.
Verifies that setting TIMMY_MODEL_BACKEND=airllm on non-Apple-Silicon hardware
(Intel Mac, Linux, Windows) or when the airllm package is not installed
falls back to the Ollama backend without crashing.
Refs #1284
"""
import sys
from unittest.mock import MagicMock, patch
import pytest
pytestmark = pytest.mark.unit
class TestIsAppleSilicon:
"""is_apple_silicon() correctly identifies the host platform."""
def test_returns_true_on_arm64_darwin(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Darwin"), patch(
"platform.machine", return_value="arm64"
):
assert is_apple_silicon() is True
def test_returns_false_on_intel_mac(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Darwin"), patch(
"platform.machine", return_value="x86_64"
):
assert is_apple_silicon() is False
def test_returns_false_on_linux(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Linux"), patch(
"platform.machine", return_value="x86_64"
):
assert is_apple_silicon() is False
def test_returns_false_on_windows(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Windows"), patch(
"platform.machine", return_value="AMD64"
):
assert is_apple_silicon() is False
class TestAirLLMGracefulDegradation:
"""create_timmy(backend='airllm') falls back to Ollama on unsupported platforms."""
def _make_fake_ollama_agent(self):
"""Return a lightweight stub that satisfies the Agno Agent interface."""
agent = MagicMock()
agent.run = MagicMock(return_value=MagicMock(content="ok"))
return agent
def test_falls_back_to_ollama_on_non_apple_silicon(self, caplog):
"""On Intel/Linux, airllm backend logs a warning and creates an Ollama agent."""
import logging
from timmy.agent import create_timmy
fake_agent = self._make_fake_ollama_agent()
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("timmy.agent._create_ollama_agent", return_value=fake_agent) as mock_create,
patch("timmy.agent._resolve_model_with_fallback", return_value=("qwen3:8b", False)),
patch("timmy.agent._check_model_available", return_value=True),
patch("timmy.agent._build_tools_list", return_value=[]),
patch("timmy.agent._build_prompt", return_value="test prompt"),
caplog.at_level(logging.WARNING, logger="timmy.agent"),
):
result = create_timmy(backend="airllm")
assert result is fake_agent
mock_create.assert_called_once()
assert "Apple Silicon" in caplog.text
def test_falls_back_to_ollama_when_airllm_not_installed(self, caplog):
"""When the airllm package is missing, log a warning and use Ollama."""
import logging
from timmy.agent import create_timmy
fake_agent = self._make_fake_ollama_agent()
# Simulate Apple Silicon + missing airllm package
def _import_side_effect(name, *args, **kwargs):
if name == "airllm":
raise ImportError("No module named 'airllm'")
return original_import(name, *args, **kwargs)
original_import = __builtins__["__import__"] if isinstance(__builtins__, dict) else __import__
with (
patch("timmy.backends.is_apple_silicon", return_value=True),
patch("builtins.__import__", side_effect=_import_side_effect),
patch("timmy.agent._create_ollama_agent", return_value=fake_agent) as mock_create,
patch("timmy.agent._resolve_model_with_fallback", return_value=("qwen3:8b", False)),
patch("timmy.agent._check_model_available", return_value=True),
patch("timmy.agent._build_tools_list", return_value=[]),
patch("timmy.agent._build_prompt", return_value="test prompt"),
caplog.at_level(logging.WARNING, logger="timmy.agent"),
):
result = create_timmy(backend="airllm")
assert result is fake_agent
mock_create.assert_called_once()
assert "airllm" in caplog.text.lower() or "AirLLM" in caplog.text
def test_airllm_backend_does_not_raise(self):
"""create_timmy(backend='airllm') never raises — it degrades gracefully."""
from timmy.agent import create_timmy
fake_agent = self._make_fake_ollama_agent()
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("timmy.agent._create_ollama_agent", return_value=fake_agent),
patch("timmy.agent._resolve_model_with_fallback", return_value=("qwen3:8b", False)),
patch("timmy.agent._check_model_available", return_value=True),
patch("timmy.agent._build_tools_list", return_value=[]),
patch("timmy.agent._build_prompt", return_value="test prompt"),
):
# Should not raise under any circumstances
result = create_timmy(backend="airllm")
assert result is not None

View File

@@ -1,235 +0,0 @@
"""Unit tests for brain.worker.DistributedWorker."""
from __future__ import annotations
import threading
from unittest.mock import MagicMock, patch
import pytest
from brain.worker import MAX_RETRIES, DelegatedTask, DistributedWorker
@pytest.fixture(autouse=True)
def clear_task_registry():
"""Reset the worker registry before each test."""
DistributedWorker.clear()
yield
DistributedWorker.clear()
class TestSubmit:
def test_returns_task_id(self):
with patch.object(DistributedWorker, "_run_task"):
task_id = DistributedWorker.submit("researcher", "research", "find something")
assert isinstance(task_id, str)
assert len(task_id) == 8
def test_task_registered_as_queued(self):
with patch.object(DistributedWorker, "_run_task"):
task_id = DistributedWorker.submit("coder", "code", "fix the bug")
status = DistributedWorker.get_status(task_id)
assert status["found"] is True
assert status["task_id"] == task_id
assert status["agent"] == "coder"
def test_unique_task_ids(self):
with patch.object(DistributedWorker, "_run_task"):
ids = [DistributedWorker.submit("coder", "code", "task") for _ in range(10)]
assert len(set(ids)) == 10
def test_starts_daemon_thread(self):
event = threading.Event()
def fake_run_task(record):
event.set()
with patch.object(DistributedWorker, "_run_task", side_effect=fake_run_task):
DistributedWorker.submit("coder", "code", "something")
assert event.wait(timeout=2), "Background thread did not start"
def test_priority_stored(self):
with patch.object(DistributedWorker, "_run_task"):
task_id = DistributedWorker.submit("coder", "code", "task", priority="high")
status = DistributedWorker.get_status(task_id)
assert status["priority"] == "high"
class TestGetStatus:
def test_unknown_task_id(self):
result = DistributedWorker.get_status("deadbeef")
assert result["found"] is False
assert result["task_id"] == "deadbeef"
def test_known_task_has_all_fields(self):
with patch.object(DistributedWorker, "_run_task"):
task_id = DistributedWorker.submit("writer", "writing", "write a blog post")
status = DistributedWorker.get_status(task_id)
for key in ("found", "task_id", "agent", "role", "status", "backend", "created_at"):
assert key in status, f"Missing key: {key}"
class TestListTasks:
def test_empty_initially(self):
assert DistributedWorker.list_tasks() == []
def test_returns_registered_tasks(self):
with patch.object(DistributedWorker, "_run_task"):
DistributedWorker.submit("coder", "code", "task A")
DistributedWorker.submit("writer", "writing", "task B")
tasks = DistributedWorker.list_tasks()
assert len(tasks) == 2
agents = {t["agent"] for t in tasks}
assert agents == {"coder", "writer"}
class TestSelectBackend:
def test_defaults_to_agentic_loop(self):
with patch("brain.worker.logger"):
backend = DistributedWorker._select_backend("code", "fix the bug")
assert backend == "agentic_loop"
def test_kimi_for_heavy_research_with_gitea(self):
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.paperclip_api_key = ""
with (
patch("timmy.kimi_delegation.exceeds_local_capacity", return_value=True),
patch("config.settings", mock_settings),
):
backend = DistributedWorker._select_backend("research", "comprehensive survey " * 10)
assert backend == "kimi"
def test_agentic_loop_when_no_gitea(self):
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
mock_settings.paperclip_api_key = ""
with patch("config.settings", mock_settings):
backend = DistributedWorker._select_backend("research", "comprehensive survey " * 10)
assert backend == "agentic_loop"
def test_paperclip_when_api_key_configured(self):
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
mock_settings.paperclip_api_key = "pk_test_123"
with patch("config.settings", mock_settings):
backend = DistributedWorker._select_backend("code", "build a widget")
assert backend == "paperclip"
class TestRunTask:
def test_marks_completed_on_success(self):
record = DelegatedTask(
task_id="abc12345",
agent_name="coder",
agent_role="code",
task_description="fix bug",
priority="normal",
backend="agentic_loop",
)
with patch.object(DistributedWorker, "_dispatch", return_value={"success": True}):
DistributedWorker._run_task(record)
assert record.status == "completed"
assert record.result == {"success": True}
assert record.error is None
def test_marks_failed_after_exhausting_retries(self):
record = DelegatedTask(
task_id="fail1234",
agent_name="coder",
agent_role="code",
task_description="broken task",
priority="normal",
backend="agentic_loop",
)
with patch.object(DistributedWorker, "_dispatch", side_effect=RuntimeError("boom")):
DistributedWorker._run_task(record)
assert record.status == "failed"
assert "boom" in record.error
assert record.retries == MAX_RETRIES
def test_retries_before_failing(self):
record = DelegatedTask(
task_id="retry001",
agent_name="coder",
agent_role="code",
task_description="flaky task",
priority="normal",
backend="agentic_loop",
)
call_count = 0
def flaky_dispatch(r):
nonlocal call_count
call_count += 1
if call_count < MAX_RETRIES + 1:
raise RuntimeError("transient failure")
return {"success": True}
with patch.object(DistributedWorker, "_dispatch", side_effect=flaky_dispatch):
DistributedWorker._run_task(record)
assert record.status == "completed"
assert call_count == MAX_RETRIES + 1
def test_succeeds_on_first_attempt(self):
record = DelegatedTask(
task_id="ok000001",
agent_name="writer",
agent_role="writing",
task_description="write summary",
priority="low",
backend="agentic_loop",
)
with patch.object(DistributedWorker, "_dispatch", return_value={"summary": "done"}):
DistributedWorker._run_task(record)
assert record.status == "completed"
assert record.retries == 0
class TestDelegatetaskIntegration:
"""Integration: delegate_task should wire to DistributedWorker."""
def test_delegate_task_returns_task_id(self):
from timmy.tools_delegation import delegate_task
with patch.object(DistributedWorker, "_run_task"):
result = delegate_task("researcher", "research something for me")
assert result["success"] is True
assert result["task_id"] is not None
assert result["status"] == "queued"
def test_delegate_task_status_queued_for_valid_agent(self):
from timmy.tools_delegation import delegate_task
with patch.object(DistributedWorker, "_run_task"):
result = delegate_task("coder", "implement feature X")
assert result["status"] == "queued"
assert len(result["task_id"]) == 8
def test_task_in_registry_after_delegation(self):
from timmy.tools_delegation import delegate_task
with patch.object(DistributedWorker, "_run_task"):
result = delegate_task("writer", "write documentation")
task_id = result["task_id"]
status = DistributedWorker.get_status(task_id)
assert status["found"] is True
assert status["agent"] == "writer"

10
tox.ini
View File

@@ -41,8 +41,10 @@ description = Static type checking with mypy
commands_pre =
deps =
mypy>=1.0.0
types-PyYAML
types-requests
commands =
mypy src --ignore-missing-imports --no-error-summary
mypy src
# ── Test Environments ────────────────────────────────────────────────────────
@@ -130,13 +132,17 @@ commands =
# ── Pre-push (mirrors CI exactly) ────────────────────────────────────────────
[testenv:pre-push]
description = Local gate — lint + full CI suite (same as Gitea Actions)
description = Local gate — lint + typecheck + full CI suite (same as Gitea Actions)
deps =
ruff>=0.8.0
mypy>=1.0.0
types-PyYAML
types-requests
commands =
ruff check src/ tests/
ruff format --check src/ tests/
bash -c 'files=$(grep -rl "<style" src/dashboard/templates/ --include="*.html" 2>/dev/null); if [ -n "$files" ]; then echo "ERROR: inline <style> blocks found — move CSS to static/css/mission-control.css:"; echo "$files"; exit 1; fi; echo "No inline CSS — OK"'
mypy src
mkdir -p reports
pytest tests/ \
--cov=src \