From 4ae1334287a575c7ef079e7e38ab659ecebfebe2 Mon Sep 17 00:00:00 2001 From: CoinDegen Date: Sun, 15 Mar 2026 11:58:19 +0530 Subject: [PATCH] fix(gateway): prevent telegram photo burst interrupts --- gateway/platforms/base.py | 21 ++++++- gateway/platforms/telegram.py | 72 ++++++++++++++++++++++-- gateway/run.py | 45 +++++++++++++-- tests/gateway/test_image_enrichment.py | 25 ++++++++ tests/gateway/test_telegram_documents.py | 46 +++++++++++++++ 5 files changed, 198 insertions(+), 11 deletions(-) create mode 100644 tests/gateway/test_image_enrichment.py diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index e523d9390..91ac5d30c 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -288,6 +288,7 @@ class MessageEvent: message_id: Optional[str] = None # Media attachments + # media_urls: local file paths (for vision tool access) media_urls: List[str] = field(default_factory=list) media_types: List[str] = field(default_factory=list) @@ -751,7 +752,25 @@ class BasePlatformAdapter(ABC): # Check if there's already an active handler for this session if session_key in self._active_sessions: - # Store this as a pending message - it will interrupt the running agent + # Special case: photo bursts/albums frequently arrive as multiple near- + # simultaneous messages. Queue them without interrupting the active run, + # then process them immediately after the current task finishes. + if event.message_type == MessageType.PHOTO: + print(f"[{self.name}] 🖼️ Queuing photo follow-up for session {session_key} without interrupt") + existing = self._pending_messages.get(session_key) + if existing and existing.message_type == MessageType.PHOTO: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + if event.text: + if not existing.text: + existing.text = event.text + elif event.text not in existing.text: + existing.text = f"{existing.text}\n\n{event.text}".strip() + else: + self._pending_messages[session_key] = event + return # Don't interrupt now - will run after current task completes + + # Default behavior for non-photo follow-ups: interrupt the running agent print(f"[{self.name}] ⚡ New message while session {session_key} is active - triggering interrupt") self._pending_messages[session_key] = event # Signal the interrupt (the processing task checks this) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 790061eca..7d289a0a4 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -111,6 +111,11 @@ class TelegramAdapter(BasePlatformAdapter): super().__init__(config, Platform.TELEGRAM) self._app: Optional[Application] = None self._bot: Optional[Bot] = None + # Buffer rapid/album photo updates so Telegram image bursts are handled + # as a single MessageEvent instead of self-interrupting multiple turns. + self._media_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_MEDIA_BATCH_DELAY_SECONDS", "0.8")) + self._pending_photo_batches: Dict[str, MessageEvent] = {} + self._pending_photo_batch_tasks: Dict[str, asyncio.Task] = {} self._media_group_events: Dict[str, MessageEvent] = {} self._media_group_tasks: Dict[str, asyncio.Task] = {} self._token_lock_identity: Optional[str] = None @@ -289,13 +294,19 @@ class TelegramAdapter(BasePlatformAdapter): release_scoped_lock("telegram-bot-token", self._token_lock_identity) except Exception as e: logger.warning("[%s] Error releasing Telegram token lock: %s", self.name, e, exc_info=True) - + + for task in self._pending_photo_batch_tasks.values(): + if task and not task.done(): + task.cancel() + self._pending_photo_batch_tasks.clear() + self._pending_photo_batches.clear() + self._mark_disconnected() self._app = None self._bot = None self._token_lock_identity = None logger.info("[%s] Disconnected from Telegram", self.name) - + async def send( self, chat_id: str, @@ -807,6 +818,49 @@ class TelegramAdapter(BasePlatformAdapter): event.text = "\n".join(parts) await self.handle_message(event) + def _photo_batch_key(self, event: MessageEvent, msg: Message) -> str: + """Return a batching key for Telegram photos/albums.""" + from gateway.session import build_session_key + session_key = build_session_key(event.source) + media_group_id = getattr(msg, "media_group_id", None) + if media_group_id: + return f"{session_key}:album:{media_group_id}" + return f"{session_key}:photo-burst" + + async def _flush_photo_batch(self, batch_key: str) -> None: + """Send a buffered photo burst/album as a single MessageEvent.""" + current_task = asyncio.current_task() + try: + await asyncio.sleep(self._media_batch_delay_seconds) + event = self._pending_photo_batches.pop(batch_key, None) + if not event: + return + logger.info("[Telegram] Flushing photo batch %s with %d image(s)", batch_key, len(event.media_urls)) + await self.handle_message(event) + finally: + if self._pending_photo_batch_tasks.get(batch_key) is current_task: + self._pending_photo_batch_tasks.pop(batch_key, None) + + def _enqueue_photo_event(self, batch_key: str, event: MessageEvent) -> None: + """Merge photo events into a pending batch and schedule flush.""" + existing = self._pending_photo_batches.get(batch_key) + if existing is None: + self._pending_photo_batches[batch_key] = event + else: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + if event.text: + if not existing.text: + existing.text = event.text + elif event.text not in existing.text: + existing.text = f"{existing.text}\n\n{event.text}".strip() + + prior_task = self._pending_photo_batch_tasks.get(batch_key) + if prior_task and not prior_task.done(): + prior_task.cancel() + + self._pending_photo_batch_tasks[batch_key] = asyncio.create_task(self._flush_photo_batch(batch_key)) + async def _handle_media_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle incoming media messages, downloading images to local cache.""" if not update.message: @@ -858,14 +912,22 @@ class TelegramAdapter(BasePlatformAdapter): if file_obj.file_path.lower().endswith(candidate): ext = candidate break - # Save to cache and populate media_urls with the local path + # Save to local cache (for vision tool access) cached_path = cache_image_from_bytes(bytes(image_bytes), ext=ext) event.media_urls = [cached_path] - event.media_types = [f"image/{ext.lstrip('.')}"] + event.media_types = [f"image/{ext.lstrip('.')}" ] logger.info("[Telegram] Cached user photo at %s", cached_path) + media_group_id = getattr(msg, "media_group_id", None) + if media_group_id: + await self._queue_media_group_event(str(media_group_id), event) + else: + batch_key = self._photo_batch_key(event, msg) + self._enqueue_photo_event(batch_key, event) + return + except Exception as e: logger.warning("[Telegram] Failed to cache photo: %s", e, exc_info=True) - + # Download voice/audio messages to cache for STT transcription if msg.voice: try: diff --git a/gateway/run.py b/gateway/run.py index 9cdfe7788..8508e0f8a 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1095,11 +1095,36 @@ class GatewayRunner: ) return None - # PRIORITY: If an agent is already running for this session, interrupt it - # immediately. This is before command parsing to minimize latency -- the - # user's "stop" message reaches the agent as fast as possible. + # PRIORITY handling when an agent is already running for this session. + # Default behavior is to interrupt immediately so user text/stop messages + # are handled with minimal latency. + # + # Special case: Telegram/photo bursts often arrive as multiple near- + # simultaneous updates. Do NOT interrupt for photo-only follow-ups here; + # let the adapter-level batching/queueing logic absorb them. _quick_key = build_session_key(source) if _quick_key in self._running_agents: + if event.message_type == MessageType.PHOTO: + logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20]) + adapter = self.adapters.get(source.platform) + if adapter: + # Reuse adapter queue semantics so photo bursts merge cleanly. + if _quick_key in adapter._pending_messages: + existing = adapter._pending_messages[_quick_key] + if getattr(existing, "message_type", None) == MessageType.PHOTO: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + if event.text: + if not existing.text: + existing.text = event.text + elif event.text not in existing.text: + existing.text = f"{existing.text}\n\n{event.text}".strip() + else: + adapter._pending_messages[_quick_key] = event + else: + adapter._pending_messages[_quick_key] = event + return None + running_agent = self._running_agents[_quick_key] logger.debug("PRIORITY interrupt for session %s", _quick_key[:20]) running_agent.interrupt(event.text) @@ -3490,9 +3515,13 @@ class GatewayRunner: 1. Immediately understand what the user sent (no extra tool call). 2. Re-examine the image with vision_analyze if it needs more detail. + Athabasca persistence should happen through Athabasca's own POST + /api/uploads flow, using the returned asset.publicUrl rather than local + cache paths. + Args: - user_text: The user's original caption / message text. - image_paths: List of local file paths to cached images. + user_text: The user's original caption / message text. + image_paths: List of local file paths to cached images. Returns: The enriched message string with vision descriptions prepended. @@ -3517,10 +3546,16 @@ class GatewayRunner: result = _json.loads(result_json) if result.get("success"): description = result.get("analysis", "") + athabasca_note = ( + "\n[If this image needs to persist in Athabasca state, upload the cached file " + "through Athabasca POST /api/uploads and use the returned asset.publicUrl. " + "Do not store the local cache path as the canonical imageUrl.]" + ) enriched_parts.append( f"[The user sent an image~ Here's what I can see:\n{description}]\n" f"[If you need a closer look, use vision_analyze with " f"image_url: {path} ~]" + f"{athabasca_note}" ) else: enriched_parts.append( diff --git a/tests/gateway/test_image_enrichment.py b/tests/gateway/test_image_enrichment.py new file mode 100644 index 000000000..d3c7b72c8 --- /dev/null +++ b/tests/gateway/test_image_enrichment.py @@ -0,0 +1,25 @@ +from unittest.mock import patch + +import pytest + + +@pytest.mark.asyncio +async def test_image_enrichment_uses_athabasca_upload_guidance_without_stale_r2_warning(): + from gateway.run import GatewayRunner + + runner = object.__new__(GatewayRunner) + + with patch( + "tools.vision_tools.vision_analyze_tool", + return_value='{"success": true, "analysis": "A painted serpent warrior."}', + ): + enriched = await runner._enrich_message_with_vision( + "caption", + ["/tmp/test.jpg"], + ) + + assert "R2 not configured" not in enriched + assert "Gateway media URL available for reference" not in enriched + assert "POST /api/uploads" in enriched + assert "Do not store the local cache path" in enriched + assert "caption" in enriched diff --git a/tests/gateway/test_telegram_documents.py b/tests/gateway/test_telegram_documents.py index 5e3e6f94d..e26b4a6a4 100644 --- a/tests/gateway/test_telegram_documents.py +++ b/tests/gateway/test_telegram_documents.py @@ -12,6 +12,7 @@ import asyncio import importlib import os import sys +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -537,6 +538,51 @@ class TestSendDocument: assert call_kwargs["reply_to_message_id"] == 50 +class TestTelegramPhotoBatching: + @pytest.mark.asyncio + async def test_flush_photo_batch_does_not_drop_newer_scheduled_task(self, adapter): + old_task = MagicMock() + new_task = MagicMock() + batch_key = "session:photo-burst" + adapter._pending_photo_batch_tasks[batch_key] = new_task + adapter._pending_photo_batches[batch_key] = MessageEvent( + text="", + message_type=MessageType.PHOTO, + source=SimpleNamespace(channel_id="chat-1"), + media_urls=["/tmp/a.jpg"], + media_types=["image/jpeg"], + ) + + with ( + patch("gateway.platforms.telegram.asyncio.current_task", return_value=old_task), + patch("gateway.platforms.telegram.asyncio.sleep", new=AsyncMock()), + ): + await adapter._flush_photo_batch(batch_key) + + assert adapter._pending_photo_batch_tasks[batch_key] is new_task + + @pytest.mark.asyncio + async def test_disconnect_cancels_pending_photo_batch_tasks(self, adapter): + task = MagicMock() + task.done.return_value = False + adapter._pending_photo_batch_tasks["session:photo-burst"] = task + adapter._pending_photo_batches["session:photo-burst"] = MessageEvent( + text="", + message_type=MessageType.PHOTO, + source=SimpleNamespace(channel_id="chat-1"), + ) + adapter._app = MagicMock() + adapter._app.updater.stop = AsyncMock() + adapter._app.stop = AsyncMock() + adapter._app.shutdown = AsyncMock() + + await adapter.disconnect() + + task.cancel.assert_called_once() + assert adapter._pending_photo_batch_tasks == {} + assert adapter._pending_photo_batches == {} + + # --------------------------------------------------------------------------- # TestSendVideo — outbound video delivery # ---------------------------------------------------------------------------