From 5a2fcaab39a8f2765c724b4ae01d2c0afd0a6b1d Mon Sep 17 00:00:00 2001 From: teknium1 Date: Sat, 14 Mar 2026 12:11:23 -0700 Subject: [PATCH] fix(gateway): harden Telegram polling conflict handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - detect Telegram getUpdates conflicts and stop polling cleanly instead of retry-spamming forever - add a machine-local token-scoped lock so different HERMES_HOME profiles on the same host can't poll the same bot token at once - persist gateway runtime health/fatal adapter state and surface it in ● hermes-gateway.service - Hermes Agent Gateway - Messaging Platform Integration Loaded: loaded (/home/teknium/.config/systemd/user/hermes-gateway.service; enabled; preset: enabled) Active: active (running) since Sat 2026-03-14 09:25:35 PDT; 2h 45min ago Invocation: 8879379b25994201b98381f4bd80c2af Main PID: 1147926 (python) Tasks: 16 (limit: 76757) Memory: 151.4M (peak: 168.1M) CPU: 47.883s CGroup: /user.slice/user-1000.slice/user@1000.service/app.slice/hermes-gateway.service ├─1147926 /home/teknium/.hermes/hermes-agent/venv/bin/python -m hermes_cli.main gateway run --replace └─1147966 node /home/teknium/.hermes/hermes-agent/scripts/whatsapp-bridge/bridge.js --port 3000 --session /home/teknium/.hermes/whatsapp/session --mode self-chat Mar 14 09:27:03 teknium-dev python[1147926]: 🔄 Retrying API call (2/3)... Mar 14 09:27:04 teknium-dev python[1147926]: [409B blob data] Mar 14 09:27:04 teknium-dev python[1147926]: Content: '' Mar 14 09:27:04 teknium-dev python[1147926]: ❌ Max retries (3) for empty content exceeded. Mar 14 09:27:07 teknium-dev python[1147926]: [1K blob data] Mar 14 09:27:07 teknium-dev python[1147926]: Content: '' Mar 14 09:27:07 teknium-dev python[1147926]: 🔄 Retrying API call (1/3)... Mar 14 09:27:12 teknium-dev python[1147926]: [1.7K blob data] Mar 14 09:27:12 teknium-dev python[1147926]: Content: '' Mar 14 09:27:12 teknium-dev python[1147926]: 🔄 Retrying API call (2/3)... ⚠ Installed gateway service definition is outdated Run: hermes gateway restart # auto-refreshes the unit ✓ Gateway service is running ✓ Systemd linger is enabled (service survives logout) - cleanly exit non-retryable startup conflicts without triggering service restart loops Tests: - gateway status runtime-state helpers - Telegram token-lock and polling-conflict behavior - GatewayRunner clean exit on non-retryable startup conflict - CLI runtime health summary --- gateway/platforms/base.py | 68 +++++++ gateway/platforms/telegram.py | 77 ++++++- gateway/run.py | 75 ++++++- gateway/status.py | 191 +++++++++++++++++- hermes_cli/gateway.py | 48 +++++ tests/gateway/test_runner_fatal_adapter.py | 46 +++++ tests/gateway/test_status.py | 74 +++++++ tests/gateway/test_telegram_conflict.py | 100 +++++++++ .../hermes_cli/test_gateway_runtime_health.py | 22 ++ 9 files changed, 692 insertions(+), 9 deletions(-) create mode 100644 tests/gateway/test_runner_fatal_adapter.py create mode 100644 tests/gateway/test_telegram_conflict.py create mode 100644 tests/hermes_cli/test_gateway_runtime_health.py diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 67a8323a7..e523d9390 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -346,6 +346,10 @@ class BasePlatformAdapter(ABC): self.platform = platform self._message_handler: Optional[MessageHandler] = None self._running = False + self._fatal_error_code: Optional[str] = None + self._fatal_error_message: Optional[str] = None + self._fatal_error_retryable = True + self._fatal_error_handler: Optional[Callable[["BasePlatformAdapter"], Awaitable[None] | None]] = None # Track active message handlers per session for interrupt support # Key: session_key (e.g., chat_id), Value: (event, asyncio.Event for interrupt) @@ -353,6 +357,70 @@ class BasePlatformAdapter(ABC): self._pending_messages: Dict[str, MessageEvent] = {} # Chats where auto-TTS on voice input is disabled (set by /voice off) self._auto_tts_disabled_chats: set = set() + + @property + def has_fatal_error(self) -> bool: + return self._fatal_error_message is not None + + @property + def fatal_error_message(self) -> Optional[str]: + return self._fatal_error_message + + @property + def fatal_error_code(self) -> Optional[str]: + return self._fatal_error_code + + @property + def fatal_error_retryable(self) -> bool: + return self._fatal_error_retryable + + def set_fatal_error_handler(self, handler: Callable[["BasePlatformAdapter"], Awaitable[None] | None]) -> None: + self._fatal_error_handler = handler + + def _mark_connected(self) -> None: + self._running = True + self._fatal_error_code = None + self._fatal_error_message = None + self._fatal_error_retryable = True + try: + from gateway.status import write_runtime_status + write_runtime_status(platform=self.platform.value, platform_state="connected", error_code=None, error_message=None) + except Exception: + pass + + def _mark_disconnected(self) -> None: + self._running = False + if self.has_fatal_error: + return + try: + from gateway.status import write_runtime_status + write_runtime_status(platform=self.platform.value, platform_state="disconnected", error_code=None, error_message=None) + except Exception: + pass + + def _set_fatal_error(self, code: str, message: str, *, retryable: bool) -> None: + self._running = False + self._fatal_error_code = code + self._fatal_error_message = message + self._fatal_error_retryable = retryable + try: + from gateway.status import write_runtime_status + write_runtime_status( + platform=self.platform.value, + platform_state="fatal", + error_code=code, + error_message=message, + ) + except Exception: + pass + + async def _notify_fatal_error(self) -> None: + handler = self._fatal_error_handler + if not handler: + return + result = handler(self) + if asyncio.iscoroutine(result): + await result @property def name(self) -> str: diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index df44733e3..8ad3e00b4 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -110,7 +110,35 @@ class TelegramAdapter(BasePlatformAdapter): super().__init__(config, Platform.TELEGRAM) self._app: Optional[Application] = None self._bot: Optional[Bot] = None - + self._token_lock_identity: Optional[str] = None + self._polling_error_task: Optional[asyncio.Task] = None + + @staticmethod + def _looks_like_polling_conflict(error: Exception) -> bool: + text = str(error).lower() + return ( + error.__class__.__name__.lower() == "conflict" + or "terminated by other getupdates request" in text + or "another bot instance is running" in text + ) + + async def _handle_polling_conflict(self, error: Exception) -> None: + if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict": + return + message = ( + "Another Telegram bot poller is already using this token. " + "Hermes stopped Telegram polling to avoid endless retry spam. " + "Make sure only one gateway instance is running for this bot token." + ) + logger.error("[%s] %s Original error: %s", self.name, message, error) + self._set_fatal_error("telegram_polling_conflict", message, retryable=False) + try: + if self._app and self._app.updater: + await self._app.updater.stop() + except Exception as stop_error: + logger.warning("[%s] Failed stopping Telegram polling after conflict: %s", self.name, stop_error, exc_info=True) + await self._notify_fatal_error() + async def connect(self) -> bool: """Connect to Telegram and start polling for updates.""" if not TELEGRAM_AVAILABLE: @@ -125,6 +153,25 @@ class TelegramAdapter(BasePlatformAdapter): return False try: + from gateway.status import acquire_scoped_lock + + self._token_lock_identity = self.config.token + acquired, existing = acquire_scoped_lock( + "telegram-bot-token", + self._token_lock_identity, + metadata={"platform": self.platform.value}, + ) + if not acquired: + owner_pid = existing.get("pid") if isinstance(existing, dict) else None + message = ( + "Another local Hermes gateway is already using this Telegram bot token" + + (f" (PID {owner_pid})." if owner_pid else ".") + + " Stop the other gateway before starting a second Telegram poller." + ) + logger.error("[%s] %s", self.name, message) + self._set_fatal_error("telegram_token_lock", message, retryable=False) + return False + # Build the application self._app = Application.builder().token(self.config.token).build() self._bot = self._app.bot @@ -150,9 +197,20 @@ class TelegramAdapter(BasePlatformAdapter): # Start polling in background await self._app.initialize() await self._app.start() + loop = asyncio.get_running_loop() + + def _polling_error_callback(error: Exception) -> None: + if not self._looks_like_polling_conflict(error): + logger.error("[%s] Telegram polling error: %s", self.name, error, exc_info=True) + return + if self._polling_error_task and not self._polling_error_task.done(): + return + self._polling_error_task = loop.create_task(self._handle_polling_conflict(error)) + await self._app.updater.start_polling( allowed_updates=Update.ALL_TYPES, drop_pending_updates=True, + error_callback=_polling_error_callback, ) # Register bot commands so Telegram shows a hint menu when users type / @@ -188,11 +246,17 @@ class TelegramAdapter(BasePlatformAdapter): exc_info=True, ) - self._running = True + self._mark_connected() logger.info("[%s] Connected and polling for Telegram updates", self.name) return True except Exception as e: + if self._token_lock_identity: + try: + from gateway.status import release_scoped_lock + release_scoped_lock("telegram-bot-token", self._token_lock_identity) + except Exception: + pass logger.error("[%s] Failed to connect to Telegram: %s", self.name, e, exc_info=True) return False @@ -205,10 +269,17 @@ class TelegramAdapter(BasePlatformAdapter): await self._app.shutdown() except Exception as e: logger.warning("[%s] Error during Telegram disconnect: %s", self.name, e, exc_info=True) + if self._token_lock_identity: + try: + from gateway.status import release_scoped_lock + release_scoped_lock("telegram-bot-token", self._token_lock_identity) + except Exception as e: + logger.warning("[%s] Error releasing Telegram token lock: %s", self.name, e, exc_info=True) - self._running = False + self._mark_disconnected() self._app = None self._bot = None + self._token_lock_identity = None logger.info("[%s] Disconnected from Telegram", self.name) async def send( diff --git a/gateway/run.py b/gateway/run.py index 5ab74972a..8b58d2eb3 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -245,6 +245,8 @@ class GatewayRunner: self.delivery_router = DeliveryRouter(self.config) self._running = False self._shutdown_event = asyncio.Event() + self._exit_cleanly = False + self._exit_reason: Optional[str] = None # Track running agents per session for interrupt support # Key: session_key, Value: AIAgent instance @@ -463,6 +465,41 @@ class GatewayRunner: """Run the sync memory flush in a thread pool so it won't block the event loop.""" loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._flush_memories_for_session, old_session_id) + + @property + def should_exit_cleanly(self) -> bool: + return self._exit_cleanly + + @property + def exit_reason(self) -> Optional[str]: + return self._exit_reason + + async def _handle_adapter_fatal_error(self, adapter: BasePlatformAdapter) -> None: + """React to a non-retryable adapter failure after startup.""" + logger.error( + "Fatal %s adapter error (%s): %s", + adapter.platform.value, + adapter.fatal_error_code or "unknown", + adapter.fatal_error_message or "unknown error", + ) + + existing = self.adapters.get(adapter.platform) + if existing is adapter: + try: + await adapter.disconnect() + finally: + self.adapters.pop(adapter.platform, None) + self.delivery_router.adapters = self.adapters + + if not self.adapters: + self._exit_reason = adapter.fatal_error_message or "All messaging adapters disconnected" + logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.") + await self.stop() + + def _request_clean_exit(self, reason: str) -> None: + self._exit_cleanly = True + self._exit_reason = reason + self._shutdown_event.set() @staticmethod def _load_prefill_messages() -> List[Dict[str, Any]]: @@ -647,6 +684,11 @@ class GatewayRunner: """ logger.info("Starting Hermes Gateway...") logger.info("Session storage: %s", self.config.sessions_dir) + try: + from gateway.status import write_runtime_status + write_runtime_status(gateway_state="starting", exit_reason=None) + except Exception: + pass # Warn if no user allowlists are configured and open access is not opted in _any_allowlist = any( @@ -676,6 +718,7 @@ class GatewayRunner: logger.warning("Process checkpoint recovery: %s", e) connected_count = 0 + startup_nonretryable_errors: list[str] = [] # Initialize and connect each configured platform for platform, platform_config in self.config.platforms.items(): @@ -687,8 +730,9 @@ class GatewayRunner: logger.warning("No adapter available for %s", platform.value) continue - # Set up message handler + # Set up message + fatal error handlers adapter.set_message_handler(self._handle_message) + adapter.set_fatal_error_handler(self._handle_adapter_fatal_error) # Try to connect logger.info("Connecting to %s...", platform.value) @@ -701,10 +745,24 @@ class GatewayRunner: logger.info("✓ %s connected", platform.value) else: logger.warning("✗ %s failed to connect", platform.value) + if adapter.has_fatal_error and not adapter.fatal_error_retryable: + startup_nonretryable_errors.append( + f"{platform.value}: {adapter.fatal_error_message}" + ) except Exception as e: logger.error("✗ %s error: %s", platform.value, e) if connected_count == 0: + if startup_nonretryable_errors: + reason = "; ".join(startup_nonretryable_errors) + logger.error("Gateway hit a non-retryable startup conflict: %s", reason) + try: + from gateway.status import write_runtime_status + write_runtime_status(gateway_state="startup_failed", exit_reason=reason) + except Exception: + pass + self._request_clean_exit(reason) + return True logger.warning("No messaging platforms connected.") logger.info("Gateway will continue running for cron job execution.") @@ -712,6 +770,11 @@ class GatewayRunner: self.delivery_router.adapters = self.adapters self._running = True + try: + from gateway.status import write_runtime_status + write_runtime_status(gateway_state="running", exit_reason=None) + except Exception: + pass # Emit gateway:startup hook hook_count = len(self.hooks.loaded_hooks) @@ -806,8 +869,12 @@ class GatewayRunner: self._shutdown_all_gateway_honcho() self._shutdown_event.set() - from gateway.status import remove_pid_file + from gateway.status import remove_pid_file, write_runtime_status remove_pid_file() + try: + write_runtime_status(gateway_state="stopped", exit_reason=self._exit_reason) + except Exception: + pass logger.info("Gateway stopped") @@ -4340,6 +4407,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = success = await runner.start() if not success: return False + if runner.should_exit_cleanly: + if runner.exit_reason: + logger.error("Gateway exiting cleanly: %s", runner.exit_reason) + return True # Write PID file so CLI can detect gateway is running import atexit diff --git a/gateway/status.py b/gateway/status.py index db72f1fed..3362a7786 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -11,13 +11,17 @@ that will be useful when we add named profiles (multiple agents running concurrently under distinct configurations). """ +import hashlib import json import os import sys +from datetime import datetime, timezone from pathlib import Path -from typing import Optional +from typing import Any, Optional _GATEWAY_KIND = "hermes-gateway" +_RUNTIME_STATUS_FILE = "gateway_state.json" +_LOCKS_DIRNAME = "gateway-locks" def _get_pid_path() -> Path: @@ -26,6 +30,32 @@ def _get_pid_path() -> Path: return home / "gateway.pid" +def _get_runtime_status_path() -> Path: + """Return the persisted runtime health/status file path.""" + return _get_pid_path().with_name(_RUNTIME_STATUS_FILE) + + +def _get_lock_dir() -> Path: + """Return the machine-local directory for token-scoped gateway locks.""" + override = os.getenv("HERMES_GATEWAY_LOCK_DIR") + if override: + return Path(override) + state_home = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local" / "state")) + return state_home / "hermes" / _LOCKS_DIRNAME + + +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _scope_hash(identity: str) -> str: + return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16] + + +def _get_scope_lock_path(scope: str, identity: str) -> Path: + return _get_lock_dir() / f"{scope}-{_scope_hash(identity)}.lock" + + def _get_process_start_time(pid: int) -> Optional[int]: """Return the kernel start time for a process when available.""" stat_path = Path(f"/proc/{pid}/stat") @@ -73,6 +103,38 @@ def _build_pid_record() -> dict: } +def _build_runtime_status_record() -> dict[str, Any]: + payload = _build_pid_record() + payload.update({ + "gateway_state": "starting", + "exit_reason": None, + "platforms": {}, + "updated_at": _utc_now_iso(), + }) + return payload + + +def _read_json_file(path: Path) -> Optional[dict[str, Any]]: + if not path.exists(): + return None + try: + raw = path.read_text().strip() + except OSError: + return None + if not raw: + return None + try: + payload = json.loads(raw) + except json.JSONDecodeError: + return None + return payload if isinstance(payload, dict) else None + + +def _write_json_file(path: Path, payload: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload)) + + def _read_pid_record() -> Optional[dict]: pid_path = _get_pid_path() if not pid_path.exists(): @@ -99,9 +161,49 @@ def _read_pid_record() -> Optional[dict]: def write_pid_file() -> None: """Write the current process PID and metadata to the gateway PID file.""" - pid_path = _get_pid_path() - pid_path.parent.mkdir(parents=True, exist_ok=True) - pid_path.write_text(json.dumps(_build_pid_record())) + _write_json_file(_get_pid_path(), _build_pid_record()) + + +def write_runtime_status( + *, + gateway_state: Optional[str] = None, + exit_reason: Optional[str] = None, + platform: Optional[str] = None, + platform_state: Optional[str] = None, + error_code: Optional[str] = None, + error_message: Optional[str] = None, +) -> None: + """Persist gateway runtime health information for diagnostics/status.""" + path = _get_runtime_status_path() + payload = _read_json_file(path) or _build_runtime_status_record() + payload.setdefault("platforms", {}) + payload.setdefault("kind", _GATEWAY_KIND) + payload.setdefault("pid", os.getpid()) + payload.setdefault("start_time", _get_process_start_time(os.getpid())) + payload["updated_at"] = _utc_now_iso() + + if gateway_state is not None: + payload["gateway_state"] = gateway_state + if exit_reason is not None: + payload["exit_reason"] = exit_reason + + if platform is not None: + platform_payload = payload["platforms"].get(platform, {}) + if platform_state is not None: + platform_payload["state"] = platform_state + if error_code is not None: + platform_payload["error_code"] = error_code + if error_message is not None: + platform_payload["error_message"] = error_message + platform_payload["updated_at"] = _utc_now_iso() + payload["platforms"][platform] = platform_payload + + _write_json_file(path, payload) + + +def read_runtime_status() -> Optional[dict[str, Any]]: + """Read the persisted gateway runtime health/status information.""" + return _read_json_file(_get_runtime_status_path()) def remove_pid_file() -> None: @@ -112,6 +214,87 @@ def remove_pid_file() -> None: pass +def acquire_scoped_lock(scope: str, identity: str, metadata: Optional[dict[str, Any]] = None) -> tuple[bool, Optional[dict[str, Any]]]: + """Acquire a machine-local lock keyed by scope + identity. + + Used to prevent multiple local gateways from using the same external identity + at once (e.g. the same Telegram bot token across different HERMES_HOME dirs). + """ + lock_path = _get_scope_lock_path(scope, identity) + lock_path.parent.mkdir(parents=True, exist_ok=True) + record = { + **_build_pid_record(), + "scope": scope, + "identity_hash": _scope_hash(identity), + "metadata": metadata or {}, + "updated_at": _utc_now_iso(), + } + + existing = _read_json_file(lock_path) + if existing: + try: + existing_pid = int(existing["pid"]) + except (KeyError, TypeError, ValueError): + existing_pid = None + + if existing_pid == os.getpid() and existing.get("start_time") == record.get("start_time"): + _write_json_file(lock_path, record) + return True, existing + + stale = existing_pid is None + if not stale: + try: + os.kill(existing_pid, 0) + except (ProcessLookupError, PermissionError): + stale = True + else: + current_start = _get_process_start_time(existing_pid) + if ( + existing.get("start_time") is not None + and current_start is not None + and current_start != existing.get("start_time") + ): + stale = True + if stale: + try: + lock_path.unlink(missing_ok=True) + except OSError: + pass + else: + return False, existing + + try: + fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + except FileExistsError: + return False, _read_json_file(lock_path) + try: + with os.fdopen(fd, "w", encoding="utf-8") as handle: + json.dump(record, handle) + except Exception: + try: + lock_path.unlink(missing_ok=True) + except OSError: + pass + raise + return True, None + + +def release_scoped_lock(scope: str, identity: str) -> None: + """Release a previously-acquired scope lock when owned by this process.""" + lock_path = _get_scope_lock_path(scope, identity) + existing = _read_json_file(lock_path) + if not existing: + return + if existing.get("pid") != os.getpid(): + return + if existing.get("start_time") != _get_process_start_time(os.getpid()): + return + try: + lock_path.unlink(missing_ok=True) + except OSError: + pass + + def get_running_pid() -> Optional[int]: """Return the PID of a running gateway instance, or ``None``. diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index 4d3ed8845..ea9496052 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -367,6 +367,13 @@ def systemd_status(deep: bool = False): print("✗ Gateway service is stopped") print(" Run: hermes gateway start") + runtime_lines = _runtime_health_lines() + if runtime_lines: + print() + print("Recent gateway health:") + for line in runtime_lines: + print(f" {line}") + if deep: print_systemd_linger_guidance() else: @@ -693,6 +700,35 @@ def _platform_status(platform: dict) -> str: return "not configured" +def _runtime_health_lines() -> list[str]: + """Summarize the latest persisted gateway runtime health state.""" + try: + from gateway.status import read_runtime_status + except Exception: + return [] + + state = read_runtime_status() + if not state: + return [] + + lines: list[str] = [] + gateway_state = state.get("gateway_state") + exit_reason = state.get("exit_reason") + platforms = state.get("platforms", {}) or {} + + for platform, pdata in platforms.items(): + if pdata.get("state") == "fatal": + message = pdata.get("error_message") or "unknown error" + lines.append(f"⚠ {platform}: {message}") + + if gateway_state == "startup_failed" and exit_reason: + lines.append(f"⚠ Last startup issue: {exit_reason}") + elif gateway_state == "stopped" and exit_reason: + lines.append(f"⚠ Last shutdown reason: {exit_reason}") + + return lines + + def _setup_standard_platform(platform: dict): """Interactive setup for Telegram, Discord, or Slack.""" emoji = platform["emoji"] @@ -1186,11 +1222,23 @@ def gateway_command(args): if pids: print(f"✓ Gateway is running (PID: {', '.join(map(str, pids))})") print(" (Running manually, not as a system service)") + runtime_lines = _runtime_health_lines() + if runtime_lines: + print() + print("Recent gateway health:") + for line in runtime_lines: + print(f" {line}") print() print("To install as a service:") print(" hermes gateway install") else: print("✗ Gateway is not running") + runtime_lines = _runtime_health_lines() + if runtime_lines: + print() + print("Recent gateway health:") + for line in runtime_lines: + print(f" {line}") print() print("To start:") print(" hermes gateway # Run in foreground") diff --git a/tests/gateway/test_runner_fatal_adapter.py b/tests/gateway/test_runner_fatal_adapter.py new file mode 100644 index 000000000..aa414d72f --- /dev/null +++ b/tests/gateway/test_runner_fatal_adapter.py @@ -0,0 +1,46 @@ +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.platforms.base import BasePlatformAdapter +from gateway.run import GatewayRunner + + +class _FatalAdapter(BasePlatformAdapter): + def __init__(self): + super().__init__(PlatformConfig(enabled=True, token="token"), Platform.TELEGRAM) + + async def connect(self) -> bool: + self._set_fatal_error( + "telegram_token_lock", + "Another local Hermes gateway is already using this Telegram bot token.", + retryable=False, + ) + return False + + async def disconnect(self) -> None: + self._mark_disconnected() + + async def send(self, chat_id, content, reply_to=None, metadata=None): + raise NotImplementedError + + async def get_chat_info(self, chat_id): + return {"id": chat_id} + + +@pytest.mark.asyncio +async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monkeypatch, tmp_path): + config = GatewayConfig( + platforms={ + Platform.TELEGRAM: PlatformConfig(enabled=True, token="token") + }, + sessions_dir=tmp_path / "sessions", + ) + runner = GatewayRunner(config) + + monkeypatch.setattr(runner, "_create_adapter", lambda platform, platform_config: _FatalAdapter()) + + ok = await runner.start() + + assert ok is True + assert runner.should_exit_cleanly is True + assert "already using this Telegram bot token" in runner.exit_reason diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index 025708a53..fdf1b57c5 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -25,3 +25,77 @@ class TestGatewayPidState: assert status.get_running_pid() is None assert not pid_path.exists() + + +class TestGatewayRuntimeStatus: + def test_write_runtime_status_records_platform_failure(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + status.write_runtime_status( + gateway_state="startup_failed", + exit_reason="telegram conflict", + platform="telegram", + platform_state="fatal", + error_code="telegram_polling_conflict", + error_message="another poller is active", + ) + + payload = status.read_runtime_status() + assert payload["gateway_state"] == "startup_failed" + assert payload["exit_reason"] == "telegram conflict" + assert payload["platforms"]["telegram"]["state"] == "fatal" + assert payload["platforms"]["telegram"]["error_code"] == "telegram_polling_conflict" + assert payload["platforms"]["telegram"]["error_message"] == "another poller is active" + + +class TestScopedLocks: + def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks")) + lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock" + lock_path.parent.mkdir(parents=True, exist_ok=True) + lock_path.write_text(json.dumps({ + "pid": 99999, + "start_time": 123, + "kind": "hermes-gateway", + })) + + monkeypatch.setattr(status.os, "kill", lambda pid, sig: None) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123) + + acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"}) + + assert acquired is False + assert existing["pid"] == 99999 + + def test_acquire_scoped_lock_replaces_stale_record(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks")) + lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock" + lock_path.parent.mkdir(parents=True, exist_ok=True) + lock_path.write_text(json.dumps({ + "pid": 99999, + "start_time": 123, + "kind": "hermes-gateway", + })) + + def fake_kill(pid, sig): + raise ProcessLookupError + + monkeypatch.setattr(status.os, "kill", fake_kill) + + acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"}) + + assert acquired is True + payload = json.loads(lock_path.read_text()) + assert payload["pid"] == os.getpid() + assert payload["metadata"]["platform"] == "telegram" + + def test_release_scoped_lock_only_removes_current_owner(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks")) + + acquired, _ = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"}) + assert acquired is True + lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock" + assert lock_path.exists() + + status.release_scoped_lock("telegram-bot-token", "secret") + assert not lock_path.exists() diff --git a/tests/gateway/test_telegram_conflict.py b/tests/gateway/test_telegram_conflict.py new file mode 100644 index 000000000..f2e212812 --- /dev/null +++ b/tests/gateway/test_telegram_conflict.py @@ -0,0 +1,100 @@ +import asyncio +import sys +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.config import PlatformConfig + + +def _ensure_telegram_mock(): + if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"): + return + + telegram_mod = MagicMock() + telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None) + telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2" + telegram_mod.constants.ChatType.GROUP = "group" + telegram_mod.constants.ChatType.SUPERGROUP = "supergroup" + telegram_mod.constants.ChatType.CHANNEL = "channel" + telegram_mod.constants.ChatType.PRIVATE = "private" + + for name in ("telegram", "telegram.ext", "telegram.constants"): + sys.modules.setdefault(name, telegram_mod) + + +_ensure_telegram_mock() + +from gateway.platforms.telegram import TelegramAdapter # noqa: E402 + + +@pytest.mark.asyncio +async def test_connect_rejects_same_host_token_lock(monkeypatch): + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token")) + + monkeypatch.setattr( + "gateway.status.acquire_scoped_lock", + lambda scope, identity, metadata=None: (False, {"pid": 4242}), + ) + + ok = await adapter.connect() + + assert ok is False + assert adapter.fatal_error_code == "telegram_token_lock" + assert adapter.has_fatal_error is True + assert "already using this Telegram bot token" in adapter.fatal_error_message + + +@pytest.mark.asyncio +async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch): + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token")) + fatal_handler = AsyncMock() + adapter.set_fatal_error_handler(fatal_handler) + + monkeypatch.setattr( + "gateway.status.acquire_scoped_lock", + lambda scope, identity, metadata=None: (True, None), + ) + monkeypatch.setattr( + "gateway.status.release_scoped_lock", + lambda scope, identity: None, + ) + + captured = {} + + async def fake_start_polling(**kwargs): + captured["error_callback"] = kwargs["error_callback"] + + updater = SimpleNamespace( + start_polling=AsyncMock(side_effect=fake_start_polling), + stop=AsyncMock(), + ) + bot = SimpleNamespace(set_my_commands=AsyncMock()) + app = SimpleNamespace( + bot=bot, + updater=updater, + add_handler=MagicMock(), + initialize=AsyncMock(), + start=AsyncMock(), + ) + builder = MagicMock() + builder.token.return_value = builder + builder.build.return_value = app + monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder))) + + ok = await adapter.connect() + + assert ok is True + assert callable(captured["error_callback"]) + + conflict = type("Conflict", (Exception,), {}) + captured["error_callback"](conflict("Conflict: terminated by other getUpdates request; make sure that only one bot instance is running")) + + await asyncio.sleep(0) + await asyncio.sleep(0) + + assert adapter.fatal_error_code == "telegram_polling_conflict" + assert adapter.has_fatal_error is True + updater.stop.assert_awaited() + fatal_handler.assert_awaited_once() diff --git a/tests/hermes_cli/test_gateway_runtime_health.py b/tests/hermes_cli/test_gateway_runtime_health.py new file mode 100644 index 000000000..15c0705cf --- /dev/null +++ b/tests/hermes_cli/test_gateway_runtime_health.py @@ -0,0 +1,22 @@ +from hermes_cli.gateway import _runtime_health_lines + + +def test_runtime_health_lines_include_fatal_platform_and_startup_reason(monkeypatch): + monkeypatch.setattr( + "gateway.status.read_runtime_status", + lambda: { + "gateway_state": "startup_failed", + "exit_reason": "telegram conflict", + "platforms": { + "telegram": { + "state": "fatal", + "error_message": "another poller is active", + } + }, + }, + ) + + lines = _runtime_health_lines() + + assert "⚠ telegram: another poller is active" in lines + assert "⚠ Last startup issue: telegram conflict" in lines