1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
e831176dec feat: add Nginx + Let's Encrypt deploy config for Gitea TLS
Stage reverse proxy configuration and automated deploy script
for securing the Gitea instance with TLS. Includes:

- Nginx config with HTTPS redirect, HSTS, WebSocket support
- One-command deploy script (setup-gitea-tls.sh) that installs
  Nginx + Certbot, obtains cert, patches app.ini, blocks port 3000
- app.ini hardening reference from security audit (#971)

Requires DNS A record for git.alexanderwhitestone.com -> 143.198.27.163
before running the deploy script on the server.

Fixes #989

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 18:39:11 -04:00
6 changed files with 424 additions and 1135 deletions

View File

@@ -0,0 +1,50 @@
# ── Gitea app.ini Hardening Patch ────────────────────────────────────────────
#
# Apply these changes to /etc/gitea/app.ini (or custom/conf/app.ini)
# AFTER running setup-gitea-tls.sh, or apply manually.
#
# The deploy script handles DOMAIN, ROOT_URL, HTTP_ADDR, and COOKIE_SECURE
# automatically. This file documents the FULL recommended hardening config
# from the security audit (#971).
#
# ── Instructions ────────────────────────────────────────────────────────────
#
# 1. Back up your current app.ini:
# cp /etc/gitea/app.ini /etc/gitea/app.ini.bak
#
# 2. Apply each section below by editing app.ini.
#
# 3. Restart Gitea:
# systemctl restart gitea
# # or: docker restart gitea
# ── [server] section ───────────────────────────────────────────────────────
# These are set automatically by setup-gitea-tls.sh:
#
# DOMAIN = git.alexanderwhitestone.com
# HTTP_ADDR = 127.0.0.1
# HTTP_PORT = 3000
# PROTOCOL = http
# ROOT_URL = https://git.alexanderwhitestone.com/
#
# Additionally recommended:
# ENABLE_PPROF = false
# OFFLINE_MODE = true
# ── [security] section ─────────────────────────────────────────────────────
# INSTALL_LOCK = true
# SECRET_KEY = <generate with: gitea generate secret SECRET_KEY>
# REVERSE_PROXY_TRUST_LOCAL = true
# COOKIE_SECURE = true (set by deploy script)
# SET_COOKIE_HTTP_ONLY = true
# ── [service] section ──────────────────────────────────────────────────────
# DISABLE_REGISTRATION = true
# ALLOW_ONLY_EXTERNAL_REGISTRATION = false
# SHOW_REGISTRATION_BUTTON = false
# ENABLE_REVERSE_PROXY_AUTHENTICATION = false
# REQUIRE_SIGNIN_VIEW = true
# ── [repository] section ───────────────────────────────────────────────────
# FORCE_PRIVATE = true
# DEFAULT_PRIVATE = private

75
deploy/nginx-gitea.conf Normal file
View File

@@ -0,0 +1,75 @@
# ── Gitea Reverse Proxy — TLS via Let's Encrypt ─────────────────────────────
#
# Install path: /etc/nginx/sites-available/gitea
# Symlink: ln -s /etc/nginx/sites-available/gitea /etc/nginx/sites-enabled/
#
# Prerequisites:
# - DNS A record: git.alexanderwhitestone.com -> 143.198.27.163
# - certbot + python3-certbot-nginx installed
# - Certificate obtained via: certbot --nginx -d git.alexanderwhitestone.com
#
# After certbot runs, it will auto-modify the ssl lines below.
# This config is the pre-certbot template that certbot enhances.
# ── HTTP → HTTPS redirect ───────────────────────────────────────────────────
server {
listen 80;
listen [::]:80;
server_name git.alexanderwhitestone.com;
# Let's Encrypt ACME challenge
location /.well-known/acme-challenge/ {
root /var/www/html;
}
location / {
return 301 https://$host$request_uri;
}
}
# ── HTTPS — reverse proxy to Gitea ──────────────────────────────────────────
server {
listen 443 ssl http2;
listen [::]:443 ssl http2;
server_name git.alexanderwhitestone.com;
# ── TLS (managed by certbot) ────────────────────────────────────────────
ssl_certificate /etc/letsencrypt/live/git.alexanderwhitestone.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/git.alexanderwhitestone.com/privkey.pem;
# ── TLS hardening ───────────────────────────────────────────────────────
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
# ── Security headers ────────────────────────────────────────────────────
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
add_header X-Content-Type-Options nosniff always;
add_header X-Frame-Options SAMEORIGIN always;
add_header Referrer-Policy strict-origin-when-cross-origin always;
# ── Proxy to Gitea ──────────────────────────────────────────────────────
location / {
proxy_pass http://127.0.0.1:3000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket support (for live updates)
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# Large repo pushes
client_max_body_size 512m;
# Timeouts for large git operations
proxy_connect_timeout 300;
proxy_send_timeout 300;
proxy_read_timeout 300;
}
}

299
deploy/setup-gitea-tls.sh Executable file
View File

@@ -0,0 +1,299 @@
#!/usr/bin/env bash
set -euo pipefail
# ── Gitea TLS Setup — Nginx + Let's Encrypt ─────────────────────────────────
#
# Sets up a reverse proxy with automatic TLS for the Gitea instance.
#
# Prerequisites:
# - Ubuntu/Debian server with root access
# - DNS A record pointing to this server's IP
# - Gitea running on localhost:3000
#
# Usage:
# sudo bash deploy/setup-gitea-tls.sh git.alexanderwhitestone.com
# sudo bash deploy/setup-gitea-tls.sh git.alexanderwhitestone.com --email admin@alexanderwhitestone.com
#
# What it does:
# 1. Installs Nginx + Certbot
# 2. Deploys the Nginx reverse proxy config
# 3. Obtains a Let's Encrypt TLS certificate
# 4. Patches Gitea app.ini for HTTPS
# 5. Blocks direct access to port 3000
# 6. Restarts services
BOLD='\033[1m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
CYAN='\033[0;36m'
NC='\033[0m'
info() { echo -e "${GREEN}[+]${NC} $1"; }
warn() { echo -e "${YELLOW}[!]${NC} $1"; }
error() { echo -e "${RED}[x]${NC} $1"; }
step() { echo -e "\n${BOLD}── $1 ──${NC}"; }
# ── Parse arguments ─────────────────────────────────────────────────────────
DOMAIN=""
EMAIL=""
GITEA_INI="/etc/gitea/app.ini"
DRY_RUN=false
while [[ $# -gt 0 ]]; do
case $1 in
--email) EMAIL="$2"; shift 2 ;;
--ini) GITEA_INI="$2"; shift 2 ;;
--dry-run) DRY_RUN=true; shift ;;
-*) error "Unknown option: $1"; exit 1 ;;
*) DOMAIN="$1"; shift ;;
esac
done
if [ -z "$DOMAIN" ]; then
error "Usage: $0 <domain> [--email you@example.com] [--ini /path/to/app.ini]"
exit 1
fi
if [ -z "$EMAIL" ]; then
EMAIL="admin@${DOMAIN#*.}"
fi
echo -e "${CYAN}${BOLD}"
echo " ╔══════════════════════════════════════════╗"
echo " ║ Gitea TLS Setup ║"
echo " ║ Nginx + Let's Encrypt ║"
echo " ╚══════════════════════════════════════════╝"
echo -e "${NC}"
echo " Domain: $DOMAIN"
echo " Email: $EMAIL"
echo " Gitea INI: $GITEA_INI"
echo " Dry run: $DRY_RUN"
echo ""
# ── Preflight checks ───────────────────────────────────────────────────────
if [ "$(id -u)" -ne 0 ]; then
error "This script must be run as root (or with sudo)"
exit 1
fi
# Verify DNS resolves to this server
step "Checking DNS"
RESOLVED_IP=$(dig +short "$DOMAIN" 2>/dev/null | head -1)
LOCAL_IP=$(curl -4sf https://ifconfig.me 2>/dev/null || hostname -I 2>/dev/null | awk '{print $1}')
if [ -z "$RESOLVED_IP" ]; then
error "DNS record for $DOMAIN not found."
error "Create an A record pointing $DOMAIN to $LOCAL_IP first."
exit 1
fi
if [ "$RESOLVED_IP" != "$LOCAL_IP" ]; then
warn "DNS for $DOMAIN resolves to $RESOLVED_IP but this server is $LOCAL_IP"
warn "Let's Encrypt will fail if DNS doesn't point here. Continue anyway? [y/N]"
read -r CONTINUE
if [ "$CONTINUE" != "y" ] && [ "$CONTINUE" != "Y" ]; then
exit 1
fi
fi
info "DNS OK: $DOMAIN -> $RESOLVED_IP"
# Verify Gitea is running
step "Checking Gitea"
if curl -sf http://127.0.0.1:3000/ > /dev/null 2>&1; then
info "Gitea is running on localhost:3000"
else
warn "Gitea not responding on localhost:3000 — continuing anyway"
fi
if $DRY_RUN; then
info "Dry run — would install nginx, certbot, configure TLS for $DOMAIN"
exit 0
fi
# ── Step 1: Install Nginx + Certbot ────────────────────────────────────────
step "Installing Nginx + Certbot"
apt-get update -qq
apt-get install -y -qq nginx certbot python3-certbot-nginx
info "Nginx + Certbot installed"
# ── Step 2: Deploy Nginx config ────────────────────────────────────────────
step "Deploying Nginx Configuration"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
NGINX_CONF="$SCRIPT_DIR/nginx-gitea.conf"
if [ ! -f "$NGINX_CONF" ]; then
error "nginx-gitea.conf not found at $NGINX_CONF"
exit 1
fi
# Install config (replacing domain if different)
sed "s/git\.alexanderwhitestone\.com/$DOMAIN/g" "$NGINX_CONF" \
> /etc/nginx/sites-available/gitea
ln -sf /etc/nginx/sites-available/gitea /etc/nginx/sites-enabled/gitea
# Remove default site if it conflicts
if [ -L /etc/nginx/sites-enabled/default ]; then
rm /etc/nginx/sites-enabled/default
info "Removed default Nginx site"
fi
# Test config (will fail on missing cert — that's expected pre-certbot)
# First deploy without SSL, get cert, then enable SSL
cat > /etc/nginx/sites-available/gitea <<PRESSL
# Temporary HTTP-only config for certbot initial setup
server {
listen 80;
listen [::]:80;
server_name $DOMAIN;
location /.well-known/acme-challenge/ {
root /var/www/html;
}
location / {
proxy_pass http://127.0.0.1:3000;
proxy_set_header Host \$host;
proxy_set_header X-Real-IP \$remote_addr;
proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto \$scheme;
}
}
PRESSL
nginx -t && systemctl reload nginx
info "Temporary HTTP proxy deployed"
# ── Step 3: Obtain TLS Certificate ─────────────────────────────────────────
step "Obtaining TLS Certificate"
certbot --nginx \
-d "$DOMAIN" \
--email "$EMAIL" \
--agree-tos \
--non-interactive \
--redirect
info "TLS certificate obtained and Nginx configured"
# Now deploy the full config (certbot may have already modified it, but
# let's ensure our hardened version is in place)
sed "s/git\.alexanderwhitestone\.com/$DOMAIN/g" "$NGINX_CONF" \
> /etc/nginx/sites-available/gitea
nginx -t && systemctl reload nginx
info "Full TLS proxy config deployed"
# ── Step 4: Patch Gitea app.ini ─────────────────────────────────────────────
step "Patching Gitea Configuration"
if [ -f "$GITEA_INI" ]; then
# Backup first
cp "$GITEA_INI" "${GITEA_INI}.bak.$(date +%Y%m%d%H%M%S)"
info "Backed up app.ini"
# Patch server section
sed -i "s|^DOMAIN\s*=.*|DOMAIN = $DOMAIN|" "$GITEA_INI"
sed -i "s|^ROOT_URL\s*=.*|ROOT_URL = https://$DOMAIN/|" "$GITEA_INI"
sed -i "s|^HTTP_ADDR\s*=.*|HTTP_ADDR = 127.0.0.1|" "$GITEA_INI"
# Enable secure cookies
if grep -q "^COOKIE_SECURE" "$GITEA_INI"; then
sed -i "s|^COOKIE_SECURE\s*=.*|COOKIE_SECURE = true|" "$GITEA_INI"
else
sed -i "/^\[security\]/a COOKIE_SECURE = true" "$GITEA_INI"
fi
info "Gitea config patched: DOMAIN=$DOMAIN, ROOT_URL=https://$DOMAIN/, HTTP_ADDR=127.0.0.1"
else
warn "Gitea config not found at $GITEA_INI"
warn "Update manually:"
warn " DOMAIN = $DOMAIN"
warn " ROOT_URL = https://$DOMAIN/"
warn " HTTP_ADDR = 127.0.0.1"
fi
# ── Step 5: Block direct port 3000 access ───────────────────────────────────
step "Blocking Direct Port 3000 Access"
if command -v ufw &> /dev/null; then
ufw deny 3000/tcp 2>/dev/null || true
info "Port 3000 blocked via ufw"
else
# Use iptables as fallback
iptables -A INPUT -p tcp --dport 3000 -j DROP 2>/dev/null || true
info "Port 3000 blocked via iptables (not persistent — install ufw for persistence)"
fi
# Ensure HTTP/HTTPS are allowed
if command -v ufw &> /dev/null; then
ufw allow 80/tcp 2>/dev/null || true
ufw allow 443/tcp 2>/dev/null || true
ufw allow 22/tcp 2>/dev/null || true
fi
# ── Step 6: Restart Gitea ───────────────────────────────────────────────────
step "Restarting Gitea"
if systemctl is-active --quiet gitea; then
systemctl restart gitea
info "Gitea restarted"
elif docker ps --format '{{.Names}}' | grep -q gitea; then
docker restart "$(docker ps --format '{{.Names}}' | grep gitea | head -1)"
info "Gitea container restarted"
else
warn "Could not auto-restart Gitea — restart it manually"
fi
# ── Step 7: Verify ──────────────────────────────────────────────────────────
step "Verifying Deployment"
sleep 3
# Check HTTPS
if curl -sf "https://$DOMAIN" > /dev/null 2>&1; then
info "HTTPS is working: https://$DOMAIN"
else
warn "HTTPS check failed — may need a moment to propagate"
fi
# Check HSTS
HSTS=$(curl -sI "https://$DOMAIN" 2>/dev/null | grep -i "strict-transport-security" || true)
if [ -n "$HSTS" ]; then
info "HSTS header present: $HSTS"
else
warn "HSTS header not detected — check Nginx config"
fi
# Check HTTP redirect
HTTP_STATUS=$(curl -sI "http://$DOMAIN" 2>/dev/null | head -1 | awk '{print $2}')
if [ "$HTTP_STATUS" = "301" ]; then
info "HTTP->HTTPS redirect working (301)"
else
warn "HTTP redirect returned $HTTP_STATUS (expected 301)"
fi
# ── Summary ─────────────────────────────────────────────────────────────────
echo ""
echo -e "${GREEN}${BOLD}"
echo " ╔══════════════════════════════════════════╗"
echo " ║ Gitea TLS Setup Complete! ║"
echo " ╚══════════════════════════════════════════╝"
echo -e "${NC}"
echo ""
echo " Gitea: https://$DOMAIN"
echo ""
echo " Certbot auto-renewal is enabled by default."
echo " Test it: certbot renew --dry-run"
echo ""
echo " To check status:"
echo " nginx -t # test config"
echo " systemctl status nginx # proxy status"
echo " certbot certificates # TLS cert info"
echo ""

View File

@@ -1,83 +0,0 @@
#!/bin/bash
# Gitea backup script — run on the VPS before any hardening changes.
# Usage: sudo bash scripts/gitea_backup.sh [off-site-dest]
#
# off-site-dest: optional rsync/scp destination for off-site copy
# e.g. user@backup-host:/backups/gitea/
#
# Refs: #971, #990
set -euo pipefail
BACKUP_DIR="/opt/gitea/backups"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
GITEA_CONF="/etc/gitea/app.ini"
GITEA_WORK_DIR="/var/lib/gitea"
OFFSITE_DEST="${1:-}"
echo "=== Gitea Backup — $TIMESTAMP ==="
# Ensure backup directory exists
mkdir -p "$BACKUP_DIR"
cd "$BACKUP_DIR"
# Run the dump
echo "[1/4] Running gitea dump..."
gitea dump -c "$GITEA_CONF"
# Find the newest zip (gitea dump names it gitea-dump-*.zip)
BACKUP_FILE=$(ls -t "$BACKUP_DIR"/gitea-dump-*.zip 2>/dev/null | head -1)
if [ -z "$BACKUP_FILE" ]; then
echo "ERROR: No backup zip found in $BACKUP_DIR"
exit 1
fi
BACKUP_SIZE=$(stat -c%s "$BACKUP_FILE" 2>/dev/null || stat -f%z "$BACKUP_FILE")
echo "[2/4] Backup created: $BACKUP_FILE ($BACKUP_SIZE bytes)"
if [ "$BACKUP_SIZE" -eq 0 ]; then
echo "ERROR: Backup file is 0 bytes"
exit 1
fi
# Lock down permissions
chmod 600 "$BACKUP_FILE"
# Verify contents
echo "[3/4] Verifying backup contents..."
CONTENTS=$(unzip -l "$BACKUP_FILE" 2>/dev/null || true)
check_component() {
if echo "$CONTENTS" | grep -q "$1"; then
echo " OK: $2"
else
echo " WARN: $2 not found in backup"
fi
}
check_component "gitea-db.sql" "Database dump"
check_component "gitea-repo" "Repositories"
check_component "custom" "Custom config"
check_component "app.ini" "app.ini"
# Off-site copy
if [ -n "$OFFSITE_DEST" ]; then
echo "[4/4] Copying to off-site: $OFFSITE_DEST"
rsync -avz "$BACKUP_FILE" "$OFFSITE_DEST"
echo " Off-site copy complete."
else
echo "[4/4] No off-site destination provided. Skipping."
echo " To copy later: scp $BACKUP_FILE user@backup-host:/backups/gitea/"
fi
echo ""
echo "=== Backup complete ==="
echo "File: $BACKUP_FILE"
echo "Size: $BACKUP_SIZE bytes"
echo ""
echo "To verify restore on a clean instance:"
echo " 1. Copy zip to test machine"
echo " 2. unzip $BACKUP_FILE"
echo " 3. gitea restore --from <extracted-dir> -c /etc/gitea/app.ini"
echo " 4. Verify repos and DB are intact"

View File

@@ -1,555 +0,0 @@
"""ResearchOrchestrator — autonomous research pipeline.
Chains: Check Local → Generate Queries → Search → Fetch → Synthesize →
Crystallize → Write Artifact into an end-to-end research workflow.
Usage:
from timmy.research import ResearchOrchestrator, run_research
orchestrator = ResearchOrchestrator(cascade=router, memory=memory_fns)
result = await orchestrator.run("Bitcoin Lightning Network scaling")
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Data structures ──────────────────────────────────────────────────────────
CONFIDENCE_THRESHOLD = 0.85
DEFAULT_QUERIES_PER_TOPIC = 8
DEFAULT_RESULTS_PER_QUERY = 5
DEFAULT_PAGES_TO_FETCH = 10
DEFAULT_FETCH_TOKEN_LIMIT = 3000
DEFAULT_SYNTHESIS_MAX_TOKENS = 4000
@dataclass
class ResearchResult:
"""Output of a completed research pipeline run."""
topic: str
report: str
queries_generated: list[str] = field(default_factory=list)
sources: list[dict[str, str]] = field(default_factory=list)
action_items: list[str] = field(default_factory=list)
cache_hit: bool = False
duration_ms: float = 0.0
metrics: dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@dataclass
class SearchSnippet:
"""A single search result snippet."""
title: str
url: str
snippet: str
relevance: float = 0.0
@dataclass
class FetchedPage:
"""A fetched and truncated web page."""
url: str
title: str
content: str
token_estimate: int = 0
# ── Memory interface ─────────────────────────────────────────────────────────
@dataclass
class MemoryInterface:
"""Abstraction over the memory system for research.
Accepts callables so the orchestrator doesn't depend on a specific
memory implementation. Defaults wire to timmy.memory_system.
"""
search_fn: Any = None # (query, limit) -> list[MemoryEntry]
store_fn: Any = None # (content, source, context_type, ...) -> MemoryEntry
def __post_init__(self):
if self.search_fn is None or self.store_fn is None:
self._load_defaults()
def _load_defaults(self):
try:
from timmy.memory_system import search_memories, store_memory
if self.search_fn is None:
self.search_fn = search_memories
if self.store_fn is None:
self.store_fn = store_memory
except ImportError:
logger.warning("Memory system not available — research will skip caching")
if self.search_fn is None:
self.search_fn = lambda query, **kw: []
if self.store_fn is None:
self.store_fn = lambda content, source, **kw: None
# ── Tool interface ───────────────────────────────────────────────────────────
@dataclass
class ResearchTools:
"""Web search and fetch callables.
These are async callables:
web_search(query: str, limit: int) -> list[dict]
web_fetch(url: str, max_tokens: int) -> str
"""
web_search: Any = None
web_fetch: Any = None
# ── Orchestrator ─────────────────────────────────────────────────────────────
class ResearchOrchestrator:
"""Pipeline that chains research steps into an autonomous workflow.
Steps:
0. CHECK LOCAL KNOWLEDGE — search memory, return cached if confident
1. GENERATE QUERIES — ask LLM to produce search queries
2. SEARCH — execute queries via web_search tool
3. FETCH — rank snippets, fetch top pages
4. SYNTHESIZE — produce structured report via LLM
5. CRYSTALLIZE — store result in semantic memory
6. WRITE ARTIFACT — create Gitea issues from action items
"""
def __init__(
self,
cascade: Any,
memory: MemoryInterface | None = None,
tools: ResearchTools | None = None,
) -> None:
self.cascade = cascade
self.memory = memory or MemoryInterface()
self.tools = tools or ResearchTools()
self._metrics: dict[str, int] = {
"research_cache_hit": 0,
"research_api_call": 0,
}
async def run(
self,
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Execute the full research pipeline.
Args:
topic: The research topic or question.
template: Optional prompt template for synthesis.
context: Additional context dict (cascade_tier hint, etc.).
Returns:
ResearchResult with report, sources, and action items.
"""
start = time.monotonic()
context = context or {}
cascade_tier = context.get("cascade_tier")
# Step 0: Check local knowledge
cached = await self._check_local_knowledge(topic)
if cached is not None:
self._metrics["research_cache_hit"] += 1
cached.duration_ms = (time.monotonic() - start) * 1000
return cached
self._metrics["research_api_call"] += 1
# Step 1: Generate queries
queries = await self._generate_queries(topic, template, cascade_tier)
# Step 2: Search
snippets = await self._search(queries)
# Step 3: Fetch top pages
pages = await self._fetch(snippets)
# Step 4: Synthesize
report = await self._synthesize(topic, template, pages, cascade_tier)
# Step 5: Extract action items
action_items = _extract_action_items(report)
# Build result
sources = [{"url": p.url, "title": p.title} for p in pages]
result = ResearchResult(
topic=topic,
report=report,
queries_generated=queries,
sources=sources,
action_items=action_items,
cache_hit=False,
duration_ms=(time.monotonic() - start) * 1000,
metrics=dict(self._metrics),
)
# Step 6: Crystallize — store in memory
await self._crystallize(topic, result)
# Step 7: Write artifact — create Gitea issues
await self._write_artifact(result)
return result
# ── Pipeline steps ───────────────────────────────────────────────────
async def _check_local_knowledge(self, topic: str) -> ResearchResult | None:
"""Search semantic memory for existing research on this topic."""
try:
results = self.memory.search_fn(
query=topic, limit=10, context_type="research"
)
if not results:
return None
# Check if top result has high confidence
top = results[0]
score = getattr(top, "relevance_score", 0.0) or 0.0
if score >= CONFIDENCE_THRESHOLD:
content = getattr(top, "content", str(top))
logger.info(
"Research cache hit for '%s' (score=%.2f)", topic, score
)
return ResearchResult(
topic=topic,
report=content,
cache_hit=True,
metrics={"research_cache_hit": 1},
)
except Exception as exc:
logger.warning("Local knowledge check failed: %s", exc)
return None
async def _generate_queries(
self,
topic: str,
template: str | None,
cascade_tier: str | None,
) -> list[str]:
"""Ask the LLM to generate search queries for the topic."""
prompt = (
f"Generate {DEFAULT_QUERIES_PER_TOPIC} diverse web search queries "
f"to thoroughly research the following topic. Return ONLY the "
f"queries, one per line, no numbering or bullets.\n\n"
f"Topic: {topic}"
)
if template:
prompt += f"\n\nResearch template context:\n{template}"
messages = [
{"role": "system", "content": "You are a research query generator."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {"messages": messages, "temperature": 0.7}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
raw = response.get("content", "")
queries = [
line.strip()
for line in raw.strip().splitlines()
if line.strip() and not line.strip().startswith("#")
]
# Clean numbering prefixes
cleaned = []
for q in queries:
q = re.sub(r"^\d+[\.\)]\s*", "", q)
q = re.sub(r"^[-*]\s*", "", q)
if q:
cleaned.append(q)
return cleaned[:DEFAULT_QUERIES_PER_TOPIC + 4] # slight over-generate
except Exception as exc:
logger.warning("Query generation failed: %s", exc)
# Fallback: use topic itself as a single query
return [topic]
async def _search(self, queries: list[str]) -> list[SearchSnippet]:
"""Execute search queries and collect snippets."""
if not self.tools.web_search:
logger.warning("No web_search tool configured — skipping search step")
return []
all_snippets: list[SearchSnippet] = []
async def _run_query(query: str) -> list[SearchSnippet]:
try:
results = await asyncio.to_thread(
self.tools.web_search, query, DEFAULT_RESULTS_PER_QUERY
)
snippets = []
for r in (results or []):
snippets.append(
SearchSnippet(
title=r.get("title", ""),
url=r.get("url", ""),
snippet=r.get("snippet", ""),
)
)
return snippets
except Exception as exc:
logger.warning("Search failed for query '%s': %s", query, exc)
return []
# Run searches concurrently
tasks = [_run_query(q) for q in queries]
results = await asyncio.gather(*tasks)
for snippets in results:
all_snippets.extend(snippets)
# Deduplicate by URL
seen_urls: set[str] = set()
unique: list[SearchSnippet] = []
for s in all_snippets:
if s.url and s.url not in seen_urls:
seen_urls.add(s.url)
unique.append(s)
return unique
async def _fetch(self, snippets: list[SearchSnippet]) -> list[FetchedPage]:
"""Fetch top pages from search snippets."""
if not self.tools.web_fetch:
logger.warning("No web_fetch tool configured — skipping fetch step")
return []
# Take top N snippets
to_fetch = snippets[:DEFAULT_PAGES_TO_FETCH]
pages: list[FetchedPage] = []
async def _fetch_one(snippet: SearchSnippet) -> FetchedPage | None:
try:
content = await asyncio.to_thread(
self.tools.web_fetch, snippet.url, DEFAULT_FETCH_TOKEN_LIMIT
)
if content:
return FetchedPage(
url=snippet.url,
title=snippet.title,
content=content[:DEFAULT_FETCH_TOKEN_LIMIT * 4],
token_estimate=len(content.split()),
)
except Exception as exc:
logger.warning("Fetch failed for %s: %s", snippet.url, exc)
return None
tasks = [_fetch_one(s) for s in to_fetch]
results = await asyncio.gather(*tasks)
for page in results:
if page is not None:
pages.append(page)
return pages
async def _synthesize(
self,
topic: str,
template: str | None,
pages: list[FetchedPage],
cascade_tier: str | None,
) -> str:
"""Synthesize fetched pages into a structured research report."""
# Build context from fetched pages
context_parts = []
for i, page in enumerate(pages, 1):
context_parts.append(
f"--- Source {i}: {page.title} ({page.url}) ---\n"
f"{page.content[:DEFAULT_FETCH_TOKEN_LIMIT * 4]}\n"
)
sources_text = "\n".join(context_parts) if context_parts else "(no sources fetched)"
if template:
prompt = (
f"{template}\n\n"
f"Topic: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Synthesize a comprehensive report based on the sources above."
)
else:
prompt = (
f"Write a comprehensive research report on: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Structure your report with:\n"
f"- Executive summary\n"
f"- Key findings\n"
f"- Analysis\n"
f"- Action items (prefix each with 'ACTION:')\n"
f"- Sources cited"
)
messages = [
{"role": "system", "content": "You are a research analyst producing structured reports."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {
"messages": messages,
"temperature": 0.3,
"max_tokens": DEFAULT_SYNTHESIS_MAX_TOKENS,
}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
return response.get("content", "")
except Exception as exc:
logger.error("Synthesis failed: %s", exc)
# Fallback: return raw source summaries
return (
f"# Research: {topic}\n\n"
f"Synthesis failed ({exc}). Raw sources:\n\n{sources_text}"
)
async def _crystallize(self, topic: str, result: ResearchResult) -> None:
"""Store the research result in semantic memory."""
try:
self.memory.store_fn(
content=result.report,
source="research_orchestrator",
context_type="research",
metadata={
"topic": topic,
"sources": result.sources,
"action_items": result.action_items,
"cache_hit": result.cache_hit,
"duration_ms": result.duration_ms,
},
)
logger.info("Crystallized research on '%s' into memory", topic)
except Exception as exc:
logger.warning("Failed to crystallize research: %s", exc)
async def _write_artifact(self, result: ResearchResult) -> None:
"""Create Gitea issues from action items."""
if not result.action_items:
return
try:
await asyncio.to_thread(_create_gitea_issues, result)
except Exception as exc:
logger.warning("Failed to create Gitea issues: %s", exc)
def get_metrics(self) -> dict[str, int]:
"""Return current research pipeline metrics."""
return dict(self._metrics)
# ── Helpers ──────────────────────────────────────────────────────────────────
def _extract_action_items(report: str) -> list[str]:
"""Extract action items from a research report.
Looks for lines prefixed with ACTION:, TODO:, or - [ ].
"""
items: list[str] = []
for line in report.splitlines():
stripped = line.strip()
# ACTION: prefix
match = re.match(r"^(?:ACTION|TODO)\s*:\s*(.+)", stripped, re.IGNORECASE)
if match:
items.append(match.group(1).strip())
continue
# Markdown checkbox
match = re.match(r"^-\s*\[\s*\]\s*(.+)", stripped)
if match:
items.append(match.group(1).strip())
return items
def _create_gitea_issues(result: ResearchResult) -> None:
"""Create Gitea issues for action items (runs in thread)."""
if not settings.gitea_token or not settings.gitea_url:
logger.debug("Gitea not configured — skipping issue creation")
return
try:
import requests
except ImportError:
logger.debug("requests not available — skipping Gitea issue creation")
return
base_url = settings.gitea_url.rstrip("/")
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
for item in result.action_items:
try:
payload = {
"title": f"[research] {item[:100]}",
"body": (
f"Auto-generated from research on: **{result.topic}**\n\n"
f"Action item: {item}\n\n"
f"---\n"
f"_Created by ResearchOrchestrator_"
),
}
resp = requests.post(
f"{base_url}/api/v1/repos/{repo}/issues",
headers=headers,
json=payload,
timeout=10,
)
if resp.status_code in (200, 201):
logger.info("Created Gitea issue: %s", item[:60])
else:
logger.warning(
"Gitea issue creation failed (%d): %s",
resp.status_code,
resp.text[:200],
)
except Exception as exc:
logger.warning("Failed to create issue '%s': %s", item[:60], exc)
# ── Convenience function ─────────────────────────────────────────────────────
async def run_research(
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Convenience function to run research with default dependencies.
Creates a ResearchOrchestrator with the cascade router singleton
and default memory, then executes the pipeline.
"""
from infrastructure.router.cascade import get_router
cascade = get_router()
orchestrator = ResearchOrchestrator(cascade=cascade)
return await orchestrator.run(topic, template=template, context=context)

View File

@@ -1,497 +0,0 @@
"""Unit tests for timmy.research — ResearchOrchestrator pipeline."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.research import (
DEFAULT_QUERIES_PER_TOPIC,
MemoryInterface,
ResearchOrchestrator,
ResearchResult,
ResearchTools,
SearchSnippet,
_extract_action_items,
)
# ── Data structures ──────────────────────────────────────────────────────────
class TestResearchResult:
def test_defaults(self):
r = ResearchResult(topic="test", report="content")
assert r.topic == "test"
assert r.report == "content"
assert r.cache_hit is False
assert r.queries_generated == []
assert r.sources == []
assert r.action_items == []
assert r.duration_ms == 0.0
assert r.timestamp # non-empty
def test_with_data(self):
r = ResearchResult(
topic="AI",
report="report text",
queries_generated=["q1", "q2"],
sources=[{"url": "http://example.com", "title": "Test"}],
action_items=["Do X"],
cache_hit=True,
duration_ms=42.5,
)
assert r.cache_hit is True
assert len(r.sources) == 1
assert r.duration_ms == 42.5
class TestSearchSnippet:
def test_fields(self):
s = SearchSnippet(title="T", url="http://x.com", snippet="text")
assert s.relevance == 0.0
# ── _extract_action_items ────────────────────────────────────────────────────
class TestExtractActionItems:
def test_action_prefix(self):
report = "Some text\nACTION: Do the thing\nMore text"
items = _extract_action_items(report)
assert items == ["Do the thing"]
def test_todo_prefix(self):
report = "TODO: Fix the bug\nTodo: Also this"
items = _extract_action_items(report)
assert items == ["Fix the bug", "Also this"]
def test_checkbox(self):
report = "- [ ] Implement feature\n- [x] Already done"
items = _extract_action_items(report)
assert items == ["Implement feature"]
def test_mixed(self):
report = "ACTION: First\n- [ ] Second\nTODO: Third"
items = _extract_action_items(report)
assert items == ["First", "Second", "Third"]
def test_empty(self):
assert _extract_action_items("No actions here") == []
assert _extract_action_items("") == []
# ── MemoryInterface ──────────────────────────────────────────────────────────
class TestMemoryInterface:
def test_custom_fns(self):
search = MagicMock(return_value=[])
store = MagicMock()
mi = MemoryInterface(search_fn=search, store_fn=store)
assert mi.search_fn is search
assert mi.store_fn is store
def test_defaults_when_import_fails(self):
with patch.dict("sys.modules", {"timmy.memory_system": None}):
mi = MemoryInterface()
# Should have fallback callables
assert callable(mi.search_fn)
assert callable(mi.store_fn)
# Fallback search returns empty
assert mi.search_fn("test") == []
# ── ResearchOrchestrator ─────────────────────────────────────────────────────
def _make_cascade(**overrides):
"""Create a mock cascade router."""
cascade = AsyncMock()
cascade.complete = AsyncMock(
return_value={"content": overrides.get("content", "query1\nquery2\nquery3")}
)
return cascade
def _make_memory(search_results=None, score=0.0):
"""Create a mock memory interface."""
if search_results is None:
search_results = []
search_fn = MagicMock(return_value=search_results)
store_fn = MagicMock()
return MemoryInterface(search_fn=search_fn, store_fn=store_fn)
def _make_tools(search_results=None, fetch_content="Page content"):
"""Create mock research tools."""
web_search = MagicMock(
return_value=search_results
or [
{"title": "Result 1", "url": "http://a.com", "snippet": "Snippet 1"},
{"title": "Result 2", "url": "http://b.com", "snippet": "Snippet 2"},
]
)
web_fetch = MagicMock(return_value=fetch_content)
return ResearchTools(web_search=web_search, web_fetch=web_fetch)
class TestResearchOrchestratorInit:
def test_basic_init(self):
cascade = _make_cascade()
memory = _make_memory()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
assert orch.cascade is cascade
assert orch.memory is memory
assert orch.tools is tools
assert orch._metrics["research_cache_hit"] == 0
assert orch._metrics["research_api_call"] == 0
class TestCheckLocalKnowledge:
@pytest.mark.asyncio
async def test_cache_hit(self):
"""High-confidence memory result returns cached ResearchResult."""
entry = MagicMock()
entry.relevance_score = 0.90
entry.content = "Cached report"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is not None
assert result.cache_hit is True
assert result.report == "Cached report"
@pytest.mark.asyncio
async def test_cache_miss_low_score(self):
"""Low-confidence result returns None."""
entry = MagicMock()
entry.relevance_score = 0.5
entry.content = "Weak match"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_cache_miss_empty(self):
"""No memory results returns None."""
memory = _make_memory(search_results=[])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_exception_returns_none(self):
"""Memory search exception returns None gracefully."""
memory = MemoryInterface(
search_fn=MagicMock(side_effect=RuntimeError("db error")),
store_fn=MagicMock(),
)
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
class TestGenerateQueries:
@pytest.mark.asyncio
async def test_parses_queries(self):
cascade = _make_cascade(content="query one\nquery two\nquery three")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("AI safety", None, None)
assert queries == ["query one", "query two", "query three"]
@pytest.mark.asyncio
async def test_strips_numbering(self):
cascade = _make_cascade(content="1. First query\n2. Second query\n3) Third")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("topic", None, None)
assert "First query" in queries
assert "Second query" in queries
assert "Third" in queries
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM down"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("fallback topic", None, None)
assert queries == ["fallback topic"]
@pytest.mark.asyncio
async def test_passes_cascade_tier(self):
cascade = _make_cascade(content="q1\nq2")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
await orch._generate_queries("topic", None, "gpt-4")
call_kwargs = cascade.complete.call_args.kwargs
assert call_kwargs.get("model") == "gpt-4"
class TestSearch:
@pytest.mark.asyncio
async def test_collects_snippets(self):
tools = _make_tools()
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1", "q2"])
# 2 results per query, 2 queries, but deduplicated by URL
assert len(snippets) == 2 # same URLs returned for both queries
@pytest.mark.asyncio
async def test_no_search_tool(self):
tools = ResearchTools(web_search=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
@pytest.mark.asyncio
async def test_search_error_handled(self):
tools = ResearchTools(
web_search=MagicMock(side_effect=RuntimeError("network error"))
)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
class TestFetch:
@pytest.mark.asyncio
async def test_fetches_pages(self):
tools = _make_tools(fetch_content="Page body here")
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = [
SearchSnippet(title="P1", url="http://a.com", snippet="s1"),
SearchSnippet(title="P2", url="http://b.com", snippet="s2"),
]
pages = await orch._fetch(snippets)
assert len(pages) == 2
assert pages[0].content == "Page body here"
@pytest.mark.asyncio
async def test_no_fetch_tool(self):
tools = ResearchTools(web_fetch=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
pages = await orch._fetch([SearchSnippet("T", "http://x.com", "s")])
assert pages == []
class TestSynthesize:
@pytest.mark.asyncio
async def test_produces_report(self):
cascade = _make_cascade(content="# Report\nKey findings here")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Report" in report
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM error"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Synthesis failed" in report
assert "topic" in report
class TestCrystallize:
@pytest.mark.asyncio
async def test_stores_in_memory(self):
memory = _make_memory()
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report text")
await orch._crystallize("test", result)
memory.store_fn.assert_called_once()
call_kwargs = memory.store_fn.call_args
assert call_kwargs.kwargs.get("context_type") == "research"
assert call_kwargs.kwargs.get("source") == "research_orchestrator"
@pytest.mark.asyncio
async def test_store_error_handled(self):
memory = MemoryInterface(
search_fn=MagicMock(return_value=[]),
store_fn=MagicMock(side_effect=RuntimeError("db error")),
)
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report")
# Should not raise
await orch._crystallize("test", result)
class TestWriteArtifact:
@pytest.mark.asyncio
async def test_no_action_items_skips(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(topic="test", report="r", action_items=[])
# Should complete without any calls
await orch._write_artifact(result)
@pytest.mark.asyncio
async def test_creates_issues(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(
topic="test", report="r", action_items=["Fix the thing"]
)
with patch("timmy.research._create_gitea_issues") as mock_create:
await orch._write_artifact(result)
mock_create.assert_called_once_with(result)
class TestFullPipeline:
@pytest.mark.asyncio
async def test_cache_hit_short_circuits(self):
"""When memory has a high-confidence match, skip web search."""
entry = MagicMock()
entry.relevance_score = 0.95
entry.content = "Previously researched content"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
result = await orch.run("cached topic")
assert result.cache_hit is True
assert result.report == "Previously researched content"
# Cascade should NOT have been called (no query generation or synthesis)
cascade.complete.assert_not_called()
assert orch._metrics["research_cache_hit"] == 1
@pytest.mark.asyncio
async def test_full_pipeline_no_tools(self):
"""Pipeline completes even without web tools (graceful degradation)."""
memory = _make_memory()
cascade = AsyncMock()
# First call: generate queries, second: synthesize
cascade.complete = AsyncMock(
side_effect=[
{"content": "query 1\nquery 2"},
{"content": "# Report\nACTION: Do something"},
]
)
tools = ResearchTools() # No web tools
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert "Report" in result.report
assert result.action_items == ["Do something"]
assert result.duration_ms > 0
assert orch._metrics["research_api_call"] == 1
memory.store_fn.assert_called_once()
@pytest.mark.asyncio
async def test_full_pipeline_with_tools(self):
"""Full pipeline with search and fetch tools."""
memory = _make_memory()
cascade = AsyncMock()
cascade.complete = AsyncMock(
side_effect=[
{"content": "search query 1\nsearch query 2"},
{"content": "# Full Report\nTODO: Review findings"},
]
)
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert len(result.queries_generated) == 2
assert len(result.sources) > 0
assert result.action_items == ["Review findings"]
@pytest.mark.asyncio
async def test_get_metrics(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
metrics = orch.get_metrics()
assert "research_cache_hit" in metrics
assert "research_api_call" in metrics
class TestCreateGiteaIssues:
def test_no_token_skips(self):
"""No Gitea token configured — silently skips."""
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="t", report="r", action_items=["item"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = ""
mock_settings.gitea_url = ""
with patch("timmy.research.settings", mock_settings):
# Should not raise
_create_gitea_issues(result)
def test_creates_issue_on_success(self):
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="AI", report="r", action_items=["Deploy model"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
mock_resp = MagicMock()
mock_resp.status_code = 201
mock_requests_mod = MagicMock()
mock_requests_mod.post.return_value = mock_resp
with (
patch("timmy.research.settings", mock_settings),
patch.dict("sys.modules", {"requests": mock_requests_mod}),
):
_create_gitea_issues(result)
mock_requests_mod.post.assert_called_once()
call_kwargs = mock_requests_mod.post.call_args
assert "[research]" in call_kwargs.kwargs["json"]["title"]