From 488a30e879d345de82e608cb319c528b731981d0 Mon Sep 17 00:00:00 2001 From: Teknium Date: Sat, 21 Mar 2026 07:11:06 -0700 Subject: [PATCH] fix(gateway): retry Telegram 409 polling conflicts before giving up MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A single Telegram 409 Conflict from getUpdates permanently killed Telegram polling with no recovery possible (retryable=False on first occurrence). This is too aggressive for production use with process supervisors. Transient 409s are expected during: - --replace handoffs where the old long-poll session lingers on Telegram servers for a few seconds after SIGTERM - systemd Restart=on-failure respawns that overlap with the dying instance cleanup Now _handle_polling_conflict() retries up to 3 times with a 10-second delay between attempts. The 30-second total retry window lets stale server-side sessions expire. If all retries fail, the error is still marked as permanently fatal — preserving the original protection against genuine dual-instance conflicts. Tests updated: split the single conflict test into two — one verifying retry on transient conflict, one verifying fatal after exhausted retries. Closes #2296 --- gateway/platforms/telegram.py | 46 +++++++++++- tests/gateway/test_telegram_conflict.py | 94 +++++++++++++++++++++++-- 2 files changed, 134 insertions(+), 6 deletions(-) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 9587298b4..cd836b030 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -129,6 +129,8 @@ class TelegramAdapter(BasePlatformAdapter): self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} self._token_lock_identity: Optional[str] = None self._polling_error_task: Optional[asyncio.Task] = None + self._polling_conflict_count: int = 0 + self._polling_error_callback_ref = None @staticmethod def _looks_like_polling_conflict(error: Exception) -> bool: @@ -142,10 +144,49 @@ class TelegramAdapter(BasePlatformAdapter): async def _handle_polling_conflict(self, error: Exception) -> None: if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict": return + # Track consecutive conflicts — transient 409s can occur when a + # previous gateway instance hasn't fully released its long-poll + # session on Telegram's server (e.g. during --replace handoffs or + # systemd Restart=on-failure respawns). Retry a few times before + # giving up, so the old session has time to expire. + self._polling_conflict_count += 1 + + MAX_CONFLICT_RETRIES = 3 + RETRY_DELAY = 10 # seconds + + if self._polling_conflict_count <= MAX_CONFLICT_RETRIES: + logger.warning( + "[%s] Telegram polling conflict (%d/%d), will retry in %ds. Error: %s", + self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES, + RETRY_DELAY, error, + ) + try: + if self._app and self._app.updater and self._app.updater.running: + await self._app.updater.stop() + except Exception: + pass + await asyncio.sleep(RETRY_DELAY) + try: + await self._app.updater.start_polling( + allowed_updates=Update.ALL_TYPES, + drop_pending_updates=False, + error_callback=self._polling_error_callback_ref, + ) + logger.info("[%s] Telegram polling resumed after conflict retry %d", self.name, self._polling_conflict_count) + self._polling_conflict_count = 0 # reset on success + return + except Exception as retry_err: + logger.warning("[%s] Telegram polling retry failed: %s", self.name, retry_err) + # Don't fall through to fatal yet — wait for the next conflict + # to trigger another retry attempt (up to MAX_CONFLICT_RETRIES). + return + + # Exhausted retries — fatal message = ( "Another Telegram bot poller is already using this token. " - "Hermes stopped Telegram polling to avoid endless retry spam. " + "Hermes stopped Telegram polling after %d retries. " "Make sure only one gateway instance is running for this bot token." + % MAX_CONFLICT_RETRIES ) logger.error("[%s] %s Original error: %s", self.name, message, error) self._set_fatal_error("telegram_polling_conflict", message, retryable=False) @@ -242,6 +283,9 @@ class TelegramAdapter(BasePlatformAdapter): return self._polling_error_task = loop.create_task(self._handle_polling_conflict(error)) + # Store reference for retry use in _handle_polling_conflict + self._polling_error_callback_ref = _polling_error_callback + await self._app.updater.start_polling( allowed_updates=Update.ALL_TYPES, drop_pending_updates=True, diff --git a/tests/gateway/test_telegram_conflict.py b/tests/gateway/test_telegram_conflict.py index 440aa99d8..c96768de2 100644 --- a/tests/gateway/test_telegram_conflict.py +++ b/tests/gateway/test_telegram_conflict.py @@ -47,8 +47,9 @@ async def test_connect_rejects_same_host_token_lock(monkeypatch): @pytest.mark.asyncio -async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch): - adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token")) +async def test_polling_conflict_retries_before_fatal(monkeypatch): + """A single 409 should trigger a retry, not an immediate fatal error.""" + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) fatal_handler = AsyncMock() adapter.set_fatal_error_handler(fatal_handler) @@ -69,6 +70,7 @@ async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch): updater = SimpleNamespace( start_polling=AsyncMock(side_effect=fake_start_polling), stop=AsyncMock(), + running=True, ) bot = SimpleNamespace(set_my_commands=AsyncMock()) app = SimpleNamespace( @@ -83,20 +85,102 @@ async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch): builder.build.return_value = app monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder))) + # Speed up retries for testing + monkeypatch.setattr("asyncio.sleep", AsyncMock()) + ok = await adapter.connect() assert ok is True assert callable(captured["error_callback"]) conflict = type("Conflict", (Exception,), {}) - captured["error_callback"](conflict("Conflict: terminated by other getUpdates request; make sure that only one bot instance is running")) + # First conflict: should retry, NOT be fatal + captured["error_callback"](conflict("Conflict: terminated by other getUpdates request")) await asyncio.sleep(0) await asyncio.sleep(0) + # Give the scheduled task a chance to run + for _ in range(10): + await asyncio.sleep(0) - assert adapter.fatal_error_code == "telegram_polling_conflict" + assert adapter.has_fatal_error is False, "First conflict should not be fatal" + assert adapter._polling_conflict_count == 0, "Count should reset after successful retry" + + +@pytest.mark.asyncio +async def test_polling_conflict_becomes_fatal_after_retries(monkeypatch): + """After exhausting retries, the conflict should become fatal.""" + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + fatal_handler = AsyncMock() + adapter.set_fatal_error_handler(fatal_handler) + + monkeypatch.setattr( + "gateway.status.acquire_scoped_lock", + lambda scope, identity, metadata=None: (True, None), + ) + monkeypatch.setattr( + "gateway.status.release_scoped_lock", + lambda scope, identity: None, + ) + + captured = {} + + async def fake_start_polling(**kwargs): + captured["error_callback"] = kwargs["error_callback"] + + # Make start_polling fail on retries to exhaust retries + call_count = {"n": 0} + + async def failing_start_polling(**kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + # First call (initial connect) succeeds + captured["error_callback"] = kwargs["error_callback"] + else: + # Retry calls fail + raise Exception("Connection refused") + + updater = SimpleNamespace( + start_polling=AsyncMock(side_effect=failing_start_polling), + stop=AsyncMock(), + running=True, + ) + bot = SimpleNamespace(set_my_commands=AsyncMock()) + app = SimpleNamespace( + bot=bot, + updater=updater, + add_handler=MagicMock(), + initialize=AsyncMock(), + start=AsyncMock(), + ) + builder = MagicMock() + builder.token.return_value = builder + builder.build.return_value = app + monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder))) + + # Speed up retries for testing + monkeypatch.setattr("asyncio.sleep", AsyncMock()) + + ok = await adapter.connect() + assert ok is True + + conflict = type("Conflict", (Exception,), {}) + + # Directly call _handle_polling_conflict to avoid event-loop scheduling + # complexity. Each call simulates one 409 from Telegram. + for i in range(4): + await adapter._handle_polling_conflict( + conflict("Conflict: terminated by other getUpdates request") + ) + + # After 3 failed retries (count 1-3 each enter the retry branch but + # start_polling raises), the 4th conflict pushes count to 4 which + # exceeds MAX_CONFLICT_RETRIES (3), entering the fatal branch. + assert adapter.fatal_error_code == "telegram_polling_conflict", ( + f"Expected fatal after 4 conflicts, got code={adapter.fatal_error_code}, " + f"count={adapter._polling_conflict_count}" + ) assert adapter.has_fatal_error is True - updater.stop.assert_awaited() fatal_handler.assert_awaited_once()