From fbb1923fad18eb3bba332c3bfbdcfd69dddae19e Mon Sep 17 00:00:00 2001 From: tekelala Date: Fri, 27 Feb 2026 11:53:46 -0500 Subject: [PATCH] fix(security): patch path traversal, size bypass, and prompt injection in document processing - Sanitize filenames in cache_document_from_bytes to prevent path traversal (strip directory components, null bytes, resolve check) - Reject documents with None file_size instead of silently allowing download - Cap text file injection at 100 KB to prevent oversized prompt payloads - Sanitize display_name in run.py context notes to block prompt injection via filenames - Add 35 unit tests covering document cache utilities and Telegram document handling Co-Authored-By: Claude Opus 4.6 --- gateway/platforms/base.py | 12 +- gateway/platforms/telegram.py | 12 +- gateway/run.py | 3 + tests/gateway/test_document_cache.py | 157 +++++++++++ tests/gateway/test_telegram_documents.py | 338 +++++++++++++++++++++++ 5 files changed, 516 insertions(+), 6 deletions(-) create mode 100644 tests/gateway/test_document_cache.py create mode 100644 tests/gateway/test_telegram_documents.py diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index f854723a4..2e818b4ea 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -209,11 +209,21 @@ def cache_document_from_bytes(data: bytes, filename: str) -> str: Returns: Absolute path to the cached document file as a string. + + Raises: + ValueError: If the sanitized path escapes the cache directory. """ cache_dir = get_document_cache_dir() - safe_name = filename if filename else "document" + # Sanitize: strip directory components, null bytes, and control characters + safe_name = Path(filename).name if filename else "document" + safe_name = safe_name.replace("\x00", "").strip() + if not safe_name or safe_name in (".", ".."): + safe_name = "document" cached_name = f"doc_{uuid.uuid4().hex[:12]}_{safe_name}" filepath = cache_dir / cached_name + # Final safety check: ensure path stays inside cache dir + if not filepath.resolve().is_relative_to(cache_dir.resolve()): + raise ValueError(f"Path traversal rejected: {filename!r}") filepath.write_bytes(data) return str(filepath) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 2bfd5085a..e7c6062a1 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -513,10 +513,11 @@ class TelegramAdapter(BasePlatformAdapter): return # Check file size (Telegram Bot API limit: 20 MB) - if doc.file_size and doc.file_size > 20 * 1024 * 1024: + MAX_DOC_BYTES = 20 * 1024 * 1024 + if not doc.file_size or doc.file_size > MAX_DOC_BYTES: event.text = ( - "The document is too large (over 20 MB). " - "Please send a smaller file." + "The document is too large or its size could not be verified. " + "Maximum: 20 MB." ) print(f"[Telegram] Document too large: {doc.file_size} bytes", flush=True) await self.handle_message(event) @@ -532,8 +533,9 @@ class TelegramAdapter(BasePlatformAdapter): event.media_types = [mime_type] print(f"[Telegram] Cached user document: {cached_path}", flush=True) - # For text files, inject content into event.text - if ext in (".md", ".txt"): + # For text files, inject content into event.text (capped at 100 KB) + MAX_TEXT_INJECT_BYTES = 100 * 1024 + if ext in (".md", ".txt") and len(raw_bytes) <= MAX_TEXT_INJECT_BYTES: try: text_content = raw_bytes.decode("utf-8") display_name = original_filename or f"document{ext}" diff --git a/gateway/run.py b/gateway/run.py index 48c4b3ce2..83f781fb0 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -757,6 +757,9 @@ class GatewayRunner: # Format: doc_<12hex>_ parts = basename.split("_", 2) display_name = parts[2] if len(parts) >= 3 else basename + # Sanitize to prevent prompt injection via filenames + import re as _re + display_name = _re.sub(r'[^\w.\- ]', '_', display_name) if mtype.startswith("text/"): context_note = ( diff --git a/tests/gateway/test_document_cache.py b/tests/gateway/test_document_cache.py new file mode 100644 index 000000000..18440ed9c --- /dev/null +++ b/tests/gateway/test_document_cache.py @@ -0,0 +1,157 @@ +""" +Tests for document cache utilities in gateway/platforms/base.py. + +Covers: get_document_cache_dir, cache_document_from_bytes, + cleanup_document_cache, SUPPORTED_DOCUMENT_TYPES. +""" + +import os +import time +from pathlib import Path + +import pytest + +from gateway.platforms.base import ( + SUPPORTED_DOCUMENT_TYPES, + cache_document_from_bytes, + cleanup_document_cache, + get_document_cache_dir, +) + +# --------------------------------------------------------------------------- +# Fixture: redirect DOCUMENT_CACHE_DIR to a temp directory for every test +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True) +def _redirect_cache(tmp_path, monkeypatch): + """Point the module-level DOCUMENT_CACHE_DIR to a fresh tmp_path.""" + monkeypatch.setattr( + "gateway.platforms.base.DOCUMENT_CACHE_DIR", tmp_path / "doc_cache" + ) + + +# --------------------------------------------------------------------------- +# TestGetDocumentCacheDir +# --------------------------------------------------------------------------- + +class TestGetDocumentCacheDir: + def test_creates_directory(self, tmp_path): + cache_dir = get_document_cache_dir() + assert cache_dir.exists() + assert cache_dir.is_dir() + + def test_returns_existing_directory(self): + first = get_document_cache_dir() + second = get_document_cache_dir() + assert first == second + assert first.exists() + + +# --------------------------------------------------------------------------- +# TestCacheDocumentFromBytes +# --------------------------------------------------------------------------- + +class TestCacheDocumentFromBytes: + def test_basic_caching(self): + data = b"hello world" + path = cache_document_from_bytes(data, "test.txt") + assert os.path.exists(path) + assert Path(path).read_bytes() == data + + def test_filename_preserved_in_path(self): + path = cache_document_from_bytes(b"data", "report.pdf") + assert "report.pdf" in os.path.basename(path) + + def test_empty_filename_uses_fallback(self): + path = cache_document_from_bytes(b"data", "") + assert "document" in os.path.basename(path) + + def test_unique_filenames(self): + p1 = cache_document_from_bytes(b"a", "same.txt") + p2 = cache_document_from_bytes(b"b", "same.txt") + assert p1 != p2 + + def test_path_traversal_blocked(self): + """Malicious directory components are stripped — only the leaf name survives.""" + path = cache_document_from_bytes(b"data", "../../etc/passwd") + basename = os.path.basename(path) + assert "passwd" in basename + # Must NOT contain directory separators + assert ".." not in basename + # File must reside inside the cache directory + cache_dir = get_document_cache_dir() + assert Path(path).resolve().is_relative_to(cache_dir.resolve()) + + def test_null_bytes_stripped(self): + path = cache_document_from_bytes(b"data", "file\x00.pdf") + basename = os.path.basename(path) + assert "\x00" not in basename + assert "file.pdf" in basename + + def test_dot_dot_filename_handled(self): + """A filename that is literally '..' falls back to 'document'.""" + path = cache_document_from_bytes(b"data", "..") + basename = os.path.basename(path) + assert "document" in basename + + def test_none_filename_uses_fallback(self): + path = cache_document_from_bytes(b"data", None) + assert "document" in os.path.basename(path) + + +# --------------------------------------------------------------------------- +# TestCleanupDocumentCache +# --------------------------------------------------------------------------- + +class TestCleanupDocumentCache: + def test_removes_old_files(self, tmp_path): + cache_dir = get_document_cache_dir() + old_file = cache_dir / "old.txt" + old_file.write_text("old") + # Set modification time to 48 hours ago + old_mtime = time.time() - 48 * 3600 + os.utime(old_file, (old_mtime, old_mtime)) + + removed = cleanup_document_cache(max_age_hours=24) + assert removed == 1 + assert not old_file.exists() + + def test_keeps_recent_files(self): + cache_dir = get_document_cache_dir() + recent = cache_dir / "recent.txt" + recent.write_text("fresh") + + removed = cleanup_document_cache(max_age_hours=24) + assert removed == 0 + assert recent.exists() + + def test_returns_removed_count(self): + cache_dir = get_document_cache_dir() + old_time = time.time() - 48 * 3600 + for i in range(3): + f = cache_dir / f"old_{i}.txt" + f.write_text("x") + os.utime(f, (old_time, old_time)) + + assert cleanup_document_cache(max_age_hours=24) == 3 + + def test_empty_cache_dir(self): + assert cleanup_document_cache(max_age_hours=24) == 0 + + +# --------------------------------------------------------------------------- +# TestSupportedDocumentTypes +# --------------------------------------------------------------------------- + +class TestSupportedDocumentTypes: + def test_all_extensions_have_mime_types(self): + for ext, mime in SUPPORTED_DOCUMENT_TYPES.items(): + assert ext.startswith("."), f"{ext} missing leading dot" + assert "/" in mime, f"{mime} is not a valid MIME type" + + @pytest.mark.parametrize( + "ext", + [".pdf", ".md", ".txt", ".docx", ".xlsx", ".pptx"], + ) + def test_expected_extensions_present(self, ext): + assert ext in SUPPORTED_DOCUMENT_TYPES diff --git a/tests/gateway/test_telegram_documents.py b/tests/gateway/test_telegram_documents.py new file mode 100644 index 000000000..4aceda842 --- /dev/null +++ b/tests/gateway/test_telegram_documents.py @@ -0,0 +1,338 @@ +""" +Tests for Telegram document handling in gateway/platforms/telegram.py. + +Covers: document type detection, download/cache flow, size limits, + text injection, error handling. + +Note: python-telegram-bot may not be installed in the test environment. +We mock the telegram module at import time to avoid collection errors. +""" + +import asyncio +import importlib +import os +import sys +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + MessageEvent, + MessageType, + SUPPORTED_DOCUMENT_TYPES, +) + + +# --------------------------------------------------------------------------- +# Mock the telegram package if it's not installed +# --------------------------------------------------------------------------- + +def _ensure_telegram_mock(): + """Install mock telegram modules so TelegramAdapter can be imported.""" + if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"): + # Real library is installed — no mocking needed + return + + telegram_mod = MagicMock() + # ContextTypes needs DEFAULT_TYPE as an actual attribute for the annotation + telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None) + telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2" + telegram_mod.constants.ChatType.GROUP = "group" + telegram_mod.constants.ChatType.SUPERGROUP = "supergroup" + telegram_mod.constants.ChatType.CHANNEL = "channel" + telegram_mod.constants.ChatType.PRIVATE = "private" + + for name in ("telegram", "telegram.ext", "telegram.constants"): + sys.modules.setdefault(name, telegram_mod) + + +_ensure_telegram_mock() + +# Now we can safely import +from gateway.platforms.telegram import TelegramAdapter # noqa: E402 + + +# --------------------------------------------------------------------------- +# Helpers to build mock Telegram objects +# --------------------------------------------------------------------------- + +def _make_file_obj(data: bytes = b"hello"): + """Create a mock Telegram File with download_as_bytearray.""" + f = AsyncMock() + f.download_as_bytearray = AsyncMock(return_value=bytearray(data)) + f.file_path = "documents/file.pdf" + return f + + +def _make_document( + file_name="report.pdf", + mime_type="application/pdf", + file_size=1024, + file_obj=None, +): + """Create a mock Telegram Document object.""" + doc = MagicMock() + doc.file_name = file_name + doc.mime_type = mime_type + doc.file_size = file_size + doc.get_file = AsyncMock(return_value=file_obj or _make_file_obj()) + return doc + + +def _make_message(document=None, caption=None): + """Build a mock Telegram Message with the given document.""" + msg = MagicMock() + msg.message_id = 42 + msg.text = caption or "" + msg.caption = caption + msg.date = None + # Media flags — all None except document + msg.photo = None + msg.video = None + msg.audio = None + msg.voice = None + msg.sticker = None + msg.document = document + # Chat / user + msg.chat = MagicMock() + msg.chat.id = 100 + msg.chat.type = "private" + msg.chat.title = None + msg.chat.full_name = "Test User" + msg.from_user = MagicMock() + msg.from_user.id = 1 + msg.from_user.full_name = "Test User" + msg.message_thread_id = None + return msg + + +def _make_update(msg): + """Wrap a message in a mock Update.""" + update = MagicMock() + update.message = msg + return update + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture() +def adapter(): + config = PlatformConfig(enabled=True, token="fake-token") + a = TelegramAdapter(config) + # Capture events instead of processing them + a.handle_message = AsyncMock() + return a + + +@pytest.fixture(autouse=True) +def _redirect_cache(tmp_path, monkeypatch): + """Point document cache to tmp_path so tests don't touch ~/.hermes.""" + monkeypatch.setattr( + "gateway.platforms.base.DOCUMENT_CACHE_DIR", tmp_path / "doc_cache" + ) + + +# --------------------------------------------------------------------------- +# TestDocumentTypeDetection +# --------------------------------------------------------------------------- + +class TestDocumentTypeDetection: + @pytest.mark.asyncio + async def test_document_detected_explicitly(self, adapter): + doc = _make_document() + msg = _make_message(document=doc) + update = _make_update(msg) + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert event.message_type == MessageType.DOCUMENT + + @pytest.mark.asyncio + async def test_fallback_is_document(self, adapter): + """When no specific media attr is set, message_type defaults to DOCUMENT.""" + msg = _make_message() + msg.document = None # no media at all + update = _make_update(msg) + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert event.message_type == MessageType.DOCUMENT + + +# --------------------------------------------------------------------------- +# TestDocumentDownloadBlock +# --------------------------------------------------------------------------- + +class TestDocumentDownloadBlock: + @pytest.mark.asyncio + async def test_supported_pdf_is_cached(self, adapter): + pdf_bytes = b"%PDF-1.4 fake" + file_obj = _make_file_obj(pdf_bytes) + doc = _make_document(file_name="report.pdf", file_size=1024, file_obj=file_obj) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert len(event.media_urls) == 1 + assert os.path.exists(event.media_urls[0]) + assert event.media_types == ["application/pdf"] + + @pytest.mark.asyncio + async def test_supported_txt_injects_content(self, adapter): + content = b"Hello from a text file" + file_obj = _make_file_obj(content) + doc = _make_document( + file_name="notes.txt", mime_type="text/plain", + file_size=len(content), file_obj=file_obj, + ) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert "Hello from a text file" in event.text + assert "[Content of notes.txt]" in event.text + + @pytest.mark.asyncio + async def test_supported_md_injects_content(self, adapter): + content = b"# Title\nSome markdown" + file_obj = _make_file_obj(content) + doc = _make_document( + file_name="readme.md", mime_type="text/markdown", + file_size=len(content), file_obj=file_obj, + ) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert "# Title" in event.text + + @pytest.mark.asyncio + async def test_caption_preserved_with_injection(self, adapter): + content = b"file text" + file_obj = _make_file_obj(content) + doc = _make_document( + file_name="doc.txt", mime_type="text/plain", + file_size=len(content), file_obj=file_obj, + ) + msg = _make_message(document=doc, caption="Please summarize") + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert "file text" in event.text + assert "Please summarize" in event.text + + @pytest.mark.asyncio + async def test_unsupported_type_rejected(self, adapter): + doc = _make_document(file_name="archive.zip", mime_type="application/zip", file_size=100) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert "Unsupported document type" in event.text + assert ".zip" in event.text + + @pytest.mark.asyncio + async def test_oversized_file_rejected(self, adapter): + doc = _make_document(file_name="huge.pdf", file_size=25 * 1024 * 1024) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert "too large" in event.text + + @pytest.mark.asyncio + async def test_none_file_size_rejected(self, adapter): + """Security fix: file_size=None must be rejected (not silently allowed).""" + doc = _make_document(file_name="tricky.pdf", file_size=None) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert "too large" in event.text or "could not be verified" in event.text + + @pytest.mark.asyncio + async def test_missing_filename_uses_mime_lookup(self, adapter): + """No file_name but valid mime_type should resolve to extension.""" + content = b"some pdf bytes" + file_obj = _make_file_obj(content) + doc = _make_document( + file_name=None, mime_type="application/pdf", + file_size=len(content), file_obj=file_obj, + ) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert len(event.media_urls) == 1 + assert event.media_types == ["application/pdf"] + + @pytest.mark.asyncio + async def test_missing_filename_and_mime_rejected(self, adapter): + doc = _make_document(file_name=None, mime_type=None, file_size=100) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + assert "Unsupported" in event.text + + @pytest.mark.asyncio + async def test_unicode_decode_error_handled(self, adapter): + """Binary bytes that aren't valid UTF-8 in a .txt — content not injected but file still cached.""" + binary = bytes(range(128, 256)) # not valid UTF-8 + file_obj = _make_file_obj(binary) + doc = _make_document( + file_name="binary.txt", mime_type="text/plain", + file_size=len(binary), file_obj=file_obj, + ) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + # File should still be cached + assert len(event.media_urls) == 1 + assert os.path.exists(event.media_urls[0]) + # Content NOT injected — text should be empty (no caption set) + assert "[Content of" not in (event.text or "") + + @pytest.mark.asyncio + async def test_text_injection_capped(self, adapter): + """A .txt file over 100 KB should NOT have its content injected.""" + large = b"x" * (200 * 1024) # 200 KB + file_obj = _make_file_obj(large) + doc = _make_document( + file_name="big.txt", mime_type="text/plain", + file_size=len(large), file_obj=file_obj, + ) + msg = _make_message(document=doc) + update = _make_update(msg) + + await adapter._handle_media_message(update, MagicMock()) + event = adapter.handle_message.call_args[0][0] + # File should be cached + assert len(event.media_urls) == 1 + # Content should NOT be injected + assert "[Content of" not in (event.text or "") + + @pytest.mark.asyncio + async def test_download_exception_handled(self, adapter): + """If get_file() raises, the handler logs the error without crashing.""" + doc = _make_document(file_name="crash.pdf", file_size=100) + doc.get_file = AsyncMock(side_effect=RuntimeError("Telegram API down")) + msg = _make_message(document=doc) + update = _make_update(msg) + + # Should not raise + await adapter._handle_media_message(update, MagicMock()) + # handle_message should still be called (the handler catches the exception) + adapter.handle_message.assert_called_once()