Merge pull request 'feat: Phase 3.5 — DPO training pair generation from Deep Dive pipeline' (#1347) from feature/deepdive-dpo-phase-3.5 into main
This commit was merged in pull request #1347.
This commit is contained in:
@@ -88,6 +88,18 @@ deepdive:
|
||||
speed: 1.0
|
||||
output_format: "mp3" # piper outputs WAV, convert for Telegram
|
||||
|
||||
# Phase 3.5: DPO Training Pair Generation
|
||||
training:
|
||||
dpo:
|
||||
enabled: true
|
||||
output_dir: "~/.timmy/training-data/dpo-pairs"
|
||||
min_score: 0.5 # Only generate pairs from items above this relevance score
|
||||
max_pairs_per_run: 30 # Cap pairs per pipeline execution
|
||||
pair_types: # Which pair strategies to use
|
||||
- "summarize" # Paper summary → fleet-grounded analysis
|
||||
- "relevance" # Relevance analysis → scored fleet context
|
||||
- "implication" # Implications → actionable insight
|
||||
|
||||
# Phase 0: Fleet Context Grounding
|
||||
fleet_context:
|
||||
enabled: true
|
||||
|
||||
368
intelligence/deepdive/dpo_generator.py
Normal file
368
intelligence/deepdive/dpo_generator.py
Normal file
@@ -0,0 +1,368 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Deep Dive DPO Training Pair Generator — Phase 3.5
|
||||
|
||||
Transforms ranked research items + synthesis output into DPO preference
|
||||
pairs for overnight Hermes training. Closes the loop between arXiv
|
||||
intelligence gathering and sovereign model improvement.
|
||||
|
||||
Pair strategy:
|
||||
1. summarize — "Summarize this paper" → fleet-grounded analysis (chosen) vs generic abstract (rejected)
|
||||
2. relevance — "What's relevant to Hermes?" → scored relevance analysis (chosen) vs vague (rejected)
|
||||
3. implication — "What are the implications?" → actionable insight (chosen) vs platitude (rejected)
|
||||
|
||||
Output format matches timmy-home training-data convention:
|
||||
{"prompt", "chosen", "rejected", "source_session", "task_type", "evidence_ids", "safety_flags"}
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger("deepdive.dpo_generator")
|
||||
|
||||
|
||||
@dataclass
|
||||
class DPOPair:
|
||||
"""Single DPO training pair."""
|
||||
prompt: str
|
||||
chosen: str
|
||||
rejected: str
|
||||
task_type: str
|
||||
evidence_ids: List[str] = field(default_factory=list)
|
||||
source_session: Dict[str, Any] = field(default_factory=dict)
|
||||
safety_flags: List[str] = field(default_factory=list)
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"prompt": self.prompt,
|
||||
"chosen": self.chosen,
|
||||
"rejected": self.rejected,
|
||||
"task_type": self.task_type,
|
||||
"evidence_ids": self.evidence_ids,
|
||||
"source_session": self.source_session,
|
||||
"safety_flags": self.safety_flags,
|
||||
"metadata": self.metadata,
|
||||
}
|
||||
|
||||
|
||||
class DPOPairGenerator:
|
||||
"""Generate DPO training pairs from Deep Dive pipeline output.
|
||||
|
||||
Sits between Phase 3 (Synthesis) and Phase 4 (Audio) as Phase 3.5.
|
||||
Takes ranked items + synthesis briefing and produces training pairs
|
||||
that teach Hermes to produce fleet-grounded research analysis.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
||||
cfg = config or {}
|
||||
self.output_dir = Path(
|
||||
cfg.get("output_dir", str(Path.home() / ".timmy" / "training-data" / "dpo-pairs"))
|
||||
)
|
||||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.min_score = cfg.get("min_score", 0.5)
|
||||
self.max_pairs_per_run = cfg.get("max_pairs_per_run", 30)
|
||||
self.pair_types = cfg.get("pair_types", ["summarize", "relevance", "implication"])
|
||||
|
||||
logger.info(
|
||||
f"DPOPairGenerator: output_dir={self.output_dir}, "
|
||||
f"pair_types={self.pair_types}, max_pairs={self.max_pairs_per_run}"
|
||||
)
|
||||
|
||||
def _content_hash(self, text: str) -> str:
|
||||
return hashlib.sha256(text.encode()).hexdigest()[:12]
|
||||
|
||||
def _build_summarize_pair(self, item, score: float,
|
||||
synthesis_excerpt: str) -> DPOPair:
|
||||
"""Type 1: 'Summarize this paper' → fleet-grounded analysis vs generic abstract."""
|
||||
prompt = (
|
||||
f"Summarize the following research paper and explain its significance "
|
||||
f"for a team building sovereign LLM agents:\n\n"
|
||||
f"Title: {item.title}\n"
|
||||
f"Abstract: {item.summary[:500]}\n"
|
||||
f"Source: {item.source}\n"
|
||||
f"URL: {item.url}"
|
||||
)
|
||||
|
||||
chosen = (
|
||||
f"{synthesis_excerpt}\n\n"
|
||||
f"Relevance score: {score:.2f}/5.0 — "
|
||||
f"This work directly impacts our agent architecture and training pipeline."
|
||||
)
|
||||
|
||||
# Rejected: generic, unhelpful summary without fleet context
|
||||
rejected = (
|
||||
f"This paper titled \"{item.title}\" presents research findings in the area "
|
||||
f"of artificial intelligence. The authors discuss various methods and present "
|
||||
f"results. This may be of interest to researchers in the field."
|
||||
)
|
||||
|
||||
return DPOPair(
|
||||
prompt=prompt,
|
||||
chosen=chosen,
|
||||
rejected=rejected,
|
||||
task_type="summarize",
|
||||
evidence_ids=[self._content_hash(item.url or item.title)],
|
||||
source_session={
|
||||
"pipeline": "deepdive",
|
||||
"phase": "3.5_dpo",
|
||||
"relevance_score": score,
|
||||
"source_url": item.url,
|
||||
},
|
||||
safety_flags=["auto-generated", "deepdive-pipeline"],
|
||||
metadata={
|
||||
"source_feed": item.source,
|
||||
"item_title": item.title,
|
||||
"score": score,
|
||||
},
|
||||
)
|
||||
|
||||
def _build_relevance_pair(self, item, score: float,
|
||||
fleet_context_text: str) -> DPOPair:
|
||||
"""Type 2: 'What's relevant to Hermes?' → scored analysis vs vague response."""
|
||||
prompt = (
|
||||
f"Analyze this research for relevance to the Hermes agent fleet — "
|
||||
f"a sovereign AI system using local Gemma models, Ollama inference, "
|
||||
f"and GRPO/DPO training:\n\n"
|
||||
f"Title: {item.title}\n"
|
||||
f"Abstract: {item.summary[:400]}"
|
||||
)
|
||||
|
||||
# Build keyword match explanation
|
||||
keywords_matched = []
|
||||
text_lower = f"{item.title} {item.summary}".lower()
|
||||
relevance_terms = [
|
||||
"agent", "tool use", "function calling", "reinforcement learning",
|
||||
"RLHF", "GRPO", "fine-tuning", "LoRA", "quantization", "inference",
|
||||
"reasoning", "chain of thought", "transformer", "local"
|
||||
]
|
||||
for term in relevance_terms:
|
||||
if term.lower() in text_lower:
|
||||
keywords_matched.append(term)
|
||||
|
||||
keyword_str = ", ".join(keywords_matched[:5]) if keywords_matched else "general AI/ML"
|
||||
|
||||
chosen = (
|
||||
f"**Relevance: {score:.2f}/5.0**\n\n"
|
||||
f"This paper is relevant to our fleet because it touches on: {keyword_str}.\n\n"
|
||||
)
|
||||
if fleet_context_text:
|
||||
chosen += (
|
||||
f"In the context of our current fleet state:\n"
|
||||
f"{fleet_context_text[:300]}\n\n"
|
||||
)
|
||||
chosen += (
|
||||
f"**Actionable takeaway:** Review this work for techniques applicable to "
|
||||
f"our overnight training loop and agent architecture improvements."
|
||||
)
|
||||
|
||||
rejected = (
|
||||
f"This paper might be relevant. It discusses some AI topics. "
|
||||
f"It could potentially be useful for various AI projects. "
|
||||
f"Further reading may be needed to determine its applicability."
|
||||
)
|
||||
|
||||
return DPOPair(
|
||||
prompt=prompt,
|
||||
chosen=chosen,
|
||||
rejected=rejected,
|
||||
task_type="relevance",
|
||||
evidence_ids=[self._content_hash(item.url or item.title)],
|
||||
source_session={
|
||||
"pipeline": "deepdive",
|
||||
"phase": "3.5_dpo",
|
||||
"relevance_score": score,
|
||||
"keywords_matched": keywords_matched,
|
||||
},
|
||||
safety_flags=["auto-generated", "deepdive-pipeline"],
|
||||
metadata={
|
||||
"source_feed": item.source,
|
||||
"item_title": item.title,
|
||||
"score": score,
|
||||
},
|
||||
)
|
||||
|
||||
def _build_implication_pair(self, item, score: float,
|
||||
synthesis_excerpt: str) -> DPOPair:
|
||||
"""Type 3: 'What are the implications?' → actionable insight vs platitude."""
|
||||
prompt = (
|
||||
f"What are the practical implications of this research for a team "
|
||||
f"running sovereign LLM agents with local training infrastructure?\n\n"
|
||||
f"Title: {item.title}\n"
|
||||
f"Summary: {item.summary[:400]}"
|
||||
)
|
||||
|
||||
chosen = (
|
||||
f"**Immediate implications for our fleet:**\n\n"
|
||||
f"1. **Training pipeline:** {synthesis_excerpt[:200] if synthesis_excerpt else 'This work suggests improvements to our GRPO/DPO training approach.'}\n\n"
|
||||
f"2. **Agent architecture:** Techniques described here could enhance "
|
||||
f"our tool-use and reasoning capabilities in Hermes agents.\n\n"
|
||||
f"3. **Deployment consideration:** With a relevance score of {score:.2f}, "
|
||||
f"this should be flagged for the next tightening cycle. "
|
||||
f"Consider adding these techniques to the overnight R&D queue.\n\n"
|
||||
f"**Priority:** {'HIGH — review before next deploy' if score >= 2.0 else 'MEDIUM — queue for weekly review'}"
|
||||
)
|
||||
|
||||
rejected = (
|
||||
f"This research has some implications for AI development. "
|
||||
f"Teams working on AI projects should be aware of these developments. "
|
||||
f"The field is moving quickly and it's important to stay up to date."
|
||||
)
|
||||
|
||||
return DPOPair(
|
||||
prompt=prompt,
|
||||
chosen=chosen,
|
||||
rejected=rejected,
|
||||
task_type="implication",
|
||||
evidence_ids=[self._content_hash(item.url or item.title)],
|
||||
source_session={
|
||||
"pipeline": "deepdive",
|
||||
"phase": "3.5_dpo",
|
||||
"relevance_score": score,
|
||||
},
|
||||
safety_flags=["auto-generated", "deepdive-pipeline"],
|
||||
metadata={
|
||||
"source_feed": item.source,
|
||||
"item_title": item.title,
|
||||
"score": score,
|
||||
},
|
||||
)
|
||||
|
||||
def generate(
|
||||
self,
|
||||
ranked_items: List[tuple],
|
||||
briefing: Dict[str, Any],
|
||||
fleet_context_text: str = "",
|
||||
) -> List[DPOPair]:
|
||||
"""Generate DPO pairs from ranked items and synthesis output.
|
||||
|
||||
Args:
|
||||
ranked_items: List of (FeedItem, score) tuples from Phase 2
|
||||
briefing: Structured briefing dict from Phase 3
|
||||
fleet_context_text: Optional fleet context markdown string
|
||||
|
||||
Returns:
|
||||
List of DPOPair objects
|
||||
"""
|
||||
if not ranked_items:
|
||||
logger.info("No ranked items — skipping DPO generation")
|
||||
return []
|
||||
|
||||
synthesis_text = briefing.get("briefing", "")
|
||||
pairs: List[DPOPair] = []
|
||||
|
||||
for item, score in ranked_items:
|
||||
if score < self.min_score:
|
||||
continue
|
||||
|
||||
# Extract a synthesis excerpt relevant to this item
|
||||
excerpt = self._extract_relevant_excerpt(synthesis_text, item.title)
|
||||
|
||||
if "summarize" in self.pair_types:
|
||||
pairs.append(self._build_summarize_pair(item, score, excerpt))
|
||||
|
||||
if "relevance" in self.pair_types:
|
||||
pairs.append(self._build_relevance_pair(item, score, fleet_context_text))
|
||||
|
||||
if "implication" in self.pair_types:
|
||||
pairs.append(self._build_implication_pair(item, score, excerpt))
|
||||
|
||||
if len(pairs) >= self.max_pairs_per_run:
|
||||
break
|
||||
|
||||
logger.info(f"Generated {len(pairs)} DPO pairs from {len(ranked_items)} ranked items")
|
||||
return pairs
|
||||
|
||||
def _extract_relevant_excerpt(self, synthesis_text: str, title: str) -> str:
|
||||
"""Extract the portion of synthesis most relevant to a given item title."""
|
||||
if not synthesis_text:
|
||||
return ""
|
||||
|
||||
# Try to find a paragraph mentioning key words from the title
|
||||
title_words = [w.lower() for w in title.split() if len(w) > 4]
|
||||
paragraphs = synthesis_text.split("\n\n")
|
||||
|
||||
best_para = ""
|
||||
best_overlap = 0
|
||||
|
||||
for para in paragraphs:
|
||||
para_lower = para.lower()
|
||||
overlap = sum(1 for w in title_words if w in para_lower)
|
||||
if overlap > best_overlap:
|
||||
best_overlap = overlap
|
||||
best_para = para
|
||||
|
||||
if best_overlap > 0:
|
||||
return best_para.strip()[:500]
|
||||
|
||||
# Fallback: first substantive paragraph
|
||||
for para in paragraphs:
|
||||
stripped = para.strip()
|
||||
if len(stripped) > 100 and not stripped.startswith("#"):
|
||||
return stripped[:500]
|
||||
|
||||
return synthesis_text[:500]
|
||||
|
||||
def export(self, pairs: List[DPOPair], session_id: Optional[str] = None) -> Path:
|
||||
"""Write DPO pairs to JSONL file.
|
||||
|
||||
Args:
|
||||
pairs: List of DPOPair objects
|
||||
session_id: Optional session identifier for the filename
|
||||
|
||||
Returns:
|
||||
Path to the written JSONL file
|
||||
"""
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
||||
suffix = f"_{session_id}" if session_id else ""
|
||||
filename = f"deepdive_{timestamp}{suffix}.jsonl"
|
||||
output_path = self.output_dir / filename
|
||||
|
||||
written = 0
|
||||
with open(output_path, "w") as f:
|
||||
for pair in pairs:
|
||||
f.write(json.dumps(pair.to_dict()) + "\n")
|
||||
written += 1
|
||||
|
||||
logger.info(f"Exported {written} DPO pairs to {output_path}")
|
||||
return output_path
|
||||
|
||||
def run(
|
||||
self,
|
||||
ranked_items: List[tuple],
|
||||
briefing: Dict[str, Any],
|
||||
fleet_context_text: str = "",
|
||||
session_id: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Full Phase 3.5: generate + export DPO pairs.
|
||||
|
||||
Returns summary dict for pipeline result aggregation.
|
||||
"""
|
||||
pairs = self.generate(ranked_items, briefing, fleet_context_text)
|
||||
|
||||
if not pairs:
|
||||
return {
|
||||
"status": "skipped",
|
||||
"pairs_generated": 0,
|
||||
"output_path": None,
|
||||
}
|
||||
|
||||
output_path = self.export(pairs, session_id)
|
||||
|
||||
# Summary by task type
|
||||
type_counts = {}
|
||||
for p in pairs:
|
||||
type_counts[p.task_type] = type_counts.get(p.task_type, 0) + 1
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"pairs_generated": len(pairs),
|
||||
"output_path": str(output_path),
|
||||
"pair_types": type_counts,
|
||||
"output_dir": str(self.output_dir),
|
||||
}
|
||||
@@ -61,6 +61,14 @@ except ImportError:
|
||||
build_fleet_context = None
|
||||
FleetContext = None
|
||||
|
||||
# Phase 3.5: DPO pair generation
|
||||
try:
|
||||
from dpo_generator import DPOPairGenerator
|
||||
HAS_DPO_GENERATOR = True
|
||||
except ImportError:
|
||||
HAS_DPO_GENERATOR = False
|
||||
DPOPairGenerator = None
|
||||
|
||||
# Setup logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -622,6 +630,17 @@ class DeepDivePipeline:
|
||||
|
||||
self.aggregator = RSSAggregator(self.cache_dir)
|
||||
|
||||
# Phase 3.5: DPO pair generator
|
||||
training_config = self.cfg.get('training', {})
|
||||
self.dpo_generator = None
|
||||
if HAS_DPO_GENERATOR and training_config.get('dpo', {}).get('enabled', False):
|
||||
self.dpo_generator = DPOPairGenerator(training_config.get('dpo', {}))
|
||||
logger.info("DPO pair generator enabled")
|
||||
elif not HAS_DPO_GENERATOR:
|
||||
logger.info("DPO generator not available (dpo_generator module not found)")
|
||||
else:
|
||||
logger.info("DPO pair generation disabled in config")
|
||||
|
||||
relevance_config = self.cfg.get('relevance', {})
|
||||
self.scorer = RelevanceScorer(relevance_config.get('model', 'all-MiniLM-L6-v2'))
|
||||
|
||||
@@ -701,6 +720,28 @@ class DeepDivePipeline:
|
||||
json.dump(briefing, f, indent=2)
|
||||
logger.info(f"Briefing saved: {briefing_path}")
|
||||
|
||||
# Phase 3.5: DPO Training Pair Generation
|
||||
dpo_result = None
|
||||
if self.dpo_generator:
|
||||
logger.info("Phase 3.5: DPO Training Pair Generation")
|
||||
fleet_ctx_text = fleet_ctx.to_prompt_text() if fleet_ctx else ""
|
||||
try:
|
||||
dpo_result = self.dpo_generator.run(
|
||||
ranked_items=ranked,
|
||||
briefing=briefing,
|
||||
fleet_context_text=fleet_ctx_text,
|
||||
session_id=timestamp,
|
||||
)
|
||||
logger.info(
|
||||
f"Phase 3.5 complete: {dpo_result.get('pairs_generated', 0)} pairs → "
|
||||
f"{dpo_result.get('output_path', 'none')}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Phase 3.5 DPO generation failed: {e}")
|
||||
dpo_result = {"status": "error", "error": str(e)}
|
||||
else:
|
||||
logger.info("Phase 3.5: DPO generation skipped (not configured)")
|
||||
|
||||
# Phase 4
|
||||
if self.cfg.get('tts', {}).get('enabled', False) or self.cfg.get('audio', {}).get('enabled', False):
|
||||
logger.info("Phase 4: Audio Generation")
|
||||
@@ -721,14 +762,17 @@ class DeepDivePipeline:
|
||||
else:
|
||||
logger.info("Phase 5: Telegram not configured")
|
||||
|
||||
return {
|
||||
result = {
|
||||
'status': 'success',
|
||||
'items_aggregated': len(items),
|
||||
'items_ranked': len(ranked),
|
||||
'briefing_path': str(briefing_path),
|
||||
'audio_path': str(audio_path) if audio_path else None,
|
||||
'top_items': [item[0].to_dict() for item in ranked[:3]]
|
||||
'top_items': [item[0].to_dict() for item in ranked[:3]],
|
||||
}
|
||||
if dpo_result:
|
||||
result['dpo'] = dpo_result
|
||||
return result
|
||||
|
||||
|
||||
# ============================================================================
|
||||
|
||||
Reference in New Issue
Block a user