#!/usr/bin/env python3 """deepdive_orchestrator.py — Deep Dive pipeline controller. Issue #830.""" import argparse import json import os import subprocess import sys from datetime import datetime from pathlib import Path DEFAULT_CONFIG = { "sources": ["arxiv_cs_ai", "arxiv_cs_cl", "arxiv_cs_lg"], "max_items": 10, "tts_enabled": True, "tts_provider": "openai", } class Orchestrator: def __init__(self, date: str = None, dry_run: bool = False): self.date = date or datetime.now().strftime("%Y-%m-%d") self.dry_run = dry_run self.state_dir = Path("~/the-nexus/deepdive_state").expanduser() / self.date self.state_dir.mkdir(parents=True, exist_ok=True) self.script_dir = Path(__file__).parent def phase1_aggregate(self, sources): """Aggregate from sources.""" print("[PHASE 1] Aggregating from sources...") output_file = self.state_dir / "raw_items.json" if self.dry_run: print(f" [DRY RUN] Would aggregate from: {sources}") return { "items": [ {"title": "[Dry Run] Sample arXiv Item 1", "url": "https://arxiv.org/abs/0000.00001", "content": "Sample content for dry run testing."}, {"title": "[Dry Run] Sample Blog Post", "url": "https://example.com/blog", "content": "Another sample for pipeline verification."}, ], "metadata": {"count": 2, "dry_run": True} } subprocess.run([ sys.executable, self.script_dir / "deepdive_aggregator.py", "--sources", ",".join(sources), "--output", str(output_file) ], check=True) return json.loads(output_file.read_text()) def phase2_filter(self, raw_items, max_items): """Filter by keywords.""" print("[PHASE 2] Filtering by relevance...") keywords = ["agent", "llm", "tool use", "rlhf", "alignment", "finetuning", "reasoning", "chain-of-thought", "mcp", "hermes"] scored = [] for item in raw_items.get("items", []): content = f"{item.get('title','')} {item.get('content','')}".lower() score = sum(1 for kw in keywords if kw in content) scored.append({**item, "score": score}) scored.sort(key=lambda x: x["score"], reverse=True) top = scored[:max_items] output_file = self.state_dir / "ranked.json" output_file.write_text(json.dumps({"items": top}, indent=2)) print(f" Selected top {len(top)} items") return top def phase3_synthesize(self, ranked_items): """Synthesize briefing with LLM.""" print("[PHASE 3] Synthesizing intelligence briefing...") if self.dry_run: print(" [DRY RUN] Would synthesize briefing") briefing_file = self.state_dir / "briefing.md" briefing_file.write_text(f"# Deep Dive — {self.date}\n\n[Dry run - no LLM call]\n") return str(briefing_file) # Write ranked items for synthesis script ranked_file = self.state_dir / "ranked.json" ranked_file.write_text(json.dumps({"items": ranked_items}, indent=2)) briefing_file = self.state_dir / "briefing.md" result = subprocess.run([ sys.executable, self.script_dir / "deepdive_synthesis.py", "--input", str(ranked_file), "--output", str(briefing_file), "--date", self.date ]) if result.returncode != 0: print(" [WARN] Synthesis failed, using fallback") fallback = self._fallback_briefing(ranked_items) briefing_file.write_text(fallback) return str(briefing_file) def phase4_tts(self, briefing_file): """Generate audio.""" print("[PHASE 4] Generating audio...") if not DEFAULT_CONFIG["tts_enabled"]: print(" [SKIP] TTS disabled in config") return None if self.dry_run: print(" [DRY RUN] Would generate audio") return str(self.state_dir / "briefing.mp3") audio_file = self.state_dir / "briefing.mp3" # Read briefing and convert to speech-suitable text briefing_text = Path(briefing_file).read_text() # Remove markdown formatting for TTS clean_text = self._markdown_to_speech(briefing_text) # Write temp text file for TTS text_file = self.state_dir / "briefing.txt" text_file.write_text(clean_text) result = subprocess.run([ sys.executable, self.script_dir / "deepdive_tts.py", "--input", str(text_file), "--output", str(audio_file), "--provider", DEFAULT_CONFIG["tts_provider"] ]) if result.returncode != 0: print(" [WARN] TTS generation failed") return None print(f" Audio: {audio_file}") return str(audio_file) def phase5_deliver(self, briefing_file, audio_file): """Deliver to Telegram.""" print("[PHASE 5] Delivering to Telegram...") if self.dry_run: print(" [DRY RUN] Would deliver briefing") briefing_text = Path(briefing_file).read_text() print("\n--- BRIEFING PREVIEW ---") print(briefing_text[:800] + "..." if len(briefing_text) > 800 else briefing_text) print("--- END PREVIEW ---\n") return {"status": "dry_run"} # Delivery configuration bot_token = os.environ.get("DEEPDIVE_TELEGRAM_BOT_TOKEN") or os.environ.get("TELEGRAM_BOT_TOKEN") chat_id = os.environ.get("DEEPDIVE_TELEGRAM_CHAT_ID") or os.environ.get("TELEGRAM_CHAT_ID") if not bot_token or not chat_id: print(" [ERROR] Telegram credentials not configured") print(" Set DEEPDIVE_TELEGRAM_BOT_TOKEN and DEEPDIVE_TELEGRAM_CHAT_ID") return {"status": "error", "reason": "missing_credentials"} # Send text summary briefing_text = Path(briefing_file).read_text() summary = self._extract_summary(briefing_text) result = subprocess.run([ sys.executable, self.script_dir / "deepdive_delivery.py", "--text", summary, "--chat-id", chat_id, "--bot-token", bot_token ]) if result.returncode != 0: print(" [WARN] Text delivery failed") # Send audio if available if audio_file and Path(audio_file).exists(): print(" Sending audio briefing...") subprocess.run([ sys.executable, self.script_dir / "deepdive_delivery.py", "--audio", audio_file, "--caption", f"🎙️ Deep Dive — {self.date}", "--chat-id", chat_id, "--bot-token", bot_token ]) return {"status": "delivered"} def _fallback_briefing(self, items): """Generate basic briefing without LLM.""" lines = [ f"# Deep Dive Intelligence Brief — {self.date}", "", "## Headlines", "" ] for i, item in enumerate(items[:5], 1): lines.append(f"{i}. [{item.get('title', 'Untitled')}]({item.get('url', '')})") lines.append(f" Score: {item.get('score', 0)}") lines.append("") return "\n".join(lines) def _markdown_to_speech(self, text: str) -> str: """Convert markdown to speech-friendly text.""" import re # Remove markdown links but keep text text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text) # Remove other markdown text = re.sub(r'[#*_`]', '', text) # Clean up whitespace text = re.sub(r'\n+', '\n', text) return text.strip() def _extract_summary(self, text: str) -> str: """Extract first section for text delivery.""" lines = text.split('\n') summary_lines = [] for line in lines: if line.strip().startswith('#') and len(summary_lines) > 5: break summary_lines.append(line) return '\n'.join(summary_lines[:30]) # Limit length def run(self, config): """Execute full pipeline.""" print(f"\n{'='*60}") print(f" DEEP DIVE — {self.date}") print(f"{'='*60}\n") raw = self.phase1_aggregate(config["sources"]) if not raw.get("items"): print("[ERROR] No items aggregated") return {"status": "error", "phase": 1} ranked = self.phase2_filter(raw, config["max_items"]) if not ranked: print("[ERROR] No items after filtering") return {"status": "error", "phase": 2} briefing = self.phase3_synthesize(ranked) audio = self.phase4_tts(briefing) result = self.phase5_deliver(briefing, audio) print(f"\n{'='*60}") print(f" COMPLETE — State: {self.state_dir}") print(f"{'='*60}\n") return result def main(): parser = argparse.ArgumentParser(description="Deep Dive Intelligence Pipeline") parser.add_argument("--daily", action="store_true", help="Run daily briefing") parser.add_argument("--date", help="Specific date (YYYY-MM-DD)") parser.add_argument("--dry-run", action="store_true", help="Preview without sending") parser.add_argument("--config", help="Path to config JSON file") args = parser.parse_args() # Load custom config if provided config = DEFAULT_CONFIG.copy() if args.config and Path(args.config).exists(): config.update(json.loads(Path(args.config).read_text())) orch = Orchestrator(date=args.date, dry_run=args.dry_run) result = orch.run(config) return 0 if result.get("status") != "error" else 1 if __name__ == "__main__": exit(main())