Files
the-nexus/scaffold/deep-dive/orchestrator.py
Ezra 763380d657
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
[DEEP-DIVE] Automated intelligence briefing scaffold — supports #830
2026-04-05 07:42:19 +00:00

109 lines
3.4 KiB
Python

#!/usr/bin/env python3
"""
Deep Dive Orchestrator
Main entry point for daily briefing generation
"""
import os
import sys
import asyncio
import argparse
from datetime import datetime
from pathlib import Path
# Add subdirectories to path
sys.path.insert(0, "./aggregator")
sys.path.insert(0, "./relevance")
sys.path.insert(0, "./synthesis")
sys.path.insert(0, "./tts")
sys.path.insert(0, "./delivery")
from arxiv_fetcher import fetch_all_sources, keyword_score
from blog_fetcher import fetch_all_blogs
from relevance_engine import RelevanceEngine
from synthesis_engine import generate_briefing
from tts_pipeline import generate_briefing_audio
from delivery_pipeline import deliver_briefing
def run_deep_dive(dry_run: bool = False, skip_tts: bool = False):
"""Run the full Deep Dive pipeline."""
print(f"\n{'='*60}")
print(f"Deep Dive Briefing — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"{'='*60}\n")
# Phase 1: Aggregate
print("📚 Phase 1: Aggregating sources...")
papers = fetch_all_sources(days_back=1)
blogs = fetch_all_blogs()
print(f" Fetched {len(papers)} papers, {len(blogs)} blog posts")
# Phase 2: Relevance
print("\n🎯 Phase 2: Ranking relevance...")
engine = RelevanceEngine()
# Rank papers
ranked_papers = engine.rank_items(
papers,
text_fn=lambda p: f"{p.title} {p.abstract}",
top_k=10
)
# Filter blogs by keywords for now
blog_keywords = ["agent", "llm", "model", "research", "ai"]
filtered_blogs = engine.filter_by_keywords(
blogs,
text_fn=lambda b: f"{b.title} {b.summary}",
keywords=blog_keywords
)[:5]
print(f" Top paper: {ranked_papers[0][0].title if ranked_papers else 'None'}")
# Phase 3: Synthesis
print("\n🧠 Phase 3: Synthesizing briefing...")
briefing = generate_briefing(ranked_papers, filtered_blogs)
# Save text version
output_dir = Path("./output")
output_dir.mkdir(exist_ok=True)
text_path = output_dir / f"briefing_{datetime.now().strftime('%Y%m%d')}.md"
with open(text_path, "w") as f:
f.write(briefing.raw_text)
print(f" Saved: {text_path}")
# Phase 4: TTS (optional)
audio_path = None
if not skip_tts:
print("\n🔊 Phase 4: Generating audio...")
try:
audio_path = generate_briefing_audio(briefing.raw_text, str(output_dir))
print(f" Generated: {audio_path}")
except Exception as e:
print(f" TTS skipped: {e}")
# Phase 5: Delivery
print("\n📤 Phase 5: Delivering...")
success = deliver_briefing(
audio_path=audio_path,
text_summary=briefing.raw_text[:1000] + "...",
dry_run=dry_run
)
print(f"\n{'='*60}")
print(f"Complete! Status: {'✅ Success' if success else '❌ Failed'}")
print(f"{'='*60}")
return success
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Deep Dive Daily Briefing")
parser.add_argument("--dry-run", action="store_true", help="Don't actually send")
parser.add_argument("--skip-tts", action="store_true", help="Skip audio generation")
parser.add_argument("--cron", action="store_true", help="Run in cron mode (minimal output)")
args = parser.parse_args()
success = run_deep_dive(dry_run=args.dry_run, skip_tts=args.skip_tts)
sys.exit(0 if success else 1)