diff --git a/bin/deepdive_delivery.py b/bin/deepdive_delivery.py index daccfb2..2553155 100644 --- a/bin/deepdive_delivery.py +++ b/bin/deepdive_delivery.py @@ -112,50 +112,74 @@ def load_config(): def main(): parser = argparse.ArgumentParser(description="Deep Dive Delivery Pipeline") - parser.add_argument("--audio", "-a", required=True, help="Path to audio file (MP3)") + parser.add_argument("--audio", "-a", help="Path to audio file (MP3)") + parser.add_argument("--text", "-t", help="Text message to send") parser.add_argument("--caption", "-c", help="Caption for voice message") parser.add_argument("--preview-text", help="Optional text preview sent before voice") + parser.add_argument("--bot-token", help="Telegram bot token (overrides env)") + parser.add_argument("--chat-id", help="Telegram chat ID (overrides env)") parser.add_argument("--dry-run", action="store_true", help="Validate config without sending") args = parser.parse_args() # Load config try: - token, chat_id = load_config() + if args.bot_token and args.chat_id: + token, chat_id = args.bot_token, args.chat_id + else: + token, chat_id = load_config() except RuntimeError as e: print(f"[ERROR] {e}", file=sys.stderr) sys.exit(1) - audio_path = Path(args.audio) - if not audio_path.exists(): - print(f"[ERROR] Audio file not found: {audio_path}", file=sys.stderr) + # Validate input + if not args.audio and not args.text: + print("[ERROR] Either --audio or --text required", file=sys.stderr) sys.exit(1) if args.dry_run: print(f"[DRY RUN] Config valid") print(f" Bot: {token[:10]}...") print(f" Chat: {chat_id}") - print(f" Audio: {audio_path} ({audio_path.stat().st_size} bytes)") + if args.audio: + audio_path = Path(args.audio) + print(f" Audio: {audio_path} ({audio_path.stat().st_size} bytes)") + if args.text: + print(f" Text: {args.text[:100]}...") sys.exit(0) # Deliver adapter = TelegramDeliveryAdapter(token, chat_id) - if args.preview_text: - print("[DELIVERY] Sending text preview...") - adapter.send_text_preview(args.preview_text) + # Send text if provided + if args.text: + print("[DELIVERY] Sending text message...") + result = adapter.send_text_preview(args.text) + message_id = result["result"]["message_id"] + print(f"[DELIVERY] Text sent! Message ID: {message_id}") - print(f"[DELIVERY] Sending voice message: {audio_path}...") - result = adapter.send_voice(audio_path, args.caption) - - message_id = result["result"]["message_id"] - print(f"[DELIVERY] Sent! Message ID: {message_id}") - - print(json.dumps({ - "success": True, - "message_id": message_id, - "chat_id": chat_id, - "audio_size_bytes": audio_path.stat().st_size - })) + # Send audio if provided + if args.audio: + audio_path = Path(args.audio) + if not audio_path.exists(): + print(f"[ERROR] Audio file not found: {audio_path}", file=sys.stderr) + sys.exit(1) + + if args.preview_text: + print("[DELIVERY] Sending text preview...") + adapter.send_text_preview(args.preview_text) + + print(f"[DELIVERY] Sending voice message: {audio_path}...") + result = adapter.send_voice(audio_path, args.caption) + + message_id = result["result"]["message_id"] + print(f"[DELIVERY] Voice sent! Message ID: {message_id}") + + print(json.dumps({ + "success": True, + "message_id": message_id, + "chat_id": chat_id, + "audio_size_bytes": audio_path.stat().st_size + })) if __name__ == "__main__": diff --git a/bin/deepdive_orchestrator.py b/bin/deepdive_orchestrator.py index f2613b6..328ba99 100644 --- a/bin/deepdive_orchestrator.py +++ b/bin/deepdive_orchestrator.py @@ -3,6 +3,7 @@ import argparse import json +import os import subprocess import sys from datetime import datetime @@ -11,33 +12,42 @@ from pathlib import Path DEFAULT_CONFIG = { "sources": ["arxiv_cs_ai", "arxiv_cs_cl", "arxiv_cs_lg"], "max_items": 10, - "tts_enabled": False, + "tts_enabled": True, + "tts_provider": "openai", } class Orchestrator: - def __init__(self, date: str = None): + def __init__(self, date: str = None, dry_run: bool = False): self.date = date or datetime.now().strftime("%Y-%m-%d") + self.dry_run = dry_run self.state_dir = Path("~/the-nexus/deepdive_state").expanduser() / self.date self.state_dir.mkdir(parents=True, exist_ok=True) + self.script_dir = Path(__file__).parent - def phase1(self, sources): + def phase1_aggregate(self, sources): """Aggregate from sources.""" - print("[PHASE 1] Aggregating...") + print("[PHASE 1] Aggregating from sources...") output_file = self.state_dir / "raw_items.json" + + if self.dry_run: + print(f" [DRY RUN] Would aggregate from: {sources}") + return {"items": [], "metadata": {"count": 0}} + subprocess.run([ - sys.executable, Path(__file__).parent / "deepdive_aggregator.py", + sys.executable, self.script_dir / "deepdive_aggregator.py", "--sources", ",".join(sources), "--output", str(output_file) - ]) + ], check=True) return json.loads(output_file.read_text()) - def phase2(self, raw_items, max_items): + def phase2_filter(self, raw_items, max_items): """Filter by keywords.""" - print("[PHASE 2] Filtering...") - keywords = ["agent", "llm", "tool use", "rlhf", "alignment", "finetuning"] + print("[PHASE 2] Filtering by relevance...") + keywords = ["agent", "llm", "tool use", "rlhf", "alignment", "finetuning", + "reasoning", "chain-of-thought", "mcp", "hermes"] scored = [] - for item in raw_items["items"]: + for item in raw_items.get("items", []): content = f"{item.get('title','')} {item.get('content','')}".lower() score = sum(1 for kw in keywords if kw in content) scored.append({**item, "score": score}) @@ -47,49 +57,204 @@ class Orchestrator: output_file = self.state_dir / "ranked.json" output_file.write_text(json.dumps({"items": top}, indent=2)) + print(f" Selected top {len(top)} items") return top - def phase3(self, ranked_items): - """Synthesize briefing.""" - print("[PHASE 3] Synthesizing (MVP: structured text)...") - md = f"# Deep Dive — {self.date}\n\n" - for i, item in enumerate(ranked_items[:3], 1): - md += f"{i}. [{item['title']}]({item['url']}) — Score: {item['score']}\n\n" + def phase3_synthesize(self, ranked_items): + """Synthesize briefing with LLM.""" + print("[PHASE 3] Synthesizing intelligence briefing...") + + if self.dry_run: + print(" [DRY RUN] Would synthesize briefing") + briefing_file = self.state_dir / "briefing.md" + briefing_file.write_text(f"# Deep Dive — {self.date}\n\n[Dry run - no LLM call]\n") + return str(briefing_file) + + # Write ranked items for synthesis script + ranked_file = self.state_dir / "ranked.json" + ranked_file.write_text(json.dumps({"items": ranked_items}, indent=2)) briefing_file = self.state_dir / "briefing.md" - briefing_file.write_text(md) + + result = subprocess.run([ + sys.executable, self.script_dir / "deepdive_synthesis.py", + "--input", str(ranked_file), + "--output", str(briefing_file), + "--date", self.date + ]) + + if result.returncode != 0: + print(" [WARN] Synthesis failed, using fallback") + fallback = self._fallback_briefing(ranked_items) + briefing_file.write_text(fallback) + return str(briefing_file) - def phase4(self, briefing_file): + def phase4_tts(self, briefing_file): """Generate audio.""" - print("[PHASE 4] ⚠ TTS decision needed — skipping") - return None + print("[PHASE 4] Generating audio...") + + if not DEFAULT_CONFIG["tts_enabled"]: + print(" [SKIP] TTS disabled in config") + return None + + if self.dry_run: + print(" [DRY RUN] Would generate audio") + return str(self.state_dir / "briefing.mp3") + + audio_file = self.state_dir / "briefing.mp3" + + # Read briefing and convert to speech-suitable text + briefing_text = Path(briefing_file).read_text() + # Remove markdown formatting for TTS + clean_text = self._markdown_to_speech(briefing_text) + + # Write temp text file for TTS + text_file = self.state_dir / "briefing.txt" + text_file.write_text(clean_text) + + result = subprocess.run([ + sys.executable, self.script_dir / "deepdive_tts.py", + "--input", str(text_file), + "--output", str(audio_file), + "--provider", DEFAULT_CONFIG["tts_provider"] + ]) + + if result.returncode != 0: + print(" [WARN] TTS generation failed") + return None + + print(f" Audio: {audio_file}") + return str(audio_file) - def phase5(self, briefing_file, audio_file): - """Deliver.""" - print("[PHASE 5] ⚠ Telegram delivery not integrated") - text = Path(briefing_file).read_text() - print(text[:500]) - return {"status": "logged"} + def phase5_deliver(self, briefing_file, audio_file): + """Deliver to Telegram.""" + print("[PHASE 5] Delivering to Telegram...") + + if self.dry_run: + print(" [DRY RUN] Would deliver briefing") + briefing_text = Path(briefing_file).read_text() + print("\n--- BRIEFING PREVIEW ---") + print(briefing_text[:800] + "..." if len(briefing_text) > 800 else briefing_text) + print("--- END PREVIEW ---\n") + return {"status": "dry_run"} + + # Delivery configuration + bot_token = os.environ.get("DEEPDIVE_TELEGRAM_BOT_TOKEN") or os.environ.get("TELEGRAM_BOT_TOKEN") + chat_id = os.environ.get("DEEPDIVE_TELEGRAM_CHAT_ID") or os.environ.get("TELEGRAM_CHAT_ID") + + if not bot_token or not chat_id: + print(" [ERROR] Telegram credentials not configured") + print(" Set DEEPDIVE_TELEGRAM_BOT_TOKEN and DEEPDIVE_TELEGRAM_CHAT_ID") + return {"status": "error", "reason": "missing_credentials"} + + # Send text summary + briefing_text = Path(briefing_file).read_text() + summary = self._extract_summary(briefing_text) + + result = subprocess.run([ + sys.executable, self.script_dir / "deepdive_delivery.py", + "--text", summary, + "--chat-id", chat_id, + "--bot-token", bot_token + ]) + + if result.returncode != 0: + print(" [WARN] Text delivery failed") + + # Send audio if available + if audio_file and Path(audio_file).exists(): + print(" Sending audio briefing...") + subprocess.run([ + sys.executable, self.script_dir / "deepdive_delivery.py", + "--audio", audio_file, + "--caption", f"🎙️ Deep Dive — {self.date}", + "--chat-id", chat_id, + "--bot-token", bot_token + ]) + + return {"status": "delivered"} + + def _fallback_briefing(self, items): + """Generate basic briefing without LLM.""" + lines = [ + f"# Deep Dive Intelligence Brief — {self.date}", + "", + "## Headlines", + "" + ] + for i, item in enumerate(items[:5], 1): + lines.append(f"{i}. [{item.get('title', 'Untitled')}]({item.get('url', '')})") + lines.append(f" Score: {item.get('score', 0)}") + lines.append("") + return "\n".join(lines) + + def _markdown_to_speech(self, text: str) -> str: + """Convert markdown to speech-friendly text.""" + import re + # Remove markdown links but keep text + text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text) + # Remove other markdown + text = re.sub(r'[#*_`]', '', text) + # Clean up whitespace + text = re.sub(r'\n+', '\n', text) + return text.strip() + + def _extract_summary(self, text: str) -> str: + """Extract first section for text delivery.""" + lines = text.split('\n') + summary_lines = [] + for line in lines: + if line.strip().startswith('#') and len(summary_lines) > 5: + break + summary_lines.append(line) + return '\n'.join(summary_lines[:30]) # Limit length def run(self, config): - raw = self.phase1(config["sources"]) - ranked = self.phase2(raw, config["max_items"]) - briefing = self.phase3(ranked) - audio = self.phase4(briefing) - return self.phase5(briefing, audio) + """Execute full pipeline.""" + print(f"\n{'='*60}") + print(f" DEEP DIVE — {self.date}") + print(f"{'='*60}\n") + + raw = self.phase1_aggregate(config["sources"]) + if not raw.get("items"): + print("[ERROR] No items aggregated") + return {"status": "error", "phase": 1} + + ranked = self.phase2_filter(raw, config["max_items"]) + if not ranked: + print("[ERROR] No items after filtering") + return {"status": "error", "phase": 2} + + briefing = self.phase3_synthesize(ranked) + audio = self.phase4_tts(briefing) + result = self.phase5_deliver(briefing, audio) + + print(f"\n{'='*60}") + print(f" COMPLETE — State: {self.state_dir}") + print(f"{'='*60}\n") + + return result def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--daily", action="store_true") - parser.add_argument("--date") + parser = argparse.ArgumentParser(description="Deep Dive Intelligence Pipeline") + parser.add_argument("--daily", action="store_true", help="Run daily briefing") + parser.add_argument("--date", help="Specific date (YYYY-MM-DD)") + parser.add_argument("--dry-run", action="store_true", help="Preview without sending") + parser.add_argument("--config", help="Path to config JSON file") args = parser.parse_args() - orch = Orchestrator(date=args.date) - result = orch.run(DEFAULT_CONFIG) - print(f"[DONE] State: {orch.state_dir}") + # Load custom config if provided + config = DEFAULT_CONFIG.copy() + if args.config and Path(args.config).exists(): + config.update(json.loads(Path(args.config).read_text())) + + orch = Orchestrator(date=args.date, dry_run=args.dry_run) + result = orch.run(config) + + return 0 if result.get("status") != "error" else 1 if __name__ == "__main__": - main() + exit(main()) diff --git a/bin/deepdive_synthesis.py b/bin/deepdive_synthesis.py new file mode 100644 index 0000000..d2e33f0 --- /dev/null +++ b/bin/deepdive_synthesis.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +"""deepdive_synthesis.py — Phase 3: LLM-powered intelligence briefing synthesis. Issue #830.""" + +import argparse +import json +import os +from datetime import datetime +from pathlib import Path +from typing import List, Dict + + +BRIEFING_PROMPT = """You are Deep Dive, an AI intelligence analyst for the Timmy Foundation fleet. + +Your task: Synthesize the following research papers into a tight, actionable intelligence briefing for Alexander Whitestone, founder of Timmy. + +CONTEXT: +- Timmy Foundation builds autonomous AI agents using the Hermes framework +- Focus areas: LLM architecture, tool use, RL training, agent systems +- Alexander prefers: Plain speech, evidence over vibes, concrete implications + +SOURCES: +{sources} + +OUTPUT FORMAT: +# Deep Dive Intelligence Brief — {date} + +## Headlines (3 items) +For each top paper: +- **Title**: Paper name +- **Why It Matters**: One sentence on relevance to Hermes/Timmy +- **Key Insight**: The actionable takeaway + +## Deep Dive (1 item) +Expand on the most relevant paper: +- Problem it solves +- Method/approach +- Implications for our agent work +- Suggested follow-up (if any) + +## Bottom Line +3 bullets on what to know/do this week + +Write in tight, professional intelligence style. No fluff.""" + + +class SynthesisEngine: + def __init__(self, provider: str = None): + self.provider = provider or os.environ.get("DEEPDIVE_LLM_PROVIDER", "openai") + self.api_key = os.environ.get("OPENAI_API_KEY") or os.environ.get("ANTHROPIC_API_KEY") + + def synthesize(self, items: List[Dict], date: str) -> str: + """Generate briefing from ranked items.""" + sources_text = self._format_sources(items) + prompt = BRIEFING_PROMPT.format(sources=sources_text, date=date) + + if self.provider == "openai": + return self._call_openai(prompt) + elif self.provider == "anthropic": + return self._call_anthropic(prompt) + else: + return self._fallback_synthesis(items, date) + + def _format_sources(self, items: List[Dict]) -> str: + lines = [] + for i, item in enumerate(items[:10], 1): + lines.append(f"\n{i}. {item.get('title', 'Untitled')}") + lines.append(f" URL: {item.get('url', 'N/A')}") + lines.append(f" Abstract: {item.get('content', 'No abstract')[:500]}...") + lines.append(f" Relevance Score: {item.get('score', 0)}") + return "\n".join(lines) + + def _call_openai(self, prompt: str) -> str: + """Call OpenAI API for synthesis.""" + try: + import openai + client = openai.OpenAI(api_key=self.api_key) + + response = client.chat.completions.create( + model="gpt-4o-mini", # Cost-effective for daily briefings + messages=[ + {"role": "system", "content": "You are an expert AI research analyst. Be concise and actionable."}, + {"role": "user", "content": prompt} + ], + temperature=0.3, + max_tokens=2000 + ) + return response.choices[0].message.content + except Exception as e: + print(f"[WARN] OpenAI synthesis failed: {e}") + return self._fallback_synthesis_from_prompt(prompt) + + def _call_anthropic(self, prompt: str) -> str: + """Call Anthropic API for synthesis.""" + try: + import anthropic + client = anthropic.Anthropic(api_key=self.api_key) + + response = client.messages.create( + model="claude-3-haiku-20240307", # Cost-effective + max_tokens=2000, + temperature=0.3, + system="You are an expert AI research analyst. Be concise and actionable.", + messages=[{"role": "user", "content": prompt}] + ) + return response.content[0].text + except Exception as e: + print(f"[WARN] Anthropic synthesis failed: {e}") + return self._fallback_synthesis_from_prompt(prompt) + + def _fallback_synthesis(self, items: List[Dict], date: str) -> str: + """Generate basic briefing without LLM.""" + lines = [ + f"# Deep Dive Intelligence Brief — {date}", + "", + "## Headlines", + "" + ] + for i, item in enumerate(items[:3], 1): + lines.append(f"{i}. [{item.get('title', 'Untitled')}]({item.get('url', '')})") + lines.append(f" Relevance Score: {item.get('score', 0)}") + lines.append("") + + lines.extend([ + "## Bottom Line", + "", + f"- Reviewed {len(items)} papers from arXiv", + "- Run with LLM API key for full synthesis" + ]) + + return "\n".join(lines) + + def _fallback_synthesis_from_prompt(self, prompt: str) -> str: + """Extract items from prompt and do basic synthesis.""" + # Simple extraction for fallback + return "# Deep Dive\n\n[LLM synthesis unavailable - check API key]\n\n" + prompt[:1000] + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--input", required=True, help="Path to ranked.json") + parser.add_argument("--output", required=True, help="Path to write briefing.md") + parser.add_argument("--date", default=None) + parser.add_argument("--provider", default=None) + args = parser.parse_args() + + date = args.date or datetime.now().strftime("%Y-%m-%d") + + # Load ranked items + ranked_data = json.loads(Path(args.input).read_text()) + items = ranked_data.get("items", []) + + if not items: + print("[ERROR] No items to synthesize") + return 1 + + print(f"[INFO] Synthesizing {len(items)} items...") + + # Generate briefing + engine = SynthesisEngine(provider=args.provider) + briefing = engine.synthesize(items, date) + + # Write output + Path(args.output).write_text(briefing) + print(f"[INFO] Briefing written to {args.output}") + + return 0 + + +if __name__ == "__main__": + exit(main())