* refactor: add shared helper modules for code deduplication New modules: - gateway/platforms/helpers.py: MessageDeduplicator, TextBatchAggregator, strip_markdown, ThreadParticipationTracker, redact_phone - hermes_cli/cli_output.py: print_info/success/warning/error, prompt helpers - tools/path_security.py: validate_within_dir, has_traversal_component - utils.py additions: safe_json_loads, read_json_file, read_jsonl, append_jsonl, env_str/lower/int/bool helpers - hermes_constants.py additions: get_config_path, get_skills_dir, get_logs_dir, get_env_path * refactor: migrate gateway adapters to shared helpers - MessageDeduplicator: discord, slack, dingtalk, wecom, weixin, mattermost - strip_markdown: bluebubbles, feishu, sms - redact_phone: sms, signal - ThreadParticipationTracker: discord, matrix - _acquire/_release_platform_lock: telegram, discord, slack, whatsapp, signal, weixin Net -316 lines across 19 files. * refactor: migrate CLI modules to shared helpers - tools_config.py: use cli_output print/prompt + curses_radiolist (-117 lines) - setup.py: use cli_output print helpers + curses_radiolist (-101 lines) - mcp_config.py: use cli_output prompt (-15 lines) - memory_setup.py: use curses_radiolist (-86 lines) Net -263 lines across 5 files. * refactor: migrate to shared utility helpers - safe_json_loads: agent/display.py (4 sites) - get_config_path: skill_utils.py, hermes_logging.py, hermes_time.py - get_skills_dir: skill_utils.py, prompt_builder.py - Token estimation dedup: skills_tool.py imports from model_metadata - Path security: skills_tool, cronjob_tools, skill_manager_tool, credential_files - Non-atomic YAML writes: doctor.py, config.py now use atomic_yaml_write - Platform dict: new platforms.py, skills_config + tools_config derive from it - Anthropic key: new get_anthropic_key() in auth.py, used by doctor/status/config/main * test: update tests for shared helper migrations - test_dingtalk: use _dedup.is_duplicate() instead of _is_duplicate() - test_mattermost: use _dedup instead of _seen_posts/_prune_seen - test_signal: import redact_phone from helpers instead of signal - test_discord_connect: _platform_lock_identity instead of _token_lock_identity - test_telegram_conflict: updated lock error message format - test_skill_manager_tool: 'escapes' instead of 'boundary' in error msgs
312 lines
10 KiB
Python
312 lines
10 KiB
Python
import asyncio
|
|
import sys
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from gateway.config import PlatformConfig
|
|
|
|
|
|
def _ensure_telegram_mock():
|
|
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
|
return
|
|
|
|
telegram_mod = MagicMock()
|
|
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
|
|
telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
|
|
telegram_mod.constants.ChatType.GROUP = "group"
|
|
telegram_mod.constants.ChatType.SUPERGROUP = "supergroup"
|
|
telegram_mod.constants.ChatType.CHANNEL = "channel"
|
|
telegram_mod.constants.ChatType.PRIVATE = "private"
|
|
|
|
# Provide real exception classes so ``except (NetworkError, ...)`` in
|
|
# connect() doesn't blow up with "catching classes that do not inherit
|
|
# from BaseException" when another xdist worker pollutes sys.modules.
|
|
telegram_mod.error.NetworkError = type("NetworkError", (OSError,), {})
|
|
telegram_mod.error.TimedOut = type("TimedOut", (OSError,), {})
|
|
telegram_mod.error.BadRequest = type("BadRequest", (Exception,), {})
|
|
|
|
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
|
|
sys.modules.setdefault(name, telegram_mod)
|
|
sys.modules.setdefault("telegram.error", telegram_mod.error)
|
|
|
|
|
|
_ensure_telegram_mock()
|
|
|
|
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _no_auto_discovery(monkeypatch):
|
|
"""Disable DoH auto-discovery so connect() uses the plain builder chain."""
|
|
async def _noop():
|
|
return []
|
|
monkeypatch.setattr("gateway.platforms.telegram.discover_fallback_ips", _noop)
|
|
# Mock HTTPXRequest so the builder chain doesn't fail
|
|
monkeypatch.setattr("gateway.platforms.telegram.HTTPXRequest", lambda **kwargs: MagicMock())
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_rejects_same_host_token_lock(monkeypatch):
|
|
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))
|
|
|
|
monkeypatch.setattr(
|
|
"gateway.status.acquire_scoped_lock",
|
|
lambda scope, identity, metadata=None: (False, {"pid": 4242}),
|
|
)
|
|
|
|
ok = await adapter.connect()
|
|
|
|
assert ok is False
|
|
assert adapter.fatal_error_code == "telegram-bot-token_lock"
|
|
assert adapter.has_fatal_error is True
|
|
assert "already in use" in adapter.fatal_error_message
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_polling_conflict_retries_before_fatal(monkeypatch):
|
|
"""A single 409 should trigger a retry, not an immediate fatal error."""
|
|
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
|
fatal_handler = AsyncMock()
|
|
adapter.set_fatal_error_handler(fatal_handler)
|
|
|
|
monkeypatch.setattr(
|
|
"gateway.status.acquire_scoped_lock",
|
|
lambda scope, identity, metadata=None: (True, None),
|
|
)
|
|
monkeypatch.setattr(
|
|
"gateway.status.release_scoped_lock",
|
|
lambda scope, identity: None,
|
|
)
|
|
|
|
captured = {}
|
|
|
|
async def fake_start_polling(**kwargs):
|
|
captured["error_callback"] = kwargs["error_callback"]
|
|
|
|
updater = SimpleNamespace(
|
|
start_polling=AsyncMock(side_effect=fake_start_polling),
|
|
stop=AsyncMock(),
|
|
running=True,
|
|
)
|
|
bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock())
|
|
app = SimpleNamespace(
|
|
bot=bot,
|
|
updater=updater,
|
|
add_handler=MagicMock(),
|
|
initialize=AsyncMock(),
|
|
start=AsyncMock(),
|
|
)
|
|
builder = MagicMock()
|
|
builder.token.return_value = builder
|
|
builder.request.return_value = builder
|
|
builder.get_updates_request.return_value = builder
|
|
builder.build.return_value = app
|
|
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
|
|
|
|
# Speed up retries for testing
|
|
monkeypatch.setattr("asyncio.sleep", AsyncMock())
|
|
|
|
ok = await adapter.connect()
|
|
|
|
assert ok is True
|
|
bot.delete_webhook.assert_awaited_once_with(drop_pending_updates=False)
|
|
assert callable(captured["error_callback"])
|
|
|
|
conflict = type("Conflict", (Exception,), {})
|
|
|
|
# First conflict: should retry, NOT be fatal
|
|
captured["error_callback"](conflict("Conflict: terminated by other getUpdates request"))
|
|
await asyncio.sleep(0)
|
|
await asyncio.sleep(0)
|
|
# Give the scheduled task a chance to run
|
|
for _ in range(10):
|
|
await asyncio.sleep(0)
|
|
|
|
assert adapter.has_fatal_error is False, "First conflict should not be fatal"
|
|
assert adapter._polling_conflict_count == 0, "Count should reset after successful retry"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_polling_conflict_becomes_fatal_after_retries(monkeypatch):
|
|
"""After exhausting retries, the conflict should become fatal."""
|
|
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
|
fatal_handler = AsyncMock()
|
|
adapter.set_fatal_error_handler(fatal_handler)
|
|
|
|
monkeypatch.setattr(
|
|
"gateway.status.acquire_scoped_lock",
|
|
lambda scope, identity, metadata=None: (True, None),
|
|
)
|
|
monkeypatch.setattr(
|
|
"gateway.status.release_scoped_lock",
|
|
lambda scope, identity: None,
|
|
)
|
|
|
|
captured = {}
|
|
|
|
async def fake_start_polling(**kwargs):
|
|
captured["error_callback"] = kwargs["error_callback"]
|
|
|
|
# Make start_polling fail on retries to exhaust retries
|
|
call_count = {"n": 0}
|
|
|
|
async def failing_start_polling(**kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
# First call (initial connect) succeeds
|
|
captured["error_callback"] = kwargs["error_callback"]
|
|
else:
|
|
# Retry calls fail
|
|
raise Exception("Connection refused")
|
|
|
|
updater = SimpleNamespace(
|
|
start_polling=AsyncMock(side_effect=failing_start_polling),
|
|
stop=AsyncMock(),
|
|
running=True,
|
|
)
|
|
bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock())
|
|
app = SimpleNamespace(
|
|
bot=bot,
|
|
updater=updater,
|
|
add_handler=MagicMock(),
|
|
initialize=AsyncMock(),
|
|
start=AsyncMock(),
|
|
)
|
|
builder = MagicMock()
|
|
builder.token.return_value = builder
|
|
builder.request.return_value = builder
|
|
builder.get_updates_request.return_value = builder
|
|
builder.build.return_value = app
|
|
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
|
|
|
|
# Speed up retries for testing
|
|
monkeypatch.setattr("asyncio.sleep", AsyncMock())
|
|
|
|
ok = await adapter.connect()
|
|
assert ok is True
|
|
|
|
conflict = type("Conflict", (Exception,), {})
|
|
|
|
# Directly call _handle_polling_conflict to avoid event-loop scheduling
|
|
# complexity. Each call simulates one 409 from Telegram.
|
|
for i in range(4):
|
|
await adapter._handle_polling_conflict(
|
|
conflict("Conflict: terminated by other getUpdates request")
|
|
)
|
|
|
|
# After 3 failed retries (count 1-3 each enter the retry branch but
|
|
# start_polling raises), the 4th conflict pushes count to 4 which
|
|
# exceeds MAX_CONFLICT_RETRIES (3), entering the fatal branch.
|
|
assert adapter.fatal_error_code == "telegram_polling_conflict", (
|
|
f"Expected fatal after 4 conflicts, got code={adapter.fatal_error_code}, "
|
|
f"count={adapter._polling_conflict_count}"
|
|
)
|
|
assert adapter.has_fatal_error is True
|
|
fatal_handler.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_marks_retryable_fatal_error_for_startup_network_failure(monkeypatch):
|
|
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
|
|
|
monkeypatch.setattr(
|
|
"gateway.status.acquire_scoped_lock",
|
|
lambda scope, identity, metadata=None: (True, None),
|
|
)
|
|
monkeypatch.setattr(
|
|
"gateway.status.release_scoped_lock",
|
|
lambda scope, identity: None,
|
|
)
|
|
|
|
builder = MagicMock()
|
|
builder.token.return_value = builder
|
|
builder.request.return_value = builder
|
|
builder.get_updates_request.return_value = builder
|
|
app = SimpleNamespace(
|
|
bot=SimpleNamespace(delete_webhook=AsyncMock(), set_my_commands=AsyncMock()),
|
|
updater=SimpleNamespace(),
|
|
add_handler=MagicMock(),
|
|
initialize=AsyncMock(side_effect=RuntimeError("Temporary failure in name resolution")),
|
|
start=AsyncMock(),
|
|
)
|
|
builder.build.return_value = app
|
|
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
|
|
|
|
ok = await adapter.connect()
|
|
|
|
assert ok is False
|
|
assert adapter.fatal_error_code == "telegram_connect_error"
|
|
assert adapter.fatal_error_retryable is True
|
|
assert "Temporary failure in name resolution" in adapter.fatal_error_message
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_clears_webhook_before_polling(monkeypatch):
|
|
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
|
|
|
monkeypatch.setattr(
|
|
"gateway.status.acquire_scoped_lock",
|
|
lambda scope, identity, metadata=None: (True, None),
|
|
)
|
|
monkeypatch.setattr(
|
|
"gateway.status.release_scoped_lock",
|
|
lambda scope, identity: None,
|
|
)
|
|
|
|
updater = SimpleNamespace(
|
|
start_polling=AsyncMock(),
|
|
stop=AsyncMock(),
|
|
running=True,
|
|
)
|
|
bot = SimpleNamespace(
|
|
delete_webhook=AsyncMock(),
|
|
set_my_commands=AsyncMock(),
|
|
)
|
|
app = SimpleNamespace(
|
|
bot=bot,
|
|
updater=updater,
|
|
add_handler=MagicMock(),
|
|
initialize=AsyncMock(),
|
|
start=AsyncMock(),
|
|
)
|
|
builder = MagicMock()
|
|
builder.token.return_value = builder
|
|
builder.request.return_value = builder
|
|
builder.get_updates_request.return_value = builder
|
|
builder.build.return_value = app
|
|
monkeypatch.setattr(
|
|
"gateway.platforms.telegram.Application",
|
|
SimpleNamespace(builder=MagicMock(return_value=builder)),
|
|
)
|
|
|
|
ok = await adapter.connect()
|
|
|
|
assert ok is True
|
|
bot.delete_webhook.assert_awaited_once_with(drop_pending_updates=False)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disconnect_skips_inactive_updater_and_app(monkeypatch):
|
|
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
|
|
|
updater = SimpleNamespace(running=False, stop=AsyncMock())
|
|
app = SimpleNamespace(
|
|
updater=updater,
|
|
running=False,
|
|
stop=AsyncMock(),
|
|
shutdown=AsyncMock(),
|
|
)
|
|
adapter._app = app
|
|
|
|
warning = MagicMock()
|
|
monkeypatch.setattr("gateway.platforms.telegram.logger.warning", warning)
|
|
|
|
await adapter.disconnect()
|
|
|
|
updater.stop.assert_not_awaited()
|
|
app.stop.assert_not_awaited()
|
|
app.shutdown.assert_awaited_once()
|
|
warning.assert_not_called()
|