Files
the-nexus/bin/deepdive_orchestrator.py
Ezra (Archivist) 2b06e179d1
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
[deep-dive] Complete #830 implementation scaffold
Phase 3 (Synthesis):
- deepdive_synthesis.py: LLM-powered briefing generation
- Supports OpenAI (gpt-4o-mini) and Anthropic (claude-3-haiku)
- Fallback to keyword summary if LLM unavailable
- Intelligence briefing format: Headlines, Deep Dive, Bottom Line

Phase 4 (TTS):
- TTS integration in orchestrator
- Converts markdown to speech-friendly text
- Configurable provider (openai/elevenlabs/piper)

Phase 5 (Delivery):
- Enhanced delivery.py with --text and --chat-id/--bot-token overrides
- Supports text-only and audio+text delivery
- Full Telegram Bot API integration

Orchestrator:
- Complete 5-phase pipeline
- --dry-run mode for testing
- State management in ~/the-nexus/deepdive_state/
- Error handling with fallbacks

Progresses #830 to implementation-ready status
2026-04-05 04:43:22 +00:00

261 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""deepdive_orchestrator.py — Deep Dive pipeline controller. Issue #830."""
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
DEFAULT_CONFIG = {
"sources": ["arxiv_cs_ai", "arxiv_cs_cl", "arxiv_cs_lg"],
"max_items": 10,
"tts_enabled": True,
"tts_provider": "openai",
}
class Orchestrator:
def __init__(self, date: str = None, dry_run: bool = False):
self.date = date or datetime.now().strftime("%Y-%m-%d")
self.dry_run = dry_run
self.state_dir = Path("~/the-nexus/deepdive_state").expanduser() / self.date
self.state_dir.mkdir(parents=True, exist_ok=True)
self.script_dir = Path(__file__).parent
def phase1_aggregate(self, sources):
"""Aggregate from sources."""
print("[PHASE 1] Aggregating from sources...")
output_file = self.state_dir / "raw_items.json"
if self.dry_run:
print(f" [DRY RUN] Would aggregate from: {sources}")
return {"items": [], "metadata": {"count": 0}}
subprocess.run([
sys.executable, self.script_dir / "deepdive_aggregator.py",
"--sources", ",".join(sources), "--output", str(output_file)
], check=True)
return json.loads(output_file.read_text())
def phase2_filter(self, raw_items, max_items):
"""Filter by keywords."""
print("[PHASE 2] Filtering by relevance...")
keywords = ["agent", "llm", "tool use", "rlhf", "alignment", "finetuning",
"reasoning", "chain-of-thought", "mcp", "hermes"]
scored = []
for item in raw_items.get("items", []):
content = f"{item.get('title','')} {item.get('content','')}".lower()
score = sum(1 for kw in keywords if kw in content)
scored.append({**item, "score": score})
scored.sort(key=lambda x: x["score"], reverse=True)
top = scored[:max_items]
output_file = self.state_dir / "ranked.json"
output_file.write_text(json.dumps({"items": top}, indent=2))
print(f" Selected top {len(top)} items")
return top
def phase3_synthesize(self, ranked_items):
"""Synthesize briefing with LLM."""
print("[PHASE 3] Synthesizing intelligence briefing...")
if self.dry_run:
print(" [DRY RUN] Would synthesize briefing")
briefing_file = self.state_dir / "briefing.md"
briefing_file.write_text(f"# Deep Dive — {self.date}\n\n[Dry run - no LLM call]\n")
return str(briefing_file)
# Write ranked items for synthesis script
ranked_file = self.state_dir / "ranked.json"
ranked_file.write_text(json.dumps({"items": ranked_items}, indent=2))
briefing_file = self.state_dir / "briefing.md"
result = subprocess.run([
sys.executable, self.script_dir / "deepdive_synthesis.py",
"--input", str(ranked_file),
"--output", str(briefing_file),
"--date", self.date
])
if result.returncode != 0:
print(" [WARN] Synthesis failed, using fallback")
fallback = self._fallback_briefing(ranked_items)
briefing_file.write_text(fallback)
return str(briefing_file)
def phase4_tts(self, briefing_file):
"""Generate audio."""
print("[PHASE 4] Generating audio...")
if not DEFAULT_CONFIG["tts_enabled"]:
print(" [SKIP] TTS disabled in config")
return None
if self.dry_run:
print(" [DRY RUN] Would generate audio")
return str(self.state_dir / "briefing.mp3")
audio_file = self.state_dir / "briefing.mp3"
# Read briefing and convert to speech-suitable text
briefing_text = Path(briefing_file).read_text()
# Remove markdown formatting for TTS
clean_text = self._markdown_to_speech(briefing_text)
# Write temp text file for TTS
text_file = self.state_dir / "briefing.txt"
text_file.write_text(clean_text)
result = subprocess.run([
sys.executable, self.script_dir / "deepdive_tts.py",
"--input", str(text_file),
"--output", str(audio_file),
"--provider", DEFAULT_CONFIG["tts_provider"]
])
if result.returncode != 0:
print(" [WARN] TTS generation failed")
return None
print(f" Audio: {audio_file}")
return str(audio_file)
def phase5_deliver(self, briefing_file, audio_file):
"""Deliver to Telegram."""
print("[PHASE 5] Delivering to Telegram...")
if self.dry_run:
print(" [DRY RUN] Would deliver briefing")
briefing_text = Path(briefing_file).read_text()
print("\n--- BRIEFING PREVIEW ---")
print(briefing_text[:800] + "..." if len(briefing_text) > 800 else briefing_text)
print("--- END PREVIEW ---\n")
return {"status": "dry_run"}
# Delivery configuration
bot_token = os.environ.get("DEEPDIVE_TELEGRAM_BOT_TOKEN") or os.environ.get("TELEGRAM_BOT_TOKEN")
chat_id = os.environ.get("DEEPDIVE_TELEGRAM_CHAT_ID") or os.environ.get("TELEGRAM_CHAT_ID")
if not bot_token or not chat_id:
print(" [ERROR] Telegram credentials not configured")
print(" Set DEEPDIVE_TELEGRAM_BOT_TOKEN and DEEPDIVE_TELEGRAM_CHAT_ID")
return {"status": "error", "reason": "missing_credentials"}
# Send text summary
briefing_text = Path(briefing_file).read_text()
summary = self._extract_summary(briefing_text)
result = subprocess.run([
sys.executable, self.script_dir / "deepdive_delivery.py",
"--text", summary,
"--chat-id", chat_id,
"--bot-token", bot_token
])
if result.returncode != 0:
print(" [WARN] Text delivery failed")
# Send audio if available
if audio_file and Path(audio_file).exists():
print(" Sending audio briefing...")
subprocess.run([
sys.executable, self.script_dir / "deepdive_delivery.py",
"--audio", audio_file,
"--caption", f"🎙️ Deep Dive — {self.date}",
"--chat-id", chat_id,
"--bot-token", bot_token
])
return {"status": "delivered"}
def _fallback_briefing(self, items):
"""Generate basic briefing without LLM."""
lines = [
f"# Deep Dive Intelligence Brief — {self.date}",
"",
"## Headlines",
""
]
for i, item in enumerate(items[:5], 1):
lines.append(f"{i}. [{item.get('title', 'Untitled')}]({item.get('url', '')})")
lines.append(f" Score: {item.get('score', 0)}")
lines.append("")
return "\n".join(lines)
def _markdown_to_speech(self, text: str) -> str:
"""Convert markdown to speech-friendly text."""
import re
# Remove markdown links but keep text
text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
# Remove other markdown
text = re.sub(r'[#*_`]', '', text)
# Clean up whitespace
text = re.sub(r'\n+', '\n', text)
return text.strip()
def _extract_summary(self, text: str) -> str:
"""Extract first section for text delivery."""
lines = text.split('\n')
summary_lines = []
for line in lines:
if line.strip().startswith('#') and len(summary_lines) > 5:
break
summary_lines.append(line)
return '\n'.join(summary_lines[:30]) # Limit length
def run(self, config):
"""Execute full pipeline."""
print(f"\n{'='*60}")
print(f" DEEP DIVE — {self.date}")
print(f"{'='*60}\n")
raw = self.phase1_aggregate(config["sources"])
if not raw.get("items"):
print("[ERROR] No items aggregated")
return {"status": "error", "phase": 1}
ranked = self.phase2_filter(raw, config["max_items"])
if not ranked:
print("[ERROR] No items after filtering")
return {"status": "error", "phase": 2}
briefing = self.phase3_synthesize(ranked)
audio = self.phase4_tts(briefing)
result = self.phase5_deliver(briefing, audio)
print(f"\n{'='*60}")
print(f" COMPLETE — State: {self.state_dir}")
print(f"{'='*60}\n")
return result
def main():
parser = argparse.ArgumentParser(description="Deep Dive Intelligence Pipeline")
parser.add_argument("--daily", action="store_true", help="Run daily briefing")
parser.add_argument("--date", help="Specific date (YYYY-MM-DD)")
parser.add_argument("--dry-run", action="store_true", help="Preview without sending")
parser.add_argument("--config", help="Path to config JSON file")
args = parser.parse_args()
# Load custom config if provided
config = DEFAULT_CONFIG.copy()
if args.config and Path(args.config).exists():
config.update(json.loads(Path(args.config).read_text()))
orch = Orchestrator(date=args.date, dry_run=args.dry_run)
result = orch.run(config)
return 0 if result.get("status") != "error" else 1
if __name__ == "__main__":
exit(main())