fix(gateway): retry Telegram 409 polling conflicts before giving up
A single Telegram 409 Conflict from getUpdates permanently killed Telegram polling with no recovery possible (retryable=False on first occurrence). This is too aggressive for production use with process supervisors. Transient 409s are expected during: - --replace handoffs where the old long-poll session lingers on Telegram servers for a few seconds after SIGTERM - systemd Restart=on-failure respawns that overlap with the dying instance cleanup Now _handle_polling_conflict() retries up to 3 times with a 10-second delay between attempts. The 30-second total retry window lets stale server-side sessions expire. If all retries fail, the error is still marked as permanently fatal — preserving the original protection against genuine dual-instance conflicts. Tests updated: split the single conflict test into two — one verifying retry on transient conflict, one verifying fatal after exhausted retries. Closes #2296
This commit is contained in:
@@ -129,6 +129,8 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||||
self._token_lock_identity: Optional[str] = None
|
||||
self._polling_error_task: Optional[asyncio.Task] = None
|
||||
self._polling_conflict_count: int = 0
|
||||
self._polling_error_callback_ref = None
|
||||
|
||||
@staticmethod
|
||||
def _looks_like_polling_conflict(error: Exception) -> bool:
|
||||
@@ -142,10 +144,49 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
async def _handle_polling_conflict(self, error: Exception) -> None:
|
||||
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
|
||||
return
|
||||
# Track consecutive conflicts — transient 409s can occur when a
|
||||
# previous gateway instance hasn't fully released its long-poll
|
||||
# session on Telegram's server (e.g. during --replace handoffs or
|
||||
# systemd Restart=on-failure respawns). Retry a few times before
|
||||
# giving up, so the old session has time to expire.
|
||||
self._polling_conflict_count += 1
|
||||
|
||||
MAX_CONFLICT_RETRIES = 3
|
||||
RETRY_DELAY = 10 # seconds
|
||||
|
||||
if self._polling_conflict_count <= MAX_CONFLICT_RETRIES:
|
||||
logger.warning(
|
||||
"[%s] Telegram polling conflict (%d/%d), will retry in %ds. Error: %s",
|
||||
self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES,
|
||||
RETRY_DELAY, error,
|
||||
)
|
||||
try:
|
||||
if self._app and self._app.updater and self._app.updater.running:
|
||||
await self._app.updater.stop()
|
||||
except Exception:
|
||||
pass
|
||||
await asyncio.sleep(RETRY_DELAY)
|
||||
try:
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
drop_pending_updates=False,
|
||||
error_callback=self._polling_error_callback_ref,
|
||||
)
|
||||
logger.info("[%s] Telegram polling resumed after conflict retry %d", self.name, self._polling_conflict_count)
|
||||
self._polling_conflict_count = 0 # reset on success
|
||||
return
|
||||
except Exception as retry_err:
|
||||
logger.warning("[%s] Telegram polling retry failed: %s", self.name, retry_err)
|
||||
# Don't fall through to fatal yet — wait for the next conflict
|
||||
# to trigger another retry attempt (up to MAX_CONFLICT_RETRIES).
|
||||
return
|
||||
|
||||
# Exhausted retries — fatal
|
||||
message = (
|
||||
"Another Telegram bot poller is already using this token. "
|
||||
"Hermes stopped Telegram polling to avoid endless retry spam. "
|
||||
"Hermes stopped Telegram polling after %d retries. "
|
||||
"Make sure only one gateway instance is running for this bot token."
|
||||
% MAX_CONFLICT_RETRIES
|
||||
)
|
||||
logger.error("[%s] %s Original error: %s", self.name, message, error)
|
||||
self._set_fatal_error("telegram_polling_conflict", message, retryable=False)
|
||||
@@ -242,6 +283,9 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
return
|
||||
self._polling_error_task = loop.create_task(self._handle_polling_conflict(error))
|
||||
|
||||
# Store reference for retry use in _handle_polling_conflict
|
||||
self._polling_error_callback_ref = _polling_error_callback
|
||||
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
drop_pending_updates=True,
|
||||
|
||||
@@ -47,8 +47,9 @@ async def test_connect_rejects_same_host_token_lock(monkeypatch):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch):
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))
|
||||
async def test_polling_conflict_retries_before_fatal(monkeypatch):
|
||||
"""A single 409 should trigger a retry, not an immediate fatal error."""
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
||||
fatal_handler = AsyncMock()
|
||||
adapter.set_fatal_error_handler(fatal_handler)
|
||||
|
||||
@@ -69,6 +70,7 @@ async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch):
|
||||
updater = SimpleNamespace(
|
||||
start_polling=AsyncMock(side_effect=fake_start_polling),
|
||||
stop=AsyncMock(),
|
||||
running=True,
|
||||
)
|
||||
bot = SimpleNamespace(set_my_commands=AsyncMock())
|
||||
app = SimpleNamespace(
|
||||
@@ -83,20 +85,102 @@ async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch):
|
||||
builder.build.return_value = app
|
||||
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
|
||||
|
||||
# Speed up retries for testing
|
||||
monkeypatch.setattr("asyncio.sleep", AsyncMock())
|
||||
|
||||
ok = await adapter.connect()
|
||||
|
||||
assert ok is True
|
||||
assert callable(captured["error_callback"])
|
||||
|
||||
conflict = type("Conflict", (Exception,), {})
|
||||
captured["error_callback"](conflict("Conflict: terminated by other getUpdates request; make sure that only one bot instance is running"))
|
||||
|
||||
# First conflict: should retry, NOT be fatal
|
||||
captured["error_callback"](conflict("Conflict: terminated by other getUpdates request"))
|
||||
await asyncio.sleep(0)
|
||||
await asyncio.sleep(0)
|
||||
# Give the scheduled task a chance to run
|
||||
for _ in range(10):
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert adapter.fatal_error_code == "telegram_polling_conflict"
|
||||
assert adapter.has_fatal_error is False, "First conflict should not be fatal"
|
||||
assert adapter._polling_conflict_count == 0, "Count should reset after successful retry"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_polling_conflict_becomes_fatal_after_retries(monkeypatch):
|
||||
"""After exhausting retries, the conflict should become fatal."""
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
||||
fatal_handler = AsyncMock()
|
||||
adapter.set_fatal_error_handler(fatal_handler)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.acquire_scoped_lock",
|
||||
lambda scope, identity, metadata=None: (True, None),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.release_scoped_lock",
|
||||
lambda scope, identity: None,
|
||||
)
|
||||
|
||||
captured = {}
|
||||
|
||||
async def fake_start_polling(**kwargs):
|
||||
captured["error_callback"] = kwargs["error_callback"]
|
||||
|
||||
# Make start_polling fail on retries to exhaust retries
|
||||
call_count = {"n": 0}
|
||||
|
||||
async def failing_start_polling(**kwargs):
|
||||
call_count["n"] += 1
|
||||
if call_count["n"] == 1:
|
||||
# First call (initial connect) succeeds
|
||||
captured["error_callback"] = kwargs["error_callback"]
|
||||
else:
|
||||
# Retry calls fail
|
||||
raise Exception("Connection refused")
|
||||
|
||||
updater = SimpleNamespace(
|
||||
start_polling=AsyncMock(side_effect=failing_start_polling),
|
||||
stop=AsyncMock(),
|
||||
running=True,
|
||||
)
|
||||
bot = SimpleNamespace(set_my_commands=AsyncMock())
|
||||
app = SimpleNamespace(
|
||||
bot=bot,
|
||||
updater=updater,
|
||||
add_handler=MagicMock(),
|
||||
initialize=AsyncMock(),
|
||||
start=AsyncMock(),
|
||||
)
|
||||
builder = MagicMock()
|
||||
builder.token.return_value = builder
|
||||
builder.build.return_value = app
|
||||
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
|
||||
|
||||
# Speed up retries for testing
|
||||
monkeypatch.setattr("asyncio.sleep", AsyncMock())
|
||||
|
||||
ok = await adapter.connect()
|
||||
assert ok is True
|
||||
|
||||
conflict = type("Conflict", (Exception,), {})
|
||||
|
||||
# Directly call _handle_polling_conflict to avoid event-loop scheduling
|
||||
# complexity. Each call simulates one 409 from Telegram.
|
||||
for i in range(4):
|
||||
await adapter._handle_polling_conflict(
|
||||
conflict("Conflict: terminated by other getUpdates request")
|
||||
)
|
||||
|
||||
# After 3 failed retries (count 1-3 each enter the retry branch but
|
||||
# start_polling raises), the 4th conflict pushes count to 4 which
|
||||
# exceeds MAX_CONFLICT_RETRIES (3), entering the fatal branch.
|
||||
assert adapter.fatal_error_code == "telegram_polling_conflict", (
|
||||
f"Expected fatal after 4 conflicts, got code={adapter.fatal_error_code}, "
|
||||
f"count={adapter._polling_conflict_count}"
|
||||
)
|
||||
assert adapter.has_fatal_error is True
|
||||
updater.stop.assert_awaited()
|
||||
fatal_handler.assert_awaited_once()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user