Files
the-nexus/intelligence/deepdive/pipeline.py
Ezra (Archivist) 4b1873d76e
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
feat(deepdive): production briefing prompt + prompt engineering KT
- production_briefing_v1.txt: podcast-script prompt engineered for
  10-15 min premium audio, grounded fleet context, and actionable tone.
- PROMPT_ENGINEERING_KT.md: A/B testing protocol, failure modes,
  and maintenance checklist.
- pipeline.py: load external prompt_file from config.yaml.

Refs #830
2026-04-05 20:19:20 +00:00

780 lines
28 KiB
Python

#!/usr/bin/env python3
"""Deep Dive Intelligence Pipeline - PRODUCTION IMPLEMENTATION
Executable 5-phase pipeline for sovereign daily intelligence briefing.
Not architecture stubs — this runs.
Usage:
python -m deepdive.pipeline --config config.yaml --dry-run
python -m deepdive.pipeline --config config.yaml --today
"""
import asyncio
import hashlib
import json
import logging
import re
import tempfile
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import List, Dict, Optional, Any
import os
# Third-party imports with graceful degradation
try:
import feedparser
HAS_FEEDPARSER = True
except ImportError:
HAS_FEEDPARSER = False
feedparser = None
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
httpx = None
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
yaml = None
try:
import numpy as np
from sentence_transformers import SentenceTransformer
HAS_TRANSFORMERS = True
except ImportError:
HAS_TRANSFORMERS = False
np = None
SentenceTransformer = None
# Phase 0: Fleet context grounding
try:
from fleet_context import build_fleet_context, FleetContext
HAS_FLEET_CONTEXT = True
except ImportError:
HAS_FLEET_CONTEXT = False
build_fleet_context = None
FleetContext = None
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)s | %(message)s'
)
logger = logging.getLogger('deepdive')
# ============================================================================
# PHASE 1: SOURCE AGGREGATION
# ============================================================================
@dataclass
class FeedItem:
"""Normalized feed item from any source."""
title: str
summary: str
url: str
source: str
published: datetime
content_hash: str # For deduplication
raw: Dict[str, Any]
def to_dict(self) -> Dict:
return {
'title': self.title,
'summary': self.summary[:500],
'url': self.url,
'source': self.source,
'published': self.published.isoformat(),
'content_hash': self.content_hash,
}
class RSSAggregator:
"""Fetch and normalize RSS feeds with caching."""
def __init__(self, cache_dir: Optional[Path] = None, timeout: int = 30):
self.cache_dir = cache_dir or Path.home() / ".cache" / "deepdive"
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.timeout = timeout
self.etag_cache: Dict[str, str] = {}
logger.info(f"RSSAggregator: cache_dir={self.cache_dir}")
def _compute_hash(self, data: str) -> str:
"""Compute content hash for deduplication."""
return hashlib.sha256(data.encode()).hexdigest()[:16]
def _parse_date(self, parsed_time) -> datetime:
"""Convert feedparser time struct to datetime."""
if parsed_time:
try:
return datetime(*parsed_time[:6])
except:
pass
return datetime.now(timezone.utc).replace(tzinfo=None)
def _fetch_arxiv_api(self, category: str, max_items: int = 50) -> List[FeedItem]:
"""Fallback to arXiv API when RSS is empty."""
import urllib.request
import xml.etree.ElementTree as ET
api_url = f"http://export.arxiv.org/api/query?search_query=cat:{category}&sortBy=submittedDate&sortOrder=descending&start=0&max_results={max_items}"
logger.info(f"ArXiv RSS empty, falling back to API: {category}")
try:
req = urllib.request.Request(api_url, headers={'User-Agent': 'DeepDiveBot/1.0'})
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
data = resp.read().decode('utf-8')
ns = {'atom': 'http://www.w3.org/2005/Atom'}
root = ET.fromstring(data)
items = []
for entry in root.findall('atom:entry', ns)[:max_items]:
title = entry.find('atom:title', ns)
title = title.text.replace('\n', ' ').strip() if title is not None else 'Untitled'
summary = entry.find('atom:summary', ns)
summary = summary.text.strip() if summary is not None else ''
link = entry.find('atom:id', ns)
link = link.text.strip() if link is not None else ''
published = entry.find('atom:published', ns)
published_text = published.text if published is not None else None
content = f"{title}{summary}"
content_hash = self._compute_hash(content)
if published_text:
try:
pub_dt = datetime.fromisoformat(published_text.replace('Z', '+00:00')).replace(tzinfo=None)
except Exception:
pub_dt = datetime.now(timezone.utc).replace(tzinfo=None)
else:
pub_dt = datetime.now(timezone.utc).replace(tzinfo=None)
item = FeedItem(
title=title,
summary=summary,
url=link,
source=f"arxiv_api_{category}",
published=pub_dt,
content_hash=content_hash,
raw={'published': published_text}
)
items.append(item)
logger.info(f"Fetched {len(items)} items from arXiv API fallback")
return items
except Exception as e:
logger.error(f"ArXiv API fallback failed: {e}")
return []
async def fetch_feed(self, url: str, name: str,
since: Optional[datetime] = None,
max_items: int = 50) -> List[FeedItem]:
"""Fetch single feed with caching. Returns normalized items."""
if not HAS_FEEDPARSER:
logger.warning("feedparser not installed — using API fallback")
if 'arxiv' in name.lower() and 'arxiv.org/rss' in url:
category = url.split('/')[-1] if '/' in url else 'cs.AI'
return self._fetch_arxiv_api(category, max_items)
return []
logger.info(f"Fetching {name}: {url}")
try:
feed = feedparser.parse(url)
if feed.get('bozo_exception'):
logger.warning(f"Parse warning for {name}: {feed.bozo_exception}")
items = []
for entry in feed.entries[:max_items]:
title = entry.get('title', 'Untitled')
summary = entry.get('summary', entry.get('description', ''))
link = entry.get('link', '')
content = f"{title}{summary}"
content_hash = self._compute_hash(content)
published = self._parse_date(entry.get('published_parsed'))
if since and published < since:
continue
item = FeedItem(
title=title,
summary=summary,
url=link,
source=name,
published=published,
content_hash=content_hash,
raw=dict(entry)
)
items.append(item)
# ArXiv API fallback for empty RSS
if not items and 'arxiv' in name.lower() and 'arxiv.org/rss' in url:
category = url.split('/')[-1] if '/' in url else 'cs.AI'
items = self._fetch_arxiv_api(category, max_items)
logger.info(f"Fetched {len(items)} items from {name}")
return items
except Exception as e:
logger.error(f"Failed to fetch {name}: {e}")
return []
async def fetch_all(self, sources: List[Dict[str, Any]],
since: Optional[datetime] = None) -> List[FeedItem]:
"""Fetch all configured sources since cutoff time."""
all_items = []
for source in sources:
name = source['name']
url = source['url']
max_items = source.get('max_items', 50)
items = await self.fetch_feed(url, name, since, max_items)
all_items.extend(items)
# Deduplicate by content hash
seen = set()
unique = []
for item in all_items:
if item.content_hash not in seen:
seen.add(item.content_hash)
unique.append(item)
unique.sort(key=lambda x: x.published, reverse=True)
logger.info(f"Total unique items after aggregation: {len(unique)}")
return unique
# ============================================================================
# PHASE 2: RELEVANCE ENGINE
# ============================================================================
class RelevanceScorer:
"""Score items by relevance to Hermes/Timmy work."""
def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
self.model = None
self.model_name = model_name
self.keywords = {
"LLM agent": 1.5,
"agent architecture": 1.5,
"tool use": 1.3,
"function calling": 1.3,
"chain of thought": 1.2,
"reasoning": 1.2,
"reinforcement learning": 1.4,
"RLHF": 1.4,
"GRPO": 1.4,
"PPO": 1.3,
"fine-tuning": 1.1,
"LoRA": 1.1,
"quantization": 1.0,
"GGUF": 1.1,
"transformer": 1.0,
"attention": 1.0,
"inference": 1.0,
"training": 1.1,
"eval": 0.9,
"MMLU": 0.9,
"benchmark": 0.8,
}
if HAS_TRANSFORMERS:
try:
logger.info(f"Loading embedding model: {model_name}")
self.model = SentenceTransformer(model_name)
logger.info("Embedding model loaded")
except Exception as e:
logger.warning(f"Could not load embeddings model: {e}")
def keyword_score(self, text: str) -> float:
"""Score based on keyword matches."""
text_lower = text.lower()
score = 0.0
for keyword, weight in self.keywords.items():
if keyword.lower() in text_lower:
score += weight
count = text_lower.count(keyword.lower())
score += weight * (count - 1) * 0.5
return min(score, 5.0)
def embedding_score(self, item: FeedItem,
reference_texts: List[str]) -> float:
if not self.model or not np:
return 0.5
try:
item_text = f"{item.title} {item.summary}"
item_embedding = self.model.encode(item_text)
max_sim = 0.0
for ref_text in reference_texts:
ref_embedding = self.model.encode(ref_text)
sim = float(
np.dot(item_embedding, ref_embedding) /
(np.linalg.norm(item_embedding) * np.linalg.norm(ref_embedding))
)
max_sim = max(max_sim, sim)
return max_sim
except Exception as e:
logger.warning(f"Embedding score failed: {e}")
return 0.5
def score(self, item: FeedItem,
reference_texts: Optional[List[str]] = None) -> float:
text = f"{item.title} {item.summary}"
kw_score = self.keyword_score(text)
emb_score = self.embedding_score(item, reference_texts or [])
final = (kw_score * 0.6) + (emb_score * 2.0 * 0.4)
return round(final, 3)
def rank(self, items: List[FeedItem], top_n: int = 10,
min_score: float = 0.5) -> List[tuple]:
scored = []
for item in items:
s = self.score(item)
if s >= min_score:
scored.append((item, s))
scored.sort(key=lambda x: x[1], reverse=True)
return scored[:top_n]
# ============================================================================
# PHASE 3: SYNTHESIS ENGINE
# ============================================================================
class SynthesisEngine:
"""Generate intelligence briefing from filtered items."""
def __init__(self, llm_endpoint: str = "http://localhost:11435/v1",
prompt_template: Optional[str] = None):
self.endpoint = llm_endpoint
self.prompt_template = prompt_template
self.system_prompt = """You are an intelligence analyst for the Timmy Foundation fleet.
Synthesize AI/ML research into actionable briefings for agent developers.
Guidelines:
- Focus on implications for LLM agents, tool use, RL training
- Highlight practical techniques we could adopt
- Keep tone professional but urgent
- Structure: Headlines → Deep Dive → Implications
Context: Hermes agents run locally with Gemma 4, sovereign infrastructure.
If Fleet Context is provided above, use it to explain how external developments
impact our live repos, open issues, and current architecture."""
def _call_llm(self, prompt: str) -> str:
if not HAS_HTTPX or not httpx:
return "[LLM synthesis unavailable: httpx not installed]"
try:
response = httpx.post(
f"{self.endpoint}/chat/completions",
json={
"model": "local",
"messages": [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": prompt}
],
"temperature": 0.7,
"max_tokens": 2000
},
timeout=120.0
)
data = response.json()
return data['choices'][0]['message']['content']
except Exception as e:
logger.error(f"LLM call failed: {e}")
return f"[LLM synthesis failed: {e}. Using fallback template.]"
def _fallback_synthesis(self, items: List[tuple]) -> str:
lines = ["## Deep Dive Intelligence Briefing\n"]
lines.append("*Top items ranked by relevance to Hermes/Timmy work*\n")
for i, (item, score) in enumerate(items, 1):
lines.append(f"\n### {i}. {item.title}")
lines.append(f"**Score:** {score:.2f} | **Source:** {item.source}")
lines.append(f"**URL:** {item.url}\n")
lines.append(f"{item.summary[:300]}...")
lines.append("\n---\n")
lines.append("*Generated by Deep Dive pipeline*")
return "\n".join(lines)
def generate_structured(self, items: List[tuple],
fleet_context: Optional[FleetContext] = None) -> Dict[str, Any]:
if not items:
return {
'headline': 'No relevant intelligence today',
'briefing': 'No items met relevance threshold.',
'sources': []
}
# Build research items text
research_lines = []
for i, (item, score) in enumerate(items, 1):
research_lines.append(f"{i}. [{item.source}] {item.title}")
research_lines.append(f" Score: {score}")
research_lines.append(f" Summary: {item.summary[:300]}...")
research_lines.append(f" URL: {item.url}")
research_lines.append("")
research_text = "\n".join(research_lines)
fleet_text = ""
if fleet_context:
fleet_text = fleet_context.to_prompt_text(max_items_per_section=5)
if self.prompt_template:
prompt = (
self.prompt_template
.replace("{{FLEET_CONTEXT}}", fleet_text)
.replace("{{RESEARCH_ITEMS}}", research_text)
)
else:
lines = []
if fleet_text:
lines.append("FLEET CONTEXT:")
lines.append(fleet_text)
lines.append("")
lines.append("Generate an intelligence briefing from these research items:")
lines.append("")
lines.extend(research_lines)
prompt = "\n".join(lines)
synthesis = self._call_llm(prompt)
# If LLM failed, use fallback
if synthesis.startswith("["):
synthesis = self._fallback_synthesis(items)
return {
'headline': f"Deep Dive: {len(items)} items, top score {items[0][1]:.2f}",
'briefing': synthesis,
'sources': [item[0].to_dict() for item in items],
'generated_at': datetime.now(timezone.utc).isoformat()
}
# ============================================================================
# PHASE 4: AUDIO GENERATION
# ============================================================================
class AudioGenerator:
"""Generate audio from briefing text using local TTS."""
def __init__(self, voice_model: str = "en_US-lessac-medium"):
self.voice_model = voice_model
self.output_dir = Path.home() / ".cache" / "deepdive" / "audio"
self.output_dir.mkdir(parents=True, exist_ok=True)
def generate(self, briefing: Dict[str, Any]) -> Optional[Path]:
piper_path = Path("/usr/local/bin/piper")
if not piper_path.exists():
logger.warning("piper-tts not found. Audio generation skipped.")
return None
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
output_file = self.output_dir / f"deepdive_{timestamp}.wav"
text = briefing.get('briefing', '')
if not text:
return None
words = text.split()[:2000]
tts_text = " ".join(words)
logger.info(f"Generating audio: {output_file}")
import subprocess
try:
proc = subprocess.run(
[str(piper_path), "--model", self.voice_model, "--output_file", str(output_file)],
input=tts_text,
capture_output=True,
text=True
)
if proc.returncode == 0:
return output_file
else:
logger.error(f"Piper failed: {proc.stderr}")
return None
except Exception as e:
logger.error(f"Audio generation failed: {e}")
return None
# ============================================================================
# PHASE 5: DELIVERY (Telegram)
# ============================================================================
class TelegramDelivery:
"""Deliver briefing to Telegram as voice message + text summary."""
def __init__(self, bot_token: str, chat_id: str):
self.bot_token = bot_token
self.chat_id = chat_id
self.base_url = f"https://api.telegram.org/bot{bot_token}"
def deliver_text(self, briefing: Dict[str, Any]) -> bool:
if not HAS_HTTPX or not httpx:
logger.error("httpx not installed")
return False
try:
message = f"📡 *{briefing['headline']}*\n\n"
message += briefing['briefing'][:4000]
resp = httpx.post(
f"{self.base_url}/sendMessage",
json={
"chat_id": self.chat_id,
"text": message,
"parse_mode": "Markdown",
"disable_web_page_preview": True
},
timeout=30.0
)
if resp.status_code == 200:
logger.info("Telegram text delivery successful")
return True
else:
logger.error(f"Telegram delivery failed: {resp.text}")
return False
except Exception as e:
logger.error(f"Telegram delivery error: {e}")
return False
def deliver_voice(self, audio_path: Path) -> bool:
"""Deliver audio file as Telegram voice message using multipart upload."""
if not HAS_HTTPX or not httpx:
logger.error("httpx not installed")
return False
try:
import mimetypes
mime, _ = mimetypes.guess_type(str(audio_path))
mime = mime or "audio/ogg"
with open(audio_path, "rb") as f:
files = {
"voice": (audio_path.name, f, mime),
}
data = {
"chat_id": self.chat_id,
}
resp = httpx.post(
f"{self.base_url}/sendVoice",
data=data,
files=files,
timeout=60.0
)
if resp.status_code == 200:
logger.info("Telegram voice delivery successful")
return True
else:
logger.error(f"Telegram voice delivery failed: {resp.text}")
return False
except Exception as e:
logger.error(f"Telegram voice delivery error: {e}")
return False
# ============================================================================
# PIPELINE ORCHESTRATOR
# ============================================================================
class DeepDivePipeline:
"""End-to-end intelligence pipeline."""
def __init__(self, config: Dict[str, Any]):
self.config = config
# Config may be wrapped under 'deepdive' key or flat
self.cfg = config.get('deepdive', config)
self.cache_dir = Path.home() / ".cache" / "deepdive"
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.aggregator = RSSAggregator(self.cache_dir)
relevance_config = self.cfg.get('relevance', {})
self.scorer = RelevanceScorer(relevance_config.get('model', 'all-MiniLM-L6-v2'))
llm_endpoint = self.cfg.get('synthesis', {}).get('llm_endpoint', 'http://localhost:11435/v1')
prompt_file = self.cfg.get('synthesis', {}).get('prompt_file')
prompt_template = None
if prompt_file:
pf = Path(prompt_file)
if not pf.is_absolute():
pf = Path(__file__).parent / prompt_file
if pf.exists():
prompt_template = pf.read_text()
logger.info(f"Loaded prompt template: {pf}")
else:
logger.warning(f"Prompt file not found: {pf}")
self.synthesizer = SynthesisEngine(llm_endpoint, prompt_template=prompt_template)
self.audio_gen = AudioGenerator()
delivery_config = self.cfg.get('delivery', {})
self.telegram = None
bot_token = delivery_config.get('bot_token') or delivery_config.get('telegram_bot_token')
chat_id = delivery_config.get('channel_id') or delivery_config.get('telegram_chat_id')
if bot_token and chat_id:
self.telegram = TelegramDelivery(bot_token, str(chat_id))
async def run(self, since: Optional[datetime] = None,
dry_run: bool = False, force: bool = False) -> Dict[str, Any]:
logger.info("="*60)
logger.info("DEEP DIVE INTELLIGENCE PIPELINE")
logger.info("="*60)
# Phase 1
logger.info("Phase 1: Source Aggregation")
sources = self.cfg.get('sources', [])
items = await self.aggregator.fetch_all(sources, since)
if not items:
logger.warning("No items fetched")
if not force:
return {'status': 'empty', 'items_count': 0}
logger.info("Force mode enabled — continuing with empty dataset")
# Phase 2
logger.info("Phase 2: Relevance Scoring")
relevance_config = self.cfg.get('relevance', {})
top_n = relevance_config.get('top_n', 10)
min_score = relevance_config.get('min_score', 0.5)
ranked = self.scorer.rank(items, top_n=top_n, min_score=min_score)
logger.info(f"Selected {len(ranked)} items above threshold {min_score}")
if not ranked and not force:
return {'status': 'filtered', 'items_count': len(items), 'ranked_count': 0}
# Phase 0 — injected before Phase 3
logger.info("Phase 0: Fleet Context Grounding")
fleet_ctx = None
if HAS_FLEET_CONTEXT:
try:
fleet_ctx = build_fleet_context(self.cfg)
if fleet_ctx:
logger.info(f"Fleet context built: {len(fleet_ctx.repos)} repos, "
f"{len(fleet_ctx.open_issues)} issues/PRs, "
f"{len(fleet_ctx.recent_commits)} recent commits")
except Exception as e:
logger.warning(f"Fleet context build failed: {e}")
# Phase 3
logger.info("Phase 3: Synthesis")
briefing = self.synthesizer.generate_structured(ranked, fleet_context=fleet_ctx)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
briefing_path = self.cache_dir / f"briefing_{timestamp}.json"
with open(briefing_path, 'w') as f:
json.dump(briefing, f, indent=2)
logger.info(f"Briefing saved: {briefing_path}")
# Phase 4
if self.cfg.get('tts', {}).get('enabled', False) or self.cfg.get('audio', {}).get('enabled', False):
logger.info("Phase 4: Audio Generation")
audio_path = self.audio_gen.generate(briefing)
else:
audio_path = None
logger.info("Phase 4: Audio disabled")
# Phase 5
if not dry_run and self.telegram:
logger.info("Phase 5: Delivery")
self.telegram.deliver_text(briefing)
if audio_path:
self.telegram.deliver_voice(audio_path)
else:
if dry_run:
logger.info("Phase 5: DRY RUN - delivery skipped")
else:
logger.info("Phase 5: Telegram not configured")
return {
'status': 'success',
'items_aggregated': len(items),
'items_ranked': len(ranked),
'briefing_path': str(briefing_path),
'audio_path': str(audio_path) if audio_path else None,
'top_items': [item[0].to_dict() for item in ranked[:3]]
}
# ============================================================================
# CLI
# ============================================================================
async def main():
import argparse
parser = argparse.ArgumentParser(description="Deep Dive Intelligence Pipeline")
parser.add_argument('--config', '-c', default='config.yaml',
help='Configuration file path')
parser.add_argument('--dry-run', '-n', action='store_true',
help='Run without delivery')
parser.add_argument('--today', '-t', action='store_true',
help="Fetch only today's items")
parser.add_argument('--since', '-s', type=int, default=24,
help='Hours back to fetch (default: 24)')
parser.add_argument('--force', '-f', action='store_true',
help='Run pipeline even if no items are fetched (for testing)')
args = parser.parse_args()
if not HAS_YAML:
print("ERROR: PyYAML not installed. Run: pip install pyyaml")
return 1
with open(args.config) as f:
config = yaml.safe_load(f)
if args.today:
since = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None)
else:
since = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=args.since)
pipeline = DeepDivePipeline(config)
result = await pipeline.run(since=since, dry_run=args.dry_run, force=args.force)
print("\n" + "="*60)
print("PIPELINE RESULT")
print("="*60)
print(json.dumps(result, indent=2))
return 0 if result['status'] == 'success' else 1
if __name__ == '__main__':
exit(asyncio.run(main()))