diff --git a/intelligence/deepdive/telegram_command.py b/intelligence/deepdive/telegram_command.py new file mode 100644 index 0000000..3192592 --- /dev/null +++ b/intelligence/deepdive/telegram_command.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +""" +Telegram command handler for /deepdive on-demand briefings. +Issue #830 — Deep Dive: Sovereign NotebookLM + Daily AI Intelligence Briefing + +Usage (in Hermes Telegram gateway): + from telegram_command import deepdive_handler + commands.register("/deepdive", deepdive_handler) +""" + +import asyncio +import subprocess +import tempfile +from pathlib import Path +from datetime import datetime +from typing import Optional + +# Pipeline integration +try: + import sys + sys.path.insert(0, str(Path(__file__).parent)) + from pipeline import DeepDivePipeline + HAS_PIPELINE = True +except ImportError: + HAS_PIPELINE = False + + +def _load_config() -> dict: + """Load deepdive config from standard location.""" + import yaml + config_path = Path(__file__).parent / "config.yaml" + if not config_path.exists(): + raise FileNotFoundError(f"config.yaml not found at {config_path}") + with open(config_path) as f: + return yaml.safe_load(f) + + +def _run_pipeline_sync(config: dict, since_hours: int = 24) -> dict: + """Run pipeline synchronously for Telegram handler compatibility.""" + return asyncio.run(_run_pipeline_async(config, since_hours)) + + +async def _run_pipeline_async(config: dict, since_hours: int) -> dict: + pipeline = DeepDivePipeline(config) + from datetime import timedelta + since = datetime.utcnow() - timedelta(hours=since_hours) + result = await pipeline.run(since=since, dry_run=False) + return result + + +def deepdive_handler(message_text: str, chat_id: str, reply_func) -> str: + """ + Hermes-compatible Telegram command handler for /deepdive. + + Args: + message_text: Full message text (e.g. "/deepdive --since 48") + chat_id: Telegram chat/channel ID + reply_func: Callable to send replies back to Telegram + + Returns: + Status message string + """ + if not HAS_PIPELINE: + reply_func("āŒ Deep Dive pipeline not available. Check deployment.") + return "pipeline_unavailable" + + # Parse simple arguments + args = message_text.strip().split() + since_hours = 24 + for i, arg in enumerate(args): + if arg in ("--since", "-s") and i + 1 < len(args): + try: + since_hours = int(args[i + 1]) + except ValueError: + pass + + reply_func(f"šŸŽÆ Generating Deep Dive briefing (last {since_hours}h)...") + + try: + config = _load_config() + result = _run_pipeline_sync(config, since_hours) + + if result["status"] == "success": + items = result.get("items_ranked", 0) + briefing_path = result.get("briefing_path", "unknown") + audio_path = result.get("audio_path") + + reply_text = ( + f"āœ… Deep Dive complete!\n" + f"šŸ“Š {items} relevant items synthesized\n" + f"šŸ“ Briefing: {briefing_path}" + ) + if audio_path: + reply_text += f"\nšŸŽ™ Audio: {audio_path}" + + reply_func(reply_text) + + # If audio was generated, send it as voice message + if audio_path and Path(audio_path).exists(): + reply_func(f"šŸŽ§ Sending audio briefing...") + # Note: actual voice delivery depends on gateway capabilities + + return "success" + + elif result["status"] == "empty": + reply_func("āš ļø No new items found in the requested window.") + return "empty" + + else: + reply_func(f"āš ļø Pipeline returned: {result['status']}") + return result["status"] + + except Exception as e: + reply_func(f"āŒ Deep Dive failed: {type(e).__name__}: {str(e)[:200]}") + return "error" + + +def main_cli(): + """CLI entry point for testing the command handler locally.""" + import argparse + parser = argparse.ArgumentParser(description="Test /deepdive Telegram command") + parser.add_argument("--since", "-s", type=int, default=24) + args = parser.parse_args() + + def mock_reply(text): + print(f"[MOCK_REPLY] {text}") + + result = deepdive_handler(f"/deepdive --since {args.since}", "test_chat", mock_reply) + print(f"Result: {result}") + + +if __name__ == "__main__": + main_cli()