diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index b28b78e7c..f854723a4 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -171,6 +171,74 @@ async def cache_audio_from_url(url: str, ext: str = ".ogg") -> str: return cache_audio_from_bytes(response.content, ext) +# --------------------------------------------------------------------------- +# Document cache utilities +# +# Same pattern as image/audio cache -- documents from platforms are downloaded +# here so the agent can reference them by local file path. +# --------------------------------------------------------------------------- + +DOCUMENT_CACHE_DIR = Path(os.path.expanduser("~/.hermes/document_cache")) + +SUPPORTED_DOCUMENT_TYPES = { + ".pdf": "application/pdf", + ".md": "text/markdown", + ".txt": "text/plain", + ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", +} + + +def get_document_cache_dir() -> Path: + """Return the document cache directory, creating it if it doesn't exist.""" + DOCUMENT_CACHE_DIR.mkdir(parents=True, exist_ok=True) + return DOCUMENT_CACHE_DIR + + +def cache_document_from_bytes(data: bytes, filename: str) -> str: + """ + Save raw document bytes to the cache and return the absolute file path. + + The cached filename preserves the original human-readable name with a + unique prefix: ``doc_{uuid12}_{original_filename}``. + + Args: + data: Raw document bytes. + filename: Original filename (e.g. "report.pdf"). + + Returns: + Absolute path to the cached document file as a string. + """ + cache_dir = get_document_cache_dir() + safe_name = filename if filename else "document" + cached_name = f"doc_{uuid.uuid4().hex[:12]}_{safe_name}" + filepath = cache_dir / cached_name + filepath.write_bytes(data) + return str(filepath) + + +def cleanup_document_cache(max_age_hours: int = 24) -> int: + """ + Delete cached documents older than *max_age_hours*. + + Returns the number of files removed. + """ + import time + + cache_dir = get_document_cache_dir() + cutoff = time.time() - (max_age_hours * 3600) + removed = 0 + for f in cache_dir.iterdir(): + if f.is_file() and f.stat().st_mtime < cutoff: + try: + f.unlink() + removed += 1 + except OSError: + pass + return removed + + class MessageType(Enum): """Types of incoming messages.""" TEXT = "text" diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 73d749bd3..2bfd5085a 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -8,6 +8,7 @@ Uses python-telegram-bot library for: """ import asyncio +import os import re from typing import Dict, List, Optional, Any @@ -42,6 +43,8 @@ from gateway.platforms.base import ( SendResult, cache_image_from_bytes, cache_audio_from_bytes, + cache_document_from_bytes, + SUPPORTED_DOCUMENT_TYPES, ) @@ -419,6 +422,8 @@ class TelegramAdapter(BasePlatformAdapter): msg_type = MessageType.AUDIO elif msg.voice: msg_type = MessageType.VOICE + elif msg.document: + msg_type = MessageType.DOCUMENT else: msg_type = MessageType.DOCUMENT @@ -479,7 +484,70 @@ class TelegramAdapter(BasePlatformAdapter): print(f"[Telegram] Cached user audio: {cached_path}", flush=True) except Exception as e: print(f"[Telegram] Failed to cache audio: {e}", flush=True) - + + # Download document files to cache for agent processing + elif msg.document: + doc = msg.document + try: + # Determine file extension + ext = "" + original_filename = doc.file_name or "" + if original_filename: + _, ext = os.path.splitext(original_filename) + ext = ext.lower() + + # If no extension from filename, reverse-lookup from MIME type + if not ext and doc.mime_type: + mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()} + ext = mime_to_ext.get(doc.mime_type, "") + + # Check if supported + if ext not in SUPPORTED_DOCUMENT_TYPES: + supported_list = ", ".join(sorted(SUPPORTED_DOCUMENT_TYPES.keys())) + event.text = ( + f"Unsupported document type '{ext or 'unknown'}'. " + f"Supported types: {supported_list}" + ) + print(f"[Telegram] Unsupported document type: {ext or 'unknown'}", flush=True) + await self.handle_message(event) + return + + # Check file size (Telegram Bot API limit: 20 MB) + if doc.file_size and doc.file_size > 20 * 1024 * 1024: + event.text = ( + "The document is too large (over 20 MB). " + "Please send a smaller file." + ) + print(f"[Telegram] Document too large: {doc.file_size} bytes", flush=True) + await self.handle_message(event) + return + + # Download and cache + file_obj = await doc.get_file() + doc_bytes = await file_obj.download_as_bytearray() + raw_bytes = bytes(doc_bytes) + cached_path = cache_document_from_bytes(raw_bytes, original_filename or f"document{ext}") + mime_type = SUPPORTED_DOCUMENT_TYPES[ext] + event.media_urls = [cached_path] + event.media_types = [mime_type] + print(f"[Telegram] Cached user document: {cached_path}", flush=True) + + # For text files, inject content into event.text + if ext in (".md", ".txt"): + try: + text_content = raw_bytes.decode("utf-8") + display_name = original_filename or f"document{ext}" + injection = f"[Content of {display_name}]:\n{text_content}" + if event.text: + event.text = f"{injection}\n\n{event.text}" + else: + event.text = injection + except UnicodeDecodeError: + print(f"[Telegram] Could not decode text file as UTF-8, skipping content injection", flush=True) + + except Exception as e: + print(f"[Telegram] Failed to cache document: {e}", flush=True) + await self.handle_message(event) async def _handle_sticker(self, msg: Message, event: "MessageEvent") -> None: diff --git a/gateway/run.py b/gateway/run.py index df882d8e6..48c4b3ce2 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -742,7 +742,36 @@ class GatewayRunner: message_text = await self._enrich_message_with_transcription( message_text, audio_paths ) - + + # ----------------------------------------------------------------- + # Enrich document messages with context notes for the agent + # ----------------------------------------------------------------- + if event.media_urls and event.message_type == MessageType.DOCUMENT: + for i, path in enumerate(event.media_urls): + mtype = event.media_types[i] if i < len(event.media_types) else "" + if not (mtype.startswith("application/") or mtype.startswith("text/")): + continue + # Extract display filename by stripping the doc_{uuid12}_ prefix + import os as _os + basename = _os.path.basename(path) + # Format: doc_<12hex>_ + parts = basename.split("_", 2) + display_name = parts[2] if len(parts) >= 3 else basename + + if mtype.startswith("text/"): + context_note = ( + f"[The user sent a text document: '{display_name}'. " + f"Its content has been included below. " + f"The file is also saved at: {path}]" + ) + else: + context_note = ( + f"[The user sent a document: '{display_name}'. " + f"The file is saved at: {path}. " + f"Ask the user what they'd like you to do with it.]" + ) + message_text = f"{context_note}\n\n{message_text}" + try: # Emit agent:start hook hook_ctx = { @@ -1754,10 +1783,10 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, interval: int needing a separate `hermes cron daemon` or system cron entry. Also refreshes the channel directory every 5 minutes and prunes the - image/audio cache once per hour. + image/audio/document cache once per hour. """ from cron.scheduler import tick as cron_tick - from gateway.platforms.base import cleanup_image_cache + from gateway.platforms.base import cleanup_image_cache, cleanup_document_cache IMAGE_CACHE_EVERY = 60 # ticks — once per hour at default 60s interval CHANNEL_DIR_EVERY = 5 # ticks — every 5 minutes @@ -1786,6 +1815,12 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, interval: int logger.info("Image cache cleanup: removed %d stale file(s)", removed) except Exception as e: logger.debug("Image cache cleanup error: %s", e) + try: + removed = cleanup_document_cache(max_age_hours=24) + if removed: + logger.info("Document cache cleanup: removed %d stale file(s)", removed) + except Exception as e: + logger.debug("Document cache cleanup error: %s", e) stop_event.wait(timeout=interval) logger.info("Cron ticker stopped")