Add Telegram /deepdive command handler for on-demand briefings (#830)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Hermes-compatible command handler that parses /deepdive args, runs the pipeline, and returns status + audio to Telegram.
This commit is contained in:
133
intelligence/deepdive/telegram_command.py
Normal file
133
intelligence/deepdive/telegram_command.py
Normal file
@@ -0,0 +1,133 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Telegram command handler for /deepdive on-demand briefings.
|
||||
Issue #830 — Deep Dive: Sovereign NotebookLM + Daily AI Intelligence Briefing
|
||||
|
||||
Usage (in Hermes Telegram gateway):
|
||||
from telegram_command import deepdive_handler
|
||||
commands.register("/deepdive", deepdive_handler)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
# Pipeline integration
|
||||
try:
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from pipeline import DeepDivePipeline
|
||||
HAS_PIPELINE = True
|
||||
except ImportError:
|
||||
HAS_PIPELINE = False
|
||||
|
||||
|
||||
def _load_config() -> dict:
|
||||
"""Load deepdive config from standard location."""
|
||||
import yaml
|
||||
config_path = Path(__file__).parent / "config.yaml"
|
||||
if not config_path.exists():
|
||||
raise FileNotFoundError(f"config.yaml not found at {config_path}")
|
||||
with open(config_path) as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
|
||||
def _run_pipeline_sync(config: dict, since_hours: int = 24) -> dict:
|
||||
"""Run pipeline synchronously for Telegram handler compatibility."""
|
||||
return asyncio.run(_run_pipeline_async(config, since_hours))
|
||||
|
||||
|
||||
async def _run_pipeline_async(config: dict, since_hours: int) -> dict:
|
||||
pipeline = DeepDivePipeline(config)
|
||||
from datetime import timedelta
|
||||
since = datetime.utcnow() - timedelta(hours=since_hours)
|
||||
result = await pipeline.run(since=since, dry_run=False)
|
||||
return result
|
||||
|
||||
|
||||
def deepdive_handler(message_text: str, chat_id: str, reply_func) -> str:
|
||||
"""
|
||||
Hermes-compatible Telegram command handler for /deepdive.
|
||||
|
||||
Args:
|
||||
message_text: Full message text (e.g. "/deepdive --since 48")
|
||||
chat_id: Telegram chat/channel ID
|
||||
reply_func: Callable to send replies back to Telegram
|
||||
|
||||
Returns:
|
||||
Status message string
|
||||
"""
|
||||
if not HAS_PIPELINE:
|
||||
reply_func("❌ Deep Dive pipeline not available. Check deployment.")
|
||||
return "pipeline_unavailable"
|
||||
|
||||
# Parse simple arguments
|
||||
args = message_text.strip().split()
|
||||
since_hours = 24
|
||||
for i, arg in enumerate(args):
|
||||
if arg in ("--since", "-s") and i + 1 < len(args):
|
||||
try:
|
||||
since_hours = int(args[i + 1])
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
reply_func(f"🎯 Generating Deep Dive briefing (last {since_hours}h)...")
|
||||
|
||||
try:
|
||||
config = _load_config()
|
||||
result = _run_pipeline_sync(config, since_hours)
|
||||
|
||||
if result["status"] == "success":
|
||||
items = result.get("items_ranked", 0)
|
||||
briefing_path = result.get("briefing_path", "unknown")
|
||||
audio_path = result.get("audio_path")
|
||||
|
||||
reply_text = (
|
||||
f"✅ Deep Dive complete!\n"
|
||||
f"📊 {items} relevant items synthesized\n"
|
||||
f"📝 Briefing: {briefing_path}"
|
||||
)
|
||||
if audio_path:
|
||||
reply_text += f"\n🎙 Audio: {audio_path}"
|
||||
|
||||
reply_func(reply_text)
|
||||
|
||||
# If audio was generated, send it as voice message
|
||||
if audio_path and Path(audio_path).exists():
|
||||
reply_func(f"🎧 Sending audio briefing...")
|
||||
# Note: actual voice delivery depends on gateway capabilities
|
||||
|
||||
return "success"
|
||||
|
||||
elif result["status"] == "empty":
|
||||
reply_func("⚠️ No new items found in the requested window.")
|
||||
return "empty"
|
||||
|
||||
else:
|
||||
reply_func(f"⚠️ Pipeline returned: {result['status']}")
|
||||
return result["status"]
|
||||
|
||||
except Exception as e:
|
||||
reply_func(f"❌ Deep Dive failed: {type(e).__name__}: {str(e)[:200]}")
|
||||
return "error"
|
||||
|
||||
|
||||
def main_cli():
|
||||
"""CLI entry point for testing the command handler locally."""
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Test /deepdive Telegram command")
|
||||
parser.add_argument("--since", "-s", type=int, default=24)
|
||||
args = parser.parse_args()
|
||||
|
||||
def mock_reply(text):
|
||||
print(f"[MOCK_REPLY] {text}")
|
||||
|
||||
result = deepdive_handler(f"/deepdive --since {args.since}", "test_chat", mock_reply)
|
||||
print(f"Result: {result}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main_cli()
|
||||
Reference in New Issue
Block a user