Files
hermes-agent/tests/gateway/test_telegram_photo_interrupts.py
teknium1 fef710aca8 test(gateway): cover photo burst interrupt regressions
Add regression coverage for non-album Telegram photo burst batching, photo follow-ups that should queue without interrupting active runs, and the gateway priority-interrupt path for photo events.
2026-03-15 03:50:45 -07:00

50 lines
1.5 KiB
Python

import asyncio
from unittest.mock import MagicMock
import pytest
from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource, build_session_key
from gateway.run import GatewayRunner
class _PendingAdapter:
def __init__(self):
self._pending_messages = {}
def _make_runner():
runner = object.__new__(GatewayRunner)
runner.config = GatewayConfig(platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")})
runner.adapters = {Platform.TELEGRAM: _PendingAdapter()}
runner._running_agents = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner._voice_mode = {}
runner._is_user_authorized = lambda _source: True
return runner
@pytest.mark.asyncio
async def test_handle_message_does_not_priority_interrupt_photo_followup():
runner = _make_runner()
source = SessionSource(platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm")
session_key = build_session_key(source)
running_agent = MagicMock()
runner._running_agents[session_key] = running_agent
event = MessageEvent(
text="caption",
message_type=MessageType.PHOTO,
source=source,
media_urls=["/tmp/photo-a.jpg"],
media_types=["image/jpeg"],
)
result = await runner._handle_message(event)
assert result is None
running_agent.interrupt.assert_not_called()
assert runner.adapters[Platform.TELEGRAM]._pending_messages[session_key] is event