#!/usr/bin/env python3 """deepdive_orchestrator.py — Deep Dive pipeline controller. Issue #830.""" import argparse import json import subprocess import sys from datetime import datetime from pathlib import Path DEFAULT_CONFIG = { "sources": ["arxiv_cs_ai", "arxiv_cs_cl", "arxiv_cs_lg"], "max_items": 10, "tts_enabled": False, } class Orchestrator: def __init__(self, date: str = None): self.date = date or datetime.now().strftime("%Y-%m-%d") self.state_dir = Path("~/the-nexus/deepdive_state").expanduser() / self.date self.state_dir.mkdir(parents=True, exist_ok=True) def phase1(self, sources): """Aggregate from sources.""" print("[PHASE 1] Aggregating...") output_file = self.state_dir / "raw_items.json" subprocess.run([ sys.executable, Path(__file__).parent / "deepdive_aggregator.py", "--sources", ",".join(sources), "--output", str(output_file) ]) return json.loads(output_file.read_text()) def phase2(self, raw_items, max_items): """Filter by keywords.""" print("[PHASE 2] Filtering...") keywords = ["agent", "llm", "tool use", "rlhf", "alignment", "finetuning"] scored = [] for item in raw_items["items"]: content = f"{item.get('title','')} {item.get('content','')}".lower() score = sum(1 for kw in keywords if kw in content) scored.append({**item, "score": score}) scored.sort(key=lambda x: x["score"], reverse=True) top = scored[:max_items] output_file = self.state_dir / "ranked.json" output_file.write_text(json.dumps({"items": top}, indent=2)) return top def phase3(self, ranked_items): """Synthesize briefing.""" print("[PHASE 3] Synthesizing (MVP: structured text)...") md = f"# Deep Dive — {self.date}\n\n" for i, item in enumerate(ranked_items[:3], 1): md += f"{i}. [{item['title']}]({item['url']}) — Score: {item['score']}\n\n" briefing_file = self.state_dir / "briefing.md" briefing_file.write_text(md) return str(briefing_file) def phase4(self, briefing_file): """Generate audio.""" print("[PHASE 4] ⚠ TTS decision needed — skipping") return None def phase5(self, briefing_file, audio_file): """Deliver.""" print("[PHASE 5] ⚠ Telegram delivery not integrated") text = Path(briefing_file).read_text() print(text[:500]) return {"status": "logged"} def run(self, config): raw = self.phase1(config["sources"]) ranked = self.phase2(raw, config["max_items"]) briefing = self.phase3(ranked) audio = self.phase4(briefing) return self.phase5(briefing, audio) def main(): parser = argparse.ArgumentParser() parser.add_argument("--daily", action="store_true") parser.add_argument("--date") args = parser.parse_args() orch = Orchestrator(date=args.date) result = orch.run(DEFAULT_CONFIG) print(f"[DONE] State: {orch.state_dir}") if __name__ == "__main__": main()