feat(telegram): add document file processing for PDF, text, and Office files
Download, cache, and enrich document files sent via Telegram. Supports .pdf, .md, .txt, .docx, .xlsx, .pptx with size validation, unsupported type rejection, text content injection for .md/.txt, and hourly cache cleanup. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -171,6 +171,74 @@ async def cache_audio_from_url(url: str, ext: str = ".ogg") -> str:
|
||||
return cache_audio_from_bytes(response.content, ext)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Document cache utilities
|
||||
#
|
||||
# Same pattern as image/audio cache -- documents from platforms are downloaded
|
||||
# here so the agent can reference them by local file path.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
DOCUMENT_CACHE_DIR = Path(os.path.expanduser("~/.hermes/document_cache"))
|
||||
|
||||
SUPPORTED_DOCUMENT_TYPES = {
|
||||
".pdf": "application/pdf",
|
||||
".md": "text/markdown",
|
||||
".txt": "text/plain",
|
||||
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
||||
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
||||
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
||||
}
|
||||
|
||||
|
||||
def get_document_cache_dir() -> Path:
|
||||
"""Return the document cache directory, creating it if it doesn't exist."""
|
||||
DOCUMENT_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
return DOCUMENT_CACHE_DIR
|
||||
|
||||
|
||||
def cache_document_from_bytes(data: bytes, filename: str) -> str:
|
||||
"""
|
||||
Save raw document bytes to the cache and return the absolute file path.
|
||||
|
||||
The cached filename preserves the original human-readable name with a
|
||||
unique prefix: ``doc_{uuid12}_{original_filename}``.
|
||||
|
||||
Args:
|
||||
data: Raw document bytes.
|
||||
filename: Original filename (e.g. "report.pdf").
|
||||
|
||||
Returns:
|
||||
Absolute path to the cached document file as a string.
|
||||
"""
|
||||
cache_dir = get_document_cache_dir()
|
||||
safe_name = filename if filename else "document"
|
||||
cached_name = f"doc_{uuid.uuid4().hex[:12]}_{safe_name}"
|
||||
filepath = cache_dir / cached_name
|
||||
filepath.write_bytes(data)
|
||||
return str(filepath)
|
||||
|
||||
|
||||
def cleanup_document_cache(max_age_hours: int = 24) -> int:
|
||||
"""
|
||||
Delete cached documents older than *max_age_hours*.
|
||||
|
||||
Returns the number of files removed.
|
||||
"""
|
||||
import time
|
||||
|
||||
cache_dir = get_document_cache_dir()
|
||||
cutoff = time.time() - (max_age_hours * 3600)
|
||||
removed = 0
|
||||
for f in cache_dir.iterdir():
|
||||
if f.is_file() and f.stat().st_mtime < cutoff:
|
||||
try:
|
||||
f.unlink()
|
||||
removed += 1
|
||||
except OSError:
|
||||
pass
|
||||
return removed
|
||||
|
||||
|
||||
class MessageType(Enum):
|
||||
"""Types of incoming messages."""
|
||||
TEXT = "text"
|
||||
|
||||
@@ -8,6 +8,7 @@ Uses python-telegram-bot library for:
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
from typing import Dict, List, Optional, Any
|
||||
|
||||
@@ -42,6 +43,8 @@ from gateway.platforms.base import (
|
||||
SendResult,
|
||||
cache_image_from_bytes,
|
||||
cache_audio_from_bytes,
|
||||
cache_document_from_bytes,
|
||||
SUPPORTED_DOCUMENT_TYPES,
|
||||
)
|
||||
|
||||
|
||||
@@ -419,6 +422,8 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
msg_type = MessageType.AUDIO
|
||||
elif msg.voice:
|
||||
msg_type = MessageType.VOICE
|
||||
elif msg.document:
|
||||
msg_type = MessageType.DOCUMENT
|
||||
else:
|
||||
msg_type = MessageType.DOCUMENT
|
||||
|
||||
@@ -479,7 +484,70 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
print(f"[Telegram] Cached user audio: {cached_path}", flush=True)
|
||||
except Exception as e:
|
||||
print(f"[Telegram] Failed to cache audio: {e}", flush=True)
|
||||
|
||||
|
||||
# Download document files to cache for agent processing
|
||||
elif msg.document:
|
||||
doc = msg.document
|
||||
try:
|
||||
# Determine file extension
|
||||
ext = ""
|
||||
original_filename = doc.file_name or ""
|
||||
if original_filename:
|
||||
_, ext = os.path.splitext(original_filename)
|
||||
ext = ext.lower()
|
||||
|
||||
# If no extension from filename, reverse-lookup from MIME type
|
||||
if not ext and doc.mime_type:
|
||||
mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()}
|
||||
ext = mime_to_ext.get(doc.mime_type, "")
|
||||
|
||||
# Check if supported
|
||||
if ext not in SUPPORTED_DOCUMENT_TYPES:
|
||||
supported_list = ", ".join(sorted(SUPPORTED_DOCUMENT_TYPES.keys()))
|
||||
event.text = (
|
||||
f"Unsupported document type '{ext or 'unknown'}'. "
|
||||
f"Supported types: {supported_list}"
|
||||
)
|
||||
print(f"[Telegram] Unsupported document type: {ext or 'unknown'}", flush=True)
|
||||
await self.handle_message(event)
|
||||
return
|
||||
|
||||
# Check file size (Telegram Bot API limit: 20 MB)
|
||||
if doc.file_size and doc.file_size > 20 * 1024 * 1024:
|
||||
event.text = (
|
||||
"The document is too large (over 20 MB). "
|
||||
"Please send a smaller file."
|
||||
)
|
||||
print(f"[Telegram] Document too large: {doc.file_size} bytes", flush=True)
|
||||
await self.handle_message(event)
|
||||
return
|
||||
|
||||
# Download and cache
|
||||
file_obj = await doc.get_file()
|
||||
doc_bytes = await file_obj.download_as_bytearray()
|
||||
raw_bytes = bytes(doc_bytes)
|
||||
cached_path = cache_document_from_bytes(raw_bytes, original_filename or f"document{ext}")
|
||||
mime_type = SUPPORTED_DOCUMENT_TYPES[ext]
|
||||
event.media_urls = [cached_path]
|
||||
event.media_types = [mime_type]
|
||||
print(f"[Telegram] Cached user document: {cached_path}", flush=True)
|
||||
|
||||
# For text files, inject content into event.text
|
||||
if ext in (".md", ".txt"):
|
||||
try:
|
||||
text_content = raw_bytes.decode("utf-8")
|
||||
display_name = original_filename or f"document{ext}"
|
||||
injection = f"[Content of {display_name}]:\n{text_content}"
|
||||
if event.text:
|
||||
event.text = f"{injection}\n\n{event.text}"
|
||||
else:
|
||||
event.text = injection
|
||||
except UnicodeDecodeError:
|
||||
print(f"[Telegram] Could not decode text file as UTF-8, skipping content injection", flush=True)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[Telegram] Failed to cache document: {e}", flush=True)
|
||||
|
||||
await self.handle_message(event)
|
||||
|
||||
async def _handle_sticker(self, msg: Message, event: "MessageEvent") -> None:
|
||||
|
||||
@@ -742,7 +742,36 @@ class GatewayRunner:
|
||||
message_text = await self._enrich_message_with_transcription(
|
||||
message_text, audio_paths
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------
|
||||
# Enrich document messages with context notes for the agent
|
||||
# -----------------------------------------------------------------
|
||||
if event.media_urls and event.message_type == MessageType.DOCUMENT:
|
||||
for i, path in enumerate(event.media_urls):
|
||||
mtype = event.media_types[i] if i < len(event.media_types) else ""
|
||||
if not (mtype.startswith("application/") or mtype.startswith("text/")):
|
||||
continue
|
||||
# Extract display filename by stripping the doc_{uuid12}_ prefix
|
||||
import os as _os
|
||||
basename = _os.path.basename(path)
|
||||
# Format: doc_<12hex>_<original_filename>
|
||||
parts = basename.split("_", 2)
|
||||
display_name = parts[2] if len(parts) >= 3 else basename
|
||||
|
||||
if mtype.startswith("text/"):
|
||||
context_note = (
|
||||
f"[The user sent a text document: '{display_name}'. "
|
||||
f"Its content has been included below. "
|
||||
f"The file is also saved at: {path}]"
|
||||
)
|
||||
else:
|
||||
context_note = (
|
||||
f"[The user sent a document: '{display_name}'. "
|
||||
f"The file is saved at: {path}. "
|
||||
f"Ask the user what they'd like you to do with it.]"
|
||||
)
|
||||
message_text = f"{context_note}\n\n{message_text}"
|
||||
|
||||
try:
|
||||
# Emit agent:start hook
|
||||
hook_ctx = {
|
||||
@@ -1754,10 +1783,10 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, interval: int
|
||||
needing a separate `hermes cron daemon` or system cron entry.
|
||||
|
||||
Also refreshes the channel directory every 5 minutes and prunes the
|
||||
image/audio cache once per hour.
|
||||
image/audio/document cache once per hour.
|
||||
"""
|
||||
from cron.scheduler import tick as cron_tick
|
||||
from gateway.platforms.base import cleanup_image_cache
|
||||
from gateway.platforms.base import cleanup_image_cache, cleanup_document_cache
|
||||
|
||||
IMAGE_CACHE_EVERY = 60 # ticks — once per hour at default 60s interval
|
||||
CHANNEL_DIR_EVERY = 5 # ticks — every 5 minutes
|
||||
@@ -1786,6 +1815,12 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, interval: int
|
||||
logger.info("Image cache cleanup: removed %d stale file(s)", removed)
|
||||
except Exception as e:
|
||||
logger.debug("Image cache cleanup error: %s", e)
|
||||
try:
|
||||
removed = cleanup_document_cache(max_age_hours=24)
|
||||
if removed:
|
||||
logger.info("Document cache cleanup: removed %d stale file(s)", removed)
|
||||
except Exception as e:
|
||||
logger.debug("Document cache cleanup error: %s", e)
|
||||
|
||||
stop_event.wait(timeout=interval)
|
||||
logger.info("Cron ticker stopped")
|
||||
|
||||
Reference in New Issue
Block a user