Files
hermes-agent/tests/gateway/test_telegram_network_reconnect.py
Teknium 75fcbc44ce feat(telegram): auto-discover fallback IPs via DoH when api.telegram.org is unreachable (#3376)
* feat(telegram): auto-discover fallback IPs via DoH when api.telegram.org is unreachable

On some networks (university, corporate), api.telegram.org resolves to a
valid Telegram IP that is unreachable due to routing/firewall rules. A
different IP in the same Telegram-owned 149.154.160.0/20 block works fine.

This adds automatic fallback IP discovery at connect time:
1. Query Google and Cloudflare DNS-over-HTTPS for api.telegram.org A records
2. Exclude the system-DNS IP (the unreachable one), use the rest as fallbacks
3. If DoH is also blocked, fall back to a seed list (149.154.167.220)
4. TelegramFallbackTransport tries primary first, sticks to whichever works

No configuration needed — works automatically. TELEGRAM_FALLBACK_IPS env var
still available as manual override. Zero impact on healthy networks (primary
path succeeds on first attempt, fallback never exercised).

No new dependencies (uses httpx already in deps + stdlib socket).

* fix: share transport instance and downgrade seed fallback log to info

- Use single TelegramFallbackTransport shared between request and
  get_updates_request so sticky IP is shared across polling and API calls
- Keep separate HTTPXRequest instances (different timeout settings)
- Downgrade "using seed fallback IPs" from warning to info to avoid
  noisy logs on healthy networks

* fix: add telegram.request mock and discovery fixture to remaining test files

The original PR missed test_dm_topics.py and
test_telegram_network_reconnect.py — both need the telegram.request
mock module. The reconnect test also needs _no_auto_discovery since
_handle_polling_network_error calls connect() which now invokes
discover_fallback_ips().

---------

Co-authored-by: Mohan Qiao <Gavin-Qiao@users.noreply.github.com>
2026-03-27 04:03:13 -07:00

163 lines
5.2 KiB
Python

"""
Tests for Telegram polling network error recovery.
Specifically tests the fix for #3173 — when start_polling() fails after a
network error, the adapter must self-reschedule the next reconnect attempt
rather than silently leaving polling dead.
"""
import asyncio
import sys
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import PlatformConfig
def _ensure_telegram_mock():
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
telegram_mod = MagicMock()
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
telegram_mod.constants.ChatType.GROUP = "group"
telegram_mod.constants.ChatType.SUPERGROUP = "supergroup"
telegram_mod.constants.ChatType.CHANNEL = "channel"
telegram_mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, telegram_mod)
_ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
@pytest.fixture(autouse=True)
def _no_auto_discovery(monkeypatch):
"""Disable DoH auto-discovery so connect() uses the plain builder chain."""
async def _noop():
return []
monkeypatch.setattr("gateway.platforms.telegram.discover_fallback_ips", _noop)
def _make_adapter() -> TelegramAdapter:
return TelegramAdapter(PlatformConfig(enabled=True, token="test-token"))
@pytest.mark.asyncio
async def test_reconnect_self_schedules_on_start_polling_failure():
"""
When start_polling() raises during a network error retry, the adapter must
schedule a new _handle_polling_network_error task — otherwise polling stays
dead with no further error callbacks to trigger recovery.
Regression test for #3173: gateway becomes unresponsive after Telegram 502.
"""
adapter = _make_adapter()
adapter._polling_network_error_count = 1
mock_updater = MagicMock()
mock_updater.running = True
mock_updater.stop = AsyncMock()
mock_updater.start_polling = AsyncMock(side_effect=Exception("Timed out"))
mock_app = MagicMock()
mock_app.updater = mock_updater
adapter._app = mock_app
with patch("asyncio.sleep", new_callable=AsyncMock):
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
# A retry task must have been added to _background_tasks
pending = [t for t in adapter._background_tasks if not t.done()]
assert len(pending) >= 1, (
"Expected at least one self-rescheduled retry task in _background_tasks "
f"after start_polling failure, got {len(pending)}"
)
# Clean up — cancel the pending retry so it doesn't run after the test
for t in pending:
t.cancel()
try:
await t
except (asyncio.CancelledError, Exception):
pass
@pytest.mark.asyncio
async def test_reconnect_does_not_self_schedule_when_fatal_error_set():
"""
When a fatal error is already set, the failed reconnect should NOT create
another retry task — the gateway is already shutting down this adapter.
"""
adapter = _make_adapter()
adapter._polling_network_error_count = 1
adapter._set_fatal_error("telegram_network_error", "already fatal", retryable=True)
mock_updater = MagicMock()
mock_updater.running = True
mock_updater.stop = AsyncMock()
mock_updater.start_polling = AsyncMock(side_effect=Exception("Timed out"))
mock_app = MagicMock()
mock_app.updater = mock_updater
adapter._app = mock_app
initial_count = len(adapter._background_tasks)
with patch("asyncio.sleep", new_callable=AsyncMock):
await adapter._handle_polling_network_error(Exception("Timed out"))
assert len(adapter._background_tasks) == initial_count, (
"Should not schedule a retry when a fatal error is already set"
)
@pytest.mark.asyncio
async def test_reconnect_success_resets_error_count():
"""
When start_polling() succeeds, _polling_network_error_count should reset to 0.
"""
adapter = _make_adapter()
adapter._polling_network_error_count = 3
mock_updater = MagicMock()
mock_updater.running = True
mock_updater.stop = AsyncMock()
mock_updater.start_polling = AsyncMock() # succeeds
mock_app = MagicMock()
mock_app.updater = mock_updater
adapter._app = mock_app
with patch("asyncio.sleep", new_callable=AsyncMock):
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
assert adapter._polling_network_error_count == 0
@pytest.mark.asyncio
async def test_reconnect_triggers_fatal_after_max_retries():
"""
After MAX_NETWORK_RETRIES attempts, the adapter should set a fatal error
rather than retrying forever.
"""
adapter = _make_adapter()
adapter._polling_network_error_count = 10 # MAX_NETWORK_RETRIES
fatal_handler = AsyncMock()
adapter.set_fatal_error_handler(fatal_handler)
mock_app = MagicMock()
adapter._app = mock_app
await adapter._handle_polling_network_error(Exception("still failing"))
assert adapter.has_fatal_error
assert adapter.fatal_error_code == "telegram_network_error"
fatal_handler.assert_called_once()