Complete production-ready scaffold for automated daily AI intelligence briefings: - Phase 1: Source aggregation (arXiv + lab blogs) - Phase 2: Relevance ranking (keyword + source authority scoring) - Phase 3: LLM synthesis (Hermes-context briefing generation) - Phase 4: TTS audio (edge-tts/OpenAI/ElevenLabs) - Phase 5: Telegram delivery (voice message) Deliverables: - docs/ARCHITECTURE.md (9000+ lines) - system design - docs/OPERATIONS.md - runbook and troubleshooting - 5 executable phase scripts (bin/) - Full pipeline orchestrator (run_full_pipeline.py) - requirements.txt, README.md Addresses all 9 acceptance criteria from #830. Ready for host selection, credential config, and cron activation. Author: Ezra | Burn mode | 2026-04-05
231 lines
8.6 KiB
Python
231 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Deep Dive Phase 5: Delivery Pipeline
|
|
Delivers briefing via Telegram voice message or text digest.
|
|
|
|
Usage:
|
|
python phase5_deliver.py [--date YYYY-MM-DD] [--output-dir DIR] [--text-only]
|
|
|
|
Issue: the-nexus#830
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import asyncio
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import aiohttp
|
|
|
|
|
|
class TelegramDelivery:
|
|
"""Deliver briefing via Telegram Bot API."""
|
|
|
|
API_BASE = "https://api.telegram.org/bot{token}"
|
|
|
|
def __init__(self, bot_token: str, chat_id: str):
|
|
self.bot_token = bot_token
|
|
self.chat_id = chat_id
|
|
self.api_url = self.API_BASE.format(token=bot_token)
|
|
|
|
async def send_voice(self, session: aiohttp.ClientSession, audio_path: Path) -> bool:
|
|
"""Send audio file as voice message."""
|
|
url = f"{self.api_url}/sendVoice"
|
|
|
|
# Telegram accepts voice as audio file with voice param, or as document
|
|
# Using sendVoice is best for briefings
|
|
|
|
try:
|
|
data = aiohttp.FormData()
|
|
data.add_field('chat_id', self.chat_id)
|
|
data.add_field('caption', f"🎙️ Deep Dive — {audio_path.stem}")
|
|
|
|
with open(audio_path, 'rb') as f:
|
|
data.add_field('voice', f, filename=audio_path.name,
|
|
content_type='audio/mpeg')
|
|
|
|
async with session.post(url, data=data) as resp:
|
|
result = await resp.json()
|
|
if result.get('ok'):
|
|
print(f"[Phase 5] Voice message sent: {result['result']['message_id']}")
|
|
return True
|
|
else:
|
|
print(f"[ERROR] Telegram API: {result.get('description')}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[ERROR] Send voice failed: {e}")
|
|
return False
|
|
|
|
async def send_audio(self, session: aiohttp.ClientSession, audio_path: Path) -> bool:
|
|
"""Send audio file as regular audio (fallback)."""
|
|
url = f"{self.api_url}/sendAudio"
|
|
|
|
try:
|
|
data = aiohttp.FormData()
|
|
data.add_field('chat_id', self.chat_id)
|
|
data.add_field('title', f"Deep Dive — {audio_path.stem}")
|
|
data.add_field('performer', "Hermes Deep Dive")
|
|
|
|
with open(audio_path, 'rb') as f:
|
|
data.add_field('audio', f, filename=audio_path.name,
|
|
content_type='audio/mpeg')
|
|
|
|
async with session.post(url, data=data) as resp:
|
|
result = await resp.json()
|
|
if result.get('ok'):
|
|
print(f"[Phase 5] Audio sent: {result['result']['message_id']}")
|
|
return True
|
|
else:
|
|
print(f"[ERROR] Telegram API: {result.get('description')}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[ERROR] Send audio failed: {e}")
|
|
return False
|
|
|
|
async def send_text(self, session: aiohttp.ClientSession, text: str) -> bool:
|
|
"""Send text message as fallback."""
|
|
url = f"{self.api_url}/sendMessage"
|
|
|
|
# Telegram message limit: 4096 characters
|
|
if len(text) > 4000:
|
|
text = text[:4000] + "...\n\n[Message truncated. Full briefing in files.]"
|
|
|
|
payload = {
|
|
'chat_id': self.chat_id,
|
|
'text': text,
|
|
'parse_mode': 'Markdown',
|
|
'disable_web_page_preview': True
|
|
}
|
|
|
|
try:
|
|
async with session.post(url, json=payload) as resp:
|
|
result = await resp.json()
|
|
if result.get('ok'):
|
|
print(f"[Phase 5] Text message sent: {result['result']['message_id']}")
|
|
return True
|
|
else:
|
|
print(f"[ERROR] Telegram API: {result.get('description')}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[ERROR] Send text failed: {e}")
|
|
return False
|
|
|
|
async def send_document(self, session: aiohttp.ClientSession, doc_path: Path) -> bool:
|
|
"""Send file as document."""
|
|
url = f"{self.api_url}/sendDocument"
|
|
|
|
try:
|
|
data = aiohttp.FormData()
|
|
data.add_field('chat_id', self.chat_id)
|
|
data.add_field('caption', f"📄 Deep Dive Briefing — {doc_path.stem}")
|
|
|
|
with open(doc_path, 'rb') as f:
|
|
data.add_field('document', f, filename=doc_path.name)
|
|
|
|
async with session.post(url, data=data) as resp:
|
|
result = await resp.json()
|
|
if result.get('ok'):
|
|
print(f"[Phase 5] Document sent: {result['result']['message_id']}")
|
|
return True
|
|
else:
|
|
print(f"[ERROR] Telegram API: {result.get('description')}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[ERROR] Send document failed: {e}")
|
|
return False
|
|
|
|
|
|
class DeliveryPipeline:
|
|
"""Orchestrate delivery of daily briefing."""
|
|
|
|
def __init__(self, output_dir: Path, date: str, text_only: bool = False):
|
|
self.output_dir = output_dir
|
|
self.date = date
|
|
self.text_only = text_only
|
|
self.audio_dir = output_dir / "audio"
|
|
self.briefings_dir = output_dir / "briefings"
|
|
|
|
# Load credentials from environment
|
|
self.bot_token = os.environ.get('DEEPDIVE_TELEGRAM_BOT_TOKEN')
|
|
self.chat_id = os.environ.get('DEEPDIVE_TELEGRAM_CHAT_ID')
|
|
|
|
def load_briefing_text(self) -> str:
|
|
"""Load briefing text."""
|
|
briefing_file = self.briefings_dir / f"{self.date}.md"
|
|
if not briefing_file.exists():
|
|
raise FileNotFoundError(f"Briefing not found: {briefing_file}")
|
|
|
|
with open(briefing_file) as f:
|
|
return f.read()
|
|
|
|
async def run(self) -> bool:
|
|
"""Run full delivery pipeline."""
|
|
print(f"[Phase 5] Delivering briefing for {self.date}")
|
|
|
|
if not self.bot_token or not self.chat_id:
|
|
print("[ERROR] Telegram credentials not configured")
|
|
print(" Set DEEPDIVE_TELEGRAM_BOT_TOKEN and DEEPDIVE_TELEGRAM_CHAT_ID")
|
|
return False
|
|
|
|
telegram = TelegramDelivery(self.bot_token, self.chat_id)
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
# Try audio delivery first (if not text-only)
|
|
if not self.text_only:
|
|
audio_file = self.audio_dir / f"{self.date}.mp3"
|
|
if audio_file.exists():
|
|
print(f"[Phase 5] Sending audio: {audio_file}")
|
|
|
|
# Try voice message first
|
|
if await telegram.send_voice(session, audio_file):
|
|
return True
|
|
|
|
# Fallback to audio file
|
|
if await telegram.send_audio(session, audio_file):
|
|
return True
|
|
|
|
print("[WARN] Audio delivery failed, falling back to text")
|
|
else:
|
|
print(f"[WARN] Audio not found: {audio_file}")
|
|
|
|
# Text delivery fallback
|
|
print("[Phase 5] Sending text digest...")
|
|
briefing_text = self.load_briefing_text()
|
|
|
|
# Add header
|
|
header = f"🎙️ **Deep Dive — {self.date}**\n\n"
|
|
full_text = header + briefing_text
|
|
|
|
if await telegram.send_text(session, full_text):
|
|
# Also send the full markdown as document
|
|
doc_file = self.briefings_dir / f"{self.date}.md"
|
|
await telegram.send_document(session, doc_file)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Deep Dive Phase 5: Delivery')
|
|
parser.add_argument('--date', default=datetime.now().strftime('%Y-%m-%d'),
|
|
help='Target date (YYYY-MM-DD)')
|
|
parser.add_argument('--output-dir', type=Path, default=Path('../data'),
|
|
help='Output directory for data')
|
|
parser.add_argument('--text-only', action='store_true',
|
|
help='Skip audio, send text only')
|
|
args = parser.parse_args()
|
|
|
|
pipeline = DeliveryPipeline(args.output_dir, args.date, args.text_only)
|
|
success = asyncio.run(pipeline.run())
|
|
|
|
if success:
|
|
print("[DONE] Delivery complete")
|
|
else:
|
|
print("[FAIL] Delivery failed")
|
|
exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|