[ezra] Add Phase 5 Telegram voice delivery pipeline #830
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled

This commit is contained in:
2026-04-05 03:45:07 +00:00
parent dde9c74fa7
commit a0d9a79c7d

162
bin/deepdive_delivery.py Normal file
View File

@@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""deepdive_delivery.py — Phase 5: Telegram voice message delivery.
Issue: #830 (the-nexus)
Delivers synthesized audio briefing as Telegram voice message.
"""
import argparse
import json
import os
import sys
from pathlib import Path
import urllib.request
class TelegramDeliveryAdapter:
"""Deliver audio briefing via Telegram bot as voice message."""
def __init__(self, bot_token: str, chat_id: str):
self.bot_token = bot_token
self.chat_id = chat_id
self.api_base = f"https://api.telegram.org/bot{bot_token}"
def _api_post(self, method: str, data: dict, files: dict = None):
"""Call Telegram Bot API."""
import urllib.request
import urllib.parse
url = f"{self.api_base}/{method}"
if files:
# Multipart form for file uploads
boundary = "----DeepDiveBoundary"
body_parts = []
for key, value in data.items():
body_parts.append(f'--{boundary}\r\nContent-Disposition: form-data; name="{key}"\r\n\r\n{value}\r\n')
for key, (filename, content) in files.items():
body_parts.append(
f'--{boundary}\r\n'
f'Content-Disposition: form-data; name="{key}"; filename="{filename}"\r\n'
f'Content-Type: audio/mpeg\r\n\r\n'
)
body_parts.append(content)
body_parts.append(f'\r\n')
body_parts.append(f'--{boundary}--\r\n')
body = b""
for part in body_parts:
if isinstance(part, str):
body += part.encode()
else:
body += part
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
else:
body = urllib.parse.urlencode(data).encode()
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/x-www-form-urlencoded")
try:
with urllib.request.urlopen(req, timeout=60) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
error_body = e.read().decode()
raise RuntimeError(f"Telegram API error: {e.code} - {error_body}")
def send_voice(self, audio_path: Path, caption: str = None) -> dict:
"""Send audio file as voice message."""
audio_bytes = audio_path.read_bytes()
files = {"voice": (audio_path.name, audio_bytes)}
data = {"chat_id": self.chat_id}
if caption:
data["caption"] = caption[:1024] # Telegram caption limit
result = self._api_post("sendVoice", data, files)
if not result.get("ok"):
raise RuntimeError(f"Telegram send failed: {result}")
return result
def send_text_preview(self, text: str) -> dict:
"""Send text summary before voice (optional)."""
data = {
"chat_id": self.chat_id,
"text": text[:4096] # Telegram message limit
}
return self._api_post("sendMessage", data)
def load_config():
"""Load Telegram configuration from environment."""
token = os.environ.get("DEEPDIVE_TELEGRAM_BOT_TOKEN") or os.environ.get("TELEGRAM_BOT_TOKEN")
chat_id = os.environ.get("DEEPDIVE_TELEGRAM_CHAT_ID") or os.environ.get("TELEGRAM_CHAT_ID")
if not token:
raise RuntimeError(
"Telegram bot token required. Set DEEPDIVE_TELEGRAM_BOT_TOKEN or TELEGRAM_BOT_TOKEN"
)
if not chat_id:
raise RuntimeError(
"Telegram chat ID required. Set DEEPDIVE_TELEGRAM_CHAT_ID or TELEGRAM_CHAT_ID"
)
return token, chat_id
def main():
parser = argparse.ArgumentParser(description="Deep Dive Delivery Pipeline")
parser.add_argument("--audio", "-a", required=True, help="Path to audio file (MP3)")
parser.add_argument("--caption", "-c", help="Caption for voice message")
parser.add_argument("--preview-text", help="Optional text preview sent before voice")
parser.add_argument("--dry-run", action="store_true", help="Validate config without sending")
args = parser.parse_args()
# Load config
try:
token, chat_id = load_config()
except RuntimeError as e:
print(f"[ERROR] {e}", file=sys.stderr)
sys.exit(1)
audio_path = Path(args.audio)
if not audio_path.exists():
print(f"[ERROR] Audio file not found: {audio_path}", file=sys.stderr)
sys.exit(1)
if args.dry_run:
print(f"[DRY RUN] Config valid")
print(f" Bot: {token[:10]}...")
print(f" Chat: {chat_id}")
print(f" Audio: {audio_path} ({audio_path.stat().st_size} bytes)")
sys.exit(0)
# Deliver
adapter = TelegramDeliveryAdapter(token, chat_id)
if args.preview_text:
print("[DELIVERY] Sending text preview...")
adapter.send_text_preview(args.preview_text)
print(f"[DELIVERY] Sending voice message: {audio_path}...")
result = adapter.send_voice(audio_path, args.caption)
message_id = result["result"]["message_id"]
print(f"[DELIVERY] Sent! Message ID: {message_id}")
print(json.dumps({
"success": True,
"message_id": message_id,
"chat_id": chat_id,
"audio_size_bytes": audio_path.stat().st_size
}))
if __name__ == "__main__":
main()