Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
- Fix fleet_context.py env-var substitution for 0c16baadaebaaabc2c8390f35ef5e9aa2f4db671 - Remove non-existent wizard-checkpoints from config.yaml - Fix bin/deepdive_orchestrator.py dry-run mock items - Add PROOF_OF_LIFE.md with live execution output including fleet context Progresses #830
267 lines
9.9 KiB
Python
267 lines
9.9 KiB
Python
#!/usr/bin/env python3
|
|
"""deepdive_orchestrator.py — Deep Dive pipeline controller. Issue #830."""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
DEFAULT_CONFIG = {
|
|
"sources": ["arxiv_cs_ai", "arxiv_cs_cl", "arxiv_cs_lg"],
|
|
"max_items": 10,
|
|
"tts_enabled": True,
|
|
"tts_provider": "openai",
|
|
}
|
|
|
|
|
|
class Orchestrator:
|
|
def __init__(self, date: str = None, dry_run: bool = False):
|
|
self.date = date or datetime.now().strftime("%Y-%m-%d")
|
|
self.dry_run = dry_run
|
|
self.state_dir = Path("~/the-nexus/deepdive_state").expanduser() / self.date
|
|
self.state_dir.mkdir(parents=True, exist_ok=True)
|
|
self.script_dir = Path(__file__).parent
|
|
|
|
def phase1_aggregate(self, sources):
|
|
"""Aggregate from sources."""
|
|
print("[PHASE 1] Aggregating from sources...")
|
|
output_file = self.state_dir / "raw_items.json"
|
|
|
|
if self.dry_run:
|
|
print(f" [DRY RUN] Would aggregate from: {sources}")
|
|
return {
|
|
"items": [
|
|
{"title": "[Dry Run] Sample arXiv Item 1", "url": "https://arxiv.org/abs/0000.00001", "content": "Sample content for dry run testing."},
|
|
{"title": "[Dry Run] Sample Blog Post", "url": "https://example.com/blog", "content": "Another sample for pipeline verification."},
|
|
],
|
|
"metadata": {"count": 2, "dry_run": True}
|
|
}
|
|
|
|
subprocess.run([
|
|
sys.executable, self.script_dir / "deepdive_aggregator.py",
|
|
"--sources", ",".join(sources), "--output", str(output_file)
|
|
], check=True)
|
|
return json.loads(output_file.read_text())
|
|
|
|
def phase2_filter(self, raw_items, max_items):
|
|
"""Filter by keywords."""
|
|
print("[PHASE 2] Filtering by relevance...")
|
|
keywords = ["agent", "llm", "tool use", "rlhf", "alignment", "finetuning",
|
|
"reasoning", "chain-of-thought", "mcp", "hermes"]
|
|
|
|
scored = []
|
|
for item in raw_items.get("items", []):
|
|
content = f"{item.get('title','')} {item.get('content','')}".lower()
|
|
score = sum(1 for kw in keywords if kw in content)
|
|
scored.append({**item, "score": score})
|
|
|
|
scored.sort(key=lambda x: x["score"], reverse=True)
|
|
top = scored[:max_items]
|
|
|
|
output_file = self.state_dir / "ranked.json"
|
|
output_file.write_text(json.dumps({"items": top}, indent=2))
|
|
print(f" Selected top {len(top)} items")
|
|
return top
|
|
|
|
def phase3_synthesize(self, ranked_items):
|
|
"""Synthesize briefing with LLM."""
|
|
print("[PHASE 3] Synthesizing intelligence briefing...")
|
|
|
|
if self.dry_run:
|
|
print(" [DRY RUN] Would synthesize briefing")
|
|
briefing_file = self.state_dir / "briefing.md"
|
|
briefing_file.write_text(f"# Deep Dive — {self.date}\n\n[Dry run - no LLM call]\n")
|
|
return str(briefing_file)
|
|
|
|
# Write ranked items for synthesis script
|
|
ranked_file = self.state_dir / "ranked.json"
|
|
ranked_file.write_text(json.dumps({"items": ranked_items}, indent=2))
|
|
|
|
briefing_file = self.state_dir / "briefing.md"
|
|
|
|
result = subprocess.run([
|
|
sys.executable, self.script_dir / "deepdive_synthesis.py",
|
|
"--input", str(ranked_file),
|
|
"--output", str(briefing_file),
|
|
"--date", self.date
|
|
])
|
|
|
|
if result.returncode != 0:
|
|
print(" [WARN] Synthesis failed, using fallback")
|
|
fallback = self._fallback_briefing(ranked_items)
|
|
briefing_file.write_text(fallback)
|
|
|
|
return str(briefing_file)
|
|
|
|
def phase4_tts(self, briefing_file):
|
|
"""Generate audio."""
|
|
print("[PHASE 4] Generating audio...")
|
|
|
|
if not DEFAULT_CONFIG["tts_enabled"]:
|
|
print(" [SKIP] TTS disabled in config")
|
|
return None
|
|
|
|
if self.dry_run:
|
|
print(" [DRY RUN] Would generate audio")
|
|
return str(self.state_dir / "briefing.mp3")
|
|
|
|
audio_file = self.state_dir / "briefing.mp3"
|
|
|
|
# Read briefing and convert to speech-suitable text
|
|
briefing_text = Path(briefing_file).read_text()
|
|
# Remove markdown formatting for TTS
|
|
clean_text = self._markdown_to_speech(briefing_text)
|
|
|
|
# Write temp text file for TTS
|
|
text_file = self.state_dir / "briefing.txt"
|
|
text_file.write_text(clean_text)
|
|
|
|
result = subprocess.run([
|
|
sys.executable, self.script_dir / "deepdive_tts.py",
|
|
"--input", str(text_file),
|
|
"--output", str(audio_file),
|
|
"--provider", DEFAULT_CONFIG["tts_provider"]
|
|
])
|
|
|
|
if result.returncode != 0:
|
|
print(" [WARN] TTS generation failed")
|
|
return None
|
|
|
|
print(f" Audio: {audio_file}")
|
|
return str(audio_file)
|
|
|
|
def phase5_deliver(self, briefing_file, audio_file):
|
|
"""Deliver to Telegram."""
|
|
print("[PHASE 5] Delivering to Telegram...")
|
|
|
|
if self.dry_run:
|
|
print(" [DRY RUN] Would deliver briefing")
|
|
briefing_text = Path(briefing_file).read_text()
|
|
print("\n--- BRIEFING PREVIEW ---")
|
|
print(briefing_text[:800] + "..." if len(briefing_text) > 800 else briefing_text)
|
|
print("--- END PREVIEW ---\n")
|
|
return {"status": "dry_run"}
|
|
|
|
# Delivery configuration
|
|
bot_token = os.environ.get("DEEPDIVE_TELEGRAM_BOT_TOKEN") or os.environ.get("TELEGRAM_BOT_TOKEN")
|
|
chat_id = os.environ.get("DEEPDIVE_TELEGRAM_CHAT_ID") or os.environ.get("TELEGRAM_CHAT_ID")
|
|
|
|
if not bot_token or not chat_id:
|
|
print(" [ERROR] Telegram credentials not configured")
|
|
print(" Set DEEPDIVE_TELEGRAM_BOT_TOKEN and DEEPDIVE_TELEGRAM_CHAT_ID")
|
|
return {"status": "error", "reason": "missing_credentials"}
|
|
|
|
# Send text summary
|
|
briefing_text = Path(briefing_file).read_text()
|
|
summary = self._extract_summary(briefing_text)
|
|
|
|
result = subprocess.run([
|
|
sys.executable, self.script_dir / "deepdive_delivery.py",
|
|
"--text", summary,
|
|
"--chat-id", chat_id,
|
|
"--bot-token", bot_token
|
|
])
|
|
|
|
if result.returncode != 0:
|
|
print(" [WARN] Text delivery failed")
|
|
|
|
# Send audio if available
|
|
if audio_file and Path(audio_file).exists():
|
|
print(" Sending audio briefing...")
|
|
subprocess.run([
|
|
sys.executable, self.script_dir / "deepdive_delivery.py",
|
|
"--audio", audio_file,
|
|
"--caption", f"🎙️ Deep Dive — {self.date}",
|
|
"--chat-id", chat_id,
|
|
"--bot-token", bot_token
|
|
])
|
|
|
|
return {"status": "delivered"}
|
|
|
|
def _fallback_briefing(self, items):
|
|
"""Generate basic briefing without LLM."""
|
|
lines = [
|
|
f"# Deep Dive Intelligence Brief — {self.date}",
|
|
"",
|
|
"## Headlines",
|
|
""
|
|
]
|
|
for i, item in enumerate(items[:5], 1):
|
|
lines.append(f"{i}. [{item.get('title', 'Untitled')}]({item.get('url', '')})")
|
|
lines.append(f" Score: {item.get('score', 0)}")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
def _markdown_to_speech(self, text: str) -> str:
|
|
"""Convert markdown to speech-friendly text."""
|
|
import re
|
|
# Remove markdown links but keep text
|
|
text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
|
|
# Remove other markdown
|
|
text = re.sub(r'[#*_`]', '', text)
|
|
# Clean up whitespace
|
|
text = re.sub(r'\n+', '\n', text)
|
|
return text.strip()
|
|
|
|
def _extract_summary(self, text: str) -> str:
|
|
"""Extract first section for text delivery."""
|
|
lines = text.split('\n')
|
|
summary_lines = []
|
|
for line in lines:
|
|
if line.strip().startswith('#') and len(summary_lines) > 5:
|
|
break
|
|
summary_lines.append(line)
|
|
return '\n'.join(summary_lines[:30]) # Limit length
|
|
|
|
def run(self, config):
|
|
"""Execute full pipeline."""
|
|
print(f"\n{'='*60}")
|
|
print(f" DEEP DIVE — {self.date}")
|
|
print(f"{'='*60}\n")
|
|
|
|
raw = self.phase1_aggregate(config["sources"])
|
|
if not raw.get("items"):
|
|
print("[ERROR] No items aggregated")
|
|
return {"status": "error", "phase": 1}
|
|
|
|
ranked = self.phase2_filter(raw, config["max_items"])
|
|
if not ranked:
|
|
print("[ERROR] No items after filtering")
|
|
return {"status": "error", "phase": 2}
|
|
|
|
briefing = self.phase3_synthesize(ranked)
|
|
audio = self.phase4_tts(briefing)
|
|
result = self.phase5_deliver(briefing, audio)
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f" COMPLETE — State: {self.state_dir}")
|
|
print(f"{'='*60}\n")
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Deep Dive Intelligence Pipeline")
|
|
parser.add_argument("--daily", action="store_true", help="Run daily briefing")
|
|
parser.add_argument("--date", help="Specific date (YYYY-MM-DD)")
|
|
parser.add_argument("--dry-run", action="store_true", help="Preview without sending")
|
|
parser.add_argument("--config", help="Path to config JSON file")
|
|
args = parser.parse_args()
|
|
|
|
# Load custom config if provided
|
|
config = DEFAULT_CONFIG.copy()
|
|
if args.config and Path(args.config).exists():
|
|
config.update(json.loads(Path(args.config).read_text()))
|
|
|
|
orch = Orchestrator(date=args.date, dry_run=args.dry_run)
|
|
result = orch.run(config)
|
|
|
|
return 0 if result.get("status") != "error" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|