[ezra] Deep Dive scaffold #830: deepdive_orchestrator.py
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled

This commit is contained in:
2026-04-05 01:51:03 +00:00
parent 9ba00b7ea8
commit 75fa66344d

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""deepdive_orchestrator.py — Deep Dive pipeline controller. Issue #830."""
import argparse
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
DEFAULT_CONFIG = {
"sources": ["arxiv_cs_ai", "arxiv_cs_cl", "arxiv_cs_lg"],
"max_items": 10,
"tts_enabled": False,
}
class Orchestrator:
def __init__(self, date: str = None):
self.date = date or datetime.now().strftime("%Y-%m-%d")
self.state_dir = Path("~/the-nexus/deepdive_state").expanduser() / self.date
self.state_dir.mkdir(parents=True, exist_ok=True)
def phase1(self, sources):
"""Aggregate from sources."""
print("[PHASE 1] Aggregating...")
output_file = self.state_dir / "raw_items.json"
subprocess.run([
sys.executable, Path(__file__).parent / "deepdive_aggregator.py",
"--sources", ",".join(sources), "--output", str(output_file)
])
return json.loads(output_file.read_text())
def phase2(self, raw_items, max_items):
"""Filter by keywords."""
print("[PHASE 2] Filtering...")
keywords = ["agent", "llm", "tool use", "rlhf", "alignment", "finetuning"]
scored = []
for item in raw_items["items"]:
content = f"{item.get('title','')} {item.get('content','')}".lower()
score = sum(1 for kw in keywords if kw in content)
scored.append({**item, "score": score})
scored.sort(key=lambda x: x["score"], reverse=True)
top = scored[:max_items]
output_file = self.state_dir / "ranked.json"
output_file.write_text(json.dumps({"items": top}, indent=2))
return top
def phase3(self, ranked_items):
"""Synthesize briefing."""
print("[PHASE 3] Synthesizing (MVP: structured text)...")
md = f"# Deep Dive — {self.date}\n\n"
for i, item in enumerate(ranked_items[:3], 1):
md += f"{i}. [{item['title']}]({item['url']}) — Score: {item['score']}\n\n"
briefing_file = self.state_dir / "briefing.md"
briefing_file.write_text(md)
return str(briefing_file)
def phase4(self, briefing_file):
"""Generate audio."""
print("[PHASE 4] ⚠ TTS decision needed — skipping")
return None
def phase5(self, briefing_file, audio_file):
"""Deliver."""
print("[PHASE 5] ⚠ Telegram delivery not integrated")
text = Path(briefing_file).read_text()
print(text[:500])
return {"status": "logged"}
def run(self, config):
raw = self.phase1(config["sources"])
ranked = self.phase2(raw, config["max_items"])
briefing = self.phase3(ranked)
audio = self.phase4(briefing)
return self.phase5(briefing, audio)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--daily", action="store_true")
parser.add_argument("--date")
args = parser.parse_args()
orch = Orchestrator(date=args.date)
result = orch.run(DEFAULT_CONFIG)
print(f"[DONE] State: {orch.state_dir}")
if __name__ == "__main__":
main()