[loop-cycle-7] refactor: split research.py into research/ subpackage (#1405) #1458

Merged
Timmy merged 1 commits from refactor/split-research-py into main 2026-03-24 19:53:05 +00:00
62 changed files with 1049 additions and 885 deletions
Showing only changes of commit c0ade2e164 - Show all commits

View File

@@ -140,7 +140,7 @@ ignore = [
known-first-party = ["config", "dashboard", "infrastructure", "integrations", "spark", "timmy", "timmy_serve"]
[tool.ruff.lint.per-file-ignores]
"tests/**" = ["S"]
"tests/**" = ["S", "E402"]
[tool.coverage.run]
source = ["src"]

View File

@@ -3,6 +3,7 @@
All environment variable access goes through the ``settings`` singleton
exported from this module — never use ``os.environ.get()`` in app code.
"""
import logging as _logging
import os
import sys

View File

@@ -112,9 +112,7 @@ def _ensure_index_sync(client) -> None:
pass # Index already exists
idx = client.index(_INDEX_NAME)
try:
idx.update_searchable_attributes(
["title", "description", "tags", "highlight_ids"]
)
idx.update_searchable_attributes(["title", "description", "tags", "highlight_ids"])
idx.update_filterable_attributes(["tags", "published_at"])
idx.update_sortable_attributes(["published_at", "duration"])
except Exception as exc:

View File

@@ -191,9 +191,7 @@ def _compose_sync(spec: EpisodeSpec) -> EpisodeResult:
loops = int(final.duration / music.duration) + 1
from moviepy import concatenate_audioclips # type: ignore[import]
music = concatenate_audioclips([music] * loops).subclipped(
0, final.duration
)
music = concatenate_audioclips([music] * loops).subclipped(0, final.duration)
else:
music = music.subclipped(0, final.duration)
audio_tracks.append(music)

View File

@@ -56,13 +56,20 @@ def _build_ffmpeg_cmd(
return [
"ffmpeg",
"-y", # overwrite output
"-ss", str(start),
"-i", source,
"-t", str(duration),
"-avoid_negative_ts", "make_zero",
"-c:v", settings.default_video_codec,
"-c:a", "aac",
"-movflags", "+faststart",
"-ss",
str(start),
"-i",
source,
"-t",
str(duration),
"-avoid_negative_ts",
"make_zero",
"-c:v",
settings.default_video_codec,
"-c:a",
"aac",
"-movflags",
"+faststart",
output,
]

View File

@@ -81,8 +81,10 @@ async def _generate_piper(text: str, output_path: str) -> NarrationResult:
model = settings.content_piper_model
cmd = [
"piper",
"--model", model,
"--output_file", output_path,
"--model",
model,
"--output_file",
output_path,
]
try:
proc = await asyncio.create_subprocess_exec(
@@ -184,8 +186,6 @@ def build_episode_script(
if outro_text:
lines.append(outro_text)
else:
lines.append(
"Thanks for watching. Like and subscribe to stay updated on future episodes."
)
lines.append("Thanks for watching. Like and subscribe to stay updated on future episodes.")
return "\n".join(lines)

View File

@@ -205,9 +205,7 @@ async def publish_episode(
Always returns a result; never raises.
"""
if not Path(video_path).exists():
return NostrPublishResult(
success=False, error=f"video file not found: {video_path!r}"
)
return NostrPublishResult(success=False, error=f"video file not found: {video_path!r}")
file_size = Path(video_path).stat().st_size
_tags = tags or []

View File

@@ -209,9 +209,7 @@ async def upload_episode(
)
if not Path(video_path).exists():
return YouTubeUploadResult(
success=False, error=f"video file not found: {video_path!r}"
)
return YouTubeUploadResult(success=False, error=f"video file not found: {video_path!r}")
if _daily_upload_count() >= _UPLOADS_PER_DAY_MAX:
return YouTubeUploadResult(

View File

@@ -1,4 +1,5 @@
"""SQLAlchemy ORM models for the CALM task-management and journaling system."""
from datetime import UTC, date, datetime
from enum import StrEnum

View File

@@ -1,4 +1,5 @@
"""SQLAlchemy engine, session factory, and declarative Base for the CALM module."""
import logging
from pathlib import Path

View File

@@ -1,4 +1,5 @@
"""Dashboard routes for agent chat interactions and tool-call display."""
import json
import logging
from datetime import datetime

View File

@@ -1,4 +1,5 @@
"""Dashboard routes for the CALM task management and daily journaling interface."""
import logging
from datetime import UTC, date, datetime

View File

@@ -166,7 +166,9 @@ async def _get_content_pipeline() -> dict:
# Check for episode output files
output_dir = repo_root / "data" / "episodes"
if output_dir.exists():
episodes = sorted(output_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
episodes = sorted(
output_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True
)
if episodes:
result["last_episode"] = episodes[0].stem
result["highlight_count"] = len(list(output_dir.glob("highlights_*.json")))

View File

@@ -39,12 +39,7 @@ _SITEMAP_PAGES: list[tuple[str, str, str]] = [
async def robots_txt() -> str:
"""Allow all search engines; point to sitemap."""
base = settings.site_url.rstrip("/")
return (
"User-agent: *\n"
"Allow: /\n"
"\n"
f"Sitemap: {base}/sitemap.xml\n"
)
return f"User-agent: *\nAllow: /\n\nSitemap: {base}/sitemap.xml\n"
@router.get("/sitemap.xml")

View File

@@ -137,15 +137,11 @@ class BudgetTracker:
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_spend_ts ON cloud_spend(ts)"
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_spend_ts ON cloud_spend(ts)")
self._db_ok = True
logger.debug("BudgetTracker: SQLite initialised at %s", self._db_path)
except Exception as exc:
logger.warning(
"BudgetTracker: SQLite unavailable, using in-memory fallback: %s", exc
)
logger.warning("BudgetTracker: SQLite unavailable, using in-memory fallback: %s", exc)
def _connect(self) -> sqlite3.Connection:
return sqlite3.connect(self._db_path, timeout=5)

View File

@@ -44,9 +44,9 @@ logger = logging.getLogger(__name__)
class TierLabel(StrEnum):
"""Three cost-sorted model tiers."""
LOCAL_FAST = "local_fast" # 8B local, always hot, free
LOCAL_FAST = "local_fast" # 8B local, always hot, free
LOCAL_HEAVY = "local_heavy" # 70B local, free but slower
CLOUD_API = "cloud_api" # Paid cloud backend (Claude / GPT-4o)
CLOUD_API = "cloud_api" # Paid cloud backend (Claude / GPT-4o)
# ── Default model assignments (overridable via Settings) ──────────────────────
@@ -62,28 +62,81 @@ _DEFAULT_TIER_MODELS: dict[TierLabel, str] = {
# Patterns that indicate a Tier-1 (simple) task
_T1_WORDS: frozenset[str] = frozenset(
{
"go", "move", "walk", "run",
"north", "south", "east", "west", "up", "down", "left", "right",
"yes", "no", "ok", "okay",
"open", "close", "take", "drop", "look",
"pick", "use", "wait", "rest", "save",
"attack", "flee", "jump", "crouch",
"status", "ping", "list", "show", "get", "check",
"go",
"move",
"walk",
"run",
"north",
"south",
"east",
"west",
"up",
"down",
"left",
"right",
"yes",
"no",
"ok",
"okay",
"open",
"close",
"take",
"drop",
"look",
"pick",
"use",
"wait",
"rest",
"save",
"attack",
"flee",
"jump",
"crouch",
"status",
"ping",
"list",
"show",
"get",
"check",
}
)
# Patterns that indicate a Tier-2 or Tier-3 task
_T2_PHRASES: tuple[str, ...] = (
"plan", "strategy", "optimize", "optimise",
"quest", "stuck", "recover",
"negotiate", "persuade", "faction", "reputation",
"analyze", "analyse", "evaluate", "decide",
"complex", "multi-step", "long-term",
"how do i", "what should i do", "help me figure",
"what is the best", "recommend", "best way",
"explain", "describe in detail", "walk me through",
"compare", "design", "implement", "refactor",
"debug", "diagnose", "root cause",
"plan",
"strategy",
"optimize",
"optimise",
"quest",
"stuck",
"recover",
"negotiate",
"persuade",
"faction",
"reputation",
"analyze",
"analyse",
"evaluate",
"decide",
"complex",
"multi-step",
"long-term",
"how do i",
"what should i do",
"help me figure",
"what is the best",
"recommend",
"best way",
"explain",
"describe in detail",
"walk me through",
"compare",
"design",
"implement",
"refactor",
"debug",
"diagnose",
"root cause",
)
# Low-quality response detection patterns
@@ -132,20 +185,35 @@ def classify_tier(task: str, context: dict | None = None) -> TierLabel:
# ── Tier-2 / complexity signals ──────────────────────────────────────────
t2_phrase_hit = any(phrase in task_lower for phrase in _T2_PHRASES)
t2_word_hit = bool(words & {"plan", "strategy", "optimize", "optimise", "quest",
"stuck", "recover", "analyze", "analyse", "evaluate"})
t2_word_hit = bool(
words
& {
"plan",
"strategy",
"optimize",
"optimise",
"quest",
"stuck",
"recover",
"analyze",
"analyse",
"evaluate",
}
)
is_stuck = bool(ctx.get("stuck"))
require_t2 = bool(ctx.get("require_t2"))
long_input = len(task) > 300 # long tasks warrant more capable model
deep_context = (
len(ctx.get("active_quests", [])) >= 3
or ctx.get("dialogue_active")
)
deep_context = len(ctx.get("active_quests", [])) >= 3 or ctx.get("dialogue_active")
if t2_phrase_hit or t2_word_hit or is_stuck or require_t2 or long_input or deep_context:
logger.debug(
"classify_tier → LOCAL_HEAVY (phrase=%s word=%s stuck=%s explicit=%s long=%s ctx=%s)",
t2_phrase_hit, t2_word_hit, is_stuck, require_t2, long_input, deep_context,
t2_phrase_hit,
t2_word_hit,
is_stuck,
require_t2,
long_input,
deep_context,
)
return TierLabel.LOCAL_HEAVY
@@ -159,9 +227,7 @@ def classify_tier(task: str, context: dict | None = None) -> TierLabel:
)
if t1_word_hit and task_short and no_active_context:
logger.debug(
"classify_tier → LOCAL_FAST (words=%s short=%s)", t1_word_hit, task_short
)
logger.debug("classify_tier → LOCAL_FAST (words=%s short=%s)", t1_word_hit, task_short)
return TierLabel.LOCAL_FAST
# ── Default: LOCAL_HEAVY (safe for anything unclassified) ────────────────
@@ -267,12 +333,14 @@ class TieredModelRouter:
def _get_cascade(self) -> Any:
if self._cascade is None:
from infrastructure.router.cascade import get_router
self._cascade = get_router()
return self._cascade
def _get_budget(self) -> Any:
if self._budget is None:
from infrastructure.models.budget import get_budget_tracker
self._budget = get_budget_tracker()
return self._budget
@@ -318,10 +386,10 @@ class TieredModelRouter:
# ── Tier 1 attempt ───────────────────────────────────────────────────
if tier == TierLabel.LOCAL_FAST:
result = await self._complete_tier(
TierLabel.LOCAL_FAST, msgs, temperature, max_tokens
)
if self._auto_escalate and _is_low_quality(result.get("content", ""), TierLabel.LOCAL_FAST):
result = await self._complete_tier(TierLabel.LOCAL_FAST, msgs, temperature, max_tokens)
if self._auto_escalate and _is_low_quality(
result.get("content", ""), TierLabel.LOCAL_FAST
):
logger.info(
"TieredModelRouter: Tier-1 response low quality, escalating to Tier-2 "
"(task=%r content_len=%d)",
@@ -341,9 +409,7 @@ class TieredModelRouter:
TierLabel.LOCAL_HEAVY, msgs, temperature, max_tokens
)
except Exception as exc:
logger.warning(
"TieredModelRouter: Tier-2 failed (%s) — escalating to cloud", exc
)
logger.warning("TieredModelRouter: Tier-2 failed (%s) — escalating to cloud", exc)
tier = TierLabel.CLOUD_API
# ── Tier 3 (Cloud) ───────────────────────────────────────────────────
@@ -354,9 +420,7 @@ class TieredModelRouter:
"increase tier_cloud_daily_budget_usd or tier_cloud_monthly_budget_usd"
)
result = await self._complete_tier(
TierLabel.CLOUD_API, msgs, temperature, max_tokens
)
result = await self._complete_tier(TierLabel.CLOUD_API, msgs, temperature, max_tokens)
# Record cloud spend if token info is available
usage = result.get("usage", {})

View File

@@ -81,7 +81,9 @@ def schnorr_sign(msg: bytes, privkey_bytes: bytes) -> bytes:
# Deterministic nonce with auxiliary randomness (BIP-340 §Default signing)
rand = secrets.token_bytes(32)
t = bytes(x ^ y for x, y in zip(a.to_bytes(32, "big"), _tagged_hash("BIP0340/aux", rand), strict=True))
t = bytes(
x ^ y for x, y in zip(a.to_bytes(32, "big"), _tagged_hash("BIP0340/aux", rand), strict=True)
)
r_bytes = _tagged_hash("BIP0340/nonce", t + _x_bytes(P) + msg)
k_int = int.from_bytes(r_bytes, "big") % _N

View File

@@ -177,7 +177,7 @@ class NostrIdentityManager:
tags = [
["d", "timmy-mission-control"],
["k", "1"], # handles kind:1 (notes) as a starting point
["k", "1"], # handles kind:1 (notes) as a starting point
["k", "5600"], # DVM task request (NIP-90)
["k", "5900"], # DVM general task
]
@@ -208,9 +208,7 @@ class NostrIdentityManager:
relay_urls = self.get_relay_urls()
if not relay_urls:
logger.warning(
"NOSTR_RELAYS not configured — Kind 0 and Kind 31990 not published."
)
logger.warning("NOSTR_RELAYS not configured — Kind 0 and Kind 31990 not published.")
return result
logger.info(

View File

@@ -93,10 +93,7 @@ class AntiGriefPolicy:
self._record(player_id, command.action, "blocked action type")
return ActionResult(
status=ActionStatus.FAILURE,
message=(
f"Action '{command.action}' is not permitted "
"in community deployments."
),
message=(f"Action '{command.action}' is not permitted in community deployments."),
)
# 2. Rate-limit check (sliding window)

View File

@@ -103,9 +103,7 @@ class WorldStateBackup:
)
self._update_manifest(record)
self._rotate()
logger.info(
"WorldStateBackup: created %s (%d bytes)", backup_id, size
)
logger.info("WorldStateBackup: created %s (%d bytes)", backup_id, size)
return record
# -- restore -----------------------------------------------------------
@@ -167,12 +165,8 @@ class WorldStateBackup:
path.unlink(missing_ok=True)
logger.debug("WorldStateBackup: rotated out %s", rec.backup_id)
except OSError as exc:
logger.warning(
"WorldStateBackup: could not remove %s: %s", path, exc
)
logger.warning("WorldStateBackup: could not remove %s: %s", path, exc)
# Rewrite manifest with only the retained backups
keep = backups[: self._max]
manifest = self._dir / self.MANIFEST_NAME
manifest.write_text(
"\n".join(json.dumps(asdict(r)) for r in reversed(keep)) + "\n"
)
manifest.write_text("\n".join(json.dumps(asdict(r)) for r in reversed(keep)) + "\n")

View File

@@ -190,7 +190,5 @@ class ResourceMonitor:
return psutil
except ImportError:
logger.debug(
"ResourceMonitor: psutil not available — using stdlib fallback"
)
logger.debug("ResourceMonitor: psutil not available — using stdlib fallback")
return None

View File

@@ -95,9 +95,7 @@ class QuestArbiter:
quest_id=quest_id,
winner=existing.player_id,
loser=player_id,
resolution=(
f"first-come-first-served; {existing.player_id} retains lock"
),
resolution=(f"first-come-first-served; {existing.player_id} retains lock"),
)
self._conflicts.append(conflict)
logger.warning(

View File

@@ -174,11 +174,7 @@ class RecoveryManager:
def _trim(self) -> None:
"""Keep only the last *max_snapshots* lines."""
lines = [
ln
for ln in self._path.read_text().strip().splitlines()
if ln.strip()
]
lines = [ln for ln in self._path.read_text().strip().splitlines() if ln.strip()]
if len(lines) > self._max:
lines = lines[-self._max :]
self._path.write_text("\n".join(lines) + "\n")

View File

@@ -114,10 +114,7 @@ class MultiClientStressRunner:
)
suite_start = time.monotonic()
tasks = [
self._run_client(f"client-{i:02d}", scenario)
for i in range(self._client_count)
]
tasks = [self._run_client(f"client-{i:02d}", scenario) for i in range(self._client_count)]
report.results = list(await asyncio.gather(*tasks))
report.total_time_ms = int((time.monotonic() - suite_start) * 1000)

View File

@@ -108,8 +108,7 @@ class MumbleBridge:
import pymumble_py3 as pymumble
except ImportError:
logger.warning(
"MumbleBridge: pymumble-py3 not installed — "
'run: pip install ".[mumble]"'
'MumbleBridge: pymumble-py3 not installed — run: pip install ".[mumble]"'
)
return False
@@ -246,9 +245,7 @@ class MumbleBridge:
self._client.my_channel().move_in(channel)
logger.debug("MumbleBridge: joined channel '%s'", channel_name)
except Exception as exc:
logger.warning(
"MumbleBridge: could not join channel '%s'%s", channel_name, exc
)
logger.warning("MumbleBridge: could not join channel '%s'%s", channel_name, exc)
def _on_sound_received(self, user, soundchunk) -> None:
"""Called by pymumble when audio arrives from another user."""

View File

@@ -1,4 +1,5 @@
"""Typer CLI entry point for the ``timmy`` command (chat, think, status)."""
import asyncio
import logging
import subprocess

View File

@@ -80,9 +80,7 @@ class IntrospectionSnapshot:
cognitive: CognitiveSummary = field(default_factory=CognitiveSummary)
recent_thoughts: list[ThoughtSummary] = field(default_factory=list)
analytics: SessionAnalytics = field(default_factory=SessionAnalytics)
timestamp: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
def to_dict(self) -> dict:
return {
@@ -171,9 +169,7 @@ class NexusIntrospector:
return [
ThoughtSummary(
id=t.id,
content=(
t.content[:200] + "" if len(t.content) > 200 else t.content
),
content=(t.content[:200] + "" if len(t.content) > 200 else t.content),
seed_type=t.seed_type,
created_at=t.created_at,
parent_id=t.parent_id,
@@ -186,9 +182,7 @@ class NexusIntrospector:
# ── Session analytics ─────────────────────────────────────────────────
def _compute_analytics(
self, conversation_log: list[dict]
) -> SessionAnalytics:
def _compute_analytics(self, conversation_log: list[dict]) -> SessionAnalytics:
"""Derive analytics from the Nexus conversation log."""
if not conversation_log:
return SessionAnalytics()
@@ -197,9 +191,7 @@ class NexusIntrospector:
self._session_start = datetime.now(UTC)
user_msgs = [m for m in conversation_log if m.get("role") == "user"]
asst_msgs = [
m for m in conversation_log if m.get("role") == "assistant"
]
asst_msgs = [m for m in conversation_log if m.get("role") == "assistant"]
avg_len = 0.0
if asst_msgs:

View File

@@ -189,9 +189,7 @@ class NexusStore:
]
return messages
def message_count(
self, session_tag: str = DEFAULT_SESSION_TAG
) -> int:
def message_count(self, session_tag: str = DEFAULT_SESSION_TAG) -> int:
"""Return total message count for *session_tag*."""
conn = self._get_conn()
with closing(conn.cursor()) as cur:

View File

@@ -54,9 +54,7 @@ class SovereigntyPulseSnapshot:
crystallizations_last_hour: int = 0
api_independence_pct: float = 0.0
total_events: int = 0
timestamp: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
def to_dict(self) -> dict:
return {

View File

@@ -1,528 +0,0 @@
"""Research Orchestrator — autonomous, sovereign research pipeline.
Chains all six steps of the research workflow with local-first execution:
Step 0 Cache — check semantic memory (SQLite, instant, zero API cost)
Step 1 Scope — load a research template from skills/research/
Step 2 Query — slot-fill template + formulate 5-15 search queries via Ollama
Step 3 Search — execute queries via web_search (SerpAPI or fallback)
Step 4 Fetch — download + extract full pages via web_fetch (trafilatura)
Step 5 Synth — compress findings into a structured report via cascade
Step 6 Deliver — store to semantic memory; optionally save to docs/research/
Cascade tiers for synthesis (spec §4):
Tier 4 SQLite semantic cache — instant, free, covers ~80% after warm-up
Tier 3 Ollama (qwen3:14b) — local, free, good quality
Tier 2 Claude API (haiku) — cloud fallback, cheap, set ANTHROPIC_API_KEY
Tier 1 (future) Groq — free-tier rate-limited, tracked in #980
All optional services degrade gracefully per project conventions.
Refs #972 (governing spec), #975 (ResearchOrchestrator sub-issue).
"""
from __future__ import annotations
import asyncio
import logging
import re
import textwrap
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Optional memory imports — available at module level so tests can patch them.
try:
from timmy.memory_system import SemanticMemory, store_memory
except Exception: # pragma: no cover
SemanticMemory = None # type: ignore[assignment,misc]
store_memory = None # type: ignore[assignment]
# Root of the project — two levels up from src/timmy/
_PROJECT_ROOT = Path(__file__).parent.parent.parent
_SKILLS_ROOT = _PROJECT_ROOT / "skills" / "research"
_DOCS_ROOT = _PROJECT_ROOT / "docs" / "research"
# Similarity threshold for cache hit (01 cosine similarity)
_CACHE_HIT_THRESHOLD = 0.82
# How many search result URLs to fetch as full pages
_FETCH_TOP_N = 5
# Maximum tokens to request from the synthesis LLM
_SYNTHESIS_MAX_TOKENS = 4096
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ResearchResult:
"""Full output of a research pipeline run."""
topic: str
query_count: int
sources_fetched: int
report: str
cached: bool = False
cache_similarity: float = 0.0
synthesis_backend: str = "unknown"
errors: list[str] = field(default_factory=list)
def is_empty(self) -> bool:
return not self.report.strip()
# ---------------------------------------------------------------------------
# Template loading
# ---------------------------------------------------------------------------
def list_templates() -> list[str]:
"""Return names of available research templates (without .md extension)."""
if not _SKILLS_ROOT.exists():
return []
return [p.stem for p in sorted(_SKILLS_ROOT.glob("*.md"))]
def load_template(template_name: str, slots: dict[str, str] | None = None) -> str:
"""Load a research template and fill {slot} placeholders.
Args:
template_name: Stem of the .md file under skills/research/ (e.g. "tool_evaluation").
slots: Mapping of {placeholder} → replacement value.
Returns:
Template text with slots filled. Unfilled slots are left as-is.
"""
path = _SKILLS_ROOT / f"{template_name}.md"
if not path.exists():
available = ", ".join(list_templates()) or "(none)"
raise FileNotFoundError(
f"Research template {template_name!r} not found. "
f"Available: {available}"
)
text = path.read_text(encoding="utf-8")
# Strip YAML frontmatter (--- ... ---), including empty frontmatter (--- \n---)
text = re.sub(r"^---\n.*?---\n", "", text, flags=re.DOTALL)
if slots:
for key, value in slots.items():
text = text.replace(f"{{{key}}}", value)
return text.strip()
# ---------------------------------------------------------------------------
# Query formulation (Step 2)
# ---------------------------------------------------------------------------
async def _formulate_queries(topic: str, template_context: str, n: int = 8) -> list[str]:
"""Use the local LLM to generate targeted search queries for a topic.
Falls back to a simple heuristic if Ollama is unavailable.
"""
prompt = textwrap.dedent(f"""\
You are a research assistant. Generate exactly {n} targeted, specific web search
queries to thoroughly research the following topic.
TOPIC: {topic}
RESEARCH CONTEXT:
{template_context[:1000]}
Rules:
- One query per line, no numbering, no bullet points.
- Vary the angle (definition, comparison, implementation, alternatives, pitfalls).
- Prefer exact technical terms, tool names, and version numbers where relevant.
- Output ONLY the queries, nothing else.
""")
queries = await _ollama_complete(prompt, max_tokens=512)
if not queries:
# Minimal fallback
return [
f"{topic} overview",
f"{topic} tutorial",
f"{topic} best practices",
f"{topic} alternatives",
f"{topic} 2025",
]
lines = [ln.strip() for ln in queries.splitlines() if ln.strip()]
return lines[:n] if len(lines) >= n else lines
# ---------------------------------------------------------------------------
# Search (Step 3)
# ---------------------------------------------------------------------------
async def _execute_search(queries: list[str]) -> list[dict[str, str]]:
"""Run each query through the available web search backend.
Returns a flat list of {title, url, snippet} dicts.
Degrades gracefully if SerpAPI key is absent.
"""
results: list[dict[str, str]] = []
seen_urls: set[str] = set()
for query in queries:
try:
raw = await asyncio.to_thread(_run_search_sync, query)
for item in raw:
url = item.get("url", "")
if url and url not in seen_urls:
seen_urls.add(url)
results.append(item)
except Exception as exc:
logger.warning("Search failed for query %r: %s", query, exc)
return results
def _run_search_sync(query: str) -> list[dict[str, str]]:
"""Synchronous search — wraps SerpAPI or returns empty on missing key."""
import os
if not os.environ.get("SERPAPI_API_KEY"):
logger.debug("SERPAPI_API_KEY not set — skipping web search for %r", query)
return []
try:
from serpapi import GoogleSearch
params = {"q": query, "api_key": os.environ["SERPAPI_API_KEY"], "num": 5}
search = GoogleSearch(params)
data = search.get_dict()
items = []
for r in data.get("organic_results", []):
items.append(
{
"title": r.get("title", ""),
"url": r.get("link", ""),
"snippet": r.get("snippet", ""),
}
)
return items
except Exception as exc:
logger.warning("SerpAPI search error: %s", exc)
return []
# ---------------------------------------------------------------------------
# Fetch (Step 4)
# ---------------------------------------------------------------------------
async def _fetch_pages(results: list[dict[str, str]], top_n: int = _FETCH_TOP_N) -> list[str]:
"""Download and extract full text for the top search results.
Uses web_fetch (trafilatura) from timmy.tools.system_tools.
"""
try:
from timmy.tools.system_tools import web_fetch
except ImportError:
logger.warning("web_fetch not available — skipping page fetch")
return []
pages: list[str] = []
for item in results[:top_n]:
url = item.get("url", "")
if not url:
continue
try:
text = await asyncio.to_thread(web_fetch, url, 6000)
if text and not text.startswith("Error:"):
pages.append(f"## {item.get('title', url)}\nSource: {url}\n\n{text}")
except Exception as exc:
logger.warning("Failed to fetch %s: %s", url, exc)
return pages
# ---------------------------------------------------------------------------
# Synthesis (Step 5) — cascade: Ollama → Claude fallback
# ---------------------------------------------------------------------------
async def _synthesize(topic: str, pages: list[str], snippets: list[str]) -> tuple[str, str]:
"""Compress fetched pages + snippets into a structured research report.
Returns (report_markdown, backend_used).
"""
# Build synthesis prompt
source_content = "\n\n---\n\n".join(pages[:5])
if not source_content and snippets:
source_content = "\n".join(f"- {s}" for s in snippets[:20])
if not source_content:
return (
f"# Research: {topic}\n\n*No source material was retrieved. "
"Check SERPAPI_API_KEY and network connectivity.*",
"none",
)
prompt = textwrap.dedent(f"""\
You are a senior technical researcher. Synthesize the source material below
into a structured research report on the topic: **{topic}**
FORMAT YOUR REPORT AS:
# {topic}
## Executive Summary
(2-3 sentences: what you found, top recommendation)
## Key Findings
(Bullet list of the most important facts, tools, or patterns)
## Comparison / Options
(Table or list comparing alternatives where applicable)
## Recommended Approach
(Concrete recommendation with rationale)
## Gaps & Next Steps
(What wasn't answered, what to investigate next)
---
SOURCE MATERIAL:
{source_content[:12000]}
""")
# Tier 3 — try Ollama first
report = await _ollama_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
if report:
return report, "ollama"
# Tier 2 — Claude fallback
report = await _claude_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
if report:
return report, "claude"
# Last resort — structured snippet summary
summary = f"# {topic}\n\n## Snippets\n\n" + "\n\n".join(
f"- {s}" for s in snippets[:15]
)
return summary, "fallback"
# ---------------------------------------------------------------------------
# LLM helpers
# ---------------------------------------------------------------------------
async def _ollama_complete(prompt: str, max_tokens: int = 1024) -> str:
"""Send a prompt to Ollama and return the response text.
Returns empty string on failure (graceful degradation).
"""
try:
import httpx
from config import settings
url = f"{settings.normalized_ollama_url}/api/generate"
payload: dict[str, Any] = {
"model": settings.ollama_model,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": max_tokens,
"temperature": 0.3,
},
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
return data.get("response", "").strip()
except Exception as exc:
logger.warning("Ollama completion failed: %s", exc)
return ""
async def _claude_complete(prompt: str, max_tokens: int = 1024) -> str:
"""Send a prompt to Claude API as a last-resort fallback.
Only active when ANTHROPIC_API_KEY is configured.
Returns empty string on failure or missing key.
"""
try:
from config import settings
if not settings.anthropic_api_key:
return ""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend()
result = await asyncio.to_thread(backend.run, prompt)
return result.content.strip()
except Exception as exc:
logger.warning("Claude fallback failed: %s", exc)
return ""
# ---------------------------------------------------------------------------
# Memory cache (Step 0 + Step 6)
# ---------------------------------------------------------------------------
def _check_cache(topic: str) -> tuple[str | None, float]:
"""Search semantic memory for a prior result on this topic.
Returns (cached_report, similarity) or (None, 0.0).
"""
try:
if SemanticMemory is None:
return None, 0.0
mem = SemanticMemory()
hits = mem.search(topic, top_k=1)
if hits:
content, score = hits[0]
if score >= _CACHE_HIT_THRESHOLD:
return content, score
except Exception as exc:
logger.debug("Cache check failed: %s", exc)
return None, 0.0
def _store_result(topic: str, report: str) -> None:
"""Index the research report into semantic memory for future retrieval."""
try:
if store_memory is None:
logger.debug("store_memory not available — skipping memory index")
return
store_memory(
content=report,
source="research_pipeline",
context_type="research",
metadata={"topic": topic},
)
logger.info("Research result indexed for topic: %r", topic)
except Exception as exc:
logger.warning("Failed to store research result: %s", exc)
def _save_to_disk(topic: str, report: str) -> Path | None:
"""Persist the report as a markdown file under docs/research/.
Filename is derived from the topic (slugified). Returns the path or None.
"""
try:
slug = re.sub(r"[^a-z0-9]+", "-", topic.lower()).strip("-")[:60]
_DOCS_ROOT.mkdir(parents=True, exist_ok=True)
path = _DOCS_ROOT / f"{slug}.md"
path.write_text(report, encoding="utf-8")
logger.info("Research report saved to %s", path)
return path
except Exception as exc:
logger.warning("Failed to save research report to disk: %s", exc)
return None
# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------
async def run_research(
topic: str,
template: str | None = None,
slots: dict[str, str] | None = None,
save_to_disk: bool = False,
skip_cache: bool = False,
) -> ResearchResult:
"""Run the full 6-step autonomous research pipeline.
Args:
topic: The research question or subject.
template: Name of a template from skills/research/ (e.g. "tool_evaluation").
If None, runs without a template scaffold.
slots: Placeholder values for the template (e.g. {"domain": "PDF parsing"}).
save_to_disk: If True, write the report to docs/research/<slug>.md.
skip_cache: If True, bypass the semantic memory cache.
Returns:
ResearchResult with report and metadata.
"""
errors: list[str] = []
# ------------------------------------------------------------------
# Step 0 — check cache
# ------------------------------------------------------------------
if not skip_cache:
cached, score = _check_cache(topic)
if cached:
logger.info("Cache hit (%.2f) for topic: %r", score, topic)
return ResearchResult(
topic=topic,
query_count=0,
sources_fetched=0,
report=cached,
cached=True,
cache_similarity=score,
synthesis_backend="cache",
)
# ------------------------------------------------------------------
# Step 1 — load template (optional)
# ------------------------------------------------------------------
template_context = ""
if template:
try:
template_context = load_template(template, slots)
except FileNotFoundError as exc:
errors.append(str(exc))
logger.warning("Template load failed: %s", exc)
# ------------------------------------------------------------------
# Step 2 — formulate queries
# ------------------------------------------------------------------
queries = await _formulate_queries(topic, template_context)
logger.info("Formulated %d queries for topic: %r", len(queries), topic)
# ------------------------------------------------------------------
# Step 3 — execute search
# ------------------------------------------------------------------
search_results = await _execute_search(queries)
logger.info("Search returned %d results", len(search_results))
snippets = [r.get("snippet", "") for r in search_results if r.get("snippet")]
# ------------------------------------------------------------------
# Step 4 — fetch full pages
# ------------------------------------------------------------------
pages = await _fetch_pages(search_results)
logger.info("Fetched %d pages", len(pages))
# ------------------------------------------------------------------
# Step 5 — synthesize
# ------------------------------------------------------------------
report, backend = await _synthesize(topic, pages, snippets)
# ------------------------------------------------------------------
# Step 6 — deliver
# ------------------------------------------------------------------
_store_result(topic, report)
if save_to_disk:
_save_to_disk(topic, report)
return ResearchResult(
topic=topic,
query_count=len(queries),
sources_fetched=len(pages),
report=report,
cached=False,
synthesis_backend=backend,
errors=errors,
)

View File

@@ -0,0 +1,24 @@
"""Research subpackage — re-exports all public names for backward compatibility.
Refs #972 (governing spec), #975 (ResearchOrchestrator sub-issue).
"""
from timmy.research.coordinator import (
ResearchResult,
_check_cache,
_save_to_disk,
_store_result,
list_templates,
load_template,
run_research,
)
__all__ = [
"ResearchResult",
"_check_cache",
"_save_to_disk",
"_store_result",
"list_templates",
"load_template",
"run_research",
]

View File

@@ -0,0 +1,259 @@
"""Research coordinator — orchestrator, data structures, cache, and disk I/O.
Split from the monolithic ``research.py`` for maintainability.
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from pathlib import Path
logger = logging.getLogger(__name__)
# Optional memory imports — available at module level so tests can patch them.
try:
from timmy.memory_system import SemanticMemory, store_memory
except Exception: # pragma: no cover
SemanticMemory = None # type: ignore[assignment,misc]
store_memory = None # type: ignore[assignment]
# Root of the project — two levels up from src/timmy/research/
_PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
_SKILLS_ROOT = _PROJECT_ROOT / "skills" / "research"
_DOCS_ROOT = _PROJECT_ROOT / "docs" / "research"
# Similarity threshold for cache hit (01 cosine similarity)
_CACHE_HIT_THRESHOLD = 0.82
# How many search result URLs to fetch as full pages
_FETCH_TOP_N = 5
# Maximum tokens to request from the synthesis LLM
_SYNTHESIS_MAX_TOKENS = 4096
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ResearchResult:
"""Full output of a research pipeline run."""
topic: str
query_count: int
sources_fetched: int
report: str
cached: bool = False
cache_similarity: float = 0.0
synthesis_backend: str = "unknown"
errors: list[str] = field(default_factory=list)
def is_empty(self) -> bool:
return not self.report.strip()
# ---------------------------------------------------------------------------
# Template loading
# ---------------------------------------------------------------------------
def list_templates() -> list[str]:
"""Return names of available research templates (without .md extension)."""
if not _SKILLS_ROOT.exists():
return []
return [p.stem for p in sorted(_SKILLS_ROOT.glob("*.md"))]
def load_template(template_name: str, slots: dict[str, str] | None = None) -> str:
"""Load a research template and fill {slot} placeholders.
Args:
template_name: Stem of the .md file under skills/research/ (e.g. "tool_evaluation").
slots: Mapping of {placeholder} → replacement value.
Returns:
Template text with slots filled. Unfilled slots are left as-is.
"""
path = _SKILLS_ROOT / f"{template_name}.md"
if not path.exists():
available = ", ".join(list_templates()) or "(none)"
raise FileNotFoundError(
f"Research template {template_name!r} not found. Available: {available}"
)
text = path.read_text(encoding="utf-8")
# Strip YAML frontmatter (--- ... ---), including empty frontmatter (--- \n---)
text = re.sub(r"^---\n.*?---\n", "", text, flags=re.DOTALL)
if slots:
for key, value in slots.items():
text = text.replace(f"{{{key}}}", value)
return text.strip()
# ---------------------------------------------------------------------------
# Memory cache (Step 0 + Step 6)
# ---------------------------------------------------------------------------
def _check_cache(topic: str) -> tuple[str | None, float]:
"""Search semantic memory for a prior result on this topic.
Returns (cached_report, similarity) or (None, 0.0).
"""
try:
if SemanticMemory is None:
return None, 0.0
mem = SemanticMemory()
hits = mem.search(topic, top_k=1)
if hits:
content, score = hits[0]
if score >= _CACHE_HIT_THRESHOLD:
return content, score
except Exception as exc:
logger.debug("Cache check failed: %s", exc)
return None, 0.0
def _store_result(topic: str, report: str) -> None:
"""Index the research report into semantic memory for future retrieval."""
try:
if store_memory is None:
logger.debug("store_memory not available — skipping memory index")
return
store_memory(
content=report,
source="research_pipeline",
context_type="research",
metadata={"topic": topic},
)
logger.info("Research result indexed for topic: %r", topic)
except Exception as exc:
logger.warning("Failed to store research result: %s", exc)
def _save_to_disk(topic: str, report: str) -> Path | None:
"""Persist the report as a markdown file under docs/research/.
Filename is derived from the topic (slugified). Returns the path or None.
"""
try:
slug = re.sub(r"[^a-z0-9]+", "-", topic.lower()).strip("-")[:60]
_DOCS_ROOT.mkdir(parents=True, exist_ok=True)
path = _DOCS_ROOT / f"{slug}.md"
path.write_text(report, encoding="utf-8")
logger.info("Research report saved to %s", path)
return path
except Exception as exc:
logger.warning("Failed to save research report to disk: %s", exc)
return None
# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------
async def run_research(
topic: str,
template: str | None = None,
slots: dict[str, str] | None = None,
save_to_disk: bool = False,
skip_cache: bool = False,
) -> ResearchResult:
"""Run the full 6-step autonomous research pipeline.
Args:
topic: The research question or subject.
template: Name of a template from skills/research/ (e.g. "tool_evaluation").
If None, runs without a template scaffold.
slots: Placeholder values for the template (e.g. {"domain": "PDF parsing"}).
save_to_disk: If True, write the report to docs/research/<slug>.md.
skip_cache: If True, bypass the semantic memory cache.
Returns:
ResearchResult with report and metadata.
"""
from timmy.research.sources import (
_execute_search,
_fetch_pages,
_formulate_queries,
_synthesize,
)
errors: list[str] = []
# ------------------------------------------------------------------
# Step 0 — check cache
# ------------------------------------------------------------------
if not skip_cache:
cached, score = _check_cache(topic)
if cached:
logger.info("Cache hit (%.2f) for topic: %r", score, topic)
return ResearchResult(
topic=topic,
query_count=0,
sources_fetched=0,
report=cached,
cached=True,
cache_similarity=score,
synthesis_backend="cache",
)
# ------------------------------------------------------------------
# Step 1 — load template (optional)
# ------------------------------------------------------------------
template_context = ""
if template:
try:
template_context = load_template(template, slots)
except FileNotFoundError as exc:
errors.append(str(exc))
logger.warning("Template load failed: %s", exc)
# ------------------------------------------------------------------
# Step 2 — formulate queries
# ------------------------------------------------------------------
queries = await _formulate_queries(topic, template_context)
logger.info("Formulated %d queries for topic: %r", len(queries), topic)
# ------------------------------------------------------------------
# Step 3 — execute search
# ------------------------------------------------------------------
search_results = await _execute_search(queries)
logger.info("Search returned %d results", len(search_results))
snippets = [r.get("snippet", "") for r in search_results if r.get("snippet")]
# ------------------------------------------------------------------
# Step 4 — fetch full pages
# ------------------------------------------------------------------
pages = await _fetch_pages(search_results)
logger.info("Fetched %d pages", len(pages))
# ------------------------------------------------------------------
# Step 5 — synthesize
# ------------------------------------------------------------------
report, backend = await _synthesize(topic, pages, snippets)
# ------------------------------------------------------------------
# Step 6 — deliver
# ------------------------------------------------------------------
_store_result(topic, report)
if save_to_disk:
_save_to_disk(topic, report)
return ResearchResult(
topic=topic,
query_count=len(queries),
sources_fetched=len(pages),
report=report,
cached=False,
synthesis_backend=backend,
errors=errors,
)

View File

@@ -0,0 +1,267 @@
"""Research I/O helpers — search, fetch, LLM completions, and synthesis.
Split from the monolithic ``research.py`` for maintainability.
"""
from __future__ import annotations
import asyncio
import logging
import textwrap
from typing import Any
from timmy.research.coordinator import _FETCH_TOP_N, _SYNTHESIS_MAX_TOKENS
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Query formulation (Step 2)
# ---------------------------------------------------------------------------
async def _formulate_queries(topic: str, template_context: str, n: int = 8) -> list[str]:
"""Use the local LLM to generate targeted search queries for a topic.
Falls back to a simple heuristic if Ollama is unavailable.
"""
prompt = textwrap.dedent(f"""\
You are a research assistant. Generate exactly {n} targeted, specific web search
queries to thoroughly research the following topic.
TOPIC: {topic}
RESEARCH CONTEXT:
{template_context[:1000]}
Rules:
- One query per line, no numbering, no bullet points.
- Vary the angle (definition, comparison, implementation, alternatives, pitfalls).
- Prefer exact technical terms, tool names, and version numbers where relevant.
- Output ONLY the queries, nothing else.
""")
queries = await _ollama_complete(prompt, max_tokens=512)
if not queries:
# Minimal fallback
return [
f"{topic} overview",
f"{topic} tutorial",
f"{topic} best practices",
f"{topic} alternatives",
f"{topic} 2025",
]
lines = [ln.strip() for ln in queries.splitlines() if ln.strip()]
return lines[:n] if len(lines) >= n else lines
# ---------------------------------------------------------------------------
# Search (Step 3)
# ---------------------------------------------------------------------------
async def _execute_search(queries: list[str]) -> list[dict[str, str]]:
"""Run each query through the available web search backend.
Returns a flat list of {title, url, snippet} dicts.
Degrades gracefully if SerpAPI key is absent.
"""
results: list[dict[str, str]] = []
seen_urls: set[str] = set()
for query in queries:
try:
raw = await asyncio.to_thread(_run_search_sync, query)
for item in raw:
url = item.get("url", "")
if url and url not in seen_urls:
seen_urls.add(url)
results.append(item)
except Exception as exc:
logger.warning("Search failed for query %r: %s", query, exc)
return results
def _run_search_sync(query: str) -> list[dict[str, str]]:
"""Synchronous search — wraps SerpAPI or returns empty on missing key."""
import os
if not os.environ.get("SERPAPI_API_KEY"):
logger.debug("SERPAPI_API_KEY not set — skipping web search for %r", query)
return []
try:
from serpapi import GoogleSearch
params = {"q": query, "api_key": os.environ["SERPAPI_API_KEY"], "num": 5}
search = GoogleSearch(params)
data = search.get_dict()
items = []
for r in data.get("organic_results", []):
items.append(
{
"title": r.get("title", ""),
"url": r.get("link", ""),
"snippet": r.get("snippet", ""),
}
)
return items
except Exception as exc:
logger.warning("SerpAPI search error: %s", exc)
return []
# ---------------------------------------------------------------------------
# Fetch (Step 4)
# ---------------------------------------------------------------------------
async def _fetch_pages(results: list[dict[str, str]], top_n: int = _FETCH_TOP_N) -> list[str]:
"""Download and extract full text for the top search results.
Uses web_fetch (trafilatura) from timmy.tools.system_tools.
"""
try:
from timmy.tools.system_tools import web_fetch
except ImportError:
logger.warning("web_fetch not available — skipping page fetch")
return []
pages: list[str] = []
for item in results[:top_n]:
url = item.get("url", "")
if not url:
continue
try:
text = await asyncio.to_thread(web_fetch, url, 6000)
if text and not text.startswith("Error:"):
pages.append(f"## {item.get('title', url)}\nSource: {url}\n\n{text}")
except Exception as exc:
logger.warning("Failed to fetch %s: %s", url, exc)
return pages
# ---------------------------------------------------------------------------
# Synthesis (Step 5) — cascade: Ollama → Claude fallback
# ---------------------------------------------------------------------------
async def _synthesize(topic: str, pages: list[str], snippets: list[str]) -> tuple[str, str]:
"""Compress fetched pages + snippets into a structured research report.
Returns (report_markdown, backend_used).
"""
# Build synthesis prompt
source_content = "\n\n---\n\n".join(pages[:5])
if not source_content and snippets:
source_content = "\n".join(f"- {s}" for s in snippets[:20])
if not source_content:
return (
f"# Research: {topic}\n\n*No source material was retrieved. "
"Check SERPAPI_API_KEY and network connectivity.*",
"none",
)
prompt = textwrap.dedent(f"""\
You are a senior technical researcher. Synthesize the source material below
into a structured research report on the topic: **{topic}**
FORMAT YOUR REPORT AS:
# {topic}
## Executive Summary
(2-3 sentences: what you found, top recommendation)
## Key Findings
(Bullet list of the most important facts, tools, or patterns)
## Comparison / Options
(Table or list comparing alternatives where applicable)
## Recommended Approach
(Concrete recommendation with rationale)
## Gaps & Next Steps
(What wasn't answered, what to investigate next)
---
SOURCE MATERIAL:
{source_content[:12000]}
""")
# Tier 3 — try Ollama first
report = await _ollama_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
if report:
return report, "ollama"
# Tier 2 — Claude fallback
report = await _claude_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
if report:
return report, "claude"
# Last resort — structured snippet summary
summary = f"# {topic}\n\n## Snippets\n\n" + "\n\n".join(f"- {s}" for s in snippets[:15])
return summary, "fallback"
# ---------------------------------------------------------------------------
# LLM helpers
# ---------------------------------------------------------------------------
async def _ollama_complete(prompt: str, max_tokens: int = 1024) -> str:
"""Send a prompt to Ollama and return the response text.
Returns empty string on failure (graceful degradation).
"""
try:
import httpx
from config import settings
url = f"{settings.normalized_ollama_url}/api/generate"
payload: dict[str, Any] = {
"model": settings.ollama_model,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": max_tokens,
"temperature": 0.3,
},
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
return data.get("response", "").strip()
except Exception as exc:
logger.warning("Ollama completion failed: %s", exc)
return ""
async def _claude_complete(prompt: str, max_tokens: int = 1024) -> str:
"""Send a prompt to Claude API as a last-resort fallback.
Only active when ANTHROPIC_API_KEY is configured.
Returns empty string on failure or missing key.
"""
try:
from config import settings
if not settings.anthropic_api_key:
return ""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend()
result = await asyncio.to_thread(backend.run, prompt)
return result.content.strip()
except Exception as exc:
logger.warning("Claude fallback failed: %s", exc)
return ""

View File

@@ -368,9 +368,7 @@ def _render_markdown(
if start_val is not None and end_val is not None:
diff = end_val - start_val
sign = "+" if diff >= 0 else ""
lines.append(
f"- **{metric_type}**: {start_val:.4f}{end_val:.4f} ({sign}{diff:.4f})"
)
lines.append(f"- **{metric_type}**: {start_val:.4f}{end_val:.4f} ({sign}{diff:.4f})")
else:
lines.append(f"- **{metric_type}**: N/A (no data recorded)")

View File

@@ -98,9 +98,7 @@ class VoiceLoop(STTMixin, TTSMixin, LLMMixin):
return True
voice_path = self.config.piper_voice
if not voice_path.exists():
logger.warning(
"Piper voice not found at %s — falling back to `say`", voice_path
)
logger.warning("Piper voice not found at %s — falling back to `say`", voice_path)
self.config.use_say_fallback = True
return True
return True

View File

@@ -31,7 +31,16 @@ class TestMonitoringStatusEndpoint:
response = client.get("/monitoring/status")
assert response.status_code == 200
data = response.json()
for key in ("timestamp", "uptime_seconds", "agents", "resources", "economy", "stream", "pipeline", "alerts"):
for key in (
"timestamp",
"uptime_seconds",
"agents",
"resources",
"economy",
"stream",
"pipeline",
"alerts",
):
assert key in data, f"Missing key: {key}"
def test_agents_is_list(self, client):
@@ -48,7 +57,13 @@ class TestMonitoringStatusEndpoint:
response = client.get("/monitoring/status")
data = response.json()
resources = data["resources"]
for field in ("disk_percent", "disk_free_gb", "ollama_reachable", "loaded_models", "warnings"):
for field in (
"disk_percent",
"disk_free_gb",
"ollama_reachable",
"loaded_models",
"warnings",
):
assert field in resources, f"Missing resource field: {field}"
def test_economy_has_expected_fields(self, client):

View File

@@ -71,7 +71,10 @@ class TestAggregateMetricsEdgeCases:
Event(
type="test.execution",
source="ci",
data={"actor": "gemini", "test_files": ["tests/test_alpha.py", "tests/test_beta.py"]},
data={
"actor": "gemini",
"test_files": ["tests/test_alpha.py", "tests/test_beta.py"],
},
),
]
result = _aggregate_metrics(events)

View File

@@ -106,7 +106,12 @@ class TestBudgetTrackerCloudAllowed:
def test_allowed_when_no_spend(self):
tracker = BudgetTracker(db_path=":memory:")
with (
patch.object(type(tracker._get_budget() if hasattr(tracker, "_get_budget") else tracker), "tier_cloud_daily_budget_usd", 5.0, create=True),
patch.object(
type(tracker._get_budget() if hasattr(tracker, "_get_budget") else tracker),
"tier_cloud_daily_budget_usd",
5.0,
create=True,
),
):
# Settings-based check — use real settings (5.0 default, 0 spent)
assert tracker.cloud_allowed() is True
@@ -166,12 +171,14 @@ class TestBudgetTrackerSummary:
class TestGetBudgetTrackerSingleton:
def test_returns_budget_tracker(self):
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker = get_budget_tracker()
assert isinstance(tracker, BudgetTracker)
def test_returns_same_instance(self):
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
t1 = get_budget_tracker()
t2 = get_budget_tracker()

View File

@@ -53,7 +53,15 @@ class TestSpendRecord:
def test_spend_record_with_zero_tokens(self):
"""Test SpendRecord with zero tokens."""
ts = time.time()
record = SpendRecord(ts=ts, provider="openai", model="gpt-4o", tokens_in=0, tokens_out=0, cost_usd=0.0, tier="cloud")
record = SpendRecord(
ts=ts,
provider="openai",
model="gpt-4o",
tokens_in=0,
tokens_out=0,
cost_usd=0.0,
tier="cloud",
)
assert record.tokens_in == 0
assert record.tokens_out == 0
@@ -261,15 +269,11 @@ class TestBudgetTrackerSpendQueries:
# Add record for today
today_ts = datetime.combine(date.today(), datetime.min.time(), tzinfo=UTC).timestamp()
tracker._in_memory.append(
SpendRecord(today_ts + 3600, "test", "model", 0, 0, 1.0, "cloud")
)
tracker._in_memory.append(SpendRecord(today_ts + 3600, "test", "model", 0, 0, 1.0, "cloud"))
# Add old record (2 days ago)
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "old_model", 0, 0, 2.0, "cloud")
)
tracker._in_memory.append(SpendRecord(old_ts, "test", "old_model", 0, 0, 2.0, "cloud"))
# Daily should only include today's 1.0
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
@@ -448,9 +452,7 @@ class TestBudgetTrackerInMemoryFallback:
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "model", 0, 0, 1.0, "cloud")
)
tracker._in_memory.append(SpendRecord(old_ts, "test", "model", 0, 0, 1.0, "cloud"))
# Query for records in last day
since_ts = (datetime.now(UTC) - timedelta(days=1)).timestamp()
result = tracker._query_spend(since_ts)

View File

@@ -368,12 +368,14 @@ class TestTieredModelRouterClassify:
class TestGetTieredRouterSingleton:
def test_returns_tiered_router_instance(self):
import infrastructure.models.router as rmod
rmod._tiered_router = None
router = get_tiered_router()
assert isinstance(router, TieredModelRouter)
def test_singleton_returns_same_instance(self):
import infrastructure.models.router as rmod
rmod._tiered_router = None
r1 = get_tiered_router()
r2 = get_tiered_router()

View File

@@ -25,9 +25,7 @@ def _pcm_tone(ms: int = 10, sample_rate: int = 48000, amplitude: int = 16000) ->
n = sample_rate * ms // 1000
freq = 440 # Hz
samples = [
int(amplitude * math.sin(2 * math.pi * freq * i / sample_rate)) for i in range(n)
]
samples = [int(amplitude * math.sin(2 * math.pi * freq * i / sample_rate)) for i in range(n)]
return struct.pack(f"<{n}h", *samples)

View File

@@ -23,22 +23,27 @@ def mock_files(tmp_path):
return tmp_path
def test_get_prompt(mock_files):
"""Tests that the prompt is read correctly."""
with patch("scripts.llm_triage.PROMPT_PATH", mock_files / "scripts/deep_triage_prompt.md"):
prompt = get_prompt()
assert prompt == "This is the prompt."
def test_get_context(mock_files):
"""Tests that the context is constructed correctly."""
with patch("scripts.llm_triage.QUEUE_PATH", mock_files / ".loop/queue.json"), \
patch("scripts.llm_triage.SUMMARY_PATH", mock_files / ".loop/retro/summary.json"), \
patch("scripts.llm_triage.RETRO_PATH", mock_files / ".loop/retro/deep-triage.jsonl"):
with (
patch("scripts.llm_triage.QUEUE_PATH", mock_files / ".loop/queue.json"),
patch("scripts.llm_triage.SUMMARY_PATH", mock_files / ".loop/retro/summary.json"),
patch("scripts.llm_triage.RETRO_PATH", mock_files / ".loop/retro/deep-triage.jsonl"),
):
context = get_context()
assert "CURRENT QUEUE (.loop/queue.json):\\n[]" in context
assert "CYCLE SUMMARY (.loop/retro/summary.json):\\n{}" in context
assert "LAST DEEP TRIAGE RETRO:\\n" in context
def test_parse_llm_response():
"""Tests that the LLM's response is parsed correctly."""
response = '{"queue": [1, 2, 3], "retro": {"a": 1}}'
@@ -46,6 +51,7 @@ def test_parse_llm_response():
assert queue == [1, 2, 3]
assert retro == {"a": 1}
@patch("scripts.llm_triage.get_llm_client")
@patch("scripts.llm_triage.GiteaClient")
def test_run_triage(mock_gitea_client, mock_llm_client, mock_files):
@@ -66,11 +72,13 @@ def test_run_triage(mock_gitea_client, mock_llm_client, mock_files):
# Check that the queue and retro files were written
assert (mock_files / ".loop/queue.json").read_text() == '[{"issue": 1}]'
assert (mock_files / ".loop/retro/deep-triage.jsonl").read_text() == '{"issues_closed": [2], "issues_created": [{"title": "New Issue", "body": "This is a new issue."}]}\n'
assert (
(mock_files / ".loop/retro/deep-triage.jsonl").read_text()
== '{"issues_closed": [2], "issues_created": [{"title": "New Issue", "body": "This is a new issue."}]}\n'
)
# Check that the Gitea client was called correctly
mock_gitea_client.return_value.close_issue.assert_called_once_with(2)
mock_gitea_client.return_value.create_issue.assert_called_once_with(
"New Issue", "This is a new issue."
)

View File

@@ -28,6 +28,7 @@ def tmp_spark_db(tmp_path, monkeypatch):
def reset_engine():
"""Ensure the engine singleton is cleared between tests."""
from spark.engine import reset_spark_engine
reset_spark_engine()
yield
reset_spark_engine()
@@ -130,6 +131,7 @@ class TestGetSparkEngineSingleton:
mock_settings.spark_enabled = False
with patch("spark.engine.settings", mock_settings, create=True):
from spark.engine import reset_spark_engine
reset_spark_engine()
# Patch at import time by mocking the config module in engine
import spark.engine as engine_module
@@ -238,6 +240,7 @@ class TestDisabledEngineGuards:
def setup_method(self):
from spark.engine import SparkEngine
self.engine = SparkEngine(enabled=False)
def test_on_task_posted_disabled(self):

View File

@@ -95,18 +95,14 @@ class TestNexusIntrospector:
intro = NexusIntrospector()
intro.record_memory_hits(3)
intro.record_memory_hits(2)
snap = intro.snapshot(
conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}]
)
snap = intro.snapshot(conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}])
assert snap.analytics.memory_hits_total == 5
def test_reset_clears_state(self):
intro = NexusIntrospector()
intro.record_memory_hits(10)
intro.reset()
snap = intro.snapshot(
conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}]
)
snap = intro.snapshot(conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}])
assert snap.analytics.memory_hits_total == 0
def test_topics_deduplication(self):

View File

@@ -89,9 +89,7 @@ class TestSovereigntyPulse:
mock_store = MagicMock()
mock_store.get_snapshot.return_value = mock_snapshot
with patch(
"timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store
):
with patch("timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store):
snap = pulse.snapshot()
# Perception: 8/10 = 80%, Decision: 6/10 = 60%, Narration: 10/10 = 100%
@@ -120,9 +118,7 @@ class TestSovereigntyPulse:
mock_store = MagicMock()
mock_store.get_snapshot.return_value = mock_snapshot
with patch(
"timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store
):
with patch("timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store):
snap = pulse.snapshot()
# Total hits: 15, Total calls: 15, Total: 30
@@ -141,9 +137,7 @@ class TestSovereigntyPulse:
mock_store = MagicMock()
mock_store.get_snapshot.return_value = mock_snapshot
with patch(
"timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store
):
with patch("timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store):
snap = pulse.snapshot()
assert snap.overall_pct == 0.0

View File

@@ -148,9 +148,7 @@ class TestScoreScope:
assert score_meta < score_plain
def test_max_is_three(self):
score = _score_scope(
"Fix it", "See src/foo.py and `def bar()` method here", set()
)
score = _score_scope("Fix it", "See src/foo.py and `def bar()` method here", set())
assert score <= 3
@@ -293,9 +291,7 @@ class TestScoreIssue:
assert issue.is_unassigned is True
def test_blocked_issue_detected(self):
raw = _make_raw_issue(
title="Fix blocked deployment", body="Blocked by infra team."
)
raw = _make_raw_issue(title="Fix blocked deployment", body="Blocked by infra team.")
issue = score_issue(raw)
assert issue.is_blocked is True
@@ -421,9 +417,7 @@ class TestBuildAuditComment:
assert KIMI_READY_LABEL in comment
def test_flag_alex_comment(self):
d = TriageDecision(
issue_number=3, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
)
d = TriageDecision(issue_number=3, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked")
comment = _build_audit_comment(d)
assert OWNER_LOGIN in comment
@@ -531,9 +525,7 @@ class TestExecuteDecisionLive:
mock_client = AsyncMock()
mock_client.post.return_value = comment_resp
d = TriageDecision(
issue_number=12, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
)
d = TriageDecision(issue_number=12, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked")
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.gitea_token = "tok"
@@ -613,10 +605,7 @@ class TestBacklogTriageLoop:
_make_raw_issue(
number=100,
title="[bug] crash in src/timmy/agent.py",
body=(
"## Problem\nCrashes. Expected: runs. "
"Must pass pytest. Should return 200."
),
body=("## Problem\nCrashes. Expected: runs. Must pass pytest. Should return 200."),
labels=["bug"],
assignees=[],
)

View File

@@ -242,7 +242,9 @@ class TestGetOrCreateLabel:
client = MagicMock()
client.get = AsyncMock(return_value=mock_resp)
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
result = await _get_or_create_label(
client, "http://git", {"Authorization": "token x"}, "owner/repo"
)
assert result == 42
@pytest.mark.asyncio
@@ -261,7 +263,9 @@ class TestGetOrCreateLabel:
client.get = AsyncMock(return_value=list_resp)
client.post = AsyncMock(return_value=create_resp)
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
result = await _get_or_create_label(
client, "http://git", {"Authorization": "token x"}, "owner/repo"
)
assert result == 99
@pytest.mark.asyncio
@@ -518,7 +522,9 @@ class TestIndexKimiArtifact:
mock_entry = MagicMock()
mock_entry.id = "mem-123"
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
with patch(
"timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock
) as mock_thread:
mock_thread.return_value = mock_entry
result = await index_kimi_artifact(42, "My Research", "Some research content here")
@@ -529,7 +535,9 @@ class TestIndexKimiArtifact:
async def test_exception_returns_failure(self):
from timmy.kimi_delegation import index_kimi_artifact
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
with patch(
"timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock
) as mock_thread:
mock_thread.side_effect = Exception("DB error")
result = await index_kimi_artifact(42, "title", "some content")
@@ -634,8 +642,15 @@ class TestDelegateResearchToKimi:
"timmy.kimi_delegation.create_kimi_research_issue",
new_callable=AsyncMock,
) as mock_create:
mock_create.return_value = {"success": True, "issue_number": 7, "issue_url": "http://x", "error": None}
result = await delegate_research_to_kimi("Research X", "ctx", "What is X?", priority="high")
mock_create.return_value = {
"success": True,
"issue_number": 7,
"issue_url": "http://x",
"error": None,
}
result = await delegate_research_to_kimi(
"Research X", "ctx", "What is X?", priority="high"
)
assert result["success"] is True
assert result["issue_number"] == 7

View File

@@ -841,11 +841,7 @@ class TestEdgeCases:
def test_metadata_with_nested_structure(self, patched_db):
"""Test storing metadata with nested structure."""
metadata = {
"level1": {
"level2": {
"level3": ["item1", "item2"]
}
},
"level1": {"level2": {"level3": ["item1", "item2"]}},
"number": 42,
"boolean": True,
"null": None,

View File

@@ -43,7 +43,10 @@ class TestVassalCycleRecord:
record.dispatched_to_claude = 3
record.dispatched_to_kimi = 1
record.dispatched_to_timmy = 2
assert record.dispatched_to_claude + record.dispatched_to_kimi + record.dispatched_to_timmy == 6
assert (
record.dispatched_to_claude + record.dispatched_to_kimi + record.dispatched_to_timmy
== 6
)
# ---------------------------------------------------------------------------
@@ -137,10 +140,22 @@ class TestRunCycle:
orch = VassalOrchestrator(cycle_interval=0)
with (
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast", new_callable=AsyncMock),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast",
new_callable=AsyncMock,
),
):
await orch.run_cycle()
await orch.run_cycle()
@@ -152,10 +167,22 @@ class TestRunCycle:
orch = VassalOrchestrator(cycle_interval=0)
with (
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast", new_callable=AsyncMock),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast",
new_callable=AsyncMock,
),
):
record = await orch.run_cycle()
@@ -366,7 +393,9 @@ class TestStepHouseHealth:
snapshot.disk = MagicMock()
snapshot.disk.percent_used = 50.0
with patch("timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)):
with patch(
"timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)
):
await orch._step_house_health(record)
assert record.house_warnings == ["low disk", "high cpu"]
@@ -384,7 +413,9 @@ class TestStepHouseHealth:
mock_cleanup = AsyncMock(return_value={"deleted_count": 7})
with (
patch("timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)),
patch(
"timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)
),
patch("timmy.vassal.house_health.cleanup_stale_files", mock_cleanup),
):
await orch._step_house_health(record)

View File

@@ -38,6 +38,7 @@ from timmy.quest_system import (
# Helpers
# ---------------------------------------------------------------------------
def _make_quest(
quest_id: str = "test_quest",
quest_type: QuestType = QuestType.ISSUE_COUNT,
@@ -77,6 +78,7 @@ def clean_state():
# QuestDefinition
# ---------------------------------------------------------------------------
class TestQuestDefinition:
def test_from_dict_minimal(self):
data = {"id": "q1"}
@@ -123,6 +125,7 @@ class TestQuestDefinition:
# QuestProgress
# ---------------------------------------------------------------------------
class TestQuestProgress:
def test_to_dict_roundtrip(self):
progress = QuestProgress(
@@ -158,6 +161,7 @@ class TestQuestProgress:
# _get_progress_key
# ---------------------------------------------------------------------------
def test_get_progress_key():
assert _get_progress_key("q1", "agent_a") == "agent_a:q1"
@@ -172,6 +176,7 @@ def test_get_progress_key_different_agents():
# load_quest_config
# ---------------------------------------------------------------------------
class TestLoadQuestConfig:
def test_missing_file_returns_empty(self, tmp_path):
missing = tmp_path / "nonexistent.yaml"
@@ -252,6 +257,7 @@ quests:
# get_quest_definitions / get_quest_definition / get_active_quests
# ---------------------------------------------------------------------------
class TestQuestLookup:
def setup_method(self):
q1 = _make_quest("q1", enabled=True)
@@ -282,6 +288,7 @@ class TestQuestLookup:
# _get_target_value
# ---------------------------------------------------------------------------
class TestGetTargetValue:
def test_issue_count(self):
q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 7})
@@ -316,6 +323,7 @@ class TestGetTargetValue:
# get_or_create_progress / get_quest_progress
# ---------------------------------------------------------------------------
class TestProgressCreation:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 5})
@@ -352,6 +360,7 @@ class TestProgressCreation:
# update_quest_progress
# ---------------------------------------------------------------------------
class TestUpdateQuestProgress:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 3})
@@ -398,6 +407,7 @@ class TestUpdateQuestProgress:
# _is_on_cooldown
# ---------------------------------------------------------------------------
class TestIsOnCooldown:
def test_non_repeatable_never_on_cooldown(self):
quest = _make_quest(repeatable=False, cooldown_hours=24)
@@ -466,6 +476,7 @@ class TestIsOnCooldown:
# claim_quest_reward
# ---------------------------------------------------------------------------
class TestClaimQuestReward:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=25)
@@ -553,7 +564,9 @@ class TestClaimQuestReward:
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
with patch("timmy.quest_system.create_invoice_entry", side_effect=Exception("ledger error")):
with patch(
"timmy.quest_system.create_invoice_entry", side_effect=Exception("ledger error")
):
result = claim_quest_reward("q1", "agent_a")
assert result is None
@@ -563,10 +576,13 @@ class TestClaimQuestReward:
# check_issue_count_quest
# ---------------------------------------------------------------------------
class TestCheckIssueCountQuest:
def setup_method(self):
qs._quest_definitions["iq"] = _make_quest(
"iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 2, "issue_labels": ["bug"]}
"iq",
quest_type=QuestType.ISSUE_COUNT,
criteria={"target_count": 2, "issue_labels": ["bug"]},
)
def test_counts_matching_issues(self):
@@ -575,9 +591,7 @@ class TestCheckIssueCountQuest:
{"labels": [{"name": "bug"}, {"name": "priority"}]},
{"labels": [{"name": "feature"}]}, # doesn't match
]
progress = check_issue_count_quest(
qs._quest_definitions["iq"], "agent_a", issues
)
progress = check_issue_count_quest(qs._quest_definitions["iq"], "agent_a", issues)
assert progress.current_value == 2
assert progress.status == QuestStatus.COMPLETED
@@ -604,6 +618,7 @@ class TestCheckIssueCountQuest:
# check_issue_reduce_quest
# ---------------------------------------------------------------------------
class TestCheckIssueReduceQuest:
def setup_method(self):
qs._quest_definitions["ir"] = _make_quest(
@@ -628,6 +643,7 @@ class TestCheckIssueReduceQuest:
# check_daily_run_quest
# ---------------------------------------------------------------------------
class TestCheckDailyRunQuest:
def setup_method(self):
qs._quest_definitions["dr"] = _make_quest(
@@ -649,6 +665,7 @@ class TestCheckDailyRunQuest:
# evaluate_quest_progress
# ---------------------------------------------------------------------------
class TestEvaluateQuestProgress:
def setup_method(self):
qs._quest_definitions["iq"] = _make_quest(
@@ -695,7 +712,13 @@ class TestEvaluateQuestProgress:
assert result is None
def test_cooldown_prevents_evaluation(self):
q = _make_quest("rep_iq", quest_type=QuestType.ISSUE_COUNT, repeatable=True, cooldown_hours=24, criteria={"target_count": 1})
q = _make_quest(
"rep_iq",
quest_type=QuestType.ISSUE_COUNT,
repeatable=True,
cooldown_hours=24,
criteria={"target_count": 1},
)
qs._quest_definitions["rep_iq"] = q
progress = get_or_create_progress("rep_iq", "agent_a")
recent = datetime.now(UTC) - timedelta(hours=1)
@@ -711,6 +734,7 @@ class TestEvaluateQuestProgress:
# reset_quest_progress
# ---------------------------------------------------------------------------
class TestResetQuestProgress:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1")
@@ -755,6 +779,7 @@ class TestResetQuestProgress:
# get_quest_leaderboard
# ---------------------------------------------------------------------------
class TestGetQuestLeaderboard:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)
@@ -798,6 +823,7 @@ class TestGetQuestLeaderboard:
# get_agent_quests_status
# ---------------------------------------------------------------------------
class TestGetAgentQuestsStatus:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)

View File

@@ -1,4 +1,4 @@
"""Unit tests for src/timmy/research.py — ResearchOrchestrator pipeline.
"""Unit tests for src/timmy/research/ — ResearchOrchestrator pipeline.
Refs #972 (governing spec), #975 (ResearchOrchestrator).
"""
@@ -22,7 +22,7 @@ class TestListTemplates:
def test_returns_list(self, tmp_path, monkeypatch):
(tmp_path / "tool_evaluation.md").write_text("---\n---\n# T")
(tmp_path / "game_analysis.md").write_text("---\n---\n# G")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import list_templates
@@ -32,7 +32,7 @@ class TestListTemplates:
assert "game_analysis" in result
def test_returns_empty_when_dir_missing(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path / "nonexistent")
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path / "nonexistent")
from timmy.research import list_templates
@@ -54,7 +54,7 @@ class TestLoadTemplate:
"tool_evaluation",
"---\nname: Tool Evaluation\ntype: research\n---\n# Tool Eval: {domain}",
)
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -64,7 +64,7 @@ class TestLoadTemplate:
def test_fills_slots(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "arch", "Connect {system_a} to {system_b}")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -74,7 +74,7 @@ class TestLoadTemplate:
def test_unfilled_slots_preserved(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "t", "Hello {name} and {other}")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -82,7 +82,7 @@ class TestLoadTemplate:
assert "{other}" in result
def test_raises_file_not_found_for_missing_template(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -91,7 +91,7 @@ class TestLoadTemplate:
def test_no_slots_returns_raw_body(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "plain", "---\n---\nJust text here")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -109,7 +109,7 @@ class TestCheckCache:
mock_mem = MagicMock()
mock_mem.search.return_value = []
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
with patch("timmy.research.coordinator.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("some topic")
@@ -121,7 +121,7 @@ class TestCheckCache:
mock_mem = MagicMock()
mock_mem.search.return_value = [("cached report text", 0.91)]
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
with patch("timmy.research.coordinator.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("same topic")
@@ -133,7 +133,7 @@ class TestCheckCache:
mock_mem = MagicMock()
mock_mem.search.return_value = [("old report", 0.60)]
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
with patch("timmy.research.coordinator.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("slightly different topic")
@@ -142,7 +142,7 @@ class TestCheckCache:
assert score == 0.0
def test_degrades_gracefully_on_import_error(self):
with patch("timmy.research.SemanticMemory", None):
with patch("timmy.research.coordinator.SemanticMemory", None):
from timmy.research import _check_cache
content, score = _check_cache("topic")
@@ -160,7 +160,7 @@ class TestStoreResult:
def test_calls_store_memory(self):
mock_store = MagicMock()
with patch("timmy.research.store_memory", mock_store):
with patch("timmy.research.coordinator.store_memory", mock_store):
from timmy.research import _store_result
_store_result("test topic", "# Report\n\nContent here.")
@@ -171,7 +171,7 @@ class TestStoreResult:
def test_degrades_gracefully_on_error(self):
mock_store = MagicMock(side_effect=RuntimeError("db error"))
with patch("timmy.research.store_memory", mock_store):
with patch("timmy.research.coordinator.store_memory", mock_store):
from timmy.research import _store_result
# Should not raise
@@ -185,7 +185,7 @@ class TestStoreResult:
class TestSaveToDisk:
def test_writes_file(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
monkeypatch.setattr("timmy.research.coordinator._DOCS_ROOT", tmp_path / "research")
from timmy.research import _save_to_disk
@@ -195,7 +195,7 @@ class TestSaveToDisk:
assert path.read_text() == "# Test Report"
def test_slugifies_topic_name(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
monkeypatch.setattr("timmy.research.coordinator._DOCS_ROOT", tmp_path / "research")
from timmy.research import _save_to_disk
@@ -207,7 +207,7 @@ class TestSaveToDisk:
def test_returns_none_on_error(self, monkeypatch):
monkeypatch.setattr(
"timmy.research._DOCS_ROOT",
"timmy.research.coordinator._DOCS_ROOT",
Path("/nonexistent_root/deeply/nested"),
)
@@ -229,7 +229,7 @@ class TestRunResearch:
async def test_returns_cached_result_when_cache_hit(self):
cached_report = "# Cached Report\n\nPreviously computed."
with (
patch("timmy.research._check_cache", return_value=(cached_report, 0.93)),
patch("timmy.research.coordinator._check_cache", return_value=(cached_report, 0.93)),
):
from timmy.research import run_research
@@ -242,21 +242,23 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_skips_cache_when_requested(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=("cached", 0.99)) as mock_cache,
patch(
"timmy.research._formulate_queries",
"timmy.research.coordinator._check_cache", return_value=("cached", 0.99)
) as mock_cache,
patch(
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Fresh report", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research
@@ -268,21 +270,21 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_full_pipeline_no_search_results(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch("timmy.research.coordinator._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["query 1", "query 2"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Report", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research
@@ -296,21 +298,21 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_returns_result_with_error_on_bad_template(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch("timmy.research.coordinator._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Report", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research
@@ -321,22 +323,22 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_saves_to_disk_when_requested(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._DOCS_ROOT", tmp_path / "research")
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch("timmy.research.coordinator._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Saved Report", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research
@@ -349,21 +351,21 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_result_is_not_empty_after_synthesis(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch("timmy.research.coordinator._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["q"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Non-empty", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research

View File

@@ -40,9 +40,7 @@ class TestGoogleWebSearch:
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
result = await google_web_search("python tutorial")
mock_search_cls.assert_called_once_with(
{"q": "python tutorial", "api_key": "test-key-123"}
)
mock_search_cls.assert_called_once_with({"q": "python tutorial", "api_key": "test-key-123"})
assert "Hello" in result
@pytest.mark.asyncio

View File

@@ -175,7 +175,7 @@ class TestGatherSovereigntyData:
delta = data["deltas"].get("cache_hit_rate")
assert delta is not None
assert delta["start"] == 0.1 # oldest in window
assert delta["end"] == 0.5 # most recent
assert delta["end"] == 0.5 # most recent
assert data["previous_session"]["cache_hit_rate"] == 0.3
def test_single_data_point_no_delta(self):
@@ -334,7 +334,9 @@ class TestCommitReport:
assert result is True
mock_client.put.assert_called_once()
call_kwargs = mock_client.put.call_args
payload = call_kwargs.kwargs.get("json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {})
payload = call_kwargs.kwargs.get(
"json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {}
)
decoded = base64.b64decode(payload["content"]).decode()
assert "# report content" in decoded

View File

@@ -224,9 +224,11 @@ class TestConsultGrok:
mock_settings = MagicMock()
mock_settings.grok_free = True
with patch("timmy.backends.grok_available", return_value=True), \
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
patch("config.settings", mock_settings):
with (
patch("timmy.backends.grok_available", return_value=True),
patch("timmy.backends.get_grok_backend", return_value=mock_backend),
patch("config.settings", mock_settings),
):
result = consult_grok("What is 2+2?")
assert result == "Answer text"
@@ -240,10 +242,12 @@ class TestConsultGrok:
mock_settings = MagicMock()
mock_settings.grok_free = True
with patch("timmy.backends.grok_available", return_value=True), \
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
patch("config.settings", mock_settings), \
patch.dict("sys.modules", {"spark.engine": None}):
with (
patch("timmy.backends.grok_available", return_value=True),
patch("timmy.backends.get_grok_backend", return_value=mock_backend),
patch("config.settings", mock_settings),
patch.dict("sys.modules", {"spark.engine": None}),
):
result = consult_grok("hello")
assert result == "ok"
@@ -262,10 +266,12 @@ class TestConsultGrok:
mock_ln_backend.create_invoice.side_effect = OSError("LN down")
mock_lightning.get_backend.return_value = mock_ln_backend
with patch("timmy.backends.grok_available", return_value=True), \
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
patch("config.settings", mock_settings), \
patch.dict("sys.modules", {"lightning.factory": mock_lightning}):
with (
patch("timmy.backends.grok_available", return_value=True),
patch("timmy.backends.get_grok_backend", return_value=mock_backend),
patch("config.settings", mock_settings),
patch.dict("sys.modules", {"lightning.factory": mock_lightning}),
):
result = consult_grok("expensive query")
assert "Error" in result
@@ -313,7 +319,9 @@ class TestWebFetch:
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = None
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com")
assert "Error: could not extract" in result
@@ -329,7 +337,9 @@ class TestWebFetch:
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = long_text
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com", max_tokens=100)
assert "[…truncated" in result
@@ -345,7 +355,9 @@ class TestWebFetch:
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = "Hello"
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com")
assert result == "Hello"
@@ -358,7 +370,9 @@ class TestWebFetch:
mock_requests.get.side_effect = exc_mod.Timeout("timed out")
mock_trafilatura = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com")
assert "timed out" in result
@@ -375,7 +389,9 @@ class TestWebFetch:
)
mock_trafilatura = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com/nope")
assert "404" in result
@@ -388,7 +404,9 @@ class TestWebFetch:
mock_requests.get.side_effect = exc_mod.RequestException("connection refused")
mock_trafilatura = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com")
assert "Error" in result
@@ -404,7 +422,9 @@ class TestWebFetch:
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = "content"
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("http://example.com")
assert result == "content"

View File

@@ -178,9 +178,7 @@ class TestScrapeUrl:
def test_sync_result_returned_immediately(self):
"""If Crawl4AI returns results in the POST response, use them directly."""
mock_data = {
"results": [{"markdown": "# Hello\n\nThis is the page content."}]
}
mock_data = {"results": [{"markdown": "# Hello\n\nThis is the page content."}]}
mock_req = _mock_requests(json_response=mock_data)
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:

View File

@@ -20,32 +20,36 @@ class TestIsAppleSilicon:
def test_returns_true_on_arm64_darwin(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Darwin"), patch(
"platform.machine", return_value="arm64"
with (
patch("platform.system", return_value="Darwin"),
patch("platform.machine", return_value="arm64"),
):
assert is_apple_silicon() is True
def test_returns_false_on_intel_mac(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Darwin"), patch(
"platform.machine", return_value="x86_64"
with (
patch("platform.system", return_value="Darwin"),
patch("platform.machine", return_value="x86_64"),
):
assert is_apple_silicon() is False
def test_returns_false_on_linux(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Linux"), patch(
"platform.machine", return_value="x86_64"
with (
patch("platform.system", return_value="Linux"),
patch("platform.machine", return_value="x86_64"),
):
assert is_apple_silicon() is False
def test_returns_false_on_windows(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Windows"), patch(
"platform.machine", return_value="AMD64"
with (
patch("platform.system", return_value="Windows"),
patch("platform.machine", return_value="AMD64"),
):
assert is_apple_silicon() is False
@@ -96,7 +100,9 @@ class TestAirLLMGracefulDegradation:
raise ImportError("No module named 'airllm'")
return original_import(name, *args, **kwargs)
original_import = __builtins__["__import__"] if isinstance(__builtins__, dict) else __import__
original_import = (
__builtins__["__import__"] if isinstance(__builtins__, dict) else __import__
)
with (
patch("timmy.backends.is_apple_silicon", return_value=True),

View File

@@ -197,9 +197,7 @@ class TestExtractClip:
@pytest.mark.asyncio
async def test_uses_default_highlight_id_when_missing(self):
with patch("content.extraction.clipper._ffmpeg_available", return_value=False):
result = await extract_clip(
{"source_path": "/a.mp4", "start_time": 0, "end_time": 5}
)
result = await extract_clip({"source_path": "/a.mp4", "start_time": 0, "end_time": 5})
assert result.highlight_id == "unknown"

View File

@@ -22,7 +22,9 @@ class TestSha256File:
result = _sha256_file(str(f))
assert isinstance(result, str)
assert len(result) == 64 # SHA-256 hex is 64 chars
assert result == "b94d27b9934d3e08a52e52d7da7dabfac484efe04294e576b4b4857ad9c2f37"[0:0] or True
assert (
result == "b94d27b9934d3e08a52e52d7da7dabfac484efe04294e576b4b4857ad9c2f37"[0:0] or True
)
def test_consistent_for_same_content(self, tmp_path):
f = tmp_path / "test.bin"
@@ -51,9 +53,7 @@ class TestSha256File:
class TestPublishEpisode:
@pytest.mark.asyncio
async def test_returns_failure_when_video_missing(self, tmp_path):
result = await publish_episode(
str(tmp_path / "nonexistent.mp4"), "Title"
)
result = await publish_episode(str(tmp_path / "nonexistent.mp4"), "Title")
assert result.success is False
assert "not found" in result.error

View File

@@ -42,11 +42,7 @@ def test_model_size_unknown_returns_default(monitor):
def test_read_battery_watts_on_battery(monitor):
ioreg_output = (
"{\n"
' "InstantAmperage" = 2500\n'
' "Voltage" = 12000\n'
' "ExternalConnected" = No\n'
"}"
'{\n "InstantAmperage" = 2500\n "Voltage" = 12000\n "ExternalConnected" = No\n}'
)
mock_result = MagicMock()
mock_result.stdout = ioreg_output
@@ -60,11 +56,7 @@ def test_read_battery_watts_on_battery(monitor):
def test_read_battery_watts_plugged_in_returns_zero(monitor):
ioreg_output = (
"{\n"
' "InstantAmperage" = 1000\n'
' "Voltage" = 12000\n'
' "ExternalConnected" = Yes\n'
"}"
'{\n "InstantAmperage" = 1000\n "Voltage" = 12000\n "ExternalConnected" = Yes\n}'
)
mock_result = MagicMock()
mock_result.stdout = ioreg_output
@@ -85,10 +77,7 @@ def test_read_battery_watts_subprocess_failure_raises(monitor):
def test_read_cpu_pct_parses_top(monitor):
top_output = (
"Processes: 450 total\n"
"CPU usage: 15.2% user, 8.8% sys, 76.0% idle\n"
)
top_output = "Processes: 450 total\nCPU usage: 15.2% user, 8.8% sys, 76.0% idle\n"
mock_result = MagicMock()
mock_result.stdout = top_output

View File

@@ -516,9 +516,7 @@ class TestCountActiveKimiIssues:
resp.json.return_value = []
mock_client.get.return_value = resp
await _count_active_kimi_issues(
mock_client, "http://gitea.local/api/v1", {}, "owner/repo"
)
await _count_active_kimi_issues(mock_client, "http://gitea.local/api/v1", {}, "owner/repo")
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs["params"]["state"] == "open"
assert call_kwargs["params"]["labels"] == KIMI_READY_LABEL
@@ -557,9 +555,7 @@ class TestKimiCapEnforcement:
async def test_cap_reached_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
async_ctx = self._make_async_client(
[{"name": "kimi-ready", "id": 7}], issue_count=3
)
async_ctx = self._make_async_client([{"name": "kimi-ready", "id": 7}], issue_count=3)
with (
patch("config.settings", self._make_settings()),
@@ -575,9 +571,7 @@ class TestKimiCapEnforcement:
async def test_cap_exceeded_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
async_ctx = self._make_async_client(
[{"name": "kimi-ready", "id": 7}], issue_count=5
)
async_ctx = self._make_async_client([{"name": "kimi-ready", "id": 7}], issue_count=5)
with (
patch("config.settings", self._make_settings()),

View File

@@ -77,7 +77,7 @@ class TestSchnorrVerify:
kp = generate_keypair()
msg = b"\x00" * 32
sig = schnorr_sign(msg, kp.privkey_bytes)
bad_msg = b"\xFF" * 32
bad_msg = b"\xff" * 32
assert schnorr_verify(bad_msg, kp.pubkey_bytes, sig) is False
def test_wrong_lengths_return_false(self):

View File

@@ -1,6 +1,5 @@
"""Unit tests for infrastructure.self_correction."""
import pytest
# ---------------------------------------------------------------------------
@@ -192,14 +191,22 @@ class TestGetPatterns:
from infrastructure.self_correction import get_patterns, log_self_correction
log_self_correction(
source="test", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o",
error_type="Foo", outcome_status="success",
source="test",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="Foo",
outcome_status="success",
)
log_self_correction(
source="test", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o",
error_type="Foo", outcome_status="failed",
source="test",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="Foo",
outcome_status="failed",
)
patterns = get_patterns(top_n=5)
foo = next(p for p in patterns if p["error_type"] == "Foo")
@@ -211,13 +218,21 @@ class TestGetPatterns:
for _ in range(2):
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", error_type="Rare",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="Rare",
)
for _ in range(5):
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", error_type="Common",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="Common",
)
patterns = get_patterns(top_n=5)
assert patterns[0]["error_type"] == "Common"
@@ -240,12 +255,20 @@ class TestGetStats:
from infrastructure.self_correction import get_stats, log_self_correction
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", outcome_status="success",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
outcome_status="success",
)
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", outcome_status="failed",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
outcome_status="failed",
)
stats = get_stats()
assert stats["total"] == 2
@@ -258,8 +281,12 @@ class TestGetStats:
for _ in range(4):
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", outcome_status="success",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
outcome_status="success",
)
stats = get_stats()
assert stats["success_rate"] == 100