Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
110f67c567 fix: add Gitea hardening config and script to disable registration + require sign-in
Some checks failed
Tests / lint (pull_request) Failing after 5s
Tests / test (pull_request) Has been skipped
Adds deploy/gitea/app.ini with hardened [service] settings and
scripts/harden_gitea.sh to apply them on the server. The script
backs up the existing config, patches the four required settings,
restarts Gitea, and verifies the changes.

Settings applied:
- DISABLE_REGISTRATION = true
- ALLOW_ONLY_EXTERNAL_REGISTRATION = false
- SHOW_REGISTRATION_BUTTON = false
- REQUIRE_SIGNIN_VIEW = true

Fixes #988

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 18:39:05 -04:00
5 changed files with 190 additions and 1135 deletions

21
deploy/gitea/app.ini Normal file
View File

@@ -0,0 +1,21 @@
; ── Gitea Hardening — Security Overrides ─────────────────────────────────────
;
; Merge these settings into your Gitea custom/conf/app.ini.
;
; On a default Gitea install (Docker or bare-metal):
; /path/to/gitea/custom/conf/app.ini
;
; After editing, restart Gitea:
; systemctl restart gitea # bare-metal
; docker restart <gitea-container> # Docker
;
; See also: scripts/harden_gitea.sh (automated version)
[service]
; Disable public registration — only admins can create accounts
DISABLE_REGISTRATION = true
ALLOW_ONLY_EXTERNAL_REGISTRATION = false
SHOW_REGISTRATION_BUTTON = false
; Require sign-in to view any content (repos, explore, etc.)
REQUIRE_SIGNIN_VIEW = true

View File

@@ -1,83 +0,0 @@
#!/bin/bash
# Gitea backup script — run on the VPS before any hardening changes.
# Usage: sudo bash scripts/gitea_backup.sh [off-site-dest]
#
# off-site-dest: optional rsync/scp destination for off-site copy
# e.g. user@backup-host:/backups/gitea/
#
# Refs: #971, #990
set -euo pipefail
BACKUP_DIR="/opt/gitea/backups"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
GITEA_CONF="/etc/gitea/app.ini"
GITEA_WORK_DIR="/var/lib/gitea"
OFFSITE_DEST="${1:-}"
echo "=== Gitea Backup — $TIMESTAMP ==="
# Ensure backup directory exists
mkdir -p "$BACKUP_DIR"
cd "$BACKUP_DIR"
# Run the dump
echo "[1/4] Running gitea dump..."
gitea dump -c "$GITEA_CONF"
# Find the newest zip (gitea dump names it gitea-dump-*.zip)
BACKUP_FILE=$(ls -t "$BACKUP_DIR"/gitea-dump-*.zip 2>/dev/null | head -1)
if [ -z "$BACKUP_FILE" ]; then
echo "ERROR: No backup zip found in $BACKUP_DIR"
exit 1
fi
BACKUP_SIZE=$(stat -c%s "$BACKUP_FILE" 2>/dev/null || stat -f%z "$BACKUP_FILE")
echo "[2/4] Backup created: $BACKUP_FILE ($BACKUP_SIZE bytes)"
if [ "$BACKUP_SIZE" -eq 0 ]; then
echo "ERROR: Backup file is 0 bytes"
exit 1
fi
# Lock down permissions
chmod 600 "$BACKUP_FILE"
# Verify contents
echo "[3/4] Verifying backup contents..."
CONTENTS=$(unzip -l "$BACKUP_FILE" 2>/dev/null || true)
check_component() {
if echo "$CONTENTS" | grep -q "$1"; then
echo " OK: $2"
else
echo " WARN: $2 not found in backup"
fi
}
check_component "gitea-db.sql" "Database dump"
check_component "gitea-repo" "Repositories"
check_component "custom" "Custom config"
check_component "app.ini" "app.ini"
# Off-site copy
if [ -n "$OFFSITE_DEST" ]; then
echo "[4/4] Copying to off-site: $OFFSITE_DEST"
rsync -avz "$BACKUP_FILE" "$OFFSITE_DEST"
echo " Off-site copy complete."
else
echo "[4/4] No off-site destination provided. Skipping."
echo " To copy later: scp $BACKUP_FILE user@backup-host:/backups/gitea/"
fi
echo ""
echo "=== Backup complete ==="
echo "File: $BACKUP_FILE"
echo "Size: $BACKUP_SIZE bytes"
echo ""
echo "To verify restore on a clean instance:"
echo " 1. Copy zip to test machine"
echo " 2. unzip $BACKUP_FILE"
echo " 3. gitea restore --from <extracted-dir> -c /etc/gitea/app.ini"
echo " 4. Verify repos and DB are intact"

169
scripts/harden_gitea.sh Executable file
View File

@@ -0,0 +1,169 @@
#!/usr/bin/env bash
set -euo pipefail
# ── Gitea Hardening Script ──────────────────────────────────────────────────
#
# Disables public registration and requires sign-in to view content.
# Refs: Issue #988
#
# Usage (on the Gitea server):
# sudo bash scripts/harden_gitea.sh
# sudo bash scripts/harden_gitea.sh --config /path/to/custom/conf/app.ini
# sudo bash scripts/harden_gitea.sh --docker gitea # restart via docker
#
# What it does:
# 1. Patches [service] section in app.ini
# 2. Restarts Gitea so changes take effect
# 3. Verifies the changes are active
BOLD='\033[1m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m'
info() { echo -e "${GREEN}[+]${NC} $1"; }
warn() { echo -e "${YELLOW}[!]${NC} $1"; }
error() { echo -e "${RED}[x]${NC} $1"; }
# ── Defaults ────────────────────────────────────────────────────────────────
# Common Gitea config paths (checked in order)
SEARCH_PATHS=(
"/etc/gitea/app.ini"
"/opt/gitea/custom/conf/app.ini"
"/data/gitea/conf/app.ini"
"/app/gitea/conf/app.ini"
)
CONFIG_PATH=""
DOCKER_CONTAINER=""
SYSTEMD_SERVICE="gitea"
# ── Parse arguments ─────────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case $1 in
--config) CONFIG_PATH="$2"; shift 2 ;;
--docker) DOCKER_CONTAINER="$2"; shift 2 ;;
--service) SYSTEMD_SERVICE="$2"; shift 2 ;;
-h|--help)
echo "Usage: $0 [--config /path/to/app.ini] [--docker container] [--service name]"
exit 0
;;
*) error "Unknown option: $1"; exit 1 ;;
esac
done
# ── Find config ─────────────────────────────────────────────────────────────
if [ -z "$CONFIG_PATH" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if [ -f "$path" ]; then
CONFIG_PATH="$path"
break
fi
done
fi
# If using Docker, try to find config inside the container
if [ -z "$CONFIG_PATH" ] && [ -n "$DOCKER_CONTAINER" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if docker exec "$DOCKER_CONTAINER" test -f "$path" 2>/dev/null; then
CONFIG_PATH="$path"
info "Found config inside container at $path"
break
fi
done
fi
if [ -z "$CONFIG_PATH" ]; then
error "Could not find Gitea app.ini. Use --config to specify the path."
exit 1
fi
info "Using config: $CONFIG_PATH"
# ── Backup ──────────────────────────────────────────────────────────────────
BACKUP="${CONFIG_PATH}.bak.$(date +%Y%m%d%H%M%S)"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" cp "$CONFIG_PATH" "$BACKUP"
else
cp "$CONFIG_PATH" "$BACKUP"
fi
info "Backup saved to $BACKUP"
# ── Apply settings ──────────────────────────────────────────────────────────
apply_setting() {
local key="$1"
local value="$2"
local file="$3"
if [ -n "$DOCKER_CONTAINER" ]; then
# Check if key exists (commented or not) and update, otherwise append to [service]
if docker exec "$DOCKER_CONTAINER" grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
docker exec "$DOCKER_CONTAINER" sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Append after [service] section header
docker exec "$DOCKER_CONTAINER" sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
else
if grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Ensure [service] section exists, then append
if ! grep -q '^\[service\]' "$file"; then
printf '\n[service]\n' >> "$file"
fi
sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
fi
}
info "Applying hardening settings..."
apply_setting "DISABLE_REGISTRATION" "true" "$CONFIG_PATH"
apply_setting "ALLOW_ONLY_EXTERNAL_REGISTRATION" "false" "$CONFIG_PATH"
apply_setting "SHOW_REGISTRATION_BUTTON" "false" "$CONFIG_PATH"
apply_setting "REQUIRE_SIGNIN_VIEW" "true" "$CONFIG_PATH"
info "Settings applied:"
info " DISABLE_REGISTRATION = true"
info " ALLOW_ONLY_EXTERNAL_REGISTRATION = false"
info " SHOW_REGISTRATION_BUTTON = false"
info " REQUIRE_SIGNIN_VIEW = true"
# ── Restart Gitea ───────────────────────────────────────────────────────────
echo ""
if [ -n "$DOCKER_CONTAINER" ]; then
info "Restarting Gitea container: $DOCKER_CONTAINER"
docker restart "$DOCKER_CONTAINER"
elif systemctl is-active --quiet "$SYSTEMD_SERVICE" 2>/dev/null; then
info "Restarting Gitea via systemd: $SYSTEMD_SERVICE"
systemctl restart "$SYSTEMD_SERVICE"
else
warn "Could not detect Gitea service. Restart Gitea manually to apply changes."
fi
# ── Verify ──────────────────────────────────────────────────────────────────
echo ""
info "Verification — current [service] settings:"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
else
grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
fi
echo ""
echo -e "${GREEN}${BOLD} Gitea hardening complete.${NC}"
echo ""
echo " Registration: DISABLED"
echo " Sign-in required to view: YES"
echo ""
echo " Backup: $BACKUP"
echo ""

View File

@@ -1,555 +0,0 @@
"""ResearchOrchestrator — autonomous research pipeline.
Chains: Check Local → Generate Queries → Search → Fetch → Synthesize →
Crystallize → Write Artifact into an end-to-end research workflow.
Usage:
from timmy.research import ResearchOrchestrator, run_research
orchestrator = ResearchOrchestrator(cascade=router, memory=memory_fns)
result = await orchestrator.run("Bitcoin Lightning Network scaling")
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Data structures ──────────────────────────────────────────────────────────
CONFIDENCE_THRESHOLD = 0.85
DEFAULT_QUERIES_PER_TOPIC = 8
DEFAULT_RESULTS_PER_QUERY = 5
DEFAULT_PAGES_TO_FETCH = 10
DEFAULT_FETCH_TOKEN_LIMIT = 3000
DEFAULT_SYNTHESIS_MAX_TOKENS = 4000
@dataclass
class ResearchResult:
"""Output of a completed research pipeline run."""
topic: str
report: str
queries_generated: list[str] = field(default_factory=list)
sources: list[dict[str, str]] = field(default_factory=list)
action_items: list[str] = field(default_factory=list)
cache_hit: bool = False
duration_ms: float = 0.0
metrics: dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@dataclass
class SearchSnippet:
"""A single search result snippet."""
title: str
url: str
snippet: str
relevance: float = 0.0
@dataclass
class FetchedPage:
"""A fetched and truncated web page."""
url: str
title: str
content: str
token_estimate: int = 0
# ── Memory interface ─────────────────────────────────────────────────────────
@dataclass
class MemoryInterface:
"""Abstraction over the memory system for research.
Accepts callables so the orchestrator doesn't depend on a specific
memory implementation. Defaults wire to timmy.memory_system.
"""
search_fn: Any = None # (query, limit) -> list[MemoryEntry]
store_fn: Any = None # (content, source, context_type, ...) -> MemoryEntry
def __post_init__(self):
if self.search_fn is None or self.store_fn is None:
self._load_defaults()
def _load_defaults(self):
try:
from timmy.memory_system import search_memories, store_memory
if self.search_fn is None:
self.search_fn = search_memories
if self.store_fn is None:
self.store_fn = store_memory
except ImportError:
logger.warning("Memory system not available — research will skip caching")
if self.search_fn is None:
self.search_fn = lambda query, **kw: []
if self.store_fn is None:
self.store_fn = lambda content, source, **kw: None
# ── Tool interface ───────────────────────────────────────────────────────────
@dataclass
class ResearchTools:
"""Web search and fetch callables.
These are async callables:
web_search(query: str, limit: int) -> list[dict]
web_fetch(url: str, max_tokens: int) -> str
"""
web_search: Any = None
web_fetch: Any = None
# ── Orchestrator ─────────────────────────────────────────────────────────────
class ResearchOrchestrator:
"""Pipeline that chains research steps into an autonomous workflow.
Steps:
0. CHECK LOCAL KNOWLEDGE — search memory, return cached if confident
1. GENERATE QUERIES — ask LLM to produce search queries
2. SEARCH — execute queries via web_search tool
3. FETCH — rank snippets, fetch top pages
4. SYNTHESIZE — produce structured report via LLM
5. CRYSTALLIZE — store result in semantic memory
6. WRITE ARTIFACT — create Gitea issues from action items
"""
def __init__(
self,
cascade: Any,
memory: MemoryInterface | None = None,
tools: ResearchTools | None = None,
) -> None:
self.cascade = cascade
self.memory = memory or MemoryInterface()
self.tools = tools or ResearchTools()
self._metrics: dict[str, int] = {
"research_cache_hit": 0,
"research_api_call": 0,
}
async def run(
self,
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Execute the full research pipeline.
Args:
topic: The research topic or question.
template: Optional prompt template for synthesis.
context: Additional context dict (cascade_tier hint, etc.).
Returns:
ResearchResult with report, sources, and action items.
"""
start = time.monotonic()
context = context or {}
cascade_tier = context.get("cascade_tier")
# Step 0: Check local knowledge
cached = await self._check_local_knowledge(topic)
if cached is not None:
self._metrics["research_cache_hit"] += 1
cached.duration_ms = (time.monotonic() - start) * 1000
return cached
self._metrics["research_api_call"] += 1
# Step 1: Generate queries
queries = await self._generate_queries(topic, template, cascade_tier)
# Step 2: Search
snippets = await self._search(queries)
# Step 3: Fetch top pages
pages = await self._fetch(snippets)
# Step 4: Synthesize
report = await self._synthesize(topic, template, pages, cascade_tier)
# Step 5: Extract action items
action_items = _extract_action_items(report)
# Build result
sources = [{"url": p.url, "title": p.title} for p in pages]
result = ResearchResult(
topic=topic,
report=report,
queries_generated=queries,
sources=sources,
action_items=action_items,
cache_hit=False,
duration_ms=(time.monotonic() - start) * 1000,
metrics=dict(self._metrics),
)
# Step 6: Crystallize — store in memory
await self._crystallize(topic, result)
# Step 7: Write artifact — create Gitea issues
await self._write_artifact(result)
return result
# ── Pipeline steps ───────────────────────────────────────────────────
async def _check_local_knowledge(self, topic: str) -> ResearchResult | None:
"""Search semantic memory for existing research on this topic."""
try:
results = self.memory.search_fn(
query=topic, limit=10, context_type="research"
)
if not results:
return None
# Check if top result has high confidence
top = results[0]
score = getattr(top, "relevance_score", 0.0) or 0.0
if score >= CONFIDENCE_THRESHOLD:
content = getattr(top, "content", str(top))
logger.info(
"Research cache hit for '%s' (score=%.2f)", topic, score
)
return ResearchResult(
topic=topic,
report=content,
cache_hit=True,
metrics={"research_cache_hit": 1},
)
except Exception as exc:
logger.warning("Local knowledge check failed: %s", exc)
return None
async def _generate_queries(
self,
topic: str,
template: str | None,
cascade_tier: str | None,
) -> list[str]:
"""Ask the LLM to generate search queries for the topic."""
prompt = (
f"Generate {DEFAULT_QUERIES_PER_TOPIC} diverse web search queries "
f"to thoroughly research the following topic. Return ONLY the "
f"queries, one per line, no numbering or bullets.\n\n"
f"Topic: {topic}"
)
if template:
prompt += f"\n\nResearch template context:\n{template}"
messages = [
{"role": "system", "content": "You are a research query generator."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {"messages": messages, "temperature": 0.7}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
raw = response.get("content", "")
queries = [
line.strip()
for line in raw.strip().splitlines()
if line.strip() and not line.strip().startswith("#")
]
# Clean numbering prefixes
cleaned = []
for q in queries:
q = re.sub(r"^\d+[\.\)]\s*", "", q)
q = re.sub(r"^[-*]\s*", "", q)
if q:
cleaned.append(q)
return cleaned[:DEFAULT_QUERIES_PER_TOPIC + 4] # slight over-generate
except Exception as exc:
logger.warning("Query generation failed: %s", exc)
# Fallback: use topic itself as a single query
return [topic]
async def _search(self, queries: list[str]) -> list[SearchSnippet]:
"""Execute search queries and collect snippets."""
if not self.tools.web_search:
logger.warning("No web_search tool configured — skipping search step")
return []
all_snippets: list[SearchSnippet] = []
async def _run_query(query: str) -> list[SearchSnippet]:
try:
results = await asyncio.to_thread(
self.tools.web_search, query, DEFAULT_RESULTS_PER_QUERY
)
snippets = []
for r in (results or []):
snippets.append(
SearchSnippet(
title=r.get("title", ""),
url=r.get("url", ""),
snippet=r.get("snippet", ""),
)
)
return snippets
except Exception as exc:
logger.warning("Search failed for query '%s': %s", query, exc)
return []
# Run searches concurrently
tasks = [_run_query(q) for q in queries]
results = await asyncio.gather(*tasks)
for snippets in results:
all_snippets.extend(snippets)
# Deduplicate by URL
seen_urls: set[str] = set()
unique: list[SearchSnippet] = []
for s in all_snippets:
if s.url and s.url not in seen_urls:
seen_urls.add(s.url)
unique.append(s)
return unique
async def _fetch(self, snippets: list[SearchSnippet]) -> list[FetchedPage]:
"""Fetch top pages from search snippets."""
if not self.tools.web_fetch:
logger.warning("No web_fetch tool configured — skipping fetch step")
return []
# Take top N snippets
to_fetch = snippets[:DEFAULT_PAGES_TO_FETCH]
pages: list[FetchedPage] = []
async def _fetch_one(snippet: SearchSnippet) -> FetchedPage | None:
try:
content = await asyncio.to_thread(
self.tools.web_fetch, snippet.url, DEFAULT_FETCH_TOKEN_LIMIT
)
if content:
return FetchedPage(
url=snippet.url,
title=snippet.title,
content=content[:DEFAULT_FETCH_TOKEN_LIMIT * 4],
token_estimate=len(content.split()),
)
except Exception as exc:
logger.warning("Fetch failed for %s: %s", snippet.url, exc)
return None
tasks = [_fetch_one(s) for s in to_fetch]
results = await asyncio.gather(*tasks)
for page in results:
if page is not None:
pages.append(page)
return pages
async def _synthesize(
self,
topic: str,
template: str | None,
pages: list[FetchedPage],
cascade_tier: str | None,
) -> str:
"""Synthesize fetched pages into a structured research report."""
# Build context from fetched pages
context_parts = []
for i, page in enumerate(pages, 1):
context_parts.append(
f"--- Source {i}: {page.title} ({page.url}) ---\n"
f"{page.content[:DEFAULT_FETCH_TOKEN_LIMIT * 4]}\n"
)
sources_text = "\n".join(context_parts) if context_parts else "(no sources fetched)"
if template:
prompt = (
f"{template}\n\n"
f"Topic: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Synthesize a comprehensive report based on the sources above."
)
else:
prompt = (
f"Write a comprehensive research report on: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Structure your report with:\n"
f"- Executive summary\n"
f"- Key findings\n"
f"- Analysis\n"
f"- Action items (prefix each with 'ACTION:')\n"
f"- Sources cited"
)
messages = [
{"role": "system", "content": "You are a research analyst producing structured reports."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {
"messages": messages,
"temperature": 0.3,
"max_tokens": DEFAULT_SYNTHESIS_MAX_TOKENS,
}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
return response.get("content", "")
except Exception as exc:
logger.error("Synthesis failed: %s", exc)
# Fallback: return raw source summaries
return (
f"# Research: {topic}\n\n"
f"Synthesis failed ({exc}). Raw sources:\n\n{sources_text}"
)
async def _crystallize(self, topic: str, result: ResearchResult) -> None:
"""Store the research result in semantic memory."""
try:
self.memory.store_fn(
content=result.report,
source="research_orchestrator",
context_type="research",
metadata={
"topic": topic,
"sources": result.sources,
"action_items": result.action_items,
"cache_hit": result.cache_hit,
"duration_ms": result.duration_ms,
},
)
logger.info("Crystallized research on '%s' into memory", topic)
except Exception as exc:
logger.warning("Failed to crystallize research: %s", exc)
async def _write_artifact(self, result: ResearchResult) -> None:
"""Create Gitea issues from action items."""
if not result.action_items:
return
try:
await asyncio.to_thread(_create_gitea_issues, result)
except Exception as exc:
logger.warning("Failed to create Gitea issues: %s", exc)
def get_metrics(self) -> dict[str, int]:
"""Return current research pipeline metrics."""
return dict(self._metrics)
# ── Helpers ──────────────────────────────────────────────────────────────────
def _extract_action_items(report: str) -> list[str]:
"""Extract action items from a research report.
Looks for lines prefixed with ACTION:, TODO:, or - [ ].
"""
items: list[str] = []
for line in report.splitlines():
stripped = line.strip()
# ACTION: prefix
match = re.match(r"^(?:ACTION|TODO)\s*:\s*(.+)", stripped, re.IGNORECASE)
if match:
items.append(match.group(1).strip())
continue
# Markdown checkbox
match = re.match(r"^-\s*\[\s*\]\s*(.+)", stripped)
if match:
items.append(match.group(1).strip())
return items
def _create_gitea_issues(result: ResearchResult) -> None:
"""Create Gitea issues for action items (runs in thread)."""
if not settings.gitea_token or not settings.gitea_url:
logger.debug("Gitea not configured — skipping issue creation")
return
try:
import requests
except ImportError:
logger.debug("requests not available — skipping Gitea issue creation")
return
base_url = settings.gitea_url.rstrip("/")
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
for item in result.action_items:
try:
payload = {
"title": f"[research] {item[:100]}",
"body": (
f"Auto-generated from research on: **{result.topic}**\n\n"
f"Action item: {item}\n\n"
f"---\n"
f"_Created by ResearchOrchestrator_"
),
}
resp = requests.post(
f"{base_url}/api/v1/repos/{repo}/issues",
headers=headers,
json=payload,
timeout=10,
)
if resp.status_code in (200, 201):
logger.info("Created Gitea issue: %s", item[:60])
else:
logger.warning(
"Gitea issue creation failed (%d): %s",
resp.status_code,
resp.text[:200],
)
except Exception as exc:
logger.warning("Failed to create issue '%s': %s", item[:60], exc)
# ── Convenience function ─────────────────────────────────────────────────────
async def run_research(
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Convenience function to run research with default dependencies.
Creates a ResearchOrchestrator with the cascade router singleton
and default memory, then executes the pipeline.
"""
from infrastructure.router.cascade import get_router
cascade = get_router()
orchestrator = ResearchOrchestrator(cascade=cascade)
return await orchestrator.run(topic, template=template, context=context)

View File

@@ -1,497 +0,0 @@
"""Unit tests for timmy.research — ResearchOrchestrator pipeline."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.research import (
DEFAULT_QUERIES_PER_TOPIC,
MemoryInterface,
ResearchOrchestrator,
ResearchResult,
ResearchTools,
SearchSnippet,
_extract_action_items,
)
# ── Data structures ──────────────────────────────────────────────────────────
class TestResearchResult:
def test_defaults(self):
r = ResearchResult(topic="test", report="content")
assert r.topic == "test"
assert r.report == "content"
assert r.cache_hit is False
assert r.queries_generated == []
assert r.sources == []
assert r.action_items == []
assert r.duration_ms == 0.0
assert r.timestamp # non-empty
def test_with_data(self):
r = ResearchResult(
topic="AI",
report="report text",
queries_generated=["q1", "q2"],
sources=[{"url": "http://example.com", "title": "Test"}],
action_items=["Do X"],
cache_hit=True,
duration_ms=42.5,
)
assert r.cache_hit is True
assert len(r.sources) == 1
assert r.duration_ms == 42.5
class TestSearchSnippet:
def test_fields(self):
s = SearchSnippet(title="T", url="http://x.com", snippet="text")
assert s.relevance == 0.0
# ── _extract_action_items ────────────────────────────────────────────────────
class TestExtractActionItems:
def test_action_prefix(self):
report = "Some text\nACTION: Do the thing\nMore text"
items = _extract_action_items(report)
assert items == ["Do the thing"]
def test_todo_prefix(self):
report = "TODO: Fix the bug\nTodo: Also this"
items = _extract_action_items(report)
assert items == ["Fix the bug", "Also this"]
def test_checkbox(self):
report = "- [ ] Implement feature\n- [x] Already done"
items = _extract_action_items(report)
assert items == ["Implement feature"]
def test_mixed(self):
report = "ACTION: First\n- [ ] Second\nTODO: Third"
items = _extract_action_items(report)
assert items == ["First", "Second", "Third"]
def test_empty(self):
assert _extract_action_items("No actions here") == []
assert _extract_action_items("") == []
# ── MemoryInterface ──────────────────────────────────────────────────────────
class TestMemoryInterface:
def test_custom_fns(self):
search = MagicMock(return_value=[])
store = MagicMock()
mi = MemoryInterface(search_fn=search, store_fn=store)
assert mi.search_fn is search
assert mi.store_fn is store
def test_defaults_when_import_fails(self):
with patch.dict("sys.modules", {"timmy.memory_system": None}):
mi = MemoryInterface()
# Should have fallback callables
assert callable(mi.search_fn)
assert callable(mi.store_fn)
# Fallback search returns empty
assert mi.search_fn("test") == []
# ── ResearchOrchestrator ─────────────────────────────────────────────────────
def _make_cascade(**overrides):
"""Create a mock cascade router."""
cascade = AsyncMock()
cascade.complete = AsyncMock(
return_value={"content": overrides.get("content", "query1\nquery2\nquery3")}
)
return cascade
def _make_memory(search_results=None, score=0.0):
"""Create a mock memory interface."""
if search_results is None:
search_results = []
search_fn = MagicMock(return_value=search_results)
store_fn = MagicMock()
return MemoryInterface(search_fn=search_fn, store_fn=store_fn)
def _make_tools(search_results=None, fetch_content="Page content"):
"""Create mock research tools."""
web_search = MagicMock(
return_value=search_results
or [
{"title": "Result 1", "url": "http://a.com", "snippet": "Snippet 1"},
{"title": "Result 2", "url": "http://b.com", "snippet": "Snippet 2"},
]
)
web_fetch = MagicMock(return_value=fetch_content)
return ResearchTools(web_search=web_search, web_fetch=web_fetch)
class TestResearchOrchestratorInit:
def test_basic_init(self):
cascade = _make_cascade()
memory = _make_memory()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
assert orch.cascade is cascade
assert orch.memory is memory
assert orch.tools is tools
assert orch._metrics["research_cache_hit"] == 0
assert orch._metrics["research_api_call"] == 0
class TestCheckLocalKnowledge:
@pytest.mark.asyncio
async def test_cache_hit(self):
"""High-confidence memory result returns cached ResearchResult."""
entry = MagicMock()
entry.relevance_score = 0.90
entry.content = "Cached report"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is not None
assert result.cache_hit is True
assert result.report == "Cached report"
@pytest.mark.asyncio
async def test_cache_miss_low_score(self):
"""Low-confidence result returns None."""
entry = MagicMock()
entry.relevance_score = 0.5
entry.content = "Weak match"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_cache_miss_empty(self):
"""No memory results returns None."""
memory = _make_memory(search_results=[])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_exception_returns_none(self):
"""Memory search exception returns None gracefully."""
memory = MemoryInterface(
search_fn=MagicMock(side_effect=RuntimeError("db error")),
store_fn=MagicMock(),
)
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
class TestGenerateQueries:
@pytest.mark.asyncio
async def test_parses_queries(self):
cascade = _make_cascade(content="query one\nquery two\nquery three")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("AI safety", None, None)
assert queries == ["query one", "query two", "query three"]
@pytest.mark.asyncio
async def test_strips_numbering(self):
cascade = _make_cascade(content="1. First query\n2. Second query\n3) Third")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("topic", None, None)
assert "First query" in queries
assert "Second query" in queries
assert "Third" in queries
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM down"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("fallback topic", None, None)
assert queries == ["fallback topic"]
@pytest.mark.asyncio
async def test_passes_cascade_tier(self):
cascade = _make_cascade(content="q1\nq2")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
await orch._generate_queries("topic", None, "gpt-4")
call_kwargs = cascade.complete.call_args.kwargs
assert call_kwargs.get("model") == "gpt-4"
class TestSearch:
@pytest.mark.asyncio
async def test_collects_snippets(self):
tools = _make_tools()
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1", "q2"])
# 2 results per query, 2 queries, but deduplicated by URL
assert len(snippets) == 2 # same URLs returned for both queries
@pytest.mark.asyncio
async def test_no_search_tool(self):
tools = ResearchTools(web_search=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
@pytest.mark.asyncio
async def test_search_error_handled(self):
tools = ResearchTools(
web_search=MagicMock(side_effect=RuntimeError("network error"))
)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
class TestFetch:
@pytest.mark.asyncio
async def test_fetches_pages(self):
tools = _make_tools(fetch_content="Page body here")
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = [
SearchSnippet(title="P1", url="http://a.com", snippet="s1"),
SearchSnippet(title="P2", url="http://b.com", snippet="s2"),
]
pages = await orch._fetch(snippets)
assert len(pages) == 2
assert pages[0].content == "Page body here"
@pytest.mark.asyncio
async def test_no_fetch_tool(self):
tools = ResearchTools(web_fetch=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
pages = await orch._fetch([SearchSnippet("T", "http://x.com", "s")])
assert pages == []
class TestSynthesize:
@pytest.mark.asyncio
async def test_produces_report(self):
cascade = _make_cascade(content="# Report\nKey findings here")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Report" in report
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM error"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Synthesis failed" in report
assert "topic" in report
class TestCrystallize:
@pytest.mark.asyncio
async def test_stores_in_memory(self):
memory = _make_memory()
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report text")
await orch._crystallize("test", result)
memory.store_fn.assert_called_once()
call_kwargs = memory.store_fn.call_args
assert call_kwargs.kwargs.get("context_type") == "research"
assert call_kwargs.kwargs.get("source") == "research_orchestrator"
@pytest.mark.asyncio
async def test_store_error_handled(self):
memory = MemoryInterface(
search_fn=MagicMock(return_value=[]),
store_fn=MagicMock(side_effect=RuntimeError("db error")),
)
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report")
# Should not raise
await orch._crystallize("test", result)
class TestWriteArtifact:
@pytest.mark.asyncio
async def test_no_action_items_skips(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(topic="test", report="r", action_items=[])
# Should complete without any calls
await orch._write_artifact(result)
@pytest.mark.asyncio
async def test_creates_issues(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(
topic="test", report="r", action_items=["Fix the thing"]
)
with patch("timmy.research._create_gitea_issues") as mock_create:
await orch._write_artifact(result)
mock_create.assert_called_once_with(result)
class TestFullPipeline:
@pytest.mark.asyncio
async def test_cache_hit_short_circuits(self):
"""When memory has a high-confidence match, skip web search."""
entry = MagicMock()
entry.relevance_score = 0.95
entry.content = "Previously researched content"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
result = await orch.run("cached topic")
assert result.cache_hit is True
assert result.report == "Previously researched content"
# Cascade should NOT have been called (no query generation or synthesis)
cascade.complete.assert_not_called()
assert orch._metrics["research_cache_hit"] == 1
@pytest.mark.asyncio
async def test_full_pipeline_no_tools(self):
"""Pipeline completes even without web tools (graceful degradation)."""
memory = _make_memory()
cascade = AsyncMock()
# First call: generate queries, second: synthesize
cascade.complete = AsyncMock(
side_effect=[
{"content": "query 1\nquery 2"},
{"content": "# Report\nACTION: Do something"},
]
)
tools = ResearchTools() # No web tools
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert "Report" in result.report
assert result.action_items == ["Do something"]
assert result.duration_ms > 0
assert orch._metrics["research_api_call"] == 1
memory.store_fn.assert_called_once()
@pytest.mark.asyncio
async def test_full_pipeline_with_tools(self):
"""Full pipeline with search and fetch tools."""
memory = _make_memory()
cascade = AsyncMock()
cascade.complete = AsyncMock(
side_effect=[
{"content": "search query 1\nsearch query 2"},
{"content": "# Full Report\nTODO: Review findings"},
]
)
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert len(result.queries_generated) == 2
assert len(result.sources) > 0
assert result.action_items == ["Review findings"]
@pytest.mark.asyncio
async def test_get_metrics(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
metrics = orch.get_metrics()
assert "research_cache_hit" in metrics
assert "research_api_call" in metrics
class TestCreateGiteaIssues:
def test_no_token_skips(self):
"""No Gitea token configured — silently skips."""
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="t", report="r", action_items=["item"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = ""
mock_settings.gitea_url = ""
with patch("timmy.research.settings", mock_settings):
# Should not raise
_create_gitea_issues(result)
def test_creates_issue_on_success(self):
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="AI", report="r", action_items=["Deploy model"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
mock_resp = MagicMock()
mock_resp.status_code = 201
mock_requests_mod = MagicMock()
mock_requests_mod.post.return_value = mock_resp
with (
patch("timmy.research.settings", mock_settings),
patch.dict("sys.modules", {"requests": mock_requests_mod}),
):
_create_gitea_issues(result)
mock_requests_mod.post.assert_called_once()
call_kwargs = mock_requests_mod.post.call_args
assert "[research]" in call_kwargs.kwargs["json"]["title"]