1
0

Compare commits

...

6 Commits

Author SHA1 Message Date
Alexander Whitestone
03f9a42fbc feat: add ResearchOrchestrator pipeline (src/timmy/research.py)
Implements autonomous research pipeline that chains:
- Check local knowledge (semantic memory cache, confidence > 0.85)
- Generate queries via LLM cascade
- Web search (concurrent, deduplicated)
- Fetch top pages
- Synthesize structured report via LLM
- Crystallize results in semantic memory
- Write artifact (create Gitea issues from action items)

Includes full unit test suite (25 tests) covering all pipeline steps,
cache hits, graceful degradation, and Gitea integration.

Fixes #975

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 18:44:20 -04:00
Alexander Whitestone
3ab180b8a7 [claude] Add Gitea backup script (#990) (#996)
Co-authored-by: Alexander Whitestone <alexpaynex@gmail.com>
Co-committed-by: Alexander Whitestone <alexpaynex@gmail.com>
2026-03-22 22:36:51 +00:00
e24f49e58d [kimi] Add JSON validation guard to queue.json writes (#952) (#995) 2026-03-22 22:33:40 +00:00
1fa5cff5dc [kimi] Fix GITEA_API configuration in triage scripts (#951) (#994) 2026-03-22 22:28:23 +00:00
e255e7eb2a [kimi] Add docstrings to system.py route handlers (#940) (#992) 2026-03-22 22:12:36 +00:00
c3b6eb71c0 [kimi] Add docstrings to src/dashboard/routes/tasks.py (#939) (#991) 2026-03-22 22:08:28 +00:00
18 changed files with 1524 additions and 46 deletions

View File

@@ -17,8 +17,23 @@ REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
GITEA_API = "http://localhost:3000/api/v1"
REPO_SLUG = "rockachopa/Timmy-time-dashboard"
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
TAG_RE = re.compile(r"\[([^\]]+)\]")

83
scripts/gitea_backup.sh Executable file
View File

@@ -0,0 +1,83 @@
#!/bin/bash
# Gitea backup script — run on the VPS before any hardening changes.
# Usage: sudo bash scripts/gitea_backup.sh [off-site-dest]
#
# off-site-dest: optional rsync/scp destination for off-site copy
# e.g. user@backup-host:/backups/gitea/
#
# Refs: #971, #990
set -euo pipefail
BACKUP_DIR="/opt/gitea/backups"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
GITEA_CONF="/etc/gitea/app.ini"
GITEA_WORK_DIR="/var/lib/gitea"
OFFSITE_DEST="${1:-}"
echo "=== Gitea Backup — $TIMESTAMP ==="
# Ensure backup directory exists
mkdir -p "$BACKUP_DIR"
cd "$BACKUP_DIR"
# Run the dump
echo "[1/4] Running gitea dump..."
gitea dump -c "$GITEA_CONF"
# Find the newest zip (gitea dump names it gitea-dump-*.zip)
BACKUP_FILE=$(ls -t "$BACKUP_DIR"/gitea-dump-*.zip 2>/dev/null | head -1)
if [ -z "$BACKUP_FILE" ]; then
echo "ERROR: No backup zip found in $BACKUP_DIR"
exit 1
fi
BACKUP_SIZE=$(stat -c%s "$BACKUP_FILE" 2>/dev/null || stat -f%z "$BACKUP_FILE")
echo "[2/4] Backup created: $BACKUP_FILE ($BACKUP_SIZE bytes)"
if [ "$BACKUP_SIZE" -eq 0 ]; then
echo "ERROR: Backup file is 0 bytes"
exit 1
fi
# Lock down permissions
chmod 600 "$BACKUP_FILE"
# Verify contents
echo "[3/4] Verifying backup contents..."
CONTENTS=$(unzip -l "$BACKUP_FILE" 2>/dev/null || true)
check_component() {
if echo "$CONTENTS" | grep -q "$1"; then
echo " OK: $2"
else
echo " WARN: $2 not found in backup"
fi
}
check_component "gitea-db.sql" "Database dump"
check_component "gitea-repo" "Repositories"
check_component "custom" "Custom config"
check_component "app.ini" "app.ini"
# Off-site copy
if [ -n "$OFFSITE_DEST" ]; then
echo "[4/4] Copying to off-site: $OFFSITE_DEST"
rsync -avz "$BACKUP_FILE" "$OFFSITE_DEST"
echo " Off-site copy complete."
else
echo "[4/4] No off-site destination provided. Skipping."
echo " To copy later: scp $BACKUP_FILE user@backup-host:/backups/gitea/"
fi
echo ""
echo "=== Backup complete ==="
echo "File: $BACKUP_FILE"
echo "Size: $BACKUP_SIZE bytes"
echo ""
echo "To verify restore on a clean instance:"
echo " 1. Copy zip to test machine"
echo " 2. unzip $BACKUP_FILE"
echo " 3. gitea restore --from <extracted-dir> -c /etc/gitea/app.ini"
echo " 4. Verify repos and DB are intact"

View File

@@ -30,7 +30,22 @@ IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Default cycle duration in seconds (5 min); stale threshold = 2× this
@@ -187,7 +202,11 @@ def load_queue() -> list[dict]:
# Persist the cleaned queue so stale entries don't recur
_save_cleaned_queue(data, open_numbers)
return ready
except (json.JSONDecodeError, OSError):
except json.JSONDecodeError as exc:
print(f"[loop-guard] WARNING: Corrupt queue.json ({exc}) — returning empty queue")
return []
except OSError as exc:
print(f"[loop-guard] WARNING: Cannot read queue.json ({exc}) — returning empty queue")
return []

View File

@@ -20,11 +20,28 @@ from datetime import datetime, timezone
from pathlib import Path
# ── Config ──────────────────────────────────────────────────────────────
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
QUEUE_BACKUP_FILE = REPO_ROOT / ".loop" / "queue.json.bak"
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
CYCLE_RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
@@ -326,9 +343,38 @@ def run_triage() -> list[dict]:
ready = [s for s in scored if s["ready"]]
not_ready = [s for s in scored if not s["ready"]]
# Save backup before writing (if current file exists and is valid)
if QUEUE_FILE.exists():
try:
json.loads(QUEUE_FILE.read_text()) # Validate current file
QUEUE_BACKUP_FILE.write_text(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Current file is corrupt, don't overwrite backup
# Write new queue file
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n")
# Validate the write by re-reading and parsing
try:
json.loads(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError) as exc:
print(f"[triage] ERROR: queue.json validation failed: {exc}", file=sys.stderr)
# Restore from backup if available
if QUEUE_BACKUP_FILE.exists():
try:
backup_data = QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
QUEUE_FILE.write_text(backup_data)
print(f"[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError) as restore_exc:
print(f"[triage] ERROR: Backup restore failed: {restore_exc}", file=sys.stderr)
# Write empty list as last resort
QUEUE_FILE.write_text("[]\n")
else:
# No backup, write empty list
QUEUE_FILE.write_text("[]\n")
# Write retro entry
retro_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),

View File

@@ -56,11 +56,13 @@ async def self_modify_queue(request: Request):
@router.get("/swarm/mission-control", response_class=HTMLResponse)
async def mission_control(request: Request):
"""Render the swarm mission control dashboard page."""
return templates.TemplateResponse(request, "mission_control.html", {})
@router.get("/bugs", response_class=HTMLResponse)
async def bugs_page(request: Request):
"""Render the bug tracking page."""
return templates.TemplateResponse(
request,
"bugs.html",
@@ -75,16 +77,19 @@ async def bugs_page(request: Request):
@router.get("/self-coding", response_class=HTMLResponse)
async def self_coding(request: Request):
"""Render the self-coding automation status page."""
return templates.TemplateResponse(request, "self_coding.html", {"stats": {}})
@router.get("/hands", response_class=HTMLResponse)
async def hands_page(request: Request):
"""Render the hands (automation executions) page."""
return templates.TemplateResponse(request, "hands.html", {"executions": []})
@router.get("/creative/ui", response_class=HTMLResponse)
async def creative_ui(request: Request):
"""Render the creative UI playground page."""
return templates.TemplateResponse(request, "creative.html", {})

View File

@@ -145,6 +145,7 @@ async def tasks_page(request: Request):
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
"""Return HTMX partial for pending approval tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
@@ -164,6 +165,7 @@ async def tasks_pending(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
"""Return HTMX partial for active (approved/running/paused) tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
@@ -183,6 +185,7 @@ async def tasks_active(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
"""Return HTMX partial for completed/vetoed/failed tasks (last 50)."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
@@ -241,26 +244,31 @@ async def create_task_form(
@router.post("/tasks/{task_id}/approve", response_class=HTMLResponse)
async def approve_task(request: Request, task_id: str):
"""Approve a pending task and move it to active queue."""
return await _set_status(request, task_id, "approved")
@router.post("/tasks/{task_id}/veto", response_class=HTMLResponse)
async def veto_task(request: Request, task_id: str):
"""Veto a task, marking it as rejected."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/pause", response_class=HTMLResponse)
async def pause_task(request: Request, task_id: str):
"""Pause a running or approved task."""
return await _set_status(request, task_id, "paused")
@router.post("/tasks/{task_id}/cancel", response_class=HTMLResponse)
async def cancel_task(request: Request, task_id: str):
"""Cancel a task (marks as vetoed)."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/retry", response_class=HTMLResponse)
async def retry_task(request: Request, task_id: str):
"""Retry a failed/vetoed task by moving it back to approved."""
return await _set_status(request, task_id, "approved")
@@ -271,6 +279,7 @@ async def modify_task(
title: str = Form(...),
description: str = Form(""),
):
"""Update task title and description."""
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",

View File

@@ -7,7 +7,7 @@ without a running game server.
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import UTC, datetime
from infrastructure.world.interface import WorldInterface
@@ -81,9 +81,7 @@ class MockWorldAdapter(WorldInterface):
def act(self, command: CommandInput) -> ActionResult:
logger.debug("MockWorldAdapter.act(%s)", command.action)
self.action_log.append(
_ActionLog(command=command, timestamp=datetime.now(UTC))
)
self.action_log.append(_ActionLog(command=command, timestamp=datetime.now(UTC)))
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Mock executed: {command.action}",
@@ -92,8 +90,10 @@ class MockWorldAdapter(WorldInterface):
def speak(self, message: str, target: str | None = None) -> None:
logger.debug("MockWorldAdapter.speak(%r, target=%r)", message, target)
self.speech_log.append({
"message": message,
"target": target,
"timestamp": datetime.now(UTC).isoformat(),
})
self.speech_log.append(
{
"message": message,
"target": target,
"timestamp": datetime.now(UTC).isoformat(),
}
)

View File

@@ -35,14 +35,10 @@ class TES3MPWorldAdapter(WorldInterface):
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
raise NotImplementedError(
"TES3MPWorldAdapter.connect() — wire up TES3MP server socket"
)
raise NotImplementedError("TES3MPWorldAdapter.connect() — wire up TES3MP server socket")
def disconnect(self) -> None:
raise NotImplementedError(
"TES3MPWorldAdapter.disconnect() — close TES3MP server socket"
)
raise NotImplementedError("TES3MPWorldAdapter.disconnect() — close TES3MP server socket")
@property
def is_connected(self) -> bool:
@@ -51,9 +47,7 @@ class TES3MPWorldAdapter(WorldInterface):
# -- core contract (stubs) ---------------------------------------------
def observe(self) -> PerceptionOutput:
raise NotImplementedError(
"TES3MPWorldAdapter.observe() — poll TES3MP for player/NPC state"
)
raise NotImplementedError("TES3MPWorldAdapter.observe() — poll TES3MP for player/NPC state")
def act(self, command: CommandInput) -> ActionResult:
raise NotImplementedError(
@@ -61,6 +55,4 @@ class TES3MPWorldAdapter(WorldInterface):
)
def speak(self, message: str, target: str | None = None) -> None:
raise NotImplementedError(
"TES3MPWorldAdapter.speak() — send chat message via TES3MP"
)
raise NotImplementedError("TES3MPWorldAdapter.speak() — send chat message via TES3MP")

View File

@@ -27,14 +27,14 @@ class WorldInterface(ABC):
# -- lifecycle (optional overrides) ------------------------------------
def connect(self) -> None:
def connect(self) -> None: # noqa: B027
"""Establish connection to the game world.
Default implementation is a no-op. Override to open sockets,
authenticate, etc.
"""
def disconnect(self) -> None:
def disconnect(self) -> None: # noqa: B027
"""Tear down the connection.
Default implementation is a no-op.

View File

@@ -10,10 +10,10 @@ from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from enum import StrEnum
class ActionStatus(str, Enum):
class ActionStatus(StrEnum):
"""Outcome of an action dispatched to the world."""
SUCCESS = "success"

View File

@@ -17,7 +17,7 @@ from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import asdict, dataclass, field
from dataclasses import dataclass, field
from datetime import UTC, datetime
from loop.phase1_gather import gather
@@ -32,6 +32,7 @@ logger = logging.getLogger(__name__)
# Cycle log entry
# ---------------------------------------------------------------------------
@dataclass
class CycleRecord:
"""One observe → reason → act → reflect cycle."""
@@ -50,6 +51,7 @@ class CycleRecord:
# Heartbeat
# ---------------------------------------------------------------------------
class Heartbeat:
"""Manages the recurring cognitive loop with optional world adapter.
@@ -268,14 +270,17 @@ class Heartbeat:
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast("heartbeat.cycle", {
"cycle_id": record.cycle_id,
"timestamp": record.timestamp,
"action": record.action_taken,
"action_status": record.action_status,
"reasoning_summary": record.reasoning_summary[:300],
"observation": record.observation,
"duration_ms": record.duration_ms,
})
await ws_manager.broadcast(
"heartbeat.cycle",
{
"cycle_id": record.cycle_id,
"timestamp": record.timestamp,
"action": record.action_taken,
"action_status": record.action_status,
"reasoning_summary": record.reasoning_summary[:300],
"observation": record.observation,
"duration_ms": record.duration_ms,
},
)
except (ImportError, AttributeError, ConnectionError, RuntimeError) as exc:
logger.debug("Heartbeat broadcast skipped: %s", exc)

555
src/timmy/research.py Normal file
View File

@@ -0,0 +1,555 @@
"""ResearchOrchestrator — autonomous research pipeline.
Chains: Check Local → Generate Queries → Search → Fetch → Synthesize →
Crystallize → Write Artifact into an end-to-end research workflow.
Usage:
from timmy.research import ResearchOrchestrator, run_research
orchestrator = ResearchOrchestrator(cascade=router, memory=memory_fns)
result = await orchestrator.run("Bitcoin Lightning Network scaling")
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Data structures ──────────────────────────────────────────────────────────
CONFIDENCE_THRESHOLD = 0.85
DEFAULT_QUERIES_PER_TOPIC = 8
DEFAULT_RESULTS_PER_QUERY = 5
DEFAULT_PAGES_TO_FETCH = 10
DEFAULT_FETCH_TOKEN_LIMIT = 3000
DEFAULT_SYNTHESIS_MAX_TOKENS = 4000
@dataclass
class ResearchResult:
"""Output of a completed research pipeline run."""
topic: str
report: str
queries_generated: list[str] = field(default_factory=list)
sources: list[dict[str, str]] = field(default_factory=list)
action_items: list[str] = field(default_factory=list)
cache_hit: bool = False
duration_ms: float = 0.0
metrics: dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@dataclass
class SearchSnippet:
"""A single search result snippet."""
title: str
url: str
snippet: str
relevance: float = 0.0
@dataclass
class FetchedPage:
"""A fetched and truncated web page."""
url: str
title: str
content: str
token_estimate: int = 0
# ── Memory interface ─────────────────────────────────────────────────────────
@dataclass
class MemoryInterface:
"""Abstraction over the memory system for research.
Accepts callables so the orchestrator doesn't depend on a specific
memory implementation. Defaults wire to timmy.memory_system.
"""
search_fn: Any = None # (query, limit) -> list[MemoryEntry]
store_fn: Any = None # (content, source, context_type, ...) -> MemoryEntry
def __post_init__(self):
if self.search_fn is None or self.store_fn is None:
self._load_defaults()
def _load_defaults(self):
try:
from timmy.memory_system import search_memories, store_memory
if self.search_fn is None:
self.search_fn = search_memories
if self.store_fn is None:
self.store_fn = store_memory
except ImportError:
logger.warning("Memory system not available — research will skip caching")
if self.search_fn is None:
self.search_fn = lambda query, **kw: []
if self.store_fn is None:
self.store_fn = lambda content, source, **kw: None
# ── Tool interface ───────────────────────────────────────────────────────────
@dataclass
class ResearchTools:
"""Web search and fetch callables.
These are async callables:
web_search(query: str, limit: int) -> list[dict]
web_fetch(url: str, max_tokens: int) -> str
"""
web_search: Any = None
web_fetch: Any = None
# ── Orchestrator ─────────────────────────────────────────────────────────────
class ResearchOrchestrator:
"""Pipeline that chains research steps into an autonomous workflow.
Steps:
0. CHECK LOCAL KNOWLEDGE — search memory, return cached if confident
1. GENERATE QUERIES — ask LLM to produce search queries
2. SEARCH — execute queries via web_search tool
3. FETCH — rank snippets, fetch top pages
4. SYNTHESIZE — produce structured report via LLM
5. CRYSTALLIZE — store result in semantic memory
6. WRITE ARTIFACT — create Gitea issues from action items
"""
def __init__(
self,
cascade: Any,
memory: MemoryInterface | None = None,
tools: ResearchTools | None = None,
) -> None:
self.cascade = cascade
self.memory = memory or MemoryInterface()
self.tools = tools or ResearchTools()
self._metrics: dict[str, int] = {
"research_cache_hit": 0,
"research_api_call": 0,
}
async def run(
self,
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Execute the full research pipeline.
Args:
topic: The research topic or question.
template: Optional prompt template for synthesis.
context: Additional context dict (cascade_tier hint, etc.).
Returns:
ResearchResult with report, sources, and action items.
"""
start = time.monotonic()
context = context or {}
cascade_tier = context.get("cascade_tier")
# Step 0: Check local knowledge
cached = await self._check_local_knowledge(topic)
if cached is not None:
self._metrics["research_cache_hit"] += 1
cached.duration_ms = (time.monotonic() - start) * 1000
return cached
self._metrics["research_api_call"] += 1
# Step 1: Generate queries
queries = await self._generate_queries(topic, template, cascade_tier)
# Step 2: Search
snippets = await self._search(queries)
# Step 3: Fetch top pages
pages = await self._fetch(snippets)
# Step 4: Synthesize
report = await self._synthesize(topic, template, pages, cascade_tier)
# Step 5: Extract action items
action_items = _extract_action_items(report)
# Build result
sources = [{"url": p.url, "title": p.title} for p in pages]
result = ResearchResult(
topic=topic,
report=report,
queries_generated=queries,
sources=sources,
action_items=action_items,
cache_hit=False,
duration_ms=(time.monotonic() - start) * 1000,
metrics=dict(self._metrics),
)
# Step 6: Crystallize — store in memory
await self._crystallize(topic, result)
# Step 7: Write artifact — create Gitea issues
await self._write_artifact(result)
return result
# ── Pipeline steps ───────────────────────────────────────────────────
async def _check_local_knowledge(self, topic: str) -> ResearchResult | None:
"""Search semantic memory for existing research on this topic."""
try:
results = self.memory.search_fn(
query=topic, limit=10, context_type="research"
)
if not results:
return None
# Check if top result has high confidence
top = results[0]
score = getattr(top, "relevance_score", 0.0) or 0.0
if score >= CONFIDENCE_THRESHOLD:
content = getattr(top, "content", str(top))
logger.info(
"Research cache hit for '%s' (score=%.2f)", topic, score
)
return ResearchResult(
topic=topic,
report=content,
cache_hit=True,
metrics={"research_cache_hit": 1},
)
except Exception as exc:
logger.warning("Local knowledge check failed: %s", exc)
return None
async def _generate_queries(
self,
topic: str,
template: str | None,
cascade_tier: str | None,
) -> list[str]:
"""Ask the LLM to generate search queries for the topic."""
prompt = (
f"Generate {DEFAULT_QUERIES_PER_TOPIC} diverse web search queries "
f"to thoroughly research the following topic. Return ONLY the "
f"queries, one per line, no numbering or bullets.\n\n"
f"Topic: {topic}"
)
if template:
prompt += f"\n\nResearch template context:\n{template}"
messages = [
{"role": "system", "content": "You are a research query generator."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {"messages": messages, "temperature": 0.7}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
raw = response.get("content", "")
queries = [
line.strip()
for line in raw.strip().splitlines()
if line.strip() and not line.strip().startswith("#")
]
# Clean numbering prefixes
cleaned = []
for q in queries:
q = re.sub(r"^\d+[\.\)]\s*", "", q)
q = re.sub(r"^[-*]\s*", "", q)
if q:
cleaned.append(q)
return cleaned[:DEFAULT_QUERIES_PER_TOPIC + 4] # slight over-generate
except Exception as exc:
logger.warning("Query generation failed: %s", exc)
# Fallback: use topic itself as a single query
return [topic]
async def _search(self, queries: list[str]) -> list[SearchSnippet]:
"""Execute search queries and collect snippets."""
if not self.tools.web_search:
logger.warning("No web_search tool configured — skipping search step")
return []
all_snippets: list[SearchSnippet] = []
async def _run_query(query: str) -> list[SearchSnippet]:
try:
results = await asyncio.to_thread(
self.tools.web_search, query, DEFAULT_RESULTS_PER_QUERY
)
snippets = []
for r in (results or []):
snippets.append(
SearchSnippet(
title=r.get("title", ""),
url=r.get("url", ""),
snippet=r.get("snippet", ""),
)
)
return snippets
except Exception as exc:
logger.warning("Search failed for query '%s': %s", query, exc)
return []
# Run searches concurrently
tasks = [_run_query(q) for q in queries]
results = await asyncio.gather(*tasks)
for snippets in results:
all_snippets.extend(snippets)
# Deduplicate by URL
seen_urls: set[str] = set()
unique: list[SearchSnippet] = []
for s in all_snippets:
if s.url and s.url not in seen_urls:
seen_urls.add(s.url)
unique.append(s)
return unique
async def _fetch(self, snippets: list[SearchSnippet]) -> list[FetchedPage]:
"""Fetch top pages from search snippets."""
if not self.tools.web_fetch:
logger.warning("No web_fetch tool configured — skipping fetch step")
return []
# Take top N snippets
to_fetch = snippets[:DEFAULT_PAGES_TO_FETCH]
pages: list[FetchedPage] = []
async def _fetch_one(snippet: SearchSnippet) -> FetchedPage | None:
try:
content = await asyncio.to_thread(
self.tools.web_fetch, snippet.url, DEFAULT_FETCH_TOKEN_LIMIT
)
if content:
return FetchedPage(
url=snippet.url,
title=snippet.title,
content=content[:DEFAULT_FETCH_TOKEN_LIMIT * 4],
token_estimate=len(content.split()),
)
except Exception as exc:
logger.warning("Fetch failed for %s: %s", snippet.url, exc)
return None
tasks = [_fetch_one(s) for s in to_fetch]
results = await asyncio.gather(*tasks)
for page in results:
if page is not None:
pages.append(page)
return pages
async def _synthesize(
self,
topic: str,
template: str | None,
pages: list[FetchedPage],
cascade_tier: str | None,
) -> str:
"""Synthesize fetched pages into a structured research report."""
# Build context from fetched pages
context_parts = []
for i, page in enumerate(pages, 1):
context_parts.append(
f"--- Source {i}: {page.title} ({page.url}) ---\n"
f"{page.content[:DEFAULT_FETCH_TOKEN_LIMIT * 4]}\n"
)
sources_text = "\n".join(context_parts) if context_parts else "(no sources fetched)"
if template:
prompt = (
f"{template}\n\n"
f"Topic: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Synthesize a comprehensive report based on the sources above."
)
else:
prompt = (
f"Write a comprehensive research report on: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Structure your report with:\n"
f"- Executive summary\n"
f"- Key findings\n"
f"- Analysis\n"
f"- Action items (prefix each with 'ACTION:')\n"
f"- Sources cited"
)
messages = [
{"role": "system", "content": "You are a research analyst producing structured reports."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {
"messages": messages,
"temperature": 0.3,
"max_tokens": DEFAULT_SYNTHESIS_MAX_TOKENS,
}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
return response.get("content", "")
except Exception as exc:
logger.error("Synthesis failed: %s", exc)
# Fallback: return raw source summaries
return (
f"# Research: {topic}\n\n"
f"Synthesis failed ({exc}). Raw sources:\n\n{sources_text}"
)
async def _crystallize(self, topic: str, result: ResearchResult) -> None:
"""Store the research result in semantic memory."""
try:
self.memory.store_fn(
content=result.report,
source="research_orchestrator",
context_type="research",
metadata={
"topic": topic,
"sources": result.sources,
"action_items": result.action_items,
"cache_hit": result.cache_hit,
"duration_ms": result.duration_ms,
},
)
logger.info("Crystallized research on '%s' into memory", topic)
except Exception as exc:
logger.warning("Failed to crystallize research: %s", exc)
async def _write_artifact(self, result: ResearchResult) -> None:
"""Create Gitea issues from action items."""
if not result.action_items:
return
try:
await asyncio.to_thread(_create_gitea_issues, result)
except Exception as exc:
logger.warning("Failed to create Gitea issues: %s", exc)
def get_metrics(self) -> dict[str, int]:
"""Return current research pipeline metrics."""
return dict(self._metrics)
# ── Helpers ──────────────────────────────────────────────────────────────────
def _extract_action_items(report: str) -> list[str]:
"""Extract action items from a research report.
Looks for lines prefixed with ACTION:, TODO:, or - [ ].
"""
items: list[str] = []
for line in report.splitlines():
stripped = line.strip()
# ACTION: prefix
match = re.match(r"^(?:ACTION|TODO)\s*:\s*(.+)", stripped, re.IGNORECASE)
if match:
items.append(match.group(1).strip())
continue
# Markdown checkbox
match = re.match(r"^-\s*\[\s*\]\s*(.+)", stripped)
if match:
items.append(match.group(1).strip())
return items
def _create_gitea_issues(result: ResearchResult) -> None:
"""Create Gitea issues for action items (runs in thread)."""
if not settings.gitea_token or not settings.gitea_url:
logger.debug("Gitea not configured — skipping issue creation")
return
try:
import requests
except ImportError:
logger.debug("requests not available — skipping Gitea issue creation")
return
base_url = settings.gitea_url.rstrip("/")
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
for item in result.action_items:
try:
payload = {
"title": f"[research] {item[:100]}",
"body": (
f"Auto-generated from research on: **{result.topic}**\n\n"
f"Action item: {item}\n\n"
f"---\n"
f"_Created by ResearchOrchestrator_"
),
}
resp = requests.post(
f"{base_url}/api/v1/repos/{repo}/issues",
headers=headers,
json=payload,
timeout=10,
)
if resp.status_code in (200, 201):
logger.info("Created Gitea issue: %s", item[:60])
else:
logger.warning(
"Gitea issue creation failed (%d): %s",
resp.status_code,
resp.text[:200],
)
except Exception as exc:
logger.warning("Failed to create issue '%s': %s", item[:60], exc)
# ── Convenience function ─────────────────────────────────────────────────────
async def run_research(
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Convenience function to run research with default dependencies.
Creates a ResearchOrchestrator with the cascade router singleton
and default memory, then executes the pipeline.
"""
from infrastructure.router.cascade import get_router
cascade = get_router()
orchestrator = ResearchOrchestrator(cascade=cascade)
return await orchestrator.run(topic, template=template, context=context)

View File

@@ -10,7 +10,6 @@ from infrastructure.world.types import (
PerceptionOutput,
)
# ---------------------------------------------------------------------------
# Type construction
# ---------------------------------------------------------------------------

View File

@@ -3,7 +3,6 @@
import pytest
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.interface import WorldInterface
from infrastructure.world.registry import AdapterRegistry

View File

@@ -6,7 +6,6 @@ Acceptance criteria:
- WebSocket broadcasts include current action and reasoning summary
"""
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
@@ -81,6 +80,7 @@ class TestHeartbeatWithAdapter:
@pytest.mark.asyncio
async def test_on_cycle_callback(self, mock_adapter):
received = []
async def callback(record):
received.append(record)
@@ -145,9 +145,7 @@ class TestHeartbeatBroadcast:
) as mock_ws:
mock_ws.broadcast = AsyncMock()
# Patch the import inside heartbeat
with patch(
"infrastructure.ws_manager.handler.ws_manager"
) as ws_mod:
with patch("infrastructure.ws_manager.handler.ws_manager") as ws_mod:
ws_mod.broadcast = AsyncMock()
hb = Heartbeat(world=mock_adapter)
await hb.run_once()

View File

@@ -0,0 +1,97 @@
"""Tests for load_queue corrupt JSON handling in loop_guard.py."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
import scripts.loop_guard as lg
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect loop_guard paths to tmp_path for isolation."""
monkeypatch.setattr(lg, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(lg, "IDLE_STATE_FILE", tmp_path / "idle_state.json")
monkeypatch.setattr(lg, "CYCLE_RESULT_FILE", tmp_path / "cycle_result.json")
monkeypatch.setattr(lg, "GITEA_API", "http://test:3000/api/v1")
monkeypatch.setattr(lg, "REPO_SLUG", "owner/repo")
def test_load_queue_missing_file(tmp_path):
"""Missing queue file returns empty list."""
result = lg.load_queue()
assert result == []
def test_load_queue_valid_data(tmp_path):
"""Valid queue.json returns ready items."""
data = [
{"issue": 1, "title": "Ready issue", "ready": True},
{"issue": 2, "title": "Not ready", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert len(result) == 1
assert result[0]["issue"] == 1
def test_load_queue_corrupt_json_logs_warning(tmp_path, capsys):
"""Corrupt queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("not valid json {{{")
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Corrupt queue.json" in captured.out
def test_load_queue_not_a_list(tmp_path):
"""Queue.json that is not a list returns empty list."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps({"not": "a list"}))
result = lg.load_queue()
assert result == []
def test_load_queue_no_ready_items(tmp_path):
"""Queue with no ready items returns empty list."""
data = [
{"issue": 1, "title": "Not ready 1", "ready": False},
{"issue": 2, "title": "Not ready 2", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert result == []
def test_load_queue_oserror_logs_warning(tmp_path, monkeypatch, capsys):
"""OSError when reading queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("[]")
# Mock Path.read_text to raise OSError
original_read_text = Path.read_text
def mock_read_text(self, *args, **kwargs):
if self.name == "queue.json":
raise OSError("Permission denied")
return original_read_text(self, *args, **kwargs)
monkeypatch.setattr(Path, "read_text", mock_read_text)
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Cannot read queue.json" in captured.out

View File

@@ -0,0 +1,159 @@
"""Tests for queue.json validation and backup in triage_score.py."""
from __future__ import annotations
import json
import pytest
import scripts.triage_score as ts
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect triage_score paths to tmp_path for isolation."""
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak")
monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl")
monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json")
monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl")
def test_backup_created_on_write(tmp_path):
"""When writing queue.json, a backup should be created from previous valid file."""
# Create initial valid queue file
initial_data = [{"issue": 1, "title": "Test", "ready": True}]
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text(json.dumps(initial_data))
# Write new data
new_data = [{"issue": 2, "title": "New", "ready": True}]
ts.QUEUE_FILE.write_text(json.dumps(new_data, indent=2) + "\n")
# Manually run the backup logic as run_triage would
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text())
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
# Both files should exist with same content
assert ts.QUEUE_BACKUP_FILE.exists()
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == new_data
def test_corrupt_queue_restored_from_backup(tmp_path, capsys):
"""If queue.json is corrupt, it should be restored from backup."""
# Create a valid backup
valid_data = [{"issue": 1, "title": "Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_data, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
ts.QUEUE_FILE.write_text(backup_data)
print("[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Queue should be restored from backup
assert json.loads(ts.QUEUE_FILE.read_text()) == valid_data
captured = capsys.readouterr()
assert "Restored queue.json from backup" in captured.out
def test_corrupt_queue_no_backup_writes_empty_list(tmp_path):
"""If queue.json is corrupt and no backup exists, write empty list."""
# Ensure no backup exists
assert not ts.QUEUE_BACKUP_FILE.exists()
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_corrupt_backup_writes_empty_list(tmp_path):
"""If both queue.json and backup are corrupt, write empty list."""
# Create a corrupt backup
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text("also corrupt backup")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_valid_queue_not_corrupt_no_backup_overwrite(tmp_path):
"""Don't overwrite backup if current queue.json is corrupt."""
# Create a valid backup
valid_backup = [{"issue": 99, "title": "Old Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_backup, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("corrupt data")
# Try to save backup (should skip because current is corrupt)
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text()) # This will fail
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Should hit this branch
# Backup should still have original valid data
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == valid_backup
def test_backup_path_configuration():
"""Ensure backup file path is properly configured relative to queue file."""
assert ts.QUEUE_BACKUP_FILE.parent == ts.QUEUE_FILE.parent
assert ts.QUEUE_BACKUP_FILE.name == "queue.json.bak"
assert ts.QUEUE_FILE.name == "queue.json"

497
tests/unit/test_research.py Normal file
View File

@@ -0,0 +1,497 @@
"""Unit tests for timmy.research — ResearchOrchestrator pipeline."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.research import (
DEFAULT_QUERIES_PER_TOPIC,
MemoryInterface,
ResearchOrchestrator,
ResearchResult,
ResearchTools,
SearchSnippet,
_extract_action_items,
)
# ── Data structures ──────────────────────────────────────────────────────────
class TestResearchResult:
def test_defaults(self):
r = ResearchResult(topic="test", report="content")
assert r.topic == "test"
assert r.report == "content"
assert r.cache_hit is False
assert r.queries_generated == []
assert r.sources == []
assert r.action_items == []
assert r.duration_ms == 0.0
assert r.timestamp # non-empty
def test_with_data(self):
r = ResearchResult(
topic="AI",
report="report text",
queries_generated=["q1", "q2"],
sources=[{"url": "http://example.com", "title": "Test"}],
action_items=["Do X"],
cache_hit=True,
duration_ms=42.5,
)
assert r.cache_hit is True
assert len(r.sources) == 1
assert r.duration_ms == 42.5
class TestSearchSnippet:
def test_fields(self):
s = SearchSnippet(title="T", url="http://x.com", snippet="text")
assert s.relevance == 0.0
# ── _extract_action_items ────────────────────────────────────────────────────
class TestExtractActionItems:
def test_action_prefix(self):
report = "Some text\nACTION: Do the thing\nMore text"
items = _extract_action_items(report)
assert items == ["Do the thing"]
def test_todo_prefix(self):
report = "TODO: Fix the bug\nTodo: Also this"
items = _extract_action_items(report)
assert items == ["Fix the bug", "Also this"]
def test_checkbox(self):
report = "- [ ] Implement feature\n- [x] Already done"
items = _extract_action_items(report)
assert items == ["Implement feature"]
def test_mixed(self):
report = "ACTION: First\n- [ ] Second\nTODO: Third"
items = _extract_action_items(report)
assert items == ["First", "Second", "Third"]
def test_empty(self):
assert _extract_action_items("No actions here") == []
assert _extract_action_items("") == []
# ── MemoryInterface ──────────────────────────────────────────────────────────
class TestMemoryInterface:
def test_custom_fns(self):
search = MagicMock(return_value=[])
store = MagicMock()
mi = MemoryInterface(search_fn=search, store_fn=store)
assert mi.search_fn is search
assert mi.store_fn is store
def test_defaults_when_import_fails(self):
with patch.dict("sys.modules", {"timmy.memory_system": None}):
mi = MemoryInterface()
# Should have fallback callables
assert callable(mi.search_fn)
assert callable(mi.store_fn)
# Fallback search returns empty
assert mi.search_fn("test") == []
# ── ResearchOrchestrator ─────────────────────────────────────────────────────
def _make_cascade(**overrides):
"""Create a mock cascade router."""
cascade = AsyncMock()
cascade.complete = AsyncMock(
return_value={"content": overrides.get("content", "query1\nquery2\nquery3")}
)
return cascade
def _make_memory(search_results=None, score=0.0):
"""Create a mock memory interface."""
if search_results is None:
search_results = []
search_fn = MagicMock(return_value=search_results)
store_fn = MagicMock()
return MemoryInterface(search_fn=search_fn, store_fn=store_fn)
def _make_tools(search_results=None, fetch_content="Page content"):
"""Create mock research tools."""
web_search = MagicMock(
return_value=search_results
or [
{"title": "Result 1", "url": "http://a.com", "snippet": "Snippet 1"},
{"title": "Result 2", "url": "http://b.com", "snippet": "Snippet 2"},
]
)
web_fetch = MagicMock(return_value=fetch_content)
return ResearchTools(web_search=web_search, web_fetch=web_fetch)
class TestResearchOrchestratorInit:
def test_basic_init(self):
cascade = _make_cascade()
memory = _make_memory()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
assert orch.cascade is cascade
assert orch.memory is memory
assert orch.tools is tools
assert orch._metrics["research_cache_hit"] == 0
assert orch._metrics["research_api_call"] == 0
class TestCheckLocalKnowledge:
@pytest.mark.asyncio
async def test_cache_hit(self):
"""High-confidence memory result returns cached ResearchResult."""
entry = MagicMock()
entry.relevance_score = 0.90
entry.content = "Cached report"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is not None
assert result.cache_hit is True
assert result.report == "Cached report"
@pytest.mark.asyncio
async def test_cache_miss_low_score(self):
"""Low-confidence result returns None."""
entry = MagicMock()
entry.relevance_score = 0.5
entry.content = "Weak match"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_cache_miss_empty(self):
"""No memory results returns None."""
memory = _make_memory(search_results=[])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_exception_returns_none(self):
"""Memory search exception returns None gracefully."""
memory = MemoryInterface(
search_fn=MagicMock(side_effect=RuntimeError("db error")),
store_fn=MagicMock(),
)
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
class TestGenerateQueries:
@pytest.mark.asyncio
async def test_parses_queries(self):
cascade = _make_cascade(content="query one\nquery two\nquery three")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("AI safety", None, None)
assert queries == ["query one", "query two", "query three"]
@pytest.mark.asyncio
async def test_strips_numbering(self):
cascade = _make_cascade(content="1. First query\n2. Second query\n3) Third")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("topic", None, None)
assert "First query" in queries
assert "Second query" in queries
assert "Third" in queries
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM down"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("fallback topic", None, None)
assert queries == ["fallback topic"]
@pytest.mark.asyncio
async def test_passes_cascade_tier(self):
cascade = _make_cascade(content="q1\nq2")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
await orch._generate_queries("topic", None, "gpt-4")
call_kwargs = cascade.complete.call_args.kwargs
assert call_kwargs.get("model") == "gpt-4"
class TestSearch:
@pytest.mark.asyncio
async def test_collects_snippets(self):
tools = _make_tools()
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1", "q2"])
# 2 results per query, 2 queries, but deduplicated by URL
assert len(snippets) == 2 # same URLs returned for both queries
@pytest.mark.asyncio
async def test_no_search_tool(self):
tools = ResearchTools(web_search=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
@pytest.mark.asyncio
async def test_search_error_handled(self):
tools = ResearchTools(
web_search=MagicMock(side_effect=RuntimeError("network error"))
)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
class TestFetch:
@pytest.mark.asyncio
async def test_fetches_pages(self):
tools = _make_tools(fetch_content="Page body here")
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = [
SearchSnippet(title="P1", url="http://a.com", snippet="s1"),
SearchSnippet(title="P2", url="http://b.com", snippet="s2"),
]
pages = await orch._fetch(snippets)
assert len(pages) == 2
assert pages[0].content == "Page body here"
@pytest.mark.asyncio
async def test_no_fetch_tool(self):
tools = ResearchTools(web_fetch=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
pages = await orch._fetch([SearchSnippet("T", "http://x.com", "s")])
assert pages == []
class TestSynthesize:
@pytest.mark.asyncio
async def test_produces_report(self):
cascade = _make_cascade(content="# Report\nKey findings here")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Report" in report
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM error"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Synthesis failed" in report
assert "topic" in report
class TestCrystallize:
@pytest.mark.asyncio
async def test_stores_in_memory(self):
memory = _make_memory()
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report text")
await orch._crystallize("test", result)
memory.store_fn.assert_called_once()
call_kwargs = memory.store_fn.call_args
assert call_kwargs.kwargs.get("context_type") == "research"
assert call_kwargs.kwargs.get("source") == "research_orchestrator"
@pytest.mark.asyncio
async def test_store_error_handled(self):
memory = MemoryInterface(
search_fn=MagicMock(return_value=[]),
store_fn=MagicMock(side_effect=RuntimeError("db error")),
)
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report")
# Should not raise
await orch._crystallize("test", result)
class TestWriteArtifact:
@pytest.mark.asyncio
async def test_no_action_items_skips(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(topic="test", report="r", action_items=[])
# Should complete without any calls
await orch._write_artifact(result)
@pytest.mark.asyncio
async def test_creates_issues(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(
topic="test", report="r", action_items=["Fix the thing"]
)
with patch("timmy.research._create_gitea_issues") as mock_create:
await orch._write_artifact(result)
mock_create.assert_called_once_with(result)
class TestFullPipeline:
@pytest.mark.asyncio
async def test_cache_hit_short_circuits(self):
"""When memory has a high-confidence match, skip web search."""
entry = MagicMock()
entry.relevance_score = 0.95
entry.content = "Previously researched content"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
result = await orch.run("cached topic")
assert result.cache_hit is True
assert result.report == "Previously researched content"
# Cascade should NOT have been called (no query generation or synthesis)
cascade.complete.assert_not_called()
assert orch._metrics["research_cache_hit"] == 1
@pytest.mark.asyncio
async def test_full_pipeline_no_tools(self):
"""Pipeline completes even without web tools (graceful degradation)."""
memory = _make_memory()
cascade = AsyncMock()
# First call: generate queries, second: synthesize
cascade.complete = AsyncMock(
side_effect=[
{"content": "query 1\nquery 2"},
{"content": "# Report\nACTION: Do something"},
]
)
tools = ResearchTools() # No web tools
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert "Report" in result.report
assert result.action_items == ["Do something"]
assert result.duration_ms > 0
assert orch._metrics["research_api_call"] == 1
memory.store_fn.assert_called_once()
@pytest.mark.asyncio
async def test_full_pipeline_with_tools(self):
"""Full pipeline with search and fetch tools."""
memory = _make_memory()
cascade = AsyncMock()
cascade.complete = AsyncMock(
side_effect=[
{"content": "search query 1\nsearch query 2"},
{"content": "# Full Report\nTODO: Review findings"},
]
)
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert len(result.queries_generated) == 2
assert len(result.sources) > 0
assert result.action_items == ["Review findings"]
@pytest.mark.asyncio
async def test_get_metrics(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
metrics = orch.get_metrics()
assert "research_cache_hit" in metrics
assert "research_api_call" in metrics
class TestCreateGiteaIssues:
def test_no_token_skips(self):
"""No Gitea token configured — silently skips."""
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="t", report="r", action_items=["item"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = ""
mock_settings.gitea_url = ""
with patch("timmy.research.settings", mock_settings):
# Should not raise
_create_gitea_issues(result)
def test_creates_issue_on_success(self):
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="AI", report="r", action_items=["Deploy model"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
mock_resp = MagicMock()
mock_resp.status_code = 201
mock_requests_mod = MagicMock()
mock_requests_mod.post.return_value = mock_resp
with (
patch("timmy.research.settings", mock_settings),
patch.dict("sys.modules", {"requests": mock_requests_mod}),
):
_create_gitea_issues(result)
mock_requests_mod.post.assert_called_once()
call_kwargs = mock_requests_mod.post.call_args
assert "[research]" in call_kwargs.kwargs["json"]["title"]