Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
e55fc07f5e test: add pytestmark and full coverage for events bus (#917)
Some checks failed
Tests / lint (pull_request) Failing after 37s
Tests / test (pull_request) Has been skipped
- Add `pytestmark = pytest.mark.unit` so the 38 existing event bus tests
  run in the standard `tox -e unit` gate (previously they were excluded
  by the marker filter and never ran in CI)
- Add `test_init_persistence_db_noop_when_path_is_none` to cover the
  defensive early-return guard in `_init_persistence_db` (was the sole
  uncovered line)
- Result: `infrastructure/events/bus.py` at 100% coverage; unit suite
  grows from 474 → 513 tests

Fixes #917

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 21:47:41 -04:00
14 changed files with 23 additions and 2107 deletions

View File

@@ -150,7 +150,6 @@ async def transcribe_audio(audio: bytes) -> str:
| Service | When Unavailable | Fallback Behavior |
|---------|------------------|-------------------|
| Ollama | No local LLM | Claude backend (if ANTHROPIC_API_KEY set) |
| vLLM | Server not running | Ollama backend (cascade router fallback) |
| Redis | Cache/storage down | In-memory dict (ephemeral) |
| AirLLM | Import error or no Apple Silicon | Ollama backend |
| Voice (Piper) | Service down | Browser Web Speech API |

View File

@@ -1,122 +0,0 @@
# SOVEREIGNTY.md — Research Sovereignty Manifest
> "If this spec is implemented correctly, it is the last research document
> Alexander should need to request from a corporate AI."
> — Issue #972, March 22 2026
---
## What This Is
A machine-readable declaration of Timmy's research independence:
where we are, where we're going, and how to measure progress.
---
## The Problem We're Solving
On March 22, 2026, a single Claude session produced six deep research reports.
It consumed ~3 hours of human time and substantial corporate AI inference.
Every report was valuable — but the workflow was **linear**.
It would cost exactly the same to reproduce tomorrow.
This file tracks the pipeline that crystallizes that workflow into something
Timmy can run autonomously.
---
## The Six-Step Pipeline
| Step | What Happens | Status |
|------|-------------|--------|
| 1. Scope | Human describes knowledge gap → Gitea issue with template | ✅ Done (`skills/research/`) |
| 2. Query | LLM slot-fills template → 515 targeted queries | ✅ Done (`research.py`) |
| 3. Search | Execute queries → top result URLs | ✅ Done (`research_tools.py`) |
| 4. Fetch | Download + extract full pages (trafilatura) | ✅ Done (`tools/system_tools.py`) |
| 5. Synthesize | Compress findings → structured report | ✅ Done (`research.py` cascade) |
| 6. Deliver | Store to semantic memory + optional disk persist | ✅ Done (`research.py`) |
---
## Cascade Tiers (Synthesis Quality vs. Cost)
| Tier | Model | Cost | Quality | Status |
|------|-------|------|---------|--------|
| **4** | SQLite semantic cache | $0.00 / instant | reuses prior | ✅ Active |
| **3** | Ollama `qwen3:14b` | $0.00 / local | ★★★ | ✅ Active |
| **2** | Claude API (haiku) | ~$0.01/report | ★★★★ | ✅ Active (opt-in) |
| **1** | Groq `llama-3.3-70b` | $0.00 / rate-limited | ★★★★ | 🔲 Planned (#980) |
Set `ANTHROPIC_API_KEY` to enable Tier 2 fallback.
---
## Research Templates
Six prompt templates live in `skills/research/`:
| Template | Use Case |
|----------|----------|
| `tool_evaluation.md` | Find all shipping tools for `{domain}` |
| `architecture_spike.md` | How to connect `{system_a}` to `{system_b}` |
| `game_analysis.md` | Evaluate `{game}` for AI agent play |
| `integration_guide.md` | Wire `{tool}` into `{stack}` with code |
| `state_of_art.md` | What exists in `{field}` as of `{date}` |
| `competitive_scan.md` | How does `{project}` compare to `{alternatives}` |
---
## Sovereignty Metrics
| Metric | Target (Week 1) | Target (Month 1) | Target (Month 3) | Graduation |
|--------|-----------------|------------------|------------------|------------|
| Queries answered locally | 10% | 40% | 80% | >90% |
| API cost per report | <$1.50 | <$0.50 | <$0.10 | <$0.01 |
| Time from question to report | <3 hours | <30 min | <5 min | <1 min |
| Human involvement | 100% (review) | Review only | Approve only | None |
---
## How to Use the Pipeline
```python
from timmy.research import run_research
# Quick research (no template)
result = await run_research("best local embedding models for 36GB RAM")
# With a template and slot values
result = await run_research(
topic="PDF text extraction libraries for Python",
template="tool_evaluation",
slots={"domain": "PDF parsing", "use_case": "RAG pipeline", "focus_criteria": "accuracy"},
save_to_disk=True,
)
print(result.report)
print(f"Backend: {result.synthesis_backend}, Cached: {result.cached}")
```
---
## Implementation Status
| Component | Issue | Status |
|-----------|-------|--------|
| `web_fetch` tool (trafilatura) | #973 | ✅ Done |
| Research template library (6 templates) | #974 | ✅ Done |
| `ResearchOrchestrator` (`research.py`) | #975 | ✅ Done |
| Semantic index for outputs | #976 | 🔲 Planned |
| Auto-create Gitea issues from findings | #977 | 🔲 Planned |
| Paperclip task runner integration | #978 | 🔲 Planned |
| Kimi delegation via labels | #979 | 🔲 Planned |
| Groq free-tier cascade tier | #980 | 🔲 Planned |
| Sovereignty metrics dashboard | #981 | 🔲 Planned |
---
## Governing Spec
See [issue #972](http://143.198.27.163:3000/Rockachopa/Timmy-time-dashboard/issues/972) for the full spec and rationale.
Research artifacts committed to `docs/research/`.

View File

@@ -131,34 +131,11 @@ providers:
context_window: 32000
capabilities: [text, tools, json, streaming]
# Tertiary: vLLM (OpenAI-compatible, continuous batching, 3-10x agentic throughput)
# Runs on CUDA GPU or CPU. On Apple Silicon, prefer vllm-mlx-local (above).
# To enable: start vLLM server:
# python -m vllm.entrypoints.openai.api_server \
# --model Qwen/Qwen2.5-14B-Instruct --port 8001
# Then set enabled: true (or TIMMY_LLM_BACKEND=vllm + VLLM_URL=http://localhost:8001)
- name: vllm-local
type: vllm
enabled: false # Enable when vLLM server is running
priority: 3
tier: local
base_url: "http://localhost:8001/v1"
models:
- name: Qwen/Qwen2.5-14B-Instruct
default: true
context_window: 32000
capabilities: [text, tools, json, streaming, complex]
description: "Qwen2.5-14B on vLLM — continuous batching for agentic workloads"
- name: Qwen/Qwen2.5-7B-Instruct
context_window: 32000
capabilities: [text, tools, json, streaming, routine]
description: "Qwen2.5-7B on vLLM — fast model for routine tasks"
# Quinary: OpenAI (if API key available)
# Tertiary: OpenAI (if API key available)
- name: openai-backup
type: openai
enabled: false # Enable by setting OPENAI_API_KEY
priority: 4
priority: 3
tier: standard_cloud
api_key: "${OPENAI_API_KEY}" # Loaded from environment
base_url: null # Use default OpenAI endpoint
@@ -170,12 +147,12 @@ providers:
- name: gpt-4o
context_window: 128000
capabilities: [text, vision, tools, json, streaming]
# Senary: Anthropic (if API key available)
# Quaternary: Anthropic (if API key available)
- name: anthropic-backup
type: anthropic
enabled: false # Enable by setting ANTHROPIC_API_KEY
priority: 5
priority: 4
tier: frontier
api_key: "${ANTHROPIC_API_KEY}"
models:

View File

@@ -42,10 +42,6 @@ services:
GROK_ENABLED: "${GROK_ENABLED:-false}"
XAI_API_KEY: "${XAI_API_KEY:-}"
GROK_DEFAULT_MODEL: "${GROK_DEFAULT_MODEL:-grok-3-fast}"
# vLLM backend — set TIMMY_LLM_BACKEND=vllm to activate
TIMMY_LLM_BACKEND: "${TIMMY_LLM_BACKEND:-ollama}"
VLLM_URL: "${VLLM_URL:-http://localhost:8001}"
VLLM_MODEL: "${VLLM_MODEL:-Qwen/Qwen2.5-14B-Instruct}"
extra_hosts:
- "host.docker.internal:host-gateway" # Linux: maps to host IP
networks:
@@ -78,49 +74,6 @@ services:
profiles:
- celery
# ── vLLM — high-throughput inference server (GPU optional) ──────────────
# Requires the 'vllm' profile: docker compose --profile vllm up
#
# GPU (NVIDIA): set VLLM_MODEL and ensure nvidia-container-toolkit is installed.
# CPU-only: add --device cpu to VLLM_EXTRA_ARGS (slower, but works anywhere).
#
# The dashboard reaches vLLM at http://vllm:8001 (inside timmy-net).
# Set VLLM_URL=http://vllm:8001 in the dashboard environment when using this service.
vllm:
image: vllm/vllm-openai:latest
container_name: timmy-vllm
profiles:
- vllm
ports:
- "8001:8001"
environment:
# Model to load — override with VLLM_MODEL env var
VLLM_MODEL: "${VLLM_MODEL:-Qwen/Qwen2.5-7B-Instruct}"
command: >
--model ${VLLM_MODEL:-Qwen/Qwen2.5-7B-Instruct}
--port 8001
--host 0.0.0.0
${VLLM_EXTRA_ARGS:-}
volumes:
- vllm-cache:/root/.cache/huggingface
networks:
- timmy-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 120s
# GPU support — uncomment to enable NVIDIA GPU passthrough
# deploy:
# resources:
# reservations:
# devices:
# - driver: nvidia
# count: all
# capabilities: [gpu]
# ── OpenFang — vendored agent runtime sidecar ────────────────────────────
openfang:
build:
@@ -157,8 +110,6 @@ volumes:
device: "${PWD}/data"
openfang-data:
driver: local
vllm-cache:
driver: local
# ── Internal network ────────────────────────────────────────────────────────
networks:

View File

@@ -94,18 +94,8 @@ class Settings(BaseSettings):
# ── Backend selection ────────────────────────────────────────────────────
# "ollama" — always use Ollama (default, safe everywhere)
# "vllm" — use vLLM inference server (OpenAI-compatible, faster throughput)
# "auto" — pick best available local backend, fall back to Ollama
timmy_model_backend: Literal["ollama", "vllm", "grok", "claude", "auto"] = "ollama"
# ── vLLM backend ──────────────────────────────────────────────────────────
# vLLM is an OpenAI-compatible inference server optimised for continuous
# batching — 310x higher throughput than Ollama for agentic workloads.
# Start server: python -m vllm.entrypoints.openai.api_server \
# --model Qwen/Qwen2.5-14B-Instruct --port 8001
# Then set TIMMY_LLM_BACKEND=vllm (or enable vllm-local in providers.yaml)
vllm_url: str = "http://localhost:8001"
vllm_model: str = "Qwen/Qwen2.5-14B-Instruct"
timmy_model_backend: Literal["ollama", "grok", "claude", "auto"] = "ollama"
# ── Grok (xAI) — opt-in premium cloud backend ────────────────────────
# Grok is a premium augmentation layer — local-first ethos preserved.

View File

@@ -124,73 +124,6 @@ async def check_ollama() -> bool:
return dep.status == "healthy"
# vLLM health cache (30-second TTL)
_vllm_cache: DependencyStatus | None = None
_vllm_cache_ts: float = 0.0
_VLLM_CACHE_TTL = 30.0
def _check_vllm_sync() -> DependencyStatus:
"""Synchronous vLLM check — run via asyncio.to_thread()."""
try:
import urllib.request
base_url = settings.vllm_url.rstrip("/")
# vLLM exposes /health at the server root (strip /v1 if present)
if base_url.endswith("/v1"):
base_url = base_url[:-3]
req = urllib.request.Request(
f"{base_url}/health",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=2) as response:
if response.status == 200:
return DependencyStatus(
name="vLLM",
status="healthy",
sovereignty_score=10,
details={"url": settings.vllm_url, "model": settings.vllm_model},
)
except Exception as exc:
logger.debug("vLLM health check failed: %s", exc)
return DependencyStatus(
name="vLLM",
status="unavailable",
sovereignty_score=10,
details={"url": settings.vllm_url, "error": "Cannot connect to vLLM server"},
)
async def _check_vllm() -> DependencyStatus:
"""Check vLLM backend status without blocking the event loop.
Results are cached for 30 seconds. vLLM is an optional backend;
unavailability triggers graceful fallback to Ollama.
"""
global _vllm_cache, _vllm_cache_ts # noqa: PLW0603
now = time.monotonic()
if _vllm_cache is not None and (now - _vllm_cache_ts) < _VLLM_CACHE_TTL:
return _vllm_cache
try:
result = await asyncio.to_thread(_check_vllm_sync)
except Exception as exc:
logger.debug("vLLM async check failed: %s", exc)
result = DependencyStatus(
name="vLLM",
status="unavailable",
sovereignty_score=10,
details={"url": settings.vllm_url, "error": "Cannot connect to vLLM server"},
)
_vllm_cache = result
_vllm_cache_ts = now
return result
def _check_lightning() -> DependencyStatus:
"""Check Lightning payment backend status."""
return DependencyStatus(
@@ -262,22 +195,13 @@ async def health_check():
# Legacy format for test compatibility
ollama_ok = await check_ollama()
# Check vLLM only when it is the configured backend (avoid probing unused services)
vllm_status: str | None = None
if settings.timmy_model_backend == "vllm":
vllm_dep = await _check_vllm()
vllm_status = "up" if vllm_dep.status == "healthy" else "down"
inference_ok = vllm_status == "up" if vllm_status is not None else ollama_ok
agent_status = "idle" if inference_ok else "offline"
services: dict = {"ollama": "up" if ollama_ok else "down"}
if vllm_status is not None:
services["vllm"] = vllm_status
agent_status = "idle" if ollama_ok else "offline"
return {
"status": "ok" if inference_ok else "degraded",
"services": services,
"status": "ok" if ollama_ok else "degraded",
"services": {
"ollama": "up" if ollama_ok else "down",
},
"agents": {
"agent": {"status": agent_status},
},
@@ -286,7 +210,7 @@ async def health_check():
"version": "2.0.0",
"uptime_seconds": uptime,
"llm_backend": settings.timmy_model_backend,
"llm_model": settings.vllm_model if settings.timmy_model_backend == "vllm" else settings.ollama_model,
"llm_model": settings.ollama_model,
}
@@ -328,9 +252,6 @@ async def sovereignty_check():
_check_lightning(),
_check_sqlite(),
]
# Include vLLM in the audit when it is the active backend
if settings.timmy_model_backend == "vllm":
dependencies.append(await _check_vllm())
overall = _calculate_overall_score(dependencies)
recommendations = _generate_recommendations(dependencies)

View File

@@ -186,24 +186,6 @@
<p class="chat-history-placeholder">Loading sovereignty metrics...</p>
{% endcall %}
<!-- Agent Scorecards -->
<div class="card mc-card-spaced" id="mc-scorecards-card">
<div class="card-header">
<h2 class="card-title">Agent Scorecards</h2>
<div class="d-flex align-items-center gap-2">
<select id="mc-scorecard-period" class="form-select form-select-sm" style="width: auto;"
onchange="loadMcScorecards()">
<option value="daily" selected>Daily</option>
<option value="weekly">Weekly</option>
</select>
<a href="/scorecards" class="btn btn-sm btn-outline-secondary">Full View</a>
</div>
</div>
<div id="mc-scorecards-content" class="p-2">
<p class="chat-history-placeholder">Loading scorecards...</p>
</div>
</div>
<!-- Chat History -->
<div class="card mc-card-spaced">
<div class="card-header">
@@ -520,20 +502,6 @@ async function loadSparkStatus() {
}
}
// Load agent scorecards
async function loadMcScorecards() {
var period = document.getElementById('mc-scorecard-period').value;
var container = document.getElementById('mc-scorecards-content');
container.innerHTML = '<p class="chat-history-placeholder">Loading scorecards...</p>';
try {
var response = await fetch('/scorecards/all/panels?period=' + period);
var html = await response.text();
container.innerHTML = html;
} catch (error) {
container.innerHTML = '<p class="chat-history-placeholder">Scorecards unavailable</p>';
}
}
// Initial load
loadSparkStatus();
loadSovereignty();
@@ -542,7 +510,6 @@ loadSwarmStats();
loadLightningStats();
loadGrokStats();
loadChatHistory();
loadMcScorecards();
// Periodic updates
setInterval(loadSovereignty, 30000);
@@ -551,6 +518,5 @@ setInterval(loadSwarmStats, 5000);
setInterval(updateHeartbeat, 5000);
setInterval(loadGrokStats, 10000);
setInterval(loadSparkStatus, 15000);
setInterval(loadMcScorecards, 300000);
</script>
{% endblock %}

View File

@@ -331,22 +331,6 @@ class CascadeRouter:
logger.debug("vllm-mlx provider check error: %s", exc)
return False
elif provider.type == "vllm":
# Check if standard vLLM server is running (OpenAI-compatible API)
if requests is None:
return True
try:
base_url = provider.base_url or provider.url or settings.vllm_url
# Strip /v1 suffix — health endpoint is at the server root
server_root = base_url.rstrip("/")
if server_root.endswith("/v1"):
server_root = server_root[:-3]
response = requests.get(f"{server_root}/health", timeout=5)
return response.status_code == 200
except Exception as exc:
logger.debug("vllm provider check error: %s", exc)
return False
elif provider.type in ("openai", "anthropic", "grok"):
# Check if API key is set
return provider.api_key is not None and provider.api_key != ""
@@ -809,14 +793,6 @@ class CascadeRouter:
temperature=temperature,
max_tokens=max_tokens,
)
elif provider.type == "vllm":
result = await self._call_vllm(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
else:
raise ValueError(f"Unknown provider type: {provider.type}")
@@ -1055,49 +1031,6 @@ class CascadeRouter:
"model": response.model,
}
async def _call_vllm(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call a standard vLLM server via its OpenAI-compatible API.
vLLM exposes the same /v1/chat/completions endpoint as OpenAI.
No API key is required for local deployments.
Default URL comes from settings.vllm_url (VLLM_URL env var).
"""
import openai
base_url = provider.base_url or provider.url or settings.vllm_url
# Ensure the base_url ends with /v1 as expected by the OpenAI client
if not base_url.rstrip("/").endswith("/v1"):
base_url = base_url.rstrip("/") + "/v1"
client = openai.AsyncOpenAI(
api_key=provider.api_key or "no-key-required",
base_url=base_url,
timeout=self.config.timeout_seconds,
)
kwargs: dict = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await client.chat.completions.create(**kwargs)
return {
"content": response.choices[0].message.content,
"model": response.model,
}
def _record_success(self, provider: Provider, latency_ms: float) -> None:
"""Record a successful request."""
provider.metrics.total_requests += 1

View File

@@ -1,528 +0,0 @@
"""Research Orchestrator — autonomous, sovereign research pipeline.
Chains all six steps of the research workflow with local-first execution:
Step 0 Cache — check semantic memory (SQLite, instant, zero API cost)
Step 1 Scope — load a research template from skills/research/
Step 2 Query — slot-fill template + formulate 5-15 search queries via Ollama
Step 3 Search — execute queries via web_search (SerpAPI or fallback)
Step 4 Fetch — download + extract full pages via web_fetch (trafilatura)
Step 5 Synth — compress findings into a structured report via cascade
Step 6 Deliver — store to semantic memory; optionally save to docs/research/
Cascade tiers for synthesis (spec §4):
Tier 4 SQLite semantic cache — instant, free, covers ~80% after warm-up
Tier 3 Ollama (qwen3:14b) — local, free, good quality
Tier 2 Claude API (haiku) — cloud fallback, cheap, set ANTHROPIC_API_KEY
Tier 1 (future) Groq — free-tier rate-limited, tracked in #980
All optional services degrade gracefully per project conventions.
Refs #972 (governing spec), #975 (ResearchOrchestrator sub-issue).
"""
from __future__ import annotations
import asyncio
import logging
import re
import textwrap
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Optional memory imports — available at module level so tests can patch them.
try:
from timmy.memory_system import SemanticMemory, store_memory
except Exception: # pragma: no cover
SemanticMemory = None # type: ignore[assignment,misc]
store_memory = None # type: ignore[assignment]
# Root of the project — two levels up from src/timmy/
_PROJECT_ROOT = Path(__file__).parent.parent.parent
_SKILLS_ROOT = _PROJECT_ROOT / "skills" / "research"
_DOCS_ROOT = _PROJECT_ROOT / "docs" / "research"
# Similarity threshold for cache hit (01 cosine similarity)
_CACHE_HIT_THRESHOLD = 0.82
# How many search result URLs to fetch as full pages
_FETCH_TOP_N = 5
# Maximum tokens to request from the synthesis LLM
_SYNTHESIS_MAX_TOKENS = 4096
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ResearchResult:
"""Full output of a research pipeline run."""
topic: str
query_count: int
sources_fetched: int
report: str
cached: bool = False
cache_similarity: float = 0.0
synthesis_backend: str = "unknown"
errors: list[str] = field(default_factory=list)
def is_empty(self) -> bool:
return not self.report.strip()
# ---------------------------------------------------------------------------
# Template loading
# ---------------------------------------------------------------------------
def list_templates() -> list[str]:
"""Return names of available research templates (without .md extension)."""
if not _SKILLS_ROOT.exists():
return []
return [p.stem for p in sorted(_SKILLS_ROOT.glob("*.md"))]
def load_template(template_name: str, slots: dict[str, str] | None = None) -> str:
"""Load a research template and fill {slot} placeholders.
Args:
template_name: Stem of the .md file under skills/research/ (e.g. "tool_evaluation").
slots: Mapping of {placeholder} → replacement value.
Returns:
Template text with slots filled. Unfilled slots are left as-is.
"""
path = _SKILLS_ROOT / f"{template_name}.md"
if not path.exists():
available = ", ".join(list_templates()) or "(none)"
raise FileNotFoundError(
f"Research template {template_name!r} not found. "
f"Available: {available}"
)
text = path.read_text(encoding="utf-8")
# Strip YAML frontmatter (--- ... ---), including empty frontmatter (--- \n---)
text = re.sub(r"^---\n.*?---\n", "", text, flags=re.DOTALL)
if slots:
for key, value in slots.items():
text = text.replace(f"{{{key}}}", value)
return text.strip()
# ---------------------------------------------------------------------------
# Query formulation (Step 2)
# ---------------------------------------------------------------------------
async def _formulate_queries(topic: str, template_context: str, n: int = 8) -> list[str]:
"""Use the local LLM to generate targeted search queries for a topic.
Falls back to a simple heuristic if Ollama is unavailable.
"""
prompt = textwrap.dedent(f"""\
You are a research assistant. Generate exactly {n} targeted, specific web search
queries to thoroughly research the following topic.
TOPIC: {topic}
RESEARCH CONTEXT:
{template_context[:1000]}
Rules:
- One query per line, no numbering, no bullet points.
- Vary the angle (definition, comparison, implementation, alternatives, pitfalls).
- Prefer exact technical terms, tool names, and version numbers where relevant.
- Output ONLY the queries, nothing else.
""")
queries = await _ollama_complete(prompt, max_tokens=512)
if not queries:
# Minimal fallback
return [
f"{topic} overview",
f"{topic} tutorial",
f"{topic} best practices",
f"{topic} alternatives",
f"{topic} 2025",
]
lines = [ln.strip() for ln in queries.splitlines() if ln.strip()]
return lines[:n] if len(lines) >= n else lines
# ---------------------------------------------------------------------------
# Search (Step 3)
# ---------------------------------------------------------------------------
async def _execute_search(queries: list[str]) -> list[dict[str, str]]:
"""Run each query through the available web search backend.
Returns a flat list of {title, url, snippet} dicts.
Degrades gracefully if SerpAPI key is absent.
"""
results: list[dict[str, str]] = []
seen_urls: set[str] = set()
for query in queries:
try:
raw = await asyncio.to_thread(_run_search_sync, query)
for item in raw:
url = item.get("url", "")
if url and url not in seen_urls:
seen_urls.add(url)
results.append(item)
except Exception as exc:
logger.warning("Search failed for query %r: %s", query, exc)
return results
def _run_search_sync(query: str) -> list[dict[str, str]]:
"""Synchronous search — wraps SerpAPI or returns empty on missing key."""
import os
if not os.environ.get("SERPAPI_API_KEY"):
logger.debug("SERPAPI_API_KEY not set — skipping web search for %r", query)
return []
try:
from serpapi import GoogleSearch
params = {"q": query, "api_key": os.environ["SERPAPI_API_KEY"], "num": 5}
search = GoogleSearch(params)
data = search.get_dict()
items = []
for r in data.get("organic_results", []):
items.append(
{
"title": r.get("title", ""),
"url": r.get("link", ""),
"snippet": r.get("snippet", ""),
}
)
return items
except Exception as exc:
logger.warning("SerpAPI search error: %s", exc)
return []
# ---------------------------------------------------------------------------
# Fetch (Step 4)
# ---------------------------------------------------------------------------
async def _fetch_pages(results: list[dict[str, str]], top_n: int = _FETCH_TOP_N) -> list[str]:
"""Download and extract full text for the top search results.
Uses web_fetch (trafilatura) from timmy.tools.system_tools.
"""
try:
from timmy.tools.system_tools import web_fetch
except ImportError:
logger.warning("web_fetch not available — skipping page fetch")
return []
pages: list[str] = []
for item in results[:top_n]:
url = item.get("url", "")
if not url:
continue
try:
text = await asyncio.to_thread(web_fetch, url, 6000)
if text and not text.startswith("Error:"):
pages.append(f"## {item.get('title', url)}\nSource: {url}\n\n{text}")
except Exception as exc:
logger.warning("Failed to fetch %s: %s", url, exc)
return pages
# ---------------------------------------------------------------------------
# Synthesis (Step 5) — cascade: Ollama → Claude fallback
# ---------------------------------------------------------------------------
async def _synthesize(topic: str, pages: list[str], snippets: list[str]) -> tuple[str, str]:
"""Compress fetched pages + snippets into a structured research report.
Returns (report_markdown, backend_used).
"""
# Build synthesis prompt
source_content = "\n\n---\n\n".join(pages[:5])
if not source_content and snippets:
source_content = "\n".join(f"- {s}" for s in snippets[:20])
if not source_content:
return (
f"# Research: {topic}\n\n*No source material was retrieved. "
"Check SERPAPI_API_KEY and network connectivity.*",
"none",
)
prompt = textwrap.dedent(f"""\
You are a senior technical researcher. Synthesize the source material below
into a structured research report on the topic: **{topic}**
FORMAT YOUR REPORT AS:
# {topic}
## Executive Summary
(2-3 sentences: what you found, top recommendation)
## Key Findings
(Bullet list of the most important facts, tools, or patterns)
## Comparison / Options
(Table or list comparing alternatives where applicable)
## Recommended Approach
(Concrete recommendation with rationale)
## Gaps & Next Steps
(What wasn't answered, what to investigate next)
---
SOURCE MATERIAL:
{source_content[:12000]}
""")
# Tier 3 — try Ollama first
report = await _ollama_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
if report:
return report, "ollama"
# Tier 2 — Claude fallback
report = await _claude_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
if report:
return report, "claude"
# Last resort — structured snippet summary
summary = f"# {topic}\n\n## Snippets\n\n" + "\n\n".join(
f"- {s}" for s in snippets[:15]
)
return summary, "fallback"
# ---------------------------------------------------------------------------
# LLM helpers
# ---------------------------------------------------------------------------
async def _ollama_complete(prompt: str, max_tokens: int = 1024) -> str:
"""Send a prompt to Ollama and return the response text.
Returns empty string on failure (graceful degradation).
"""
try:
import httpx
from config import settings
url = f"{settings.normalized_ollama_url}/api/generate"
payload: dict[str, Any] = {
"model": settings.ollama_model,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": max_tokens,
"temperature": 0.3,
},
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
return data.get("response", "").strip()
except Exception as exc:
logger.warning("Ollama completion failed: %s", exc)
return ""
async def _claude_complete(prompt: str, max_tokens: int = 1024) -> str:
"""Send a prompt to Claude API as a last-resort fallback.
Only active when ANTHROPIC_API_KEY is configured.
Returns empty string on failure or missing key.
"""
try:
from config import settings
if not settings.anthropic_api_key:
return ""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend()
result = await asyncio.to_thread(backend.run, prompt)
return result.content.strip()
except Exception as exc:
logger.warning("Claude fallback failed: %s", exc)
return ""
# ---------------------------------------------------------------------------
# Memory cache (Step 0 + Step 6)
# ---------------------------------------------------------------------------
def _check_cache(topic: str) -> tuple[str | None, float]:
"""Search semantic memory for a prior result on this topic.
Returns (cached_report, similarity) or (None, 0.0).
"""
try:
if SemanticMemory is None:
return None, 0.0
mem = SemanticMemory()
hits = mem.search(topic, top_k=1)
if hits:
content, score = hits[0]
if score >= _CACHE_HIT_THRESHOLD:
return content, score
except Exception as exc:
logger.debug("Cache check failed: %s", exc)
return None, 0.0
def _store_result(topic: str, report: str) -> None:
"""Index the research report into semantic memory for future retrieval."""
try:
if store_memory is None:
logger.debug("store_memory not available — skipping memory index")
return
store_memory(
content=report,
source="research_pipeline",
context_type="research",
metadata={"topic": topic},
)
logger.info("Research result indexed for topic: %r", topic)
except Exception as exc:
logger.warning("Failed to store research result: %s", exc)
def _save_to_disk(topic: str, report: str) -> Path | None:
"""Persist the report as a markdown file under docs/research/.
Filename is derived from the topic (slugified). Returns the path or None.
"""
try:
slug = re.sub(r"[^a-z0-9]+", "-", topic.lower()).strip("-")[:60]
_DOCS_ROOT.mkdir(parents=True, exist_ok=True)
path = _DOCS_ROOT / f"{slug}.md"
path.write_text(report, encoding="utf-8")
logger.info("Research report saved to %s", path)
return path
except Exception as exc:
logger.warning("Failed to save research report to disk: %s", exc)
return None
# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------
async def run_research(
topic: str,
template: str | None = None,
slots: dict[str, str] | None = None,
save_to_disk: bool = False,
skip_cache: bool = False,
) -> ResearchResult:
"""Run the full 6-step autonomous research pipeline.
Args:
topic: The research question or subject.
template: Name of a template from skills/research/ (e.g. "tool_evaluation").
If None, runs without a template scaffold.
slots: Placeholder values for the template (e.g. {"domain": "PDF parsing"}).
save_to_disk: If True, write the report to docs/research/<slug>.md.
skip_cache: If True, bypass the semantic memory cache.
Returns:
ResearchResult with report and metadata.
"""
errors: list[str] = []
# ------------------------------------------------------------------
# Step 0 — check cache
# ------------------------------------------------------------------
if not skip_cache:
cached, score = _check_cache(topic)
if cached:
logger.info("Cache hit (%.2f) for topic: %r", score, topic)
return ResearchResult(
topic=topic,
query_count=0,
sources_fetched=0,
report=cached,
cached=True,
cache_similarity=score,
synthesis_backend="cache",
)
# ------------------------------------------------------------------
# Step 1 — load template (optional)
# ------------------------------------------------------------------
template_context = ""
if template:
try:
template_context = load_template(template, slots)
except FileNotFoundError as exc:
errors.append(str(exc))
logger.warning("Template load failed: %s", exc)
# ------------------------------------------------------------------
# Step 2 — formulate queries
# ------------------------------------------------------------------
queries = await _formulate_queries(topic, template_context)
logger.info("Formulated %d queries for topic: %r", len(queries), topic)
# ------------------------------------------------------------------
# Step 3 — execute search
# ------------------------------------------------------------------
search_results = await _execute_search(queries)
logger.info("Search returned %d results", len(search_results))
snippets = [r.get("snippet", "") for r in search_results if r.get("snippet")]
# ------------------------------------------------------------------
# Step 4 — fetch full pages
# ------------------------------------------------------------------
pages = await _fetch_pages(search_results)
logger.info("Fetched %d pages", len(pages))
# ------------------------------------------------------------------
# Step 5 — synthesize
# ------------------------------------------------------------------
report, backend = await _synthesize(topic, pages, snippets)
# ------------------------------------------------------------------
# Step 6 — deliver
# ------------------------------------------------------------------
_store_result(topic, report)
if save_to_disk:
_save_to_disk(topic, report)
return ResearchResult(
topic=topic,
query_count=len(queries),
sources_fetched=len(pages),
report=report,
cached=False,
synthesis_backend=backend,
errors=errors,
)

View File

@@ -7,6 +7,8 @@ from unittest.mock import patch
import pytest
import infrastructure.events.bus as bus_module
pytestmark = pytest.mark.unit
from infrastructure.events.bus import (
Event,
EventBus,
@@ -352,6 +354,14 @@ class TestEventBusPersistence:
events = bus.replay()
assert events == []
def test_init_persistence_db_noop_when_path_is_none(self):
"""_init_persistence_db() is a no-op when _persistence_db_path is None."""
bus = EventBus()
# _persistence_db_path is None by default; calling _init_persistence_db
# should silently return without touching the filesystem.
bus._init_persistence_db() # must not raise
assert bus._persistence_db_path is None
async def test_wal_mode_on_persistence_db(self, persistent_bus):
"""Persistence database should use WAL mode."""
conn = sqlite3.connect(str(persistent_bus._persistence_db_path))

View File

@@ -1,411 +0,0 @@
"""Unit tests for the vLLM inference backend (issue #1281).
Covers:
- vllm provider type in CascadeRouter availability check
- _call_vllm method (mocked OpenAI client)
- providers.yaml loads vllm-local entry
- vLLM health check helpers in dashboard routes
- config.py has vllm backend option
"""
from __future__ import annotations
import time
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import yaml
from infrastructure.router.cascade import CascadeRouter, Provider, ProviderStatus
# ── Provider availability checks ────────────────────────────────────────────
@pytest.mark.unit
class TestVllmProviderAvailability:
"""Test _check_provider_available for vllm provider type."""
def _make_vllm_provider(self, url: str = "http://localhost:8001/v1") -> Provider:
return Provider(
name="vllm-local",
type="vllm",
enabled=True,
priority=3,
base_url=url,
models=[{"name": "Qwen/Qwen2.5-14B-Instruct", "default": True}],
)
def test_available_when_health_200(self, tmp_path):
"""Provider is available when /health returns 200."""
provider = self._make_vllm_provider()
router = CascadeRouter(config_path=tmp_path / "none.yaml")
mock_response = MagicMock()
mock_response.status_code = 200
with patch("infrastructure.router.cascade.requests") as mock_requests:
mock_requests.get.return_value = mock_response
available = router._check_provider_available(provider)
assert available is True
# Verify the health endpoint was called (root, not /v1)
call_args = mock_requests.get.call_args[0][0]
assert call_args.endswith("/health")
assert "/v1" not in call_args
def test_unavailable_when_health_non_200(self, tmp_path):
"""Provider is unavailable when /health returns non-200."""
provider = self._make_vllm_provider()
router = CascadeRouter(config_path=tmp_path / "none.yaml")
mock_response = MagicMock()
mock_response.status_code = 503
with patch("infrastructure.router.cascade.requests") as mock_requests:
mock_requests.get.return_value = mock_response
available = router._check_provider_available(provider)
assert available is False
def test_unavailable_on_connection_error(self, tmp_path):
"""Provider is unavailable when connection fails."""
provider = self._make_vllm_provider()
router = CascadeRouter(config_path=tmp_path / "none.yaml")
with patch("infrastructure.router.cascade.requests") as mock_requests:
mock_requests.get.side_effect = ConnectionError("refused")
available = router._check_provider_available(provider)
assert available is False
def test_strips_v1_suffix_for_health_check(self, tmp_path):
"""Health check URL strips /v1 before appending /health."""
provider = self._make_vllm_provider(url="http://localhost:8001/v1")
router = CascadeRouter(config_path=tmp_path / "none.yaml")
mock_response = MagicMock()
mock_response.status_code = 200
with patch("infrastructure.router.cascade.requests") as mock_requests:
mock_requests.get.return_value = mock_response
router._check_provider_available(provider)
called_url = mock_requests.get.call_args[0][0]
assert called_url == "http://localhost:8001/health"
def test_assumes_available_when_requests_none(self, tmp_path):
"""Gracefully assumes available when requests library is absent."""
provider = self._make_vllm_provider()
router = CascadeRouter(config_path=tmp_path / "none.yaml")
with patch("infrastructure.router.cascade.requests", None):
available = router._check_provider_available(provider)
assert available is True
# ── _call_vllm method ────────────────────────────────────────────────────────
@pytest.mark.unit
class TestCallVllm:
"""Test CascadeRouter._call_vllm."""
def _make_router(self, tmp_path: Path) -> CascadeRouter:
return CascadeRouter(config_path=tmp_path / "none.yaml")
def _make_provider(self, base_url: str = "http://localhost:8001") -> Provider:
return Provider(
name="vllm-local",
type="vllm",
enabled=True,
priority=3,
base_url=base_url,
models=[{"name": "Qwen/Qwen2.5-14B-Instruct", "default": True}],
)
@pytest.mark.asyncio
async def test_returns_content_and_model(self, tmp_path):
"""_call_vllm returns content and model name from API response."""
router = self._make_router(tmp_path)
provider = self._make_provider()
mock_choice = MagicMock()
mock_choice.message.content = "Hello from vLLM!"
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.model = "Qwen/Qwen2.5-14B-Instruct"
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
with patch("openai.AsyncOpenAI", return_value=mock_client):
result = await router._call_vllm(
provider=provider,
messages=[{"role": "user", "content": "hi"}],
model="Qwen/Qwen2.5-14B-Instruct",
temperature=0.7,
max_tokens=None,
)
assert result["content"] == "Hello from vLLM!"
assert result["model"] == "Qwen/Qwen2.5-14B-Instruct"
@pytest.mark.asyncio
async def test_appends_v1_to_base_url(self, tmp_path):
"""_call_vllm always points the OpenAI client at base_url/v1."""
router = self._make_router(tmp_path)
provider = self._make_provider(base_url="http://localhost:8001")
mock_choice = MagicMock()
mock_choice.message.content = "ok"
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.model = "model"
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
with patch("openai.AsyncOpenAI", return_value=mock_client) as mock_openai:
await router._call_vllm(
provider=provider,
messages=[{"role": "user", "content": "hi"}],
model="model",
temperature=0.0,
max_tokens=None,
)
_, kwargs = mock_openai.call_args
assert kwargs["base_url"].endswith("/v1")
@pytest.mark.asyncio
async def test_does_not_double_v1(self, tmp_path):
"""_call_vllm does not append /v1 if base_url already ends with it."""
router = self._make_router(tmp_path)
provider = self._make_provider(base_url="http://localhost:8001/v1")
mock_choice = MagicMock()
mock_choice.message.content = "ok"
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.model = "model"
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
with patch("openai.AsyncOpenAI", return_value=mock_client) as mock_openai:
await router._call_vllm(
provider=provider,
messages=[{"role": "user", "content": "hi"}],
model="model",
temperature=0.0,
max_tokens=None,
)
_, kwargs = mock_openai.call_args
assert kwargs["base_url"] == "http://localhost:8001/v1"
@pytest.mark.asyncio
async def test_max_tokens_passed_when_set(self, tmp_path):
"""max_tokens is forwarded to the API when provided."""
router = self._make_router(tmp_path)
provider = self._make_provider()
mock_choice = MagicMock()
mock_choice.message.content = "ok"
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.model = "model"
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
with patch("openai.AsyncOpenAI", return_value=mock_client):
await router._call_vllm(
provider=provider,
messages=[{"role": "user", "content": "hi"}],
model="model",
temperature=0.0,
max_tokens=256,
)
call_kwargs = mock_client.chat.completions.create.call_args[1]
assert call_kwargs.get("max_tokens") == 256
@pytest.mark.asyncio
async def test_max_tokens_omitted_when_none(self, tmp_path):
"""max_tokens key is absent when not provided."""
router = self._make_router(tmp_path)
provider = self._make_provider()
mock_choice = MagicMock()
mock_choice.message.content = "ok"
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.model = "model"
mock_client = AsyncMock()
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
with patch("openai.AsyncOpenAI", return_value=mock_client):
await router._call_vllm(
provider=provider,
messages=[{"role": "user", "content": "hi"}],
model="model",
temperature=0.0,
max_tokens=None,
)
call_kwargs = mock_client.chat.completions.create.call_args[1]
assert "max_tokens" not in call_kwargs
# ── providers.yaml loads vllm-local ─────────────────────────────────────────
@pytest.mark.unit
class TestProvidersYamlVllm:
"""Verify providers.yaml contains a valid vllm-local entry."""
def test_vllm_local_entry_exists(self):
"""providers.yaml has a vllm-local provider of type vllm."""
config_path = Path(__file__).parents[2] / "config" / "providers.yaml"
assert config_path.exists(), "config/providers.yaml not found"
with config_path.open() as f:
config = yaml.safe_load(f)
providers = config.get("providers", [])
vllm_providers = [p for p in providers if p.get("type") == "vllm"]
assert vllm_providers, "No provider with type=vllm found in providers.yaml"
vllm_local = next((p for p in vllm_providers if p["name"] == "vllm-local"), None)
assert vllm_local is not None, "vllm-local provider not found in providers.yaml"
def test_vllm_local_disabled_by_default(self):
"""vllm-local is disabled by default so the router stays on Ollama."""
config_path = Path(__file__).parents[2] / "config" / "providers.yaml"
with config_path.open() as f:
config = yaml.safe_load(f)
providers = config.get("providers", [])
vllm_local = next((p for p in providers if p.get("name") == "vllm-local"), None)
assert vllm_local is not None
assert vllm_local.get("enabled") is False, "vllm-local should be disabled by default"
def test_vllm_local_has_default_model(self):
"""vllm-local has at least one model with a context window."""
config_path = Path(__file__).parents[2] / "config" / "providers.yaml"
with config_path.open() as f:
config = yaml.safe_load(f)
providers = config.get("providers", [])
vllm_local = next((p for p in providers if p.get("name") == "vllm-local"), None)
assert vllm_local is not None
models = vllm_local.get("models", [])
assert models, "vllm-local must declare at least one model"
default_models = [m for m in models if m.get("default")]
assert default_models, "vllm-local must have a model marked default: true"
# ── config.py backend option ─────────────────────────────────────────────────
@pytest.mark.unit
class TestConfigVllmBackend:
"""Verify config.py exposes the vllm backend option."""
def test_vllm_is_valid_backend(self):
"""timmy_model_backend accepts 'vllm' without validation errors."""
from config import Settings
s = Settings(timmy_model_backend="vllm")
assert s.timmy_model_backend == "vllm"
def test_vllm_url_default(self):
"""vllm_url has a sensible default."""
from config import Settings
s = Settings()
assert s.vllm_url.startswith("http://")
def test_vllm_model_default(self):
"""vllm_model has a sensible default."""
from config import Settings
s = Settings()
assert s.vllm_model # non-empty string
# ── Health check helpers ─────────────────────────────────────────────────────
@pytest.mark.unit
class TestVllmHealthCheck:
"""Test _check_vllm_sync and _check_vllm."""
def test_sync_returns_healthy_on_200(self):
"""_check_vllm_sync returns 'healthy' when server responds 200."""
import urllib.request
from dashboard.routes.health import _check_vllm_sync
mock_response = MagicMock()
mock_response.status = 200
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
with patch.object(urllib.request, "urlopen", return_value=mock_response):
result = _check_vllm_sync()
assert result.status == "healthy"
assert result.name == "vLLM"
def test_sync_returns_unavailable_on_connection_error(self):
"""_check_vllm_sync returns 'unavailable' when server is unreachable."""
import urllib.error
import urllib.request
from dashboard.routes.health import _check_vllm_sync
with patch.object(urllib.request, "urlopen", side_effect=urllib.error.URLError("refused")):
result = _check_vllm_sync()
assert result.status == "unavailable"
assert result.name == "vLLM"
@pytest.mark.asyncio
async def test_async_caches_result(self):
"""_check_vllm caches the result for _VLLM_CACHE_TTL seconds."""
import dashboard.routes.health as health_module
from dashboard.routes.health import _check_vllm
# Reset cache
health_module._vllm_cache = None
health_module._vllm_cache_ts = 0.0
mock_dep = MagicMock()
mock_dep.status = "healthy"
with patch("dashboard.routes.health._check_vllm_sync", return_value=mock_dep):
result1 = await _check_vllm()
result2 = await _check_vllm() # should hit cache
assert result1 is result2 # same object returned from cache
@pytest.mark.asyncio
async def test_async_refreshes_after_ttl(self):
"""_check_vllm refreshes the cache after the TTL expires."""
import dashboard.routes.health as health_module
from dashboard.routes.health import _VLLM_CACHE_TTL, _check_vllm
# Expire the cache
health_module._vllm_cache = None
health_module._vllm_cache_ts = time.monotonic() - _VLLM_CACHE_TTL - 1
mock_dep = MagicMock()
mock_dep.status = "unavailable"
with patch("dashboard.routes.health._check_vllm_sync", return_value=mock_dep) as mock_fn:
await _check_vllm()
mock_fn.assert_called_once()

View File

@@ -1,403 +0,0 @@
"""Unit tests for src/timmy/research.py — ResearchOrchestrator pipeline.
Refs #972 (governing spec), #975 (ResearchOrchestrator).
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
pytestmark = pytest.mark.unit
# ---------------------------------------------------------------------------
# list_templates
# ---------------------------------------------------------------------------
class TestListTemplates:
def test_returns_list(self, tmp_path, monkeypatch):
(tmp_path / "tool_evaluation.md").write_text("---\n---\n# T")
(tmp_path / "game_analysis.md").write_text("---\n---\n# G")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import list_templates
result = list_templates()
assert isinstance(result, list)
assert "tool_evaluation" in result
assert "game_analysis" in result
def test_returns_empty_when_dir_missing(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path / "nonexistent")
from timmy.research import list_templates
assert list_templates() == []
# ---------------------------------------------------------------------------
# load_template
# ---------------------------------------------------------------------------
class TestLoadTemplate:
def _write_template(self, path: Path, name: str, body: str) -> None:
(path / f"{name}.md").write_text(body, encoding="utf-8")
def test_loads_and_strips_frontmatter(self, tmp_path, monkeypatch):
self._write_template(
tmp_path,
"tool_evaluation",
"---\nname: Tool Evaluation\ntype: research\n---\n# Tool Eval: {domain}",
)
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
result = load_template("tool_evaluation", {"domain": "PDF parsing"})
assert "# Tool Eval: PDF parsing" in result
assert "name: Tool Evaluation" not in result
def test_fills_slots(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "arch", "Connect {system_a} to {system_b}")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
result = load_template("arch", {"system_a": "Kafka", "system_b": "Postgres"})
assert "Kafka" in result
assert "Postgres" in result
def test_unfilled_slots_preserved(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "t", "Hello {name} and {other}")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
result = load_template("t", {"name": "World"})
assert "{other}" in result
def test_raises_file_not_found_for_missing_template(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
with pytest.raises(FileNotFoundError, match="nonexistent"):
load_template("nonexistent")
def test_no_slots_returns_raw_body(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "plain", "---\n---\nJust text here")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
result = load_template("plain")
assert result == "Just text here"
# ---------------------------------------------------------------------------
# _check_cache
# ---------------------------------------------------------------------------
class TestCheckCache:
def test_returns_none_when_no_hits(self):
mock_mem = MagicMock()
mock_mem.search.return_value = []
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("some topic")
assert content is None
assert score == 0.0
def test_returns_content_above_threshold(self):
mock_mem = MagicMock()
mock_mem.search.return_value = [("cached report text", 0.91)]
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("same topic")
assert content == "cached report text"
assert score == pytest.approx(0.91)
def test_returns_none_below_threshold(self):
mock_mem = MagicMock()
mock_mem.search.return_value = [("old report", 0.60)]
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("slightly different topic")
assert content is None
assert score == 0.0
def test_degrades_gracefully_on_import_error(self):
with patch("timmy.research.SemanticMemory", None):
from timmy.research import _check_cache
content, score = _check_cache("topic")
assert content is None
assert score == 0.0
# ---------------------------------------------------------------------------
# _store_result
# ---------------------------------------------------------------------------
class TestStoreResult:
def test_calls_store_memory(self):
mock_store = MagicMock()
with patch("timmy.research.store_memory", mock_store):
from timmy.research import _store_result
_store_result("test topic", "# Report\n\nContent here.")
mock_store.assert_called_once()
call_kwargs = mock_store.call_args
assert "test topic" in str(call_kwargs)
def test_degrades_gracefully_on_error(self):
mock_store = MagicMock(side_effect=RuntimeError("db error"))
with patch("timmy.research.store_memory", mock_store):
from timmy.research import _store_result
# Should not raise
_store_result("topic", "report")
# ---------------------------------------------------------------------------
# _save_to_disk
# ---------------------------------------------------------------------------
class TestSaveToDisk:
def test_writes_file(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
from timmy.research import _save_to_disk
path = _save_to_disk("Test Topic: PDF Parsing", "# Test Report")
assert path is not None
assert path.exists()
assert path.read_text() == "# Test Report"
def test_slugifies_topic_name(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
from timmy.research import _save_to_disk
path = _save_to_disk("My Complex Topic! v2.0", "content")
assert path is not None
# Should be slugified: no special chars
assert " " not in path.name
assert "!" not in path.name
def test_returns_none_on_error(self, monkeypatch):
monkeypatch.setattr(
"timmy.research._DOCS_ROOT",
Path("/nonexistent_root/deeply/nested"),
)
with patch("pathlib.Path.mkdir", side_effect=PermissionError("denied")):
from timmy.research import _save_to_disk
result = _save_to_disk("topic", "report")
assert result is None
# ---------------------------------------------------------------------------
# run_research — end-to-end with mocks
# ---------------------------------------------------------------------------
class TestRunResearch:
@pytest.mark.asyncio
async def test_returns_cached_result_when_cache_hit(self):
cached_report = "# Cached Report\n\nPreviously computed."
with (
patch("timmy.research._check_cache", return_value=(cached_report, 0.93)),
):
from timmy.research import run_research
result = await run_research("some topic")
assert result.cached is True
assert result.cache_similarity == pytest.approx(0.93)
assert result.report == cached_report
assert result.synthesis_backend == "cache"
@pytest.mark.asyncio
async def test_skips_cache_when_requested(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=("cached", 0.99)) as mock_cache,
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Fresh report", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("topic", skip_cache=True)
mock_cache.assert_not_called()
assert result.cached is False
assert result.report == "# Fresh report"
@pytest.mark.asyncio
async def test_full_pipeline_no_search_results(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["query 1", "query 2"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Report", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("a new topic")
assert not result.cached
assert result.query_count == 2
assert result.sources_fetched == 0
assert result.report == "# Report"
assert result.synthesis_backend == "ollama"
@pytest.mark.asyncio
async def test_returns_result_with_error_on_bad_template(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Report", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("topic", template="nonexistent_template")
assert len(result.errors) == 1
assert "nonexistent_template" in result.errors[0]
@pytest.mark.asyncio
async def test_saves_to_disk_when_requested(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Saved Report", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("disk topic", save_to_disk=True)
assert result.report == "# Saved Report"
saved_files = list((tmp_path / "research").glob("*.md"))
assert len(saved_files) == 1
assert saved_files[0].read_text() == "# Saved Report"
@pytest.mark.asyncio
async def test_result_is_not_empty_after_synthesis(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["q"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Non-empty", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("topic")
assert not result.is_empty()
# ---------------------------------------------------------------------------
# ResearchResult
# ---------------------------------------------------------------------------
class TestResearchResult:
def test_is_empty_when_no_report(self):
from timmy.research import ResearchResult
r = ResearchResult(topic="t", query_count=0, sources_fetched=0, report="")
assert r.is_empty()
def test_is_not_empty_with_content(self):
from timmy.research import ResearchResult
r = ResearchResult(topic="t", query_count=1, sources_fetched=1, report="# Report")
assert not r.is_empty()
def test_default_cached_false(self):
from timmy.research import ResearchResult
r = ResearchResult(topic="t", query_count=0, sources_fetched=0, report="x")
assert r.cached is False
def test_errors_defaults_to_empty_list(self):
from timmy.research import ResearchResult
r = ResearchResult(topic="t", query_count=0, sources_fetched=0, report="x")
assert r.errors == []

View File

@@ -1,270 +0,0 @@
"""Tests for Daily Run orchestrator — health snapshot integration.
Verifies that the orchestrator runs a pre-flight health snapshot before
any coding work begins, and aborts on red status unless --force is passed.
Refs: #923
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Add timmy_automations to path for imports
_TA_PATH = Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run"
if str(_TA_PATH) not in sys.path:
sys.path.insert(0, str(_TA_PATH))
# Also add utils path
_TA_UTILS = Path(__file__).resolve().parent.parent.parent / "timmy_automations"
if str(_TA_UTILS) not in sys.path:
sys.path.insert(0, str(_TA_UTILS))
import health_snapshot as hs
import orchestrator as orch
def _make_snapshot(overall_status: str) -> hs.HealthSnapshot:
"""Build a minimal HealthSnapshot for testing."""
return hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status=overall_status,
ci=hs.CISignal(status="pass", message="CI passing"),
issues=hs.IssueSignal(count=0, p0_count=0, p1_count=0),
flakiness=hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
),
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
)
def _make_red_snapshot() -> hs.HealthSnapshot:
return hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status="red",
ci=hs.CISignal(status="fail", message="CI failed"),
issues=hs.IssueSignal(count=1, p0_count=1, p1_count=0),
flakiness=hs.FlakinessSignal(
status="critical",
recent_failures=8,
recent_cycles=10,
failure_rate=0.8,
message="High flakiness",
),
tokens=hs.TokenEconomySignal(status="unknown", message="No data"),
)
def _default_args(**overrides) -> argparse.Namespace:
"""Build an argparse Namespace with defaults matching the orchestrator flags."""
defaults = {
"review": False,
"json": False,
"max_items": None,
"skip_health_check": False,
"force": False,
}
defaults.update(overrides)
return argparse.Namespace(**defaults)
class TestRunHealthSnapshot:
"""Test run_health_snapshot() — the pre-flight check called by main()."""
def test_green_returns_zero(self, capsys):
"""Green snapshot returns 0 (proceed)."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_snapshot("green")):
rc = orch.run_health_snapshot(args)
assert rc == 0
def test_yellow_returns_zero(self, capsys):
"""Yellow snapshot returns 0 (proceed with caution)."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_snapshot("yellow")):
rc = orch.run_health_snapshot(args)
assert rc == 0
def test_red_returns_one(self, capsys):
"""Red snapshot returns 1 (abort)."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()):
rc = orch.run_health_snapshot(args)
assert rc == 1
def test_red_with_force_returns_zero(self, capsys):
"""Red snapshot with --force returns 0 (proceed anyway)."""
args = _default_args(force=True)
with patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()):
rc = orch.run_health_snapshot(args)
assert rc == 0
def test_snapshot_exception_is_skipped(self, capsys):
"""If health snapshot raises, it degrades gracefully and returns 0."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", side_effect=RuntimeError("boom")):
rc = orch.run_health_snapshot(args)
assert rc == 0
captured = capsys.readouterr()
assert "warning" in captured.err.lower() or "skipping" in captured.err.lower()
def test_snapshot_prints_summary(self, capsys):
"""Health snapshot prints a pre-flight summary block."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_snapshot("green")):
orch.run_health_snapshot(args)
captured = capsys.readouterr()
assert "PRE-FLIGHT HEALTH CHECK" in captured.out
assert "CI" in captured.out
def test_red_prints_abort_message(self, capsys):
"""Red snapshot prints an abort message to stderr."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()):
orch.run_health_snapshot(args)
captured = capsys.readouterr()
assert "RED" in captured.err or "aborting" in captured.err.lower()
def test_p0_issues_shown_in_output(self, capsys):
"""P0 issue count is shown in the pre-flight output."""
args = _default_args()
snapshot = hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status="red",
ci=hs.CISignal(status="pass", message="CI passing"),
issues=hs.IssueSignal(count=2, p0_count=2, p1_count=0),
flakiness=hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
),
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
)
with patch.object(orch, "_generate_health_snapshot", return_value=snapshot):
orch.run_health_snapshot(args)
captured = capsys.readouterr()
assert "P0" in captured.out
class TestMainHealthCheckIntegration:
"""Test that main() runs health snapshot before any coding work."""
def _patch_gitea_unavailable(self):
return patch.object(orch.GiteaClient, "is_available", return_value=False)
def test_main_runs_health_check_before_gitea(self):
"""Health snapshot is called before Gitea client work."""
call_order = []
def fake_snapshot(*_a, **_kw):
call_order.append("health")
return _make_snapshot("green")
def fake_gitea_available(self):
call_order.append("gitea")
return False
args = _default_args()
with (
patch.object(orch, "_generate_health_snapshot", side_effect=fake_snapshot),
patch.object(orch.GiteaClient, "is_available", fake_gitea_available),
patch("sys.argv", ["orchestrator"]),
):
orch.main()
assert call_order.index("health") < call_order.index("gitea")
def test_main_aborts_on_red_before_gitea(self):
"""main() aborts with non-zero exit code when health is red."""
gitea_called = []
def fake_gitea_available(self):
gitea_called.append(True)
return True
with (
patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()),
patch.object(orch.GiteaClient, "is_available", fake_gitea_available),
patch("sys.argv", ["orchestrator"]),
):
rc = orch.main()
assert rc != 0
assert not gitea_called, "Gitea should NOT be called when health is red"
def test_main_skips_health_check_with_flag(self):
"""--skip-health-check bypasses the pre-flight snapshot."""
health_called = []
def fake_snapshot(*_a, **_kw):
health_called.append(True)
return _make_snapshot("green")
with (
patch.object(orch, "_generate_health_snapshot", side_effect=fake_snapshot),
patch.object(orch.GiteaClient, "is_available", return_value=False),
patch("sys.argv", ["orchestrator", "--skip-health-check"]),
):
orch.main()
assert not health_called, "Health snapshot should be skipped"
def test_main_force_flag_continues_despite_red(self):
"""--force allows Daily Run to continue even when health is red."""
gitea_called = []
def fake_gitea_available(self):
gitea_called.append(True)
return False # Gitea unavailable → exits early but after health check
with (
patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()),
patch.object(orch.GiteaClient, "is_available", fake_gitea_available),
patch("sys.argv", ["orchestrator", "--force"]),
):
orch.main()
# Gitea was reached despite red status because --force was passed
assert gitea_called
def test_main_json_output_on_red_includes_error(self, capsys):
"""JSON output includes error key when health is red."""
with (
patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()),
patch.object(orch.GiteaClient, "is_available", return_value=True),
patch("sys.argv", ["orchestrator", "--json"]),
):
rc = orch.main()
assert rc != 0
captured = capsys.readouterr()
data = json.loads(captured.out)
assert "error" in data

View File

@@ -4,13 +4,10 @@
Connects to local Gitea, fetches candidate issues, and produces a concise agenda
plus a day summary (review mode).
The Daily Run begins with a Quick Health Snapshot (#710) to ensure mandatory
systems are green before burning cycles on work that cannot land.
Run: python3 timmy_automations/daily_run/orchestrator.py [--review]
Env: See timmy_automations/config/daily_run.json for configuration
Refs: #703, #923
Refs: #703
"""
from __future__ import annotations
@@ -33,11 +30,6 @@ sys.path.insert(
)
from utils.token_rules import TokenRules, compute_token_reward
# Health snapshot lives in the same package
from health_snapshot import generate_snapshot as _generate_health_snapshot
from health_snapshot import get_token as _hs_get_token
from health_snapshot import load_config as _hs_load_config
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
@@ -503,16 +495,6 @@ def parse_args() -> argparse.Namespace:
default=None,
help="Override max agenda items",
)
p.add_argument(
"--skip-health-check",
action="store_true",
help="Skip the pre-flight health snapshot (not recommended)",
)
p.add_argument(
"--force",
action="store_true",
help="Continue even if health snapshot is red (overrides abort-on-red)",
)
return p.parse_args()
@@ -553,76 +535,6 @@ def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]:
}
def run_health_snapshot(args: argparse.Namespace) -> int:
"""Run pre-flight health snapshot and return 0 (ok) or 1 (abort).
Prints a concise summary of CI, issues, flakiness, and token economy.
Returns 1 if the overall status is red AND --force was not passed.
Returns 0 for green/yellow or when --force is active.
On any import/runtime error the check is skipped with a warning.
"""
try:
hs_config = _hs_load_config()
hs_token = _hs_get_token(hs_config)
snapshot = _generate_health_snapshot(hs_config, hs_token)
except Exception as exc: # noqa: BLE001
print(f"[health] Warning: health snapshot failed ({exc}) — skipping", file=sys.stderr)
return 0
# Print concise pre-flight header
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
snapshot.overall_status, ""
)
print("" * 60)
print(f"PRE-FLIGHT HEALTH CHECK {status_emoji} {snapshot.overall_status.upper()}")
print("" * 60)
ci_emoji = {"pass": "", "fail": "", "unknown": "⚠️", "unavailable": ""}.get(
snapshot.ci.status, ""
)
print(f" {ci_emoji} CI: {snapshot.ci.message}")
if snapshot.issues.p0_count > 0:
issue_emoji = "🔴"
elif snapshot.issues.p1_count > 0:
issue_emoji = "🟡"
else:
issue_emoji = ""
critical_str = f"{snapshot.issues.count} critical"
if snapshot.issues.p0_count:
critical_str += f" (P0: {snapshot.issues.p0_count})"
if snapshot.issues.p1_count:
critical_str += f" (P1: {snapshot.issues.p1_count})"
print(f" {issue_emoji} Issues: {critical_str}")
flak_emoji = {"healthy": "", "degraded": "🟡", "critical": "🔴", "unknown": ""}.get(
snapshot.flakiness.status, ""
)
print(f" {flak_emoji} Flakiness: {snapshot.flakiness.message}")
token_emoji = {"balanced": "", "inflationary": "🟡", "deflationary": "🔵", "unknown": ""}.get(
snapshot.tokens.status, ""
)
print(f" {token_emoji} Tokens: {snapshot.tokens.message}")
print()
if snapshot.overall_status == "red" and not args.force:
print(
"🛑 Health status is RED — aborting Daily Run to avoid burning cycles.",
file=sys.stderr,
)
print(
" Fix the issues above or re-run with --force to override.",
file=sys.stderr,
)
return 1
if snapshot.overall_status == "red":
print("⚠️ Health is RED but --force passed — proceeding anyway.", file=sys.stderr)
return 0
def main() -> int:
args = parse_args()
config = load_config()
@@ -630,15 +542,6 @@ def main() -> int:
if args.max_items:
config["max_agenda_items"] = args.max_items
# ── Step 0: Pre-flight health snapshot ──────────────────────────────────
if not args.skip_health_check:
health_rc = run_health_snapshot(args)
if health_rc != 0:
tokens = compute_daily_run_tokens(success=False)
if args.json:
print(json.dumps({"error": "health_check_failed", "tokens": tokens}))
return health_rc
token = get_token(config)
client = GiteaClient(config, token)