From 55d53c513c868c631ffe02b493426f95e168f435 Mon Sep 17 00:00:00 2001 From: perplexity Date: Mon, 13 Apr 2026 02:24:04 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=203.5=20=E2=80=94=20DPO=20trainin?= =?UTF-8?q?g=20pair=20generation=20from=20Deep=20Dive=20pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire arXiv relevance filter output directly into DPO pair generation, closing the loop between research synthesis and overnight training data. New module: dpo_generator.py - DPOPairGenerator class with 3 pair strategies: * summarize: paper → fleet-grounded analysis (chosen) vs generic (rejected) * relevance: 'what matters to Hermes?' → scored context vs vague * implication: 'what should we do?' → actionable insight vs platitude - Extracts synthesis excerpts matched to each ranked item - Outputs to ~/.timmy/training-data/dpo-pairs/deepdive_{timestamp}.jsonl - Format: {prompt, chosen, rejected, task_type, evidence_ids, source_session, safety_flags, metadata} Pipeline changes (pipeline.py): - Import DPOPairGenerator with graceful degradation - Initialize from config deepdive.training.dpo section - Execute as Phase 3.5 between synthesis and audio - DPO results included in pipeline return dict - Wrapped in try/except — DPO failure never blocks delivery Config changes (config.yaml): - New deepdive.training.dpo section with: enabled, output_dir, min_score, max_pairs_per_run, pair_types Integration tested: 2 mock items × 3 pair types = 6 valid JSONL pairs. Chosen responses consistently richer than rejected (assert-verified). --- intelligence/deepdive/config.yaml | 12 + intelligence/deepdive/dpo_generator.py | 368 +++++++++++++++++++++++++ intelligence/deepdive/pipeline.py | 48 +++- 3 files changed, 426 insertions(+), 2 deletions(-) create mode 100644 intelligence/deepdive/dpo_generator.py diff --git a/intelligence/deepdive/config.yaml b/intelligence/deepdive/config.yaml index 077862a..3f6706a 100644 --- a/intelligence/deepdive/config.yaml +++ b/intelligence/deepdive/config.yaml @@ -88,6 +88,18 @@ deepdive: speed: 1.0 output_format: "mp3" # piper outputs WAV, convert for Telegram + # Phase 3.5: DPO Training Pair Generation + training: + dpo: + enabled: true + output_dir: "~/.timmy/training-data/dpo-pairs" + min_score: 0.5 # Only generate pairs from items above this relevance score + max_pairs_per_run: 30 # Cap pairs per pipeline execution + pair_types: # Which pair strategies to use + - "summarize" # Paper summary → fleet-grounded analysis + - "relevance" # Relevance analysis → scored fleet context + - "implication" # Implications → actionable insight + # Phase 0: Fleet Context Grounding fleet_context: enabled: true diff --git a/intelligence/deepdive/dpo_generator.py b/intelligence/deepdive/dpo_generator.py new file mode 100644 index 0000000..988468d --- /dev/null +++ b/intelligence/deepdive/dpo_generator.py @@ -0,0 +1,368 @@ +#!/usr/bin/env python3 +"""Deep Dive DPO Training Pair Generator — Phase 3.5 + +Transforms ranked research items + synthesis output into DPO preference +pairs for overnight Hermes training. Closes the loop between arXiv +intelligence gathering and sovereign model improvement. + +Pair strategy: + 1. summarize — "Summarize this paper" → fleet-grounded analysis (chosen) vs generic abstract (rejected) + 2. relevance — "What's relevant to Hermes?" → scored relevance analysis (chosen) vs vague (rejected) + 3. implication — "What are the implications?" → actionable insight (chosen) vs platitude (rejected) + +Output format matches timmy-home training-data convention: + {"prompt", "chosen", "rejected", "source_session", "task_type", "evidence_ids", "safety_flags"} +""" + +import hashlib +import json +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger("deepdive.dpo_generator") + + +@dataclass +class DPOPair: + """Single DPO training pair.""" + prompt: str + chosen: str + rejected: str + task_type: str + evidence_ids: List[str] = field(default_factory=list) + source_session: Dict[str, Any] = field(default_factory=dict) + safety_flags: List[str] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "prompt": self.prompt, + "chosen": self.chosen, + "rejected": self.rejected, + "task_type": self.task_type, + "evidence_ids": self.evidence_ids, + "source_session": self.source_session, + "safety_flags": self.safety_flags, + "metadata": self.metadata, + } + + +class DPOPairGenerator: + """Generate DPO training pairs from Deep Dive pipeline output. + + Sits between Phase 3 (Synthesis) and Phase 4 (Audio) as Phase 3.5. + Takes ranked items + synthesis briefing and produces training pairs + that teach Hermes to produce fleet-grounded research analysis. + """ + + def __init__(self, config: Optional[Dict[str, Any]] = None): + cfg = config or {} + self.output_dir = Path( + cfg.get("output_dir", str(Path.home() / ".timmy" / "training-data" / "dpo-pairs")) + ) + self.output_dir.mkdir(parents=True, exist_ok=True) + + self.min_score = cfg.get("min_score", 0.5) + self.max_pairs_per_run = cfg.get("max_pairs_per_run", 30) + self.pair_types = cfg.get("pair_types", ["summarize", "relevance", "implication"]) + + logger.info( + f"DPOPairGenerator: output_dir={self.output_dir}, " + f"pair_types={self.pair_types}, max_pairs={self.max_pairs_per_run}" + ) + + def _content_hash(self, text: str) -> str: + return hashlib.sha256(text.encode()).hexdigest()[:12] + + def _build_summarize_pair(self, item, score: float, + synthesis_excerpt: str) -> DPOPair: + """Type 1: 'Summarize this paper' → fleet-grounded analysis vs generic abstract.""" + prompt = ( + f"Summarize the following research paper and explain its significance " + f"for a team building sovereign LLM agents:\n\n" + f"Title: {item.title}\n" + f"Abstract: {item.summary[:500]}\n" + f"Source: {item.source}\n" + f"URL: {item.url}" + ) + + chosen = ( + f"{synthesis_excerpt}\n\n" + f"Relevance score: {score:.2f}/5.0 — " + f"This work directly impacts our agent architecture and training pipeline." + ) + + # Rejected: generic, unhelpful summary without fleet context + rejected = ( + f"This paper titled \"{item.title}\" presents research findings in the area " + f"of artificial intelligence. The authors discuss various methods and present " + f"results. This may be of interest to researchers in the field." + ) + + return DPOPair( + prompt=prompt, + chosen=chosen, + rejected=rejected, + task_type="summarize", + evidence_ids=[self._content_hash(item.url or item.title)], + source_session={ + "pipeline": "deepdive", + "phase": "3.5_dpo", + "relevance_score": score, + "source_url": item.url, + }, + safety_flags=["auto-generated", "deepdive-pipeline"], + metadata={ + "source_feed": item.source, + "item_title": item.title, + "score": score, + }, + ) + + def _build_relevance_pair(self, item, score: float, + fleet_context_text: str) -> DPOPair: + """Type 2: 'What's relevant to Hermes?' → scored analysis vs vague response.""" + prompt = ( + f"Analyze this research for relevance to the Hermes agent fleet — " + f"a sovereign AI system using local Gemma models, Ollama inference, " + f"and GRPO/DPO training:\n\n" + f"Title: {item.title}\n" + f"Abstract: {item.summary[:400]}" + ) + + # Build keyword match explanation + keywords_matched = [] + text_lower = f"{item.title} {item.summary}".lower() + relevance_terms = [ + "agent", "tool use", "function calling", "reinforcement learning", + "RLHF", "GRPO", "fine-tuning", "LoRA", "quantization", "inference", + "reasoning", "chain of thought", "transformer", "local" + ] + for term in relevance_terms: + if term.lower() in text_lower: + keywords_matched.append(term) + + keyword_str = ", ".join(keywords_matched[:5]) if keywords_matched else "general AI/ML" + + chosen = ( + f"**Relevance: {score:.2f}/5.0**\n\n" + f"This paper is relevant to our fleet because it touches on: {keyword_str}.\n\n" + ) + if fleet_context_text: + chosen += ( + f"In the context of our current fleet state:\n" + f"{fleet_context_text[:300]}\n\n" + ) + chosen += ( + f"**Actionable takeaway:** Review this work for techniques applicable to " + f"our overnight training loop and agent architecture improvements." + ) + + rejected = ( + f"This paper might be relevant. It discusses some AI topics. " + f"It could potentially be useful for various AI projects. " + f"Further reading may be needed to determine its applicability." + ) + + return DPOPair( + prompt=prompt, + chosen=chosen, + rejected=rejected, + task_type="relevance", + evidence_ids=[self._content_hash(item.url or item.title)], + source_session={ + "pipeline": "deepdive", + "phase": "3.5_dpo", + "relevance_score": score, + "keywords_matched": keywords_matched, + }, + safety_flags=["auto-generated", "deepdive-pipeline"], + metadata={ + "source_feed": item.source, + "item_title": item.title, + "score": score, + }, + ) + + def _build_implication_pair(self, item, score: float, + synthesis_excerpt: str) -> DPOPair: + """Type 3: 'What are the implications?' → actionable insight vs platitude.""" + prompt = ( + f"What are the practical implications of this research for a team " + f"running sovereign LLM agents with local training infrastructure?\n\n" + f"Title: {item.title}\n" + f"Summary: {item.summary[:400]}" + ) + + chosen = ( + f"**Immediate implications for our fleet:**\n\n" + f"1. **Training pipeline:** {synthesis_excerpt[:200] if synthesis_excerpt else 'This work suggests improvements to our GRPO/DPO training approach.'}\n\n" + f"2. **Agent architecture:** Techniques described here could enhance " + f"our tool-use and reasoning capabilities in Hermes agents.\n\n" + f"3. **Deployment consideration:** With a relevance score of {score:.2f}, " + f"this should be flagged for the next tightening cycle. " + f"Consider adding these techniques to the overnight R&D queue.\n\n" + f"**Priority:** {'HIGH — review before next deploy' if score >= 2.0 else 'MEDIUM — queue for weekly review'}" + ) + + rejected = ( + f"This research has some implications for AI development. " + f"Teams working on AI projects should be aware of these developments. " + f"The field is moving quickly and it's important to stay up to date." + ) + + return DPOPair( + prompt=prompt, + chosen=chosen, + rejected=rejected, + task_type="implication", + evidence_ids=[self._content_hash(item.url or item.title)], + source_session={ + "pipeline": "deepdive", + "phase": "3.5_dpo", + "relevance_score": score, + }, + safety_flags=["auto-generated", "deepdive-pipeline"], + metadata={ + "source_feed": item.source, + "item_title": item.title, + "score": score, + }, + ) + + def generate( + self, + ranked_items: List[tuple], + briefing: Dict[str, Any], + fleet_context_text: str = "", + ) -> List[DPOPair]: + """Generate DPO pairs from ranked items and synthesis output. + + Args: + ranked_items: List of (FeedItem, score) tuples from Phase 2 + briefing: Structured briefing dict from Phase 3 + fleet_context_text: Optional fleet context markdown string + + Returns: + List of DPOPair objects + """ + if not ranked_items: + logger.info("No ranked items — skipping DPO generation") + return [] + + synthesis_text = briefing.get("briefing", "") + pairs: List[DPOPair] = [] + + for item, score in ranked_items: + if score < self.min_score: + continue + + # Extract a synthesis excerpt relevant to this item + excerpt = self._extract_relevant_excerpt(synthesis_text, item.title) + + if "summarize" in self.pair_types: + pairs.append(self._build_summarize_pair(item, score, excerpt)) + + if "relevance" in self.pair_types: + pairs.append(self._build_relevance_pair(item, score, fleet_context_text)) + + if "implication" in self.pair_types: + pairs.append(self._build_implication_pair(item, score, excerpt)) + + if len(pairs) >= self.max_pairs_per_run: + break + + logger.info(f"Generated {len(pairs)} DPO pairs from {len(ranked_items)} ranked items") + return pairs + + def _extract_relevant_excerpt(self, synthesis_text: str, title: str) -> str: + """Extract the portion of synthesis most relevant to a given item title.""" + if not synthesis_text: + return "" + + # Try to find a paragraph mentioning key words from the title + title_words = [w.lower() for w in title.split() if len(w) > 4] + paragraphs = synthesis_text.split("\n\n") + + best_para = "" + best_overlap = 0 + + for para in paragraphs: + para_lower = para.lower() + overlap = sum(1 for w in title_words if w in para_lower) + if overlap > best_overlap: + best_overlap = overlap + best_para = para + + if best_overlap > 0: + return best_para.strip()[:500] + + # Fallback: first substantive paragraph + for para in paragraphs: + stripped = para.strip() + if len(stripped) > 100 and not stripped.startswith("#"): + return stripped[:500] + + return synthesis_text[:500] + + def export(self, pairs: List[DPOPair], session_id: Optional[str] = None) -> Path: + """Write DPO pairs to JSONL file. + + Args: + pairs: List of DPOPair objects + session_id: Optional session identifier for the filename + + Returns: + Path to the written JSONL file + """ + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + suffix = f"_{session_id}" if session_id else "" + filename = f"deepdive_{timestamp}{suffix}.jsonl" + output_path = self.output_dir / filename + + written = 0 + with open(output_path, "w") as f: + for pair in pairs: + f.write(json.dumps(pair.to_dict()) + "\n") + written += 1 + + logger.info(f"Exported {written} DPO pairs to {output_path}") + return output_path + + def run( + self, + ranked_items: List[tuple], + briefing: Dict[str, Any], + fleet_context_text: str = "", + session_id: Optional[str] = None, + ) -> Dict[str, Any]: + """Full Phase 3.5: generate + export DPO pairs. + + Returns summary dict for pipeline result aggregation. + """ + pairs = self.generate(ranked_items, briefing, fleet_context_text) + + if not pairs: + return { + "status": "skipped", + "pairs_generated": 0, + "output_path": None, + } + + output_path = self.export(pairs, session_id) + + # Summary by task type + type_counts = {} + for p in pairs: + type_counts[p.task_type] = type_counts.get(p.task_type, 0) + 1 + + return { + "status": "success", + "pairs_generated": len(pairs), + "output_path": str(output_path), + "pair_types": type_counts, + "output_dir": str(self.output_dir), + } diff --git a/intelligence/deepdive/pipeline.py b/intelligence/deepdive/pipeline.py index d675c1c..b2f210a 100644 --- a/intelligence/deepdive/pipeline.py +++ b/intelligence/deepdive/pipeline.py @@ -61,6 +61,14 @@ except ImportError: build_fleet_context = None FleetContext = None +# Phase 3.5: DPO pair generation +try: + from dpo_generator import DPOPairGenerator + HAS_DPO_GENERATOR = True +except ImportError: + HAS_DPO_GENERATOR = False + DPOPairGenerator = None + # Setup logging logging.basicConfig( level=logging.INFO, @@ -622,6 +630,17 @@ class DeepDivePipeline: self.aggregator = RSSAggregator(self.cache_dir) + # Phase 3.5: DPO pair generator + training_config = self.cfg.get('training', {}) + self.dpo_generator = None + if HAS_DPO_GENERATOR and training_config.get('dpo', {}).get('enabled', False): + self.dpo_generator = DPOPairGenerator(training_config.get('dpo', {})) + logger.info("DPO pair generator enabled") + elif not HAS_DPO_GENERATOR: + logger.info("DPO generator not available (dpo_generator module not found)") + else: + logger.info("DPO pair generation disabled in config") + relevance_config = self.cfg.get('relevance', {}) self.scorer = RelevanceScorer(relevance_config.get('model', 'all-MiniLM-L6-v2')) @@ -701,6 +720,28 @@ class DeepDivePipeline: json.dump(briefing, f, indent=2) logger.info(f"Briefing saved: {briefing_path}") + # Phase 3.5: DPO Training Pair Generation + dpo_result = None + if self.dpo_generator: + logger.info("Phase 3.5: DPO Training Pair Generation") + fleet_ctx_text = fleet_ctx.to_prompt_text() if fleet_ctx else "" + try: + dpo_result = self.dpo_generator.run( + ranked_items=ranked, + briefing=briefing, + fleet_context_text=fleet_ctx_text, + session_id=timestamp, + ) + logger.info( + f"Phase 3.5 complete: {dpo_result.get('pairs_generated', 0)} pairs → " + f"{dpo_result.get('output_path', 'none')}" + ) + except Exception as e: + logger.error(f"Phase 3.5 DPO generation failed: {e}") + dpo_result = {"status": "error", "error": str(e)} + else: + logger.info("Phase 3.5: DPO generation skipped (not configured)") + # Phase 4 if self.cfg.get('tts', {}).get('enabled', False) or self.cfg.get('audio', {}).get('enabled', False): logger.info("Phase 4: Audio Generation") @@ -721,14 +762,17 @@ class DeepDivePipeline: else: logger.info("Phase 5: Telegram not configured") - return { + result = { 'status': 'success', 'items_aggregated': len(items), 'items_ranked': len(ranked), 'briefing_path': str(briefing_path), 'audio_path': str(audio_path) if audio_path else None, - 'top_items': [item[0].to_dict() for item in ranked[:3]] + 'top_items': [item[0].to_dict() for item in ranked[:3]], } + if dpo_result: + result['dpo'] = dpo_result + return result # ============================================================================