fix(gateway): prevent telegram photo burst interrupts

This commit is contained in:
CoinDegen
2026-03-15 11:58:19 +05:30
committed by teknium1
parent db3e3aa6c5
commit 4ae1334287
5 changed files with 198 additions and 11 deletions

View File

@@ -288,6 +288,7 @@ class MessageEvent:
message_id: Optional[str] = None message_id: Optional[str] = None
# Media attachments # Media attachments
# media_urls: local file paths (for vision tool access)
media_urls: List[str] = field(default_factory=list) media_urls: List[str] = field(default_factory=list)
media_types: List[str] = field(default_factory=list) media_types: List[str] = field(default_factory=list)
@@ -751,7 +752,25 @@ class BasePlatformAdapter(ABC):
# Check if there's already an active handler for this session # Check if there's already an active handler for this session
if session_key in self._active_sessions: if session_key in self._active_sessions:
# Store this as a pending message - it will interrupt the running agent # Special case: photo bursts/albums frequently arrive as multiple near-
# simultaneous messages. Queue them without interrupting the active run,
# then process them immediately after the current task finishes.
if event.message_type == MessageType.PHOTO:
print(f"[{self.name}] 🖼️ Queuing photo follow-up for session {session_key} without interrupt")
existing = self._pending_messages.get(session_key)
if existing and existing.message_type == MessageType.PHOTO:
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
if event.text:
if not existing.text:
existing.text = event.text
elif event.text not in existing.text:
existing.text = f"{existing.text}\n\n{event.text}".strip()
else:
self._pending_messages[session_key] = event
return # Don't interrupt now - will run after current task completes
# Default behavior for non-photo follow-ups: interrupt the running agent
print(f"[{self.name}] ⚡ New message while session {session_key} is active - triggering interrupt") print(f"[{self.name}] ⚡ New message while session {session_key} is active - triggering interrupt")
self._pending_messages[session_key] = event self._pending_messages[session_key] = event
# Signal the interrupt (the processing task checks this) # Signal the interrupt (the processing task checks this)

View File

@@ -111,6 +111,11 @@ class TelegramAdapter(BasePlatformAdapter):
super().__init__(config, Platform.TELEGRAM) super().__init__(config, Platform.TELEGRAM)
self._app: Optional[Application] = None self._app: Optional[Application] = None
self._bot: Optional[Bot] = None self._bot: Optional[Bot] = None
# Buffer rapid/album photo updates so Telegram image bursts are handled
# as a single MessageEvent instead of self-interrupting multiple turns.
self._media_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_MEDIA_BATCH_DELAY_SECONDS", "0.8"))
self._pending_photo_batches: Dict[str, MessageEvent] = {}
self._pending_photo_batch_tasks: Dict[str, asyncio.Task] = {}
self._media_group_events: Dict[str, MessageEvent] = {} self._media_group_events: Dict[str, MessageEvent] = {}
self._media_group_tasks: Dict[str, asyncio.Task] = {} self._media_group_tasks: Dict[str, asyncio.Task] = {}
self._token_lock_identity: Optional[str] = None self._token_lock_identity: Optional[str] = None
@@ -289,13 +294,19 @@ class TelegramAdapter(BasePlatformAdapter):
release_scoped_lock("telegram-bot-token", self._token_lock_identity) release_scoped_lock("telegram-bot-token", self._token_lock_identity)
except Exception as e: except Exception as e:
logger.warning("[%s] Error releasing Telegram token lock: %s", self.name, e, exc_info=True) logger.warning("[%s] Error releasing Telegram token lock: %s", self.name, e, exc_info=True)
for task in self._pending_photo_batch_tasks.values():
if task and not task.done():
task.cancel()
self._pending_photo_batch_tasks.clear()
self._pending_photo_batches.clear()
self._mark_disconnected() self._mark_disconnected()
self._app = None self._app = None
self._bot = None self._bot = None
self._token_lock_identity = None self._token_lock_identity = None
logger.info("[%s] Disconnected from Telegram", self.name) logger.info("[%s] Disconnected from Telegram", self.name)
async def send( async def send(
self, self,
chat_id: str, chat_id: str,
@@ -807,6 +818,49 @@ class TelegramAdapter(BasePlatformAdapter):
event.text = "\n".join(parts) event.text = "\n".join(parts)
await self.handle_message(event) await self.handle_message(event)
def _photo_batch_key(self, event: MessageEvent, msg: Message) -> str:
"""Return a batching key for Telegram photos/albums."""
from gateway.session import build_session_key
session_key = build_session_key(event.source)
media_group_id = getattr(msg, "media_group_id", None)
if media_group_id:
return f"{session_key}:album:{media_group_id}"
return f"{session_key}:photo-burst"
async def _flush_photo_batch(self, batch_key: str) -> None:
"""Send a buffered photo burst/album as a single MessageEvent."""
current_task = asyncio.current_task()
try:
await asyncio.sleep(self._media_batch_delay_seconds)
event = self._pending_photo_batches.pop(batch_key, None)
if not event:
return
logger.info("[Telegram] Flushing photo batch %s with %d image(s)", batch_key, len(event.media_urls))
await self.handle_message(event)
finally:
if self._pending_photo_batch_tasks.get(batch_key) is current_task:
self._pending_photo_batch_tasks.pop(batch_key, None)
def _enqueue_photo_event(self, batch_key: str, event: MessageEvent) -> None:
"""Merge photo events into a pending batch and schedule flush."""
existing = self._pending_photo_batches.get(batch_key)
if existing is None:
self._pending_photo_batches[batch_key] = event
else:
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
if event.text:
if not existing.text:
existing.text = event.text
elif event.text not in existing.text:
existing.text = f"{existing.text}\n\n{event.text}".strip()
prior_task = self._pending_photo_batch_tasks.get(batch_key)
if prior_task and not prior_task.done():
prior_task.cancel()
self._pending_photo_batch_tasks[batch_key] = asyncio.create_task(self._flush_photo_batch(batch_key))
async def _handle_media_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def _handle_media_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming media messages, downloading images to local cache.""" """Handle incoming media messages, downloading images to local cache."""
if not update.message: if not update.message:
@@ -858,14 +912,22 @@ class TelegramAdapter(BasePlatformAdapter):
if file_obj.file_path.lower().endswith(candidate): if file_obj.file_path.lower().endswith(candidate):
ext = candidate ext = candidate
break break
# Save to cache and populate media_urls with the local path # Save to local cache (for vision tool access)
cached_path = cache_image_from_bytes(bytes(image_bytes), ext=ext) cached_path = cache_image_from_bytes(bytes(image_bytes), ext=ext)
event.media_urls = [cached_path] event.media_urls = [cached_path]
event.media_types = [f"image/{ext.lstrip('.')}"] event.media_types = [f"image/{ext.lstrip('.')}" ]
logger.info("[Telegram] Cached user photo at %s", cached_path) logger.info("[Telegram] Cached user photo at %s", cached_path)
media_group_id = getattr(msg, "media_group_id", None)
if media_group_id:
await self._queue_media_group_event(str(media_group_id), event)
else:
batch_key = self._photo_batch_key(event, msg)
self._enqueue_photo_event(batch_key, event)
return
except Exception as e: except Exception as e:
logger.warning("[Telegram] Failed to cache photo: %s", e, exc_info=True) logger.warning("[Telegram] Failed to cache photo: %s", e, exc_info=True)
# Download voice/audio messages to cache for STT transcription # Download voice/audio messages to cache for STT transcription
if msg.voice: if msg.voice:
try: try:

View File

@@ -1095,11 +1095,36 @@ class GatewayRunner:
) )
return None return None
# PRIORITY: If an agent is already running for this session, interrupt it # PRIORITY handling when an agent is already running for this session.
# immediately. This is before command parsing to minimize latency -- the # Default behavior is to interrupt immediately so user text/stop messages
# user's "stop" message reaches the agent as fast as possible. # are handled with minimal latency.
#
# Special case: Telegram/photo bursts often arrive as multiple near-
# simultaneous updates. Do NOT interrupt for photo-only follow-ups here;
# let the adapter-level batching/queueing logic absorb them.
_quick_key = build_session_key(source) _quick_key = build_session_key(source)
if _quick_key in self._running_agents: if _quick_key in self._running_agents:
if event.message_type == MessageType.PHOTO:
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20])
adapter = self.adapters.get(source.platform)
if adapter:
# Reuse adapter queue semantics so photo bursts merge cleanly.
if _quick_key in adapter._pending_messages:
existing = adapter._pending_messages[_quick_key]
if getattr(existing, "message_type", None) == MessageType.PHOTO:
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
if event.text:
if not existing.text:
existing.text = event.text
elif event.text not in existing.text:
existing.text = f"{existing.text}\n\n{event.text}".strip()
else:
adapter._pending_messages[_quick_key] = event
else:
adapter._pending_messages[_quick_key] = event
return None
running_agent = self._running_agents[_quick_key] running_agent = self._running_agents[_quick_key]
logger.debug("PRIORITY interrupt for session %s", _quick_key[:20]) logger.debug("PRIORITY interrupt for session %s", _quick_key[:20])
running_agent.interrupt(event.text) running_agent.interrupt(event.text)
@@ -3490,9 +3515,13 @@ class GatewayRunner:
1. Immediately understand what the user sent (no extra tool call). 1. Immediately understand what the user sent (no extra tool call).
2. Re-examine the image with vision_analyze if it needs more detail. 2. Re-examine the image with vision_analyze if it needs more detail.
Athabasca persistence should happen through Athabasca's own POST
/api/uploads flow, using the returned asset.publicUrl rather than local
cache paths.
Args: Args:
user_text: The user's original caption / message text. user_text: The user's original caption / message text.
image_paths: List of local file paths to cached images. image_paths: List of local file paths to cached images.
Returns: Returns:
The enriched message string with vision descriptions prepended. The enriched message string with vision descriptions prepended.
@@ -3517,10 +3546,16 @@ class GatewayRunner:
result = _json.loads(result_json) result = _json.loads(result_json)
if result.get("success"): if result.get("success"):
description = result.get("analysis", "") description = result.get("analysis", "")
athabasca_note = (
"\n[If this image needs to persist in Athabasca state, upload the cached file "
"through Athabasca POST /api/uploads and use the returned asset.publicUrl. "
"Do not store the local cache path as the canonical imageUrl.]"
)
enriched_parts.append( enriched_parts.append(
f"[The user sent an image~ Here's what I can see:\n{description}]\n" f"[The user sent an image~ Here's what I can see:\n{description}]\n"
f"[If you need a closer look, use vision_analyze with " f"[If you need a closer look, use vision_analyze with "
f"image_url: {path} ~]" f"image_url: {path} ~]"
f"{athabasca_note}"
) )
else: else:
enriched_parts.append( enriched_parts.append(

View File

@@ -0,0 +1,25 @@
from unittest.mock import patch
import pytest
@pytest.mark.asyncio
async def test_image_enrichment_uses_athabasca_upload_guidance_without_stale_r2_warning():
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
with patch(
"tools.vision_tools.vision_analyze_tool",
return_value='{"success": true, "analysis": "A painted serpent warrior."}',
):
enriched = await runner._enrich_message_with_vision(
"caption",
["/tmp/test.jpg"],
)
assert "R2 not configured" not in enriched
assert "Gateway media URL available for reference" not in enriched
assert "POST /api/uploads" in enriched
assert "Do not store the local cache path" in enriched
assert "caption" in enriched

View File

@@ -12,6 +12,7 @@ import asyncio
import importlib import importlib
import os import os
import sys import sys
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
@@ -537,6 +538,51 @@ class TestSendDocument:
assert call_kwargs["reply_to_message_id"] == 50 assert call_kwargs["reply_to_message_id"] == 50
class TestTelegramPhotoBatching:
@pytest.mark.asyncio
async def test_flush_photo_batch_does_not_drop_newer_scheduled_task(self, adapter):
old_task = MagicMock()
new_task = MagicMock()
batch_key = "session:photo-burst"
adapter._pending_photo_batch_tasks[batch_key] = new_task
adapter._pending_photo_batches[batch_key] = MessageEvent(
text="",
message_type=MessageType.PHOTO,
source=SimpleNamespace(channel_id="chat-1"),
media_urls=["/tmp/a.jpg"],
media_types=["image/jpeg"],
)
with (
patch("gateway.platforms.telegram.asyncio.current_task", return_value=old_task),
patch("gateway.platforms.telegram.asyncio.sleep", new=AsyncMock()),
):
await adapter._flush_photo_batch(batch_key)
assert adapter._pending_photo_batch_tasks[batch_key] is new_task
@pytest.mark.asyncio
async def test_disconnect_cancels_pending_photo_batch_tasks(self, adapter):
task = MagicMock()
task.done.return_value = False
adapter._pending_photo_batch_tasks["session:photo-burst"] = task
adapter._pending_photo_batches["session:photo-burst"] = MessageEvent(
text="",
message_type=MessageType.PHOTO,
source=SimpleNamespace(channel_id="chat-1"),
)
adapter._app = MagicMock()
adapter._app.updater.stop = AsyncMock()
adapter._app.stop = AsyncMock()
adapter._app.shutdown = AsyncMock()
await adapter.disconnect()
task.cancel.assert_called_once()
assert adapter._pending_photo_batch_tasks == {}
assert adapter._pending_photo_batches == {}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# TestSendVideo — outbound video delivery # TestSendVideo — outbound video delivery
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------