From 4920c5940fe09c97b0ab15ff2076583399d94a67 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 17 Mar 2026 01:47:34 -0700 Subject: [PATCH] feat: auto-detect local file paths in gateway responses for native media delivery (#1640) Small models (7B-14B) can't reliably use MEDIA: or IMAGE: syntax. This adds extract_local_files() to BasePlatformAdapter that regex-detects bare local file paths ending in image/video extensions, validates them with os.path.isfile(), and delivers them as native platform attachments. Hardened over the original PR: - Code-block exclusion: paths inside fenced blocks and inline code are skipped so code samples are never mutilated - URL rejection: negative lookbehind prevents matching path segments inside HTTP URLs - Relative path rejection: ./foo.png no longer matches - Tilde path cleanup: raw ~/... form is removed from response text - Deduplication by expanded path - Added .webm to _VIDEO_EXTS - Fallback to send_document for unrecognized media extensions Based on PR #1636 by sudoingX. Co-authored-by: sudoingX --- gateway/platforms/base.py | 107 +++++++- tests/gateway/test_extract_local_files.py | 317 ++++++++++++++++++++++ 2 files changed, 421 insertions(+), 3 deletions(-) create mode 100644 tests/gateway/test_extract_local_files.py diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 1ec60f762..2e34242e4 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -727,7 +727,75 @@ class BasePlatformAdapter(ABC): cleaned = re.sub(r'\n{3,}', '\n\n', cleaned).strip() return media, cleaned - + + @staticmethod + def extract_local_files(content: str) -> Tuple[List[str], str]: + """ + Detect bare local file paths in response text for native media delivery. + + Matches absolute paths (/...) and tilde paths (~/) ending in common + image or video extensions. Validates each candidate with + ``os.path.isfile()`` to avoid false positives from URLs or + non-existent paths. + + Paths inside fenced code blocks (``` ... ```) and inline code + (`...`) are ignored so that code samples are never mutilated. + + Returns: + Tuple of (list of expanded file paths, cleaned text with the + raw path strings removed). + """ + _LOCAL_MEDIA_EXTS = ( + '.png', '.jpg', '.jpeg', '.gif', '.webp', + '.mp4', '.mov', '.avi', '.mkv', '.webm', + ) + ext_part = '|'.join(e.lstrip('.') for e in _LOCAL_MEDIA_EXTS) + + # (? bool: + return any(s <= pos < e for s, e in code_spans) + + found: list = [] # (raw_match_text, expanded_path) + for match in path_re.finditer(content): + if _in_code(match.start()): + continue + raw = match.group(0) + expanded = os.path.expanduser(raw) + if os.path.isfile(expanded): + found.append((raw, expanded)) + + # Deduplicate by expanded path, preserving discovery order + seen: set = set() + unique: list = [] + for raw, expanded in found: + if expanded not in seen: + seen.add(expanded) + unique.append((raw, expanded)) + + paths = [expanded for _, expanded in unique] + + cleaned = content + if unique: + for raw, _exp in unique: + cleaned = cleaned.replace(raw, '') + cleaned = re.sub(r'\n{3,}', '\n\n', cleaned).strip() + + return paths, cleaned + async def _keep_typing(self, chat_id: str, interval: float = 2.0, metadata=None) -> None: """ Continuously send typing indicator until cancelled. @@ -842,6 +910,12 @@ class BasePlatformAdapter(ABC): images, text_content = self.extract_images(response) if images: logger.info("[%s] extract_images found %d image(s) in response (%d chars)", self.name, len(images), len(response)) + + # Auto-detect bare local file paths for native media delivery + # (helps small models that don't use MEDIA: syntax) + local_files, text_content = self.extract_local_files(text_content) + if local_files: + logger.info("[%s] extract_local_files found %d file(s) in response", self.name, len(local_files)) # Auto-TTS: if voice message, generate audio FIRST (before sending text) # Skipped when the chat has voice mode disabled (/voice off) @@ -935,7 +1009,7 @@ class BasePlatformAdapter(ABC): # Send extracted media files — route by file type _AUDIO_EXTS = {'.ogg', '.opus', '.mp3', '.wav', '.m4a'} - _VIDEO_EXTS = {'.mp4', '.mov', '.avi', '.mkv', '.3gp'} + _VIDEO_EXTS = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'} _IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.webp', '.gif'} for media_path, is_voice in media_files: @@ -972,7 +1046,34 @@ class BasePlatformAdapter(ABC): print(f"[{self.name}] Failed to send media ({ext}): {media_result.error}") except Exception as media_err: print(f"[{self.name}] Error sending media: {media_err}") - + + # Send auto-detected local files as native attachments + for file_path in local_files: + if human_delay > 0: + await asyncio.sleep(human_delay) + try: + ext = Path(file_path).suffix.lower() + if ext in _IMAGE_EXTS: + await self.send_image_file( + chat_id=event.source.chat_id, + image_path=file_path, + metadata=_thread_metadata, + ) + elif ext in _VIDEO_EXTS: + await self.send_video( + chat_id=event.source.chat_id, + video_path=file_path, + metadata=_thread_metadata, + ) + else: + await self.send_document( + chat_id=event.source.chat_id, + file_path=file_path, + metadata=_thread_metadata, + ) + except Exception as file_err: + logger.error("[%s] Error sending local file %s: %s", self.name, file_path, file_err) + # Check if there's a pending message that was queued during our processing if session_key in self._pending_messages: pending_event = self._pending_messages.pop(session_key) diff --git a/tests/gateway/test_extract_local_files.py b/tests/gateway/test_extract_local_files.py new file mode 100644 index 000000000..dd93e6370 --- /dev/null +++ b/tests/gateway/test_extract_local_files.py @@ -0,0 +1,317 @@ +""" +Tests for extract_local_files() — auto-detection of bare local file paths +in model response text for native media delivery. + +Covers: path matching, code-block exclusion, URL rejection, tilde expansion, +deduplication, text cleanup, and extension routing. + +Based on PR #1636 by sudoingX (salvaged + hardened). +""" + +import os +from unittest.mock import patch + +import pytest + +from gateway.platforms.base import BasePlatformAdapter + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _extract(content: str, existing_files: set[str] | None = None): + """ + Run extract_local_files with os.path.isfile mocked to return True + for any path in *existing_files* (expanded form). If *existing_files* + is None every path passes. + """ + existing = existing_files + + def fake_isfile(p): + if existing is None: + return True + return p in existing + + def fake_expanduser(p): + if p.startswith("~/"): + return "/home/user" + p[1:] + return p + + with patch("os.path.isfile", side_effect=fake_isfile), \ + patch("os.path.expanduser", side_effect=fake_expanduser): + return BasePlatformAdapter.extract_local_files(content) + + +# --------------------------------------------------------------------------- +# Basic detection +# --------------------------------------------------------------------------- + +class TestBasicDetection: + + def test_absolute_path_image(self): + paths, cleaned = _extract("Here is the screenshot /root/screenshots/game.png enjoy") + assert paths == ["/root/screenshots/game.png"] + assert "/root/screenshots/game.png" not in cleaned + assert "Here is the screenshot" in cleaned + + def test_tilde_path_image(self): + paths, cleaned = _extract("Check out ~/photos/cat.jpg for the cat") + assert paths == ["/home/user/photos/cat.jpg"] + assert "~/photos/cat.jpg" not in cleaned + + def test_video_extensions(self): + for ext in (".mp4", ".mov", ".avi", ".mkv", ".webm"): + text = f"Video at /tmp/clip{ext} here" + paths, _ = _extract(text) + assert len(paths) == 1, f"Failed for {ext}" + assert paths[0] == f"/tmp/clip{ext}" + + def test_image_extensions(self): + for ext in (".png", ".jpg", ".jpeg", ".gif", ".webp"): + text = f"Image at /tmp/pic{ext} here" + paths, _ = _extract(text) + assert len(paths) == 1, f"Failed for {ext}" + assert paths[0] == f"/tmp/pic{ext}" + + def test_case_insensitive_extension(self): + paths, _ = _extract("See /tmp/PHOTO.PNG and /tmp/vid.MP4 now") + assert len(paths) == 2 + + def test_multiple_paths(self): + text = "First /tmp/a.png then /tmp/b.jpg and /tmp/c.mp4 done" + paths, cleaned = _extract(text) + assert len(paths) == 3 + assert "/tmp/a.png" in paths + assert "/tmp/b.jpg" in paths + assert "/tmp/c.mp4" in paths + for p in paths: + assert p not in cleaned + + def test_path_at_line_start(self): + paths, _ = _extract("/var/data/image.png") + assert paths == ["/var/data/image.png"] + + def test_path_at_end_of_line(self): + paths, _ = _extract("saved to /var/data/image.png") + assert paths == ["/var/data/image.png"] + + def test_path_with_dots_in_directory(self): + paths, _ = _extract("See /opt/my.app/assets/logo.png here") + assert paths == ["/opt/my.app/assets/logo.png"] + + def test_path_with_hyphens(self): + paths, _ = _extract("File at /tmp/my-screenshot-2024.png done") + assert paths == ["/tmp/my-screenshot-2024.png"] + + +# --------------------------------------------------------------------------- +# Non-existent files are skipped +# --------------------------------------------------------------------------- + +class TestIsfileGuard: + + def test_nonexistent_path_skipped(self): + """Paths that don't exist on disk are not extracted.""" + paths, cleaned = _extract( + "See /tmp/nope.png here", + existing_files=set(), # nothing exists + ) + assert paths == [] + assert "/tmp/nope.png" in cleaned # not stripped + + def test_only_existing_paths_extracted(self): + """Mix of existing and non-existing — only existing are returned.""" + paths, cleaned = _extract( + "A /tmp/real.png and /tmp/fake.jpg end", + existing_files={"/tmp/real.png"}, + ) + assert paths == ["/tmp/real.png"] + assert "/tmp/real.png" not in cleaned + assert "/tmp/fake.jpg" in cleaned + + +# --------------------------------------------------------------------------- +# URL false-positive prevention +# --------------------------------------------------------------------------- + +class TestURLRejection: + + def test_https_url_not_matched(self): + """Paths embedded in HTTP URLs must not be extracted.""" + paths, cleaned = _extract("Visit https://example.com/images/photo.png for details") + # The regex lookbehind should prevent matching the URL's path segment + # Even if it did match, isfile would be False for /images/photo.png + # (we mock isfile to True-for-all here, so the lookbehind is the guard) + assert paths == [] + assert "https://example.com/images/photo.png" in cleaned + + def test_http_url_not_matched(self): + paths, _ = _extract("See http://cdn.example.com/assets/banner.jpg here") + assert paths == [] + + def test_file_url_not_matched(self): + paths, _ = _extract("Open file:///home/user/doc.png in browser") + # file:// has :// before /home so lookbehind blocks it + assert paths == [] + + +# --------------------------------------------------------------------------- +# Code block exclusion +# --------------------------------------------------------------------------- + +class TestCodeBlockExclusion: + + def test_fenced_code_block_skipped(self): + text = "Here's how:\n```python\nimg = open('/tmp/image.png')\n```\nDone." + paths, cleaned = _extract(text) + assert paths == [] + assert "/tmp/image.png" in cleaned # not stripped + + def test_inline_code_skipped(self): + text = "Use the path `/tmp/image.png` in your config" + paths, cleaned = _extract(text) + assert paths == [] + assert "`/tmp/image.png`" in cleaned + + def test_path_outside_code_block_still_matched(self): + text = ( + "```\ncode: /tmp/inside.png\n```\n" + "But this one is real: /tmp/outside.png" + ) + paths, _ = _extract(text, existing_files={"/tmp/outside.png"}) + assert paths == ["/tmp/outside.png"] + + def test_mixed_inline_code_and_bare_path(self): + text = "Config uses `/etc/app/bg.png` but output is /tmp/result.jpg" + paths, cleaned = _extract(text, existing_files={"/tmp/result.jpg"}) + assert paths == ["/tmp/result.jpg"] + assert "`/etc/app/bg.png`" in cleaned + assert "/tmp/result.jpg" not in cleaned + + def test_multiline_fenced_block(self): + text = ( + "```bash\n" + "cp /source/a.png /dest/b.png\n" + "mv /source/c.mp4 /dest/d.mp4\n" + "```\n" + "Files are ready." + ) + paths, _ = _extract(text) + assert paths == [] + + +# --------------------------------------------------------------------------- +# Deduplication +# --------------------------------------------------------------------------- + +class TestDeduplication: + + def test_duplicate_paths_deduplicated(self): + text = "See /tmp/img.png and also /tmp/img.png again" + paths, _ = _extract(text) + assert paths == ["/tmp/img.png"] + + def test_tilde_and_expanded_same_file(self): + """~/photos/a.png and /home/user/photos/a.png are the same file.""" + text = "See ~/photos/a.png and /home/user/photos/a.png here" + paths, _ = _extract(text, existing_files={"/home/user/photos/a.png"}) + assert len(paths) == 1 + assert paths[0] == "/home/user/photos/a.png" + + +# --------------------------------------------------------------------------- +# Text cleanup +# --------------------------------------------------------------------------- + +class TestTextCleanup: + + def test_path_removed_from_text(self): + paths, cleaned = _extract("Before /tmp/x.png after") + assert "Before" in cleaned + assert "after" in cleaned + assert "/tmp/x.png" not in cleaned + + def test_excessive_blank_lines_collapsed(self): + text = "Before\n\n\n/tmp/x.png\n\n\nAfter" + _, cleaned = _extract(text) + assert "\n\n\n" not in cleaned + + def test_no_paths_text_unchanged(self): + text = "This is a normal response with no file paths." + paths, cleaned = _extract(text) + assert paths == [] + assert cleaned == text + + def test_tilde_form_cleaned_from_text(self): + """The raw ~/... form should be removed, not the expanded /home/user/... form.""" + text = "Output saved to ~/result.png for review" + paths, cleaned = _extract(text) + assert paths == ["/home/user/result.png"] + assert "~/result.png" not in cleaned + + def test_only_path_in_text(self): + """If the response is just a path, cleaned text is empty.""" + paths, cleaned = _extract("/tmp/screenshot.png") + assert paths == ["/tmp/screenshot.png"] + assert cleaned == "" + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + +class TestEdgeCases: + + def test_empty_string(self): + paths, cleaned = _extract("") + assert paths == [] + assert cleaned == "" + + def test_no_media_extensions(self): + """Non-media extensions should not be matched.""" + paths, _ = _extract("See /tmp/data.csv and /tmp/script.py and /tmp/notes.txt") + assert paths == [] + + def test_path_with_spaces_not_matched(self): + """Paths with spaces are intentionally not matched (avoids false positives).""" + paths, _ = _extract("File at /tmp/my file.png here") + assert paths == [] + + def test_windows_path_not_matched(self): + """Windows-style paths should not match.""" + paths, _ = _extract("See C:\\Users\\test\\image.png") + assert paths == [] + + def test_relative_path_not_matched(self): + """Relative paths like ./image.png should not match.""" + paths, _ = _extract("File at ./screenshots/image.png here") + assert paths == [] + + def test_bare_filename_not_matched(self): + """Just 'image.png' without a path should not match.""" + paths, _ = _extract("Open image.png to see") + assert paths == [] + + def test_path_followed_by_punctuation(self): + """Path followed by comma, period, paren should still match.""" + for suffix in [",", ".", ")", ":", ";"]: + text = f"See /tmp/img.png{suffix} details" + paths, _ = _extract(text) + assert len(paths) == 1, f"Failed with suffix '{suffix}'" + + def test_path_in_parentheses(self): + paths, _ = _extract("(see /tmp/img.png)") + assert paths == ["/tmp/img.png"] + + def test_path_in_quotes(self): + paths, _ = _extract('The file is "/tmp/img.png" right here') + assert paths == ["/tmp/img.png"] + + def test_deep_nested_path(self): + paths, _ = _extract("At /a/b/c/d/e/f/g/h/image.png end") + assert paths == ["/a/b/c/d/e/f/g/h/image.png"] + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])