fix(telegram): self-reschedule reconnect when start_polling fails (#3268)
After a Telegram 502, _handle_polling_network_error calls updater.stop() then start_polling(). If start_polling() also raises, the old code logged a warning and returned — but the comment 'The next network error will trigger another attempt' was wrong. The updater loop is dead after stop(), so no further error callbacks ever fire. The gateway stays alive but permanently deaf to messages. Fix: when start_polling() fails in the except branch, schedule a new _handle_polling_network_error task to continue the exponential backoff retry chain. The task is tracked in _background_tasks (preventing GC). Guarded by has_fatal_error to avoid spurious retries during shutdown. Closes #3173. Salvaged from PR #3177 by Mibayy.
This commit is contained in:
@@ -219,7 +219,14 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
self._polling_network_error_count = 0
|
||||
except Exception as retry_err:
|
||||
logger.warning("[%s] Telegram polling reconnect failed: %s", self.name, retry_err)
|
||||
# The next network error will trigger another attempt.
|
||||
# start_polling failed — polling is dead and no further error
|
||||
# callbacks will fire, so schedule the next retry ourselves.
|
||||
if not self.has_fatal_error:
|
||||
task = asyncio.ensure_future(
|
||||
self._handle_polling_network_error(retry_err)
|
||||
)
|
||||
self._background_tasks.add(task)
|
||||
task.add_done_callback(self._background_tasks.discard)
|
||||
|
||||
async def _handle_polling_conflict(self, error: Exception) -> None:
|
||||
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
|
||||
|
||||
154
tests/gateway/test_telegram_network_reconnect.py
Normal file
154
tests/gateway/test_telegram_network_reconnect.py
Normal file
@@ -0,0 +1,154 @@
|
||||
"""
|
||||
Tests for Telegram polling network error recovery.
|
||||
|
||||
Specifically tests the fix for #3173 — when start_polling() fails after a
|
||||
network error, the adapter must self-reschedule the next reconnect attempt
|
||||
rather than silently leaving polling dead.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
|
||||
|
||||
def _ensure_telegram_mock():
|
||||
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
||||
return
|
||||
|
||||
telegram_mod = MagicMock()
|
||||
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
|
||||
telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
|
||||
telegram_mod.constants.ChatType.GROUP = "group"
|
||||
telegram_mod.constants.ChatType.SUPERGROUP = "supergroup"
|
||||
telegram_mod.constants.ChatType.CHANNEL = "channel"
|
||||
telegram_mod.constants.ChatType.PRIVATE = "private"
|
||||
|
||||
for name in ("telegram", "telegram.ext", "telegram.constants"):
|
||||
sys.modules.setdefault(name, telegram_mod)
|
||||
|
||||
|
||||
_ensure_telegram_mock()
|
||||
|
||||
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
|
||||
|
||||
|
||||
def _make_adapter() -> TelegramAdapter:
|
||||
return TelegramAdapter(PlatformConfig(enabled=True, token="test-token"))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_self_schedules_on_start_polling_failure():
|
||||
"""
|
||||
When start_polling() raises during a network error retry, the adapter must
|
||||
schedule a new _handle_polling_network_error task — otherwise polling stays
|
||||
dead with no further error callbacks to trigger recovery.
|
||||
|
||||
Regression test for #3173: gateway becomes unresponsive after Telegram 502.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
adapter._polling_network_error_count = 1
|
||||
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.running = True
|
||||
mock_updater.stop = AsyncMock()
|
||||
mock_updater.start_polling = AsyncMock(side_effect=Exception("Timed out"))
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
adapter._app = mock_app
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
|
||||
|
||||
# A retry task must have been added to _background_tasks
|
||||
pending = [t for t in adapter._background_tasks if not t.done()]
|
||||
assert len(pending) >= 1, (
|
||||
"Expected at least one self-rescheduled retry task in _background_tasks "
|
||||
f"after start_polling failure, got {len(pending)}"
|
||||
)
|
||||
|
||||
# Clean up — cancel the pending retry so it doesn't run after the test
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
try:
|
||||
await t
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_does_not_self_schedule_when_fatal_error_set():
|
||||
"""
|
||||
When a fatal error is already set, the failed reconnect should NOT create
|
||||
another retry task — the gateway is already shutting down this adapter.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
adapter._polling_network_error_count = 1
|
||||
adapter._set_fatal_error("telegram_network_error", "already fatal", retryable=True)
|
||||
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.running = True
|
||||
mock_updater.stop = AsyncMock()
|
||||
mock_updater.start_polling = AsyncMock(side_effect=Exception("Timed out"))
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
adapter._app = mock_app
|
||||
|
||||
initial_count = len(adapter._background_tasks)
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._handle_polling_network_error(Exception("Timed out"))
|
||||
|
||||
assert len(adapter._background_tasks) == initial_count, (
|
||||
"Should not schedule a retry when a fatal error is already set"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_success_resets_error_count():
|
||||
"""
|
||||
When start_polling() succeeds, _polling_network_error_count should reset to 0.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
adapter._polling_network_error_count = 3
|
||||
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.running = True
|
||||
mock_updater.stop = AsyncMock()
|
||||
mock_updater.start_polling = AsyncMock() # succeeds
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
adapter._app = mock_app
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
|
||||
|
||||
assert adapter._polling_network_error_count == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_triggers_fatal_after_max_retries():
|
||||
"""
|
||||
After MAX_NETWORK_RETRIES attempts, the adapter should set a fatal error
|
||||
rather than retrying forever.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
adapter._polling_network_error_count = 10 # MAX_NETWORK_RETRIES
|
||||
|
||||
fatal_handler = AsyncMock()
|
||||
adapter.set_fatal_error_handler(fatal_handler)
|
||||
|
||||
mock_app = MagicMock()
|
||||
adapter._app = mock_app
|
||||
|
||||
await adapter._handle_polling_network_error(Exception("still failing"))
|
||||
|
||||
assert adapter.has_fatal_error
|
||||
assert adapter.fatal_error_code == "telegram_network_error"
|
||||
fatal_handler.assert_called_once()
|
||||
Reference in New Issue
Block a user