fix(telegram): aggregate split text messages before dispatching (#1674)

When a user sends a long message, Telegram clients split it into
multiple updates that arrive within milliseconds of each other.
Previously each chunk was dispatched independently — the first would
start the agent, and subsequent chunks would interrupt or queue as
separate turns, causing the agent to only see part of the message.

Add text message batching to TelegramAdapter following the same pattern
as the existing photo burst batching:

- _enqueue_text_event() buffers text by session key, concatenating
  chunks that arrive in rapid succession
- _flush_text_batch() dispatches the combined message after a 0.6s
  quiet period (configurable via HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS)
- Timer resets on each new chunk, so all parts of a split arrive
  before the batch is dispatched

Reported by NulledVector on Discord.
This commit is contained in:
Teknium
2026-03-17 02:49:57 -07:00
committed by GitHub
parent 7042a748f5
commit d156942419
2 changed files with 196 additions and 3 deletions

View File

@@ -118,6 +118,11 @@ class TelegramAdapter(BasePlatformAdapter):
self._pending_photo_batch_tasks: Dict[str, asyncio.Task] = {}
self._media_group_events: Dict[str, MessageEvent] = {}
self._media_group_tasks: Dict[str, asyncio.Task] = {}
# Buffer rapid text messages so Telegram client-side splits of long
# messages are aggregated into a single MessageEvent.
self._text_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", "0.6"))
self._pending_text_batches: Dict[str, MessageEvent] = {}
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
self._token_lock_identity: Optional[str] = None
self._polling_error_task: Optional[asyncio.Task] = None
@@ -795,12 +800,17 @@ class TelegramAdapter(BasePlatformAdapter):
return text
async def _handle_text_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming text messages."""
"""Handle incoming text messages.
Telegram clients split long messages into multiple updates. Buffer
rapid successive text messages from the same user/chat and aggregate
them into a single MessageEvent before dispatching.
"""
if not update.message or not update.message.text:
return
event = self._build_message_event(update.message, MessageType.TEXT)
await self.handle_message(event)
self._enqueue_text_event(event)
async def _handle_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming command messages."""
@@ -845,6 +855,68 @@ class TelegramAdapter(BasePlatformAdapter):
event.text = "\n".join(parts)
await self.handle_message(event)
# ------------------------------------------------------------------
# Text message aggregation (handles Telegram client-side splits)
# ------------------------------------------------------------------
def _text_batch_key(self, event: MessageEvent) -> str:
"""Session-scoped key for text message batching."""
from gateway.session import build_session_key
return build_session_key(
event.source,
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
)
def _enqueue_text_event(self, event: MessageEvent) -> None:
"""Buffer a text event and reset the flush timer.
When Telegram splits a long user message into multiple updates,
they arrive within a few hundred milliseconds. This method
concatenates them and waits for a short quiet period before
dispatching the combined message.
"""
key = self._text_batch_key(event)
existing = self._pending_text_batches.get(key)
if existing is None:
self._pending_text_batches[key] = event
else:
# Append text from the follow-up chunk
if event.text:
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
# Merge any media that might be attached
if event.media_urls:
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
# Cancel any pending flush and restart the timer
prior_task = self._pending_text_batch_tasks.get(key)
if prior_task and not prior_task.done():
prior_task.cancel()
self._pending_text_batch_tasks[key] = asyncio.create_task(
self._flush_text_batch(key)
)
async def _flush_text_batch(self, key: str) -> None:
"""Wait for the quiet period then dispatch the aggregated text."""
current_task = asyncio.current_task()
try:
await asyncio.sleep(self._text_batch_delay_seconds)
event = self._pending_text_batches.pop(key, None)
if not event:
return
logger.info(
"[Telegram] Flushing text batch %s (%d chars)",
key, len(event.text or ""),
)
await self.handle_message(event)
finally:
if self._pending_text_batch_tasks.get(key) is current_task:
self._pending_text_batch_tasks.pop(key, None)
# ------------------------------------------------------------------
# Photo batching
# ------------------------------------------------------------------
def _photo_batch_key(self, event: MessageEvent, msg: Message) -> str:
"""Return a batching key for Telegram photos/albums."""
from gateway.session import build_session_key

View File

@@ -0,0 +1,121 @@
"""Tests for Telegram text message aggregation.
When a user sends a long message, Telegram clients split it into multiple
updates. The TelegramAdapter should buffer rapid successive text messages
from the same session and aggregate them before dispatching.
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType, SessionSource
def _make_adapter():
"""Create a minimal TelegramAdapter for testing text batching."""
from gateway.platforms.telegram import TelegramAdapter
config = PlatformConfig(enabled=True, token="test-token")
adapter = object.__new__(TelegramAdapter)
adapter._platform = Platform.TELEGRAM
adapter.config = config
adapter._pending_text_batches = {}
adapter._pending_text_batch_tasks = {}
adapter._text_batch_delay_seconds = 0.1 # fast for tests
adapter._active_sessions = {}
adapter._pending_messages = {}
adapter._message_handler = AsyncMock()
adapter.handle_message = AsyncMock()
return adapter
def _make_event(text: str, chat_id: str = "12345") -> MessageEvent:
return MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"),
)
class TestTextBatching:
@pytest.mark.asyncio
async def test_single_message_dispatched_after_delay(self):
adapter = _make_adapter()
event = _make_event("hello world")
adapter._enqueue_text_event(event)
# Not dispatched yet
adapter.handle_message.assert_not_called()
# Wait for flush
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
dispatched = adapter.handle_message.call_args[0][0]
assert dispatched.text == "hello world"
@pytest.mark.asyncio
async def test_split_messages_aggregated(self):
"""Two rapid messages from the same chat should be merged."""
adapter = _make_adapter()
adapter._enqueue_text_event(_make_event("This is part one of a long"))
await asyncio.sleep(0.02) # small gap, within batch window
adapter._enqueue_text_event(_make_event("message that was split by Telegram."))
# Not dispatched yet (timer restarted)
adapter.handle_message.assert_not_called()
# Wait for flush
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
dispatched = adapter.handle_message.call_args[0][0]
assert "part one" in dispatched.text
assert "split by Telegram" in dispatched.text
@pytest.mark.asyncio
async def test_three_way_split_aggregated(self):
"""Three rapid messages should all merge."""
adapter = _make_adapter()
adapter._enqueue_text_event(_make_event("chunk 1"))
await asyncio.sleep(0.02)
adapter._enqueue_text_event(_make_event("chunk 2"))
await asyncio.sleep(0.02)
adapter._enqueue_text_event(_make_event("chunk 3"))
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
text = adapter.handle_message.call_args[0][0].text
assert "chunk 1" in text
assert "chunk 2" in text
assert "chunk 3" in text
@pytest.mark.asyncio
async def test_different_chats_not_merged(self):
"""Messages from different chats should be separate batches."""
adapter = _make_adapter()
adapter._enqueue_text_event(_make_event("from user A", chat_id="111"))
adapter._enqueue_text_event(_make_event("from user B", chat_id="222"))
await asyncio.sleep(0.2)
assert adapter.handle_message.call_count == 2
@pytest.mark.asyncio
async def test_batch_cleans_up_after_flush(self):
"""After flushing, internal state should be clean."""
adapter = _make_adapter()
adapter._enqueue_text_event(_make_event("test"))
await asyncio.sleep(0.2)
assert len(adapter._pending_text_batches) == 0
assert len(adapter._pending_text_batch_tasks) == 0