[DEEP-DIVE] Scaffold component — #830
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled

This commit is contained in:
2026-04-05 07:42:34 +00:00
parent c195ced73f
commit ce8f05d6e7

View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
Delivery Pipeline for Deep Dive
Sends audio briefings to Telegram
"""
import os
import asyncio
from pathlib import Path
from typing import Optional
# Telegram bot integration
try:
from telegram import Bot
TELEGRAM_AVAILABLE = True
except ImportError:
TELEGRAM_AVAILABLE = False
print("python-telegram-bot not installed, delivery will be stubbed")
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHANNEL_ID = os.environ.get("TELEGRAM_HOME_CHANNEL", "")
class TelegramDelivery:
def __init__(self, token: str = None, channel_id: str = None):
self.token = token or TELEGRAM_BOT_TOKEN
self.channel_id = channel_id or TELEGRAM_CHANNEL_ID
self.bot = None
if TELEGRAM_AVAILABLE and self.token:
self.bot = Bot(token=self.token)
async def send_voice_message(
self,
audio_path: Path,
caption: str = None,
duration: int = None
) -> bool:
"""Send voice message to Telegram channel."""
if not self.bot or not self.channel_id:
print(f"[STUB] Would send {audio_path} to {self.channel_id}")
print(f"[STUB] Caption: {caption}")
return True
try:
with open(audio_path, "rb") as audio:
await self.bot.send_voice(
chat_id=self.channel_id,
voice=audio,
caption=caption,
duration=duration
)
return True
except Exception as e:
print(f"Telegram delivery failed: {e}")
return False
async def send_text_summary(self, text: str) -> bool:
"""Send text summary as fallback."""
if not self.bot or not self.channel_id:
print(f"[STUB] Would send text to {self.channel_id}")
return True
try:
# Split if too long
chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
for chunk in chunks:
await self.bot.send_message(
chat_id=self.channel_id,
text=chunk,
parse_mode="Markdown"
)
return True
except Exception as e:
print(f"Text delivery failed: {e}")
return False
def deliver_briefing(
audio_path: Path,
text_summary: str = None,
dry_run: bool = False
) -> bool:
"""Convenience function for delivery."""
delivery = TelegramDelivery()
if dry_run:
print(f"[DRY RUN] Audio: {audio_path}")
print(f"[DRY RUN] Text: {text_summary[:200] if text_summary else 'None'}...")
return True
async def _send():
success = await delivery.send_voice_message(audio_path)
if text_summary and success:
await delivery.send_text_summary(text_summary)
return success
return asyncio.run(_send())
if __name__ == "__main__":
print("Delivery pipeline loaded")
print(f"Telegram available: {TELEGRAM_AVAILABLE}")