forked from Rockachopa/Timmy-time-dashboard
529 lines
18 KiB
Python
529 lines
18 KiB
Python
"""Research Orchestrator — autonomous, sovereign research pipeline.
|
||
|
||
Chains all six steps of the research workflow with local-first execution:
|
||
|
||
Step 0 Cache — check semantic memory (SQLite, instant, zero API cost)
|
||
Step 1 Scope — load a research template from skills/research/
|
||
Step 2 Query — slot-fill template + formulate 5-15 search queries via Ollama
|
||
Step 3 Search — execute queries via web_search (SerpAPI or fallback)
|
||
Step 4 Fetch — download + extract full pages via web_fetch (trafilatura)
|
||
Step 5 Synth — compress findings into a structured report via cascade
|
||
Step 6 Deliver — store to semantic memory; optionally save to docs/research/
|
||
|
||
Cascade tiers for synthesis (spec §4):
|
||
Tier 4 SQLite semantic cache — instant, free, covers ~80% after warm-up
|
||
Tier 3 Ollama (qwen3:14b) — local, free, good quality
|
||
Tier 2 Claude API (haiku) — cloud fallback, cheap, set ANTHROPIC_API_KEY
|
||
Tier 1 (future) Groq — free-tier rate-limited, tracked in #980
|
||
|
||
All optional services degrade gracefully per project conventions.
|
||
|
||
Refs #972 (governing spec), #975 (ResearchOrchestrator sub-issue).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
import re
|
||
import textwrap
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Optional memory imports — available at module level so tests can patch them.
|
||
try:
|
||
from timmy.memory_system import SemanticMemory, store_memory
|
||
except Exception: # pragma: no cover
|
||
SemanticMemory = None # type: ignore[assignment,misc]
|
||
store_memory = None # type: ignore[assignment]
|
||
|
||
# Root of the project — two levels up from src/timmy/
|
||
_PROJECT_ROOT = Path(__file__).parent.parent.parent
|
||
_SKILLS_ROOT = _PROJECT_ROOT / "skills" / "research"
|
||
_DOCS_ROOT = _PROJECT_ROOT / "docs" / "research"
|
||
|
||
# Similarity threshold for cache hit (0–1 cosine similarity)
|
||
_CACHE_HIT_THRESHOLD = 0.82
|
||
|
||
# How many search result URLs to fetch as full pages
|
||
_FETCH_TOP_N = 5
|
||
|
||
# Maximum tokens to request from the synthesis LLM
|
||
_SYNTHESIS_MAX_TOKENS = 4096
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Data structures
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@dataclass
|
||
class ResearchResult:
|
||
"""Full output of a research pipeline run."""
|
||
|
||
topic: str
|
||
query_count: int
|
||
sources_fetched: int
|
||
report: str
|
||
cached: bool = False
|
||
cache_similarity: float = 0.0
|
||
synthesis_backend: str = "unknown"
|
||
errors: list[str] = field(default_factory=list)
|
||
|
||
def is_empty(self) -> bool:
|
||
return not self.report.strip()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Template loading
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def list_templates() -> list[str]:
|
||
"""Return names of available research templates (without .md extension)."""
|
||
if not _SKILLS_ROOT.exists():
|
||
return []
|
||
return [p.stem for p in sorted(_SKILLS_ROOT.glob("*.md"))]
|
||
|
||
|
||
def load_template(template_name: str, slots: dict[str, str] | None = None) -> str:
|
||
"""Load a research template and fill {slot} placeholders.
|
||
|
||
Args:
|
||
template_name: Stem of the .md file under skills/research/ (e.g. "tool_evaluation").
|
||
slots: Mapping of {placeholder} → replacement value.
|
||
|
||
Returns:
|
||
Template text with slots filled. Unfilled slots are left as-is.
|
||
"""
|
||
path = _SKILLS_ROOT / f"{template_name}.md"
|
||
if not path.exists():
|
||
available = ", ".join(list_templates()) or "(none)"
|
||
raise FileNotFoundError(
|
||
f"Research template {template_name!r} not found. "
|
||
f"Available: {available}"
|
||
)
|
||
|
||
text = path.read_text(encoding="utf-8")
|
||
|
||
# Strip YAML frontmatter (--- ... ---), including empty frontmatter (--- \n---)
|
||
text = re.sub(r"^---\n.*?---\n", "", text, flags=re.DOTALL)
|
||
|
||
if slots:
|
||
for key, value in slots.items():
|
||
text = text.replace(f"{{{key}}}", value)
|
||
|
||
return text.strip()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Query formulation (Step 2)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def _formulate_queries(topic: str, template_context: str, n: int = 8) -> list[str]:
|
||
"""Use the local LLM to generate targeted search queries for a topic.
|
||
|
||
Falls back to a simple heuristic if Ollama is unavailable.
|
||
"""
|
||
prompt = textwrap.dedent(f"""\
|
||
You are a research assistant. Generate exactly {n} targeted, specific web search
|
||
queries to thoroughly research the following topic.
|
||
|
||
TOPIC: {topic}
|
||
|
||
RESEARCH CONTEXT:
|
||
{template_context[:1000]}
|
||
|
||
Rules:
|
||
- One query per line, no numbering, no bullet points.
|
||
- Vary the angle (definition, comparison, implementation, alternatives, pitfalls).
|
||
- Prefer exact technical terms, tool names, and version numbers where relevant.
|
||
- Output ONLY the queries, nothing else.
|
||
""")
|
||
|
||
queries = await _ollama_complete(prompt, max_tokens=512)
|
||
|
||
if not queries:
|
||
# Minimal fallback
|
||
return [
|
||
f"{topic} overview",
|
||
f"{topic} tutorial",
|
||
f"{topic} best practices",
|
||
f"{topic} alternatives",
|
||
f"{topic} 2025",
|
||
]
|
||
|
||
lines = [ln.strip() for ln in queries.splitlines() if ln.strip()]
|
||
return lines[:n] if len(lines) >= n else lines
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Search (Step 3)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def _execute_search(queries: list[str]) -> list[dict[str, str]]:
|
||
"""Run each query through the available web search backend.
|
||
|
||
Returns a flat list of {title, url, snippet} dicts.
|
||
Degrades gracefully if SerpAPI key is absent.
|
||
"""
|
||
results: list[dict[str, str]] = []
|
||
seen_urls: set[str] = set()
|
||
|
||
for query in queries:
|
||
try:
|
||
raw = await asyncio.to_thread(_run_search_sync, query)
|
||
for item in raw:
|
||
url = item.get("url", "")
|
||
if url and url not in seen_urls:
|
||
seen_urls.add(url)
|
||
results.append(item)
|
||
except Exception as exc:
|
||
logger.warning("Search failed for query %r: %s", query, exc)
|
||
|
||
return results
|
||
|
||
|
||
def _run_search_sync(query: str) -> list[dict[str, str]]:
|
||
"""Synchronous search — wraps SerpAPI or returns empty on missing key."""
|
||
import os
|
||
|
||
if not os.environ.get("SERPAPI_API_KEY"):
|
||
logger.debug("SERPAPI_API_KEY not set — skipping web search for %r", query)
|
||
return []
|
||
|
||
try:
|
||
from serpapi import GoogleSearch
|
||
|
||
params = {"q": query, "api_key": os.environ["SERPAPI_API_KEY"], "num": 5}
|
||
search = GoogleSearch(params)
|
||
data = search.get_dict()
|
||
items = []
|
||
for r in data.get("organic_results", []):
|
||
items.append(
|
||
{
|
||
"title": r.get("title", ""),
|
||
"url": r.get("link", ""),
|
||
"snippet": r.get("snippet", ""),
|
||
}
|
||
)
|
||
return items
|
||
except Exception as exc:
|
||
logger.warning("SerpAPI search error: %s", exc)
|
||
return []
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fetch (Step 4)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def _fetch_pages(results: list[dict[str, str]], top_n: int = _FETCH_TOP_N) -> list[str]:
|
||
"""Download and extract full text for the top search results.
|
||
|
||
Uses web_fetch (trafilatura) from timmy.tools.system_tools.
|
||
"""
|
||
try:
|
||
from timmy.tools.system_tools import web_fetch
|
||
except ImportError:
|
||
logger.warning("web_fetch not available — skipping page fetch")
|
||
return []
|
||
|
||
pages: list[str] = []
|
||
for item in results[:top_n]:
|
||
url = item.get("url", "")
|
||
if not url:
|
||
continue
|
||
try:
|
||
text = await asyncio.to_thread(web_fetch, url, 6000)
|
||
if text and not text.startswith("Error:"):
|
||
pages.append(f"## {item.get('title', url)}\nSource: {url}\n\n{text}")
|
||
except Exception as exc:
|
||
logger.warning("Failed to fetch %s: %s", url, exc)
|
||
|
||
return pages
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Synthesis (Step 5) — cascade: Ollama → Claude fallback
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def _synthesize(topic: str, pages: list[str], snippets: list[str]) -> tuple[str, str]:
|
||
"""Compress fetched pages + snippets into a structured research report.
|
||
|
||
Returns (report_markdown, backend_used).
|
||
"""
|
||
# Build synthesis prompt
|
||
source_content = "\n\n---\n\n".join(pages[:5])
|
||
if not source_content and snippets:
|
||
source_content = "\n".join(f"- {s}" for s in snippets[:20])
|
||
|
||
if not source_content:
|
||
return (
|
||
f"# Research: {topic}\n\n*No source material was retrieved. "
|
||
"Check SERPAPI_API_KEY and network connectivity.*",
|
||
"none",
|
||
)
|
||
|
||
prompt = textwrap.dedent(f"""\
|
||
You are a senior technical researcher. Synthesize the source material below
|
||
into a structured research report on the topic: **{topic}**
|
||
|
||
FORMAT YOUR REPORT AS:
|
||
# {topic}
|
||
|
||
## Executive Summary
|
||
(2-3 sentences: what you found, top recommendation)
|
||
|
||
## Key Findings
|
||
(Bullet list of the most important facts, tools, or patterns)
|
||
|
||
## Comparison / Options
|
||
(Table or list comparing alternatives where applicable)
|
||
|
||
## Recommended Approach
|
||
(Concrete recommendation with rationale)
|
||
|
||
## Gaps & Next Steps
|
||
(What wasn't answered, what to investigate next)
|
||
|
||
---
|
||
SOURCE MATERIAL:
|
||
{source_content[:12000]}
|
||
""")
|
||
|
||
# Tier 3 — try Ollama first
|
||
report = await _ollama_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
|
||
if report:
|
||
return report, "ollama"
|
||
|
||
# Tier 2 — Claude fallback
|
||
report = await _claude_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
|
||
if report:
|
||
return report, "claude"
|
||
|
||
# Last resort — structured snippet summary
|
||
summary = f"# {topic}\n\n## Snippets\n\n" + "\n\n".join(
|
||
f"- {s}" for s in snippets[:15]
|
||
)
|
||
return summary, "fallback"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# LLM helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def _ollama_complete(prompt: str, max_tokens: int = 1024) -> str:
|
||
"""Send a prompt to Ollama and return the response text.
|
||
|
||
Returns empty string on failure (graceful degradation).
|
||
"""
|
||
try:
|
||
import httpx
|
||
|
||
from config import settings
|
||
|
||
url = f"{settings.normalized_ollama_url}/api/generate"
|
||
payload: dict[str, Any] = {
|
||
"model": settings.ollama_model,
|
||
"prompt": prompt,
|
||
"stream": False,
|
||
"options": {
|
||
"num_predict": max_tokens,
|
||
"temperature": 0.3,
|
||
},
|
||
}
|
||
|
||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||
resp = await client.post(url, json=payload)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
return data.get("response", "").strip()
|
||
except Exception as exc:
|
||
logger.warning("Ollama completion failed: %s", exc)
|
||
return ""
|
||
|
||
|
||
async def _claude_complete(prompt: str, max_tokens: int = 1024) -> str:
|
||
"""Send a prompt to Claude API as a last-resort fallback.
|
||
|
||
Only active when ANTHROPIC_API_KEY is configured.
|
||
Returns empty string on failure or missing key.
|
||
"""
|
||
try:
|
||
from config import settings
|
||
|
||
if not settings.anthropic_api_key:
|
||
return ""
|
||
|
||
from timmy.backends import ClaudeBackend
|
||
|
||
backend = ClaudeBackend()
|
||
result = await asyncio.to_thread(backend.run, prompt)
|
||
return result.content.strip()
|
||
except Exception as exc:
|
||
logger.warning("Claude fallback failed: %s", exc)
|
||
return ""
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Memory cache (Step 0 + Step 6)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _check_cache(topic: str) -> tuple[str | None, float]:
|
||
"""Search semantic memory for a prior result on this topic.
|
||
|
||
Returns (cached_report, similarity) or (None, 0.0).
|
||
"""
|
||
try:
|
||
if SemanticMemory is None:
|
||
return None, 0.0
|
||
mem = SemanticMemory()
|
||
hits = mem.search(topic, top_k=1)
|
||
if hits:
|
||
content, score = hits[0]
|
||
if score >= _CACHE_HIT_THRESHOLD:
|
||
return content, score
|
||
except Exception as exc:
|
||
logger.debug("Cache check failed: %s", exc)
|
||
return None, 0.0
|
||
|
||
|
||
def _store_result(topic: str, report: str) -> None:
|
||
"""Index the research report into semantic memory for future retrieval."""
|
||
try:
|
||
if store_memory is None:
|
||
logger.debug("store_memory not available — skipping memory index")
|
||
return
|
||
store_memory(
|
||
content=report,
|
||
source="research_pipeline",
|
||
context_type="research",
|
||
metadata={"topic": topic},
|
||
)
|
||
logger.info("Research result indexed for topic: %r", topic)
|
||
except Exception as exc:
|
||
logger.warning("Failed to store research result: %s", exc)
|
||
|
||
|
||
def _save_to_disk(topic: str, report: str) -> Path | None:
|
||
"""Persist the report as a markdown file under docs/research/.
|
||
|
||
Filename is derived from the topic (slugified). Returns the path or None.
|
||
"""
|
||
try:
|
||
slug = re.sub(r"[^a-z0-9]+", "-", topic.lower()).strip("-")[:60]
|
||
_DOCS_ROOT.mkdir(parents=True, exist_ok=True)
|
||
path = _DOCS_ROOT / f"{slug}.md"
|
||
path.write_text(report, encoding="utf-8")
|
||
logger.info("Research report saved to %s", path)
|
||
return path
|
||
except Exception as exc:
|
||
logger.warning("Failed to save research report to disk: %s", exc)
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main orchestrator
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def run_research(
|
||
topic: str,
|
||
template: str | None = None,
|
||
slots: dict[str, str] | None = None,
|
||
save_to_disk: bool = False,
|
||
skip_cache: bool = False,
|
||
) -> ResearchResult:
|
||
"""Run the full 6-step autonomous research pipeline.
|
||
|
||
Args:
|
||
topic: The research question or subject.
|
||
template: Name of a template from skills/research/ (e.g. "tool_evaluation").
|
||
If None, runs without a template scaffold.
|
||
slots: Placeholder values for the template (e.g. {"domain": "PDF parsing"}).
|
||
save_to_disk: If True, write the report to docs/research/<slug>.md.
|
||
skip_cache: If True, bypass the semantic memory cache.
|
||
|
||
Returns:
|
||
ResearchResult with report and metadata.
|
||
"""
|
||
errors: list[str] = []
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 0 — check cache
|
||
# ------------------------------------------------------------------
|
||
if not skip_cache:
|
||
cached, score = _check_cache(topic)
|
||
if cached:
|
||
logger.info("Cache hit (%.2f) for topic: %r", score, topic)
|
||
return ResearchResult(
|
||
topic=topic,
|
||
query_count=0,
|
||
sources_fetched=0,
|
||
report=cached,
|
||
cached=True,
|
||
cache_similarity=score,
|
||
synthesis_backend="cache",
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 1 — load template (optional)
|
||
# ------------------------------------------------------------------
|
||
template_context = ""
|
||
if template:
|
||
try:
|
||
template_context = load_template(template, slots)
|
||
except FileNotFoundError as exc:
|
||
errors.append(str(exc))
|
||
logger.warning("Template load failed: %s", exc)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 2 — formulate queries
|
||
# ------------------------------------------------------------------
|
||
queries = await _formulate_queries(topic, template_context)
|
||
logger.info("Formulated %d queries for topic: %r", len(queries), topic)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 3 — execute search
|
||
# ------------------------------------------------------------------
|
||
search_results = await _execute_search(queries)
|
||
logger.info("Search returned %d results", len(search_results))
|
||
snippets = [r.get("snippet", "") for r in search_results if r.get("snippet")]
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 4 — fetch full pages
|
||
# ------------------------------------------------------------------
|
||
pages = await _fetch_pages(search_results)
|
||
logger.info("Fetched %d pages", len(pages))
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 5 — synthesize
|
||
# ------------------------------------------------------------------
|
||
report, backend = await _synthesize(topic, pages, snippets)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 6 — deliver
|
||
# ------------------------------------------------------------------
|
||
_store_result(topic, report)
|
||
if save_to_disk:
|
||
_save_to_disk(topic, report)
|
||
|
||
return ResearchResult(
|
||
topic=topic,
|
||
query_count=len(queries),
|
||
sources_fetched=len(pages),
|
||
report=report,
|
||
cached=False,
|
||
synthesis_backend=backend,
|
||
errors=errors,
|
||
)
|