From bf27768ba64849e568b2ae0c2502a5ccbdc7fefb Mon Sep 17 00:00:00 2001 From: Ezra Date: Sun, 5 Apr 2026 01:48:59 +0000 Subject: [PATCH] [ezra] Deep Dive scaffold for #830: deepdive_orchestrator.py --- bin/deepdive_orchestrator.py | 95 ++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 bin/deepdive_orchestrator.py diff --git a/bin/deepdive_orchestrator.py b/bin/deepdive_orchestrator.py new file mode 100644 index 0000000..f2613b6 --- /dev/null +++ b/bin/deepdive_orchestrator.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +"""deepdive_orchestrator.py — Deep Dive pipeline controller. Issue #830.""" + +import argparse +import json +import subprocess +import sys +from datetime import datetime +from pathlib import Path + +DEFAULT_CONFIG = { + "sources": ["arxiv_cs_ai", "arxiv_cs_cl", "arxiv_cs_lg"], + "max_items": 10, + "tts_enabled": False, +} + + +class Orchestrator: + def __init__(self, date: str = None): + self.date = date or datetime.now().strftime("%Y-%m-%d") + self.state_dir = Path("~/the-nexus/deepdive_state").expanduser() / self.date + self.state_dir.mkdir(parents=True, exist_ok=True) + + def phase1(self, sources): + """Aggregate from sources.""" + print("[PHASE 1] Aggregating...") + output_file = self.state_dir / "raw_items.json" + subprocess.run([ + sys.executable, Path(__file__).parent / "deepdive_aggregator.py", + "--sources", ",".join(sources), "--output", str(output_file) + ]) + return json.loads(output_file.read_text()) + + def phase2(self, raw_items, max_items): + """Filter by keywords.""" + print("[PHASE 2] Filtering...") + keywords = ["agent", "llm", "tool use", "rlhf", "alignment", "finetuning"] + + scored = [] + for item in raw_items["items"]: + content = f"{item.get('title','')} {item.get('content','')}".lower() + score = sum(1 for kw in keywords if kw in content) + scored.append({**item, "score": score}) + + scored.sort(key=lambda x: x["score"], reverse=True) + top = scored[:max_items] + + output_file = self.state_dir / "ranked.json" + output_file.write_text(json.dumps({"items": top}, indent=2)) + return top + + def phase3(self, ranked_items): + """Synthesize briefing.""" + print("[PHASE 3] Synthesizing (MVP: structured text)...") + md = f"# Deep Dive — {self.date}\n\n" + for i, item in enumerate(ranked_items[:3], 1): + md += f"{i}. [{item['title']}]({item['url']}) — Score: {item['score']}\n\n" + + briefing_file = self.state_dir / "briefing.md" + briefing_file.write_text(md) + return str(briefing_file) + + def phase4(self, briefing_file): + """Generate audio.""" + print("[PHASE 4] ⚠ TTS decision needed — skipping") + return None + + def phase5(self, briefing_file, audio_file): + """Deliver.""" + print("[PHASE 5] ⚠ Telegram delivery not integrated") + text = Path(briefing_file).read_text() + print(text[:500]) + return {"status": "logged"} + + def run(self, config): + raw = self.phase1(config["sources"]) + ranked = self.phase2(raw, config["max_items"]) + briefing = self.phase3(ranked) + audio = self.phase4(briefing) + return self.phase5(briefing, audio) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--daily", action="store_true") + parser.add_argument("--date") + args = parser.parse_args() + + orch = Orchestrator(date=args.date) + result = orch.run(DEFAULT_CONFIG) + print(f"[DONE] State: {orch.state_dir}") + + +if __name__ == "__main__": + main()