From ff6d62802df8ac1fa289c1c653b2fb44348e0346 Mon Sep 17 00:00:00 2001
From: 0xbyt4 <35742124+0xbyt4@users.noreply.github.com>
Date: Sat, 28 Feb 2026 21:21:03 +0300
Subject: [PATCH] fix: platform base extract_images and truncate_message bugs +
tests
- extract_images: only remove extracted image tags from content, preserve
non-image markdown links (e.g. PDFs) that were previously silently lost
- truncate_message: walk only chunk_body (not prepended prefix) so the
reopened code fence does not toggle in_code off, leaving continuation
chunks with unclosed code blocks
- Add 49 unit tests covering MessageEvent command parsing, extract_images,
extract_media, truncate_message code block handling, and _get_human_delay
---
gateway/platforms/base.py | 16 +-
tests/gateway/test_platform_base.py | 347 ++++++++++++++++++++++++++++
2 files changed, 357 insertions(+), 6 deletions(-)
create mode 100644 tests/gateway/test_platform_base.py
diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py
index 2e818b4ea..9c0ebf18c 100644
--- a/gateway/platforms/base.py
+++ b/gateway/platforms/base.py
@@ -460,10 +460,14 @@ class BasePlatformAdapter(ABC):
url = match.group(1)
images.append((url, ""))
- # Remove matched image tags from content if we found images
+ # Remove only the matched image tags from content (not all markdown images)
if images:
- cleaned = re.sub(md_pattern, '', cleaned)
- cleaned = re.sub(html_pattern, '', cleaned)
+ extracted_urls = {url for url, _ in images}
+ def _remove_if_extracted(match):
+ url = match.group(2) if match.lastindex >= 2 else match.group(1)
+ return '' if url in extracted_urls else match.group(0)
+ cleaned = re.sub(md_pattern, _remove_if_extracted, cleaned)
+ cleaned = re.sub(html_pattern, _remove_if_extracted, cleaned)
# Clean up leftover blank lines
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned).strip()
@@ -798,11 +802,11 @@ class BasePlatformAdapter(ABC):
full_chunk = prefix + chunk_body
- # Walk the chunk line-by-line to determine whether we end
- # inside an open code block.
+ # Walk only the chunk_body (not the prefix we prepended) to
+ # determine whether we end inside an open code block.
in_code = carry_lang is not None
lang = carry_lang or ""
- for line in full_chunk.split("\n"):
+ for line in chunk_body.split("\n"):
stripped = line.strip()
if stripped.startswith("```"):
if in_code:
diff --git a/tests/gateway/test_platform_base.py b/tests/gateway/test_platform_base.py
new file mode 100644
index 000000000..b6745316e
--- /dev/null
+++ b/tests/gateway/test_platform_base.py
@@ -0,0 +1,347 @@
+"""Tests for gateway/platforms/base.py — MessageEvent, media extraction, message truncation."""
+
+import os
+from unittest.mock import patch
+
+from gateway.platforms.base import (
+ BasePlatformAdapter,
+ MessageEvent,
+ MessageType,
+)
+
+
+# ---------------------------------------------------------------------------
+# MessageEvent — command parsing
+# ---------------------------------------------------------------------------
+
+
+class TestMessageEventIsCommand:
+ def test_slash_command(self):
+ event = MessageEvent(text="/new")
+ assert event.is_command() is True
+
+ def test_regular_text(self):
+ event = MessageEvent(text="hello world")
+ assert event.is_command() is False
+
+ def test_empty_text(self):
+ event = MessageEvent(text="")
+ assert event.is_command() is False
+
+ def test_slash_only(self):
+ event = MessageEvent(text="/")
+ assert event.is_command() is True
+
+
+class TestMessageEventGetCommand:
+ def test_simple_command(self):
+ event = MessageEvent(text="/new")
+ assert event.get_command() == "new"
+
+ def test_command_with_args(self):
+ event = MessageEvent(text="/reset session")
+ assert event.get_command() == "reset"
+
+ def test_not_a_command(self):
+ event = MessageEvent(text="hello")
+ assert event.get_command() is None
+
+ def test_command_is_lowercased(self):
+ event = MessageEvent(text="/HELP")
+ assert event.get_command() == "help"
+
+ def test_slash_only_returns_empty(self):
+ event = MessageEvent(text="/")
+ assert event.get_command() == ""
+
+
+class TestMessageEventGetCommandArgs:
+ def test_command_with_args(self):
+ event = MessageEvent(text="/new session id 123")
+ assert event.get_command_args() == "session id 123"
+
+ def test_command_without_args(self):
+ event = MessageEvent(text="/new")
+ assert event.get_command_args() == ""
+
+ def test_not_a_command_returns_full_text(self):
+ event = MessageEvent(text="hello world")
+ assert event.get_command_args() == "hello world"
+
+
+# ---------------------------------------------------------------------------
+# extract_images
+# ---------------------------------------------------------------------------
+
+
+class TestExtractImages:
+ def test_no_images(self):
+ images, cleaned = BasePlatformAdapter.extract_images("Just regular text.")
+ assert images == []
+ assert cleaned == "Just regular text."
+
+ def test_markdown_image_with_image_ext(self):
+ content = "Here is a photo: "
+ images, cleaned = BasePlatformAdapter.extract_images(content)
+ assert len(images) == 1
+ assert images[0][0] == "https://example.com/cat.png"
+ assert images[0][1] == "cat"
+ assert "![cat]" not in cleaned
+
+ def test_markdown_image_jpg(self):
+ content = ""
+ images, _ = BasePlatformAdapter.extract_images(content)
+ assert len(images) == 1
+
+ def test_markdown_image_jpeg(self):
+ content = ""
+ images, _ = BasePlatformAdapter.extract_images(content)
+ assert len(images) == 1
+
+ def test_markdown_image_gif(self):
+ content = ""
+ images, _ = BasePlatformAdapter.extract_images(content)
+ assert len(images) == 1
+
+ def test_markdown_image_webp(self):
+ content = ""
+ images, _ = BasePlatformAdapter.extract_images(content)
+ assert len(images) == 1
+
+ def test_fal_media_cdn(self):
+ content = ""
+ images, _ = BasePlatformAdapter.extract_images(content)
+ assert len(images) == 1
+
+ def test_fal_cdn_url(self):
+ content = ""
+ images, _ = BasePlatformAdapter.extract_images(content)
+ assert len(images) == 1
+
+ def test_replicate_delivery(self):
+ content = ""
+ images, _ = BasePlatformAdapter.extract_images(content)
+ assert len(images) == 1
+
+ def test_non_image_ext_not_extracted(self):
+ """Markdown image with non-image extension should not be extracted."""
+ content = ""
+ images, cleaned = BasePlatformAdapter.extract_images(content)
+ assert images == []
+ assert "![doc]" in cleaned # Should be preserved
+
+ def test_html_img_tag(self):
+ content = 'Check this:
'
+ images, cleaned = BasePlatformAdapter.extract_images(content)
+ assert len(images) == 1
+ assert images[0][0] == "https://example.com/photo.png"
+ assert images[0][1] == "" # HTML images have no alt text
+ assert "
1
+
+ def test_chunks_have_indicators(self):
+ adapter = self._adapter()
+ msg = "word " * 200
+ chunks = adapter.truncate_message(msg, max_length=200)
+ assert "(1/" in chunks[0]
+ assert f"({len(chunks)}/{len(chunks)})" in chunks[-1]
+
+ def test_code_block_first_chunk_closed(self):
+ adapter = self._adapter()
+ msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter"
+ chunks = adapter.truncate_message(msg, max_length=300)
+ assert len(chunks) > 1
+ # First chunk must have a closing fence appended (code block was split)
+ first_fences = chunks[0].count("```")
+ assert first_fences == 2, "First chunk should have opening + closing fence"
+
+ def test_code_block_language_tag_carried(self):
+ adapter = self._adapter()
+ msg = "Start\n```javascript\n" + "console.log('x');\n" * 80 + "```\nEnd"
+ chunks = adapter.truncate_message(msg, max_length=300)
+ if len(chunks) > 1:
+ # At least one continuation chunk should reopen with ```javascript
+ reopened_with_lang = any(
+ "```javascript" in chunk for chunk in chunks[1:]
+ )
+ assert reopened_with_lang, "No continuation chunk reopened with language tag"
+
+ def test_continuation_chunks_have_balanced_fences(self):
+ """Regression: continuation chunks must close reopened code blocks."""
+ adapter = self._adapter()
+ msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter"
+ chunks = adapter.truncate_message(msg, max_length=300)
+ assert len(chunks) > 1
+ for i, chunk in enumerate(chunks):
+ fence_count = chunk.count("```")
+ assert fence_count % 2 == 0, (
+ f"Chunk {i} has unbalanced fences ({fence_count})"
+ )
+
+ def test_each_chunk_under_max_length(self):
+ adapter = self._adapter()
+ msg = "word " * 500
+ max_len = 200
+ chunks = adapter.truncate_message(msg, max_length=max_len)
+ for i, chunk in enumerate(chunks):
+ assert len(chunk) <= max_len + 20, f"Chunk {i} too long: {len(chunk)} > {max_len}"
+
+
+# ---------------------------------------------------------------------------
+# _get_human_delay
+# ---------------------------------------------------------------------------
+
+
+class TestGetHumanDelay:
+ def test_off_mode(self):
+ with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "off"}):
+ assert BasePlatformAdapter._get_human_delay() == 0.0
+
+ def test_default_is_off(self):
+ with patch.dict(os.environ, {}, clear=False):
+ os.environ.pop("HERMES_HUMAN_DELAY_MODE", None)
+ assert BasePlatformAdapter._get_human_delay() == 0.0
+
+ def test_natural_mode_range(self):
+ with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "natural"}):
+ delay = BasePlatformAdapter._get_human_delay()
+ assert 0.8 <= delay <= 2.5
+
+ def test_custom_mode_uses_env_vars(self):
+ env = {
+ "HERMES_HUMAN_DELAY_MODE": "custom",
+ "HERMES_HUMAN_DELAY_MIN_MS": "100",
+ "HERMES_HUMAN_DELAY_MAX_MS": "200",
+ }
+ with patch.dict(os.environ, env):
+ delay = BasePlatformAdapter._get_human_delay()
+ assert 0.1 <= delay <= 0.2