diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index b4ef75f8e..978c800f3 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -118,6 +118,11 @@ class TelegramAdapter(BasePlatformAdapter): self._pending_photo_batch_tasks: Dict[str, asyncio.Task] = {} self._media_group_events: Dict[str, MessageEvent] = {} self._media_group_tasks: Dict[str, asyncio.Task] = {} + # Buffer rapid text messages so Telegram client-side splits of long + # messages are aggregated into a single MessageEvent. + self._text_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", "0.6")) + self._pending_text_batches: Dict[str, MessageEvent] = {} + self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} self._token_lock_identity: Optional[str] = None self._polling_error_task: Optional[asyncio.Task] = None @@ -795,12 +800,17 @@ class TelegramAdapter(BasePlatformAdapter): return text async def _handle_text_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Handle incoming text messages.""" + """Handle incoming text messages. + + Telegram clients split long messages into multiple updates. Buffer + rapid successive text messages from the same user/chat and aggregate + them into a single MessageEvent before dispatching. + """ if not update.message or not update.message.text: return - + event = self._build_message_event(update.message, MessageType.TEXT) - await self.handle_message(event) + self._enqueue_text_event(event) async def _handle_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle incoming command messages.""" @@ -845,6 +855,68 @@ class TelegramAdapter(BasePlatformAdapter): event.text = "\n".join(parts) await self.handle_message(event) + # ------------------------------------------------------------------ + # Text message aggregation (handles Telegram client-side splits) + # ------------------------------------------------------------------ + + def _text_batch_key(self, event: MessageEvent) -> str: + """Session-scoped key for text message batching.""" + from gateway.session import build_session_key + return build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + ) + + def _enqueue_text_event(self, event: MessageEvent) -> None: + """Buffer a text event and reset the flush timer. + + When Telegram splits a long user message into multiple updates, + they arrive within a few hundred milliseconds. This method + concatenates them and waits for a short quiet period before + dispatching the combined message. + """ + key = self._text_batch_key(event) + existing = self._pending_text_batches.get(key) + if existing is None: + self._pending_text_batches[key] = event + else: + # Append text from the follow-up chunk + if event.text: + existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text + # Merge any media that might be attached + if event.media_urls: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + + # Cancel any pending flush and restart the timer + prior_task = self._pending_text_batch_tasks.get(key) + if prior_task and not prior_task.done(): + prior_task.cancel() + self._pending_text_batch_tasks[key] = asyncio.create_task( + self._flush_text_batch(key) + ) + + async def _flush_text_batch(self, key: str) -> None: + """Wait for the quiet period then dispatch the aggregated text.""" + current_task = asyncio.current_task() + try: + await asyncio.sleep(self._text_batch_delay_seconds) + event = self._pending_text_batches.pop(key, None) + if not event: + return + logger.info( + "[Telegram] Flushing text batch %s (%d chars)", + key, len(event.text or ""), + ) + await self.handle_message(event) + finally: + if self._pending_text_batch_tasks.get(key) is current_task: + self._pending_text_batch_tasks.pop(key, None) + + # ------------------------------------------------------------------ + # Photo batching + # ------------------------------------------------------------------ + def _photo_batch_key(self, event: MessageEvent, msg: Message) -> str: """Return a batching key for Telegram photos/albums.""" from gateway.session import build_session_key diff --git a/tests/gateway/test_telegram_text_batching.py b/tests/gateway/test_telegram_text_batching.py new file mode 100644 index 000000000..14c3f0dd6 --- /dev/null +++ b/tests/gateway/test_telegram_text_batching.py @@ -0,0 +1,121 @@ +"""Tests for Telegram text message aggregation. + +When a user sends a long message, Telegram clients split it into multiple +updates. The TelegramAdapter should buffer rapid successive text messages +from the same session and aggregate them before dispatching. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import MessageEvent, MessageType, SessionSource + + +def _make_adapter(): + """Create a minimal TelegramAdapter for testing text batching.""" + from gateway.platforms.telegram import TelegramAdapter + + config = PlatformConfig(enabled=True, token="test-token") + adapter = object.__new__(TelegramAdapter) + adapter._platform = Platform.TELEGRAM + adapter.config = config + adapter._pending_text_batches = {} + adapter._pending_text_batch_tasks = {} + adapter._text_batch_delay_seconds = 0.1 # fast for tests + adapter._active_sessions = {} + adapter._pending_messages = {} + adapter._message_handler = AsyncMock() + adapter.handle_message = AsyncMock() + return adapter + + +def _make_event(text: str, chat_id: str = "12345") -> MessageEvent: + return MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"), + ) + + +class TestTextBatching: + @pytest.mark.asyncio + async def test_single_message_dispatched_after_delay(self): + adapter = _make_adapter() + event = _make_event("hello world") + + adapter._enqueue_text_event(event) + + # Not dispatched yet + adapter.handle_message.assert_not_called() + + # Wait for flush + await asyncio.sleep(0.2) + + adapter.handle_message.assert_called_once() + dispatched = adapter.handle_message.call_args[0][0] + assert dispatched.text == "hello world" + + @pytest.mark.asyncio + async def test_split_messages_aggregated(self): + """Two rapid messages from the same chat should be merged.""" + adapter = _make_adapter() + + adapter._enqueue_text_event(_make_event("This is part one of a long")) + await asyncio.sleep(0.02) # small gap, within batch window + adapter._enqueue_text_event(_make_event("message that was split by Telegram.")) + + # Not dispatched yet (timer restarted) + adapter.handle_message.assert_not_called() + + # Wait for flush + await asyncio.sleep(0.2) + + adapter.handle_message.assert_called_once() + dispatched = adapter.handle_message.call_args[0][0] + assert "part one" in dispatched.text + assert "split by Telegram" in dispatched.text + + @pytest.mark.asyncio + async def test_three_way_split_aggregated(self): + """Three rapid messages should all merge.""" + adapter = _make_adapter() + + adapter._enqueue_text_event(_make_event("chunk 1")) + await asyncio.sleep(0.02) + adapter._enqueue_text_event(_make_event("chunk 2")) + await asyncio.sleep(0.02) + adapter._enqueue_text_event(_make_event("chunk 3")) + + await asyncio.sleep(0.2) + + adapter.handle_message.assert_called_once() + text = adapter.handle_message.call_args[0][0].text + assert "chunk 1" in text + assert "chunk 2" in text + assert "chunk 3" in text + + @pytest.mark.asyncio + async def test_different_chats_not_merged(self): + """Messages from different chats should be separate batches.""" + adapter = _make_adapter() + + adapter._enqueue_text_event(_make_event("from user A", chat_id="111")) + adapter._enqueue_text_event(_make_event("from user B", chat_id="222")) + + await asyncio.sleep(0.2) + + assert adapter.handle_message.call_count == 2 + + @pytest.mark.asyncio + async def test_batch_cleans_up_after_flush(self): + """After flushing, internal state should be clean.""" + adapter = _make_adapter() + + adapter._enqueue_text_event(_make_event("test")) + await asyncio.sleep(0.2) + + assert len(adapter._pending_text_batches) == 0 + assert len(adapter._pending_text_batch_tasks) == 0