Merge pull request #1339 from NousResearch/hermes/hermes-048e6599
Merging Telegram gateway conflict hardening: same-host token lock, clean shutdown on getUpdates conflict, persisted runtime health, and clearer gateway status diagnostics.
This commit is contained in:
@@ -346,6 +346,10 @@ class BasePlatformAdapter(ABC):
|
||||
self.platform = platform
|
||||
self._message_handler: Optional[MessageHandler] = None
|
||||
self._running = False
|
||||
self._fatal_error_code: Optional[str] = None
|
||||
self._fatal_error_message: Optional[str] = None
|
||||
self._fatal_error_retryable = True
|
||||
self._fatal_error_handler: Optional[Callable[["BasePlatformAdapter"], Awaitable[None] | None]] = None
|
||||
|
||||
# Track active message handlers per session for interrupt support
|
||||
# Key: session_key (e.g., chat_id), Value: (event, asyncio.Event for interrupt)
|
||||
@@ -353,6 +357,70 @@ class BasePlatformAdapter(ABC):
|
||||
self._pending_messages: Dict[str, MessageEvent] = {}
|
||||
# Chats where auto-TTS on voice input is disabled (set by /voice off)
|
||||
self._auto_tts_disabled_chats: set = set()
|
||||
|
||||
@property
|
||||
def has_fatal_error(self) -> bool:
|
||||
return self._fatal_error_message is not None
|
||||
|
||||
@property
|
||||
def fatal_error_message(self) -> Optional[str]:
|
||||
return self._fatal_error_message
|
||||
|
||||
@property
|
||||
def fatal_error_code(self) -> Optional[str]:
|
||||
return self._fatal_error_code
|
||||
|
||||
@property
|
||||
def fatal_error_retryable(self) -> bool:
|
||||
return self._fatal_error_retryable
|
||||
|
||||
def set_fatal_error_handler(self, handler: Callable[["BasePlatformAdapter"], Awaitable[None] | None]) -> None:
|
||||
self._fatal_error_handler = handler
|
||||
|
||||
def _mark_connected(self) -> None:
|
||||
self._running = True
|
||||
self._fatal_error_code = None
|
||||
self._fatal_error_message = None
|
||||
self._fatal_error_retryable = True
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(platform=self.platform.value, platform_state="connected", error_code=None, error_message=None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _mark_disconnected(self) -> None:
|
||||
self._running = False
|
||||
if self.has_fatal_error:
|
||||
return
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(platform=self.platform.value, platform_state="disconnected", error_code=None, error_message=None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _set_fatal_error(self, code: str, message: str, *, retryable: bool) -> None:
|
||||
self._running = False
|
||||
self._fatal_error_code = code
|
||||
self._fatal_error_message = message
|
||||
self._fatal_error_retryable = retryable
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(
|
||||
platform=self.platform.value,
|
||||
platform_state="fatal",
|
||||
error_code=code,
|
||||
error_message=message,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _notify_fatal_error(self) -> None:
|
||||
handler = self._fatal_error_handler
|
||||
if not handler:
|
||||
return
|
||||
result = handler(self)
|
||||
if asyncio.iscoroutine(result):
|
||||
await result
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
||||
@@ -110,7 +110,35 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
super().__init__(config, Platform.TELEGRAM)
|
||||
self._app: Optional[Application] = None
|
||||
self._bot: Optional[Bot] = None
|
||||
|
||||
self._token_lock_identity: Optional[str] = None
|
||||
self._polling_error_task: Optional[asyncio.Task] = None
|
||||
|
||||
@staticmethod
|
||||
def _looks_like_polling_conflict(error: Exception) -> bool:
|
||||
text = str(error).lower()
|
||||
return (
|
||||
error.__class__.__name__.lower() == "conflict"
|
||||
or "terminated by other getupdates request" in text
|
||||
or "another bot instance is running" in text
|
||||
)
|
||||
|
||||
async def _handle_polling_conflict(self, error: Exception) -> None:
|
||||
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
|
||||
return
|
||||
message = (
|
||||
"Another Telegram bot poller is already using this token. "
|
||||
"Hermes stopped Telegram polling to avoid endless retry spam. "
|
||||
"Make sure only one gateway instance is running for this bot token."
|
||||
)
|
||||
logger.error("[%s] %s Original error: %s", self.name, message, error)
|
||||
self._set_fatal_error("telegram_polling_conflict", message, retryable=False)
|
||||
try:
|
||||
if self._app and self._app.updater:
|
||||
await self._app.updater.stop()
|
||||
except Exception as stop_error:
|
||||
logger.warning("[%s] Failed stopping Telegram polling after conflict: %s", self.name, stop_error, exc_info=True)
|
||||
await self._notify_fatal_error()
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Telegram and start polling for updates."""
|
||||
if not TELEGRAM_AVAILABLE:
|
||||
@@ -125,6 +153,25 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
return False
|
||||
|
||||
try:
|
||||
from gateway.status import acquire_scoped_lock
|
||||
|
||||
self._token_lock_identity = self.config.token
|
||||
acquired, existing = acquire_scoped_lock(
|
||||
"telegram-bot-token",
|
||||
self._token_lock_identity,
|
||||
metadata={"platform": self.platform.value},
|
||||
)
|
||||
if not acquired:
|
||||
owner_pid = existing.get("pid") if isinstance(existing, dict) else None
|
||||
message = (
|
||||
"Another local Hermes gateway is already using this Telegram bot token"
|
||||
+ (f" (PID {owner_pid})." if owner_pid else ".")
|
||||
+ " Stop the other gateway before starting a second Telegram poller."
|
||||
)
|
||||
logger.error("[%s] %s", self.name, message)
|
||||
self._set_fatal_error("telegram_token_lock", message, retryable=False)
|
||||
return False
|
||||
|
||||
# Build the application
|
||||
self._app = Application.builder().token(self.config.token).build()
|
||||
self._bot = self._app.bot
|
||||
@@ -150,9 +197,20 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
# Start polling in background
|
||||
await self._app.initialize()
|
||||
await self._app.start()
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def _polling_error_callback(error: Exception) -> None:
|
||||
if not self._looks_like_polling_conflict(error):
|
||||
logger.error("[%s] Telegram polling error: %s", self.name, error, exc_info=True)
|
||||
return
|
||||
if self._polling_error_task and not self._polling_error_task.done():
|
||||
return
|
||||
self._polling_error_task = loop.create_task(self._handle_polling_conflict(error))
|
||||
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
drop_pending_updates=True,
|
||||
error_callback=_polling_error_callback,
|
||||
)
|
||||
|
||||
# Register bot commands so Telegram shows a hint menu when users type /
|
||||
@@ -188,11 +246,17 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
self._running = True
|
||||
self._mark_connected()
|
||||
logger.info("[%s] Connected and polling for Telegram updates", self.name)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
if self._token_lock_identity:
|
||||
try:
|
||||
from gateway.status import release_scoped_lock
|
||||
release_scoped_lock("telegram-bot-token", self._token_lock_identity)
|
||||
except Exception:
|
||||
pass
|
||||
logger.error("[%s] Failed to connect to Telegram: %s", self.name, e, exc_info=True)
|
||||
return False
|
||||
|
||||
@@ -205,10 +269,17 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
await self._app.shutdown()
|
||||
except Exception as e:
|
||||
logger.warning("[%s] Error during Telegram disconnect: %s", self.name, e, exc_info=True)
|
||||
if self._token_lock_identity:
|
||||
try:
|
||||
from gateway.status import release_scoped_lock
|
||||
release_scoped_lock("telegram-bot-token", self._token_lock_identity)
|
||||
except Exception as e:
|
||||
logger.warning("[%s] Error releasing Telegram token lock: %s", self.name, e, exc_info=True)
|
||||
|
||||
self._running = False
|
||||
self._mark_disconnected()
|
||||
self._app = None
|
||||
self._bot = None
|
||||
self._token_lock_identity = None
|
||||
logger.info("[%s] Disconnected from Telegram", self.name)
|
||||
|
||||
async def send(
|
||||
|
||||
@@ -245,6 +245,8 @@ class GatewayRunner:
|
||||
self.delivery_router = DeliveryRouter(self.config)
|
||||
self._running = False
|
||||
self._shutdown_event = asyncio.Event()
|
||||
self._exit_cleanly = False
|
||||
self._exit_reason: Optional[str] = None
|
||||
|
||||
# Track running agents per session for interrupt support
|
||||
# Key: session_key, Value: AIAgent instance
|
||||
@@ -463,6 +465,41 @@ class GatewayRunner:
|
||||
"""Run the sync memory flush in a thread pool so it won't block the event loop."""
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, self._flush_memories_for_session, old_session_id)
|
||||
|
||||
@property
|
||||
def should_exit_cleanly(self) -> bool:
|
||||
return self._exit_cleanly
|
||||
|
||||
@property
|
||||
def exit_reason(self) -> Optional[str]:
|
||||
return self._exit_reason
|
||||
|
||||
async def _handle_adapter_fatal_error(self, adapter: BasePlatformAdapter) -> None:
|
||||
"""React to a non-retryable adapter failure after startup."""
|
||||
logger.error(
|
||||
"Fatal %s adapter error (%s): %s",
|
||||
adapter.platform.value,
|
||||
adapter.fatal_error_code or "unknown",
|
||||
adapter.fatal_error_message or "unknown error",
|
||||
)
|
||||
|
||||
existing = self.adapters.get(adapter.platform)
|
||||
if existing is adapter:
|
||||
try:
|
||||
await adapter.disconnect()
|
||||
finally:
|
||||
self.adapters.pop(adapter.platform, None)
|
||||
self.delivery_router.adapters = self.adapters
|
||||
|
||||
if not self.adapters:
|
||||
self._exit_reason = adapter.fatal_error_message or "All messaging adapters disconnected"
|
||||
logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.")
|
||||
await self.stop()
|
||||
|
||||
def _request_clean_exit(self, reason: str) -> None:
|
||||
self._exit_cleanly = True
|
||||
self._exit_reason = reason
|
||||
self._shutdown_event.set()
|
||||
|
||||
@staticmethod
|
||||
def _load_prefill_messages() -> List[Dict[str, Any]]:
|
||||
@@ -647,6 +684,11 @@ class GatewayRunner:
|
||||
"""
|
||||
logger.info("Starting Hermes Gateway...")
|
||||
logger.info("Session storage: %s", self.config.sessions_dir)
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(gateway_state="starting", exit_reason=None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Warn if no user allowlists are configured and open access is not opted in
|
||||
_any_allowlist = any(
|
||||
@@ -676,6 +718,7 @@ class GatewayRunner:
|
||||
logger.warning("Process checkpoint recovery: %s", e)
|
||||
|
||||
connected_count = 0
|
||||
startup_nonretryable_errors: list[str] = []
|
||||
|
||||
# Initialize and connect each configured platform
|
||||
for platform, platform_config in self.config.platforms.items():
|
||||
@@ -687,8 +730,9 @@ class GatewayRunner:
|
||||
logger.warning("No adapter available for %s", platform.value)
|
||||
continue
|
||||
|
||||
# Set up message handler
|
||||
# Set up message + fatal error handlers
|
||||
adapter.set_message_handler(self._handle_message)
|
||||
adapter.set_fatal_error_handler(self._handle_adapter_fatal_error)
|
||||
|
||||
# Try to connect
|
||||
logger.info("Connecting to %s...", platform.value)
|
||||
@@ -701,10 +745,24 @@ class GatewayRunner:
|
||||
logger.info("✓ %s connected", platform.value)
|
||||
else:
|
||||
logger.warning("✗ %s failed to connect", platform.value)
|
||||
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
|
||||
startup_nonretryable_errors.append(
|
||||
f"{platform.value}: {adapter.fatal_error_message}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("✗ %s error: %s", platform.value, e)
|
||||
|
||||
if connected_count == 0:
|
||||
if startup_nonretryable_errors:
|
||||
reason = "; ".join(startup_nonretryable_errors)
|
||||
logger.error("Gateway hit a non-retryable startup conflict: %s", reason)
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(gateway_state="startup_failed", exit_reason=reason)
|
||||
except Exception:
|
||||
pass
|
||||
self._request_clean_exit(reason)
|
||||
return True
|
||||
logger.warning("No messaging platforms connected.")
|
||||
logger.info("Gateway will continue running for cron job execution.")
|
||||
|
||||
@@ -712,6 +770,11 @@ class GatewayRunner:
|
||||
self.delivery_router.adapters = self.adapters
|
||||
|
||||
self._running = True
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(gateway_state="running", exit_reason=None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Emit gateway:startup hook
|
||||
hook_count = len(self.hooks.loaded_hooks)
|
||||
@@ -806,8 +869,12 @@ class GatewayRunner:
|
||||
self._shutdown_all_gateway_honcho()
|
||||
self._shutdown_event.set()
|
||||
|
||||
from gateway.status import remove_pid_file
|
||||
from gateway.status import remove_pid_file, write_runtime_status
|
||||
remove_pid_file()
|
||||
try:
|
||||
write_runtime_status(gateway_state="stopped", exit_reason=self._exit_reason)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
logger.info("Gateway stopped")
|
||||
|
||||
@@ -4340,6 +4407,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
success = await runner.start()
|
||||
if not success:
|
||||
return False
|
||||
if runner.should_exit_cleanly:
|
||||
if runner.exit_reason:
|
||||
logger.error("Gateway exiting cleanly: %s", runner.exit_reason)
|
||||
return True
|
||||
|
||||
# Write PID file so CLI can detect gateway is running
|
||||
import atexit
|
||||
|
||||
@@ -11,13 +11,17 @@ that will be useful when we add named profiles (multiple agents running
|
||||
concurrently under distinct configurations).
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
_GATEWAY_KIND = "hermes-gateway"
|
||||
_RUNTIME_STATUS_FILE = "gateway_state.json"
|
||||
_LOCKS_DIRNAME = "gateway-locks"
|
||||
|
||||
|
||||
def _get_pid_path() -> Path:
|
||||
@@ -26,6 +30,32 @@ def _get_pid_path() -> Path:
|
||||
return home / "gateway.pid"
|
||||
|
||||
|
||||
def _get_runtime_status_path() -> Path:
|
||||
"""Return the persisted runtime health/status file path."""
|
||||
return _get_pid_path().with_name(_RUNTIME_STATUS_FILE)
|
||||
|
||||
|
||||
def _get_lock_dir() -> Path:
|
||||
"""Return the machine-local directory for token-scoped gateway locks."""
|
||||
override = os.getenv("HERMES_GATEWAY_LOCK_DIR")
|
||||
if override:
|
||||
return Path(override)
|
||||
state_home = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local" / "state"))
|
||||
return state_home / "hermes" / _LOCKS_DIRNAME
|
||||
|
||||
|
||||
def _utc_now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _scope_hash(identity: str) -> str:
|
||||
return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16]
|
||||
|
||||
|
||||
def _get_scope_lock_path(scope: str, identity: str) -> Path:
|
||||
return _get_lock_dir() / f"{scope}-{_scope_hash(identity)}.lock"
|
||||
|
||||
|
||||
def _get_process_start_time(pid: int) -> Optional[int]:
|
||||
"""Return the kernel start time for a process when available."""
|
||||
stat_path = Path(f"/proc/{pid}/stat")
|
||||
@@ -73,6 +103,38 @@ def _build_pid_record() -> dict:
|
||||
}
|
||||
|
||||
|
||||
def _build_runtime_status_record() -> dict[str, Any]:
|
||||
payload = _build_pid_record()
|
||||
payload.update({
|
||||
"gateway_state": "starting",
|
||||
"exit_reason": None,
|
||||
"platforms": {},
|
||||
"updated_at": _utc_now_iso(),
|
||||
})
|
||||
return payload
|
||||
|
||||
|
||||
def _read_json_file(path: Path) -> Optional[dict[str, Any]]:
|
||||
if not path.exists():
|
||||
return None
|
||||
try:
|
||||
raw = path.read_text().strip()
|
||||
except OSError:
|
||||
return None
|
||||
if not raw:
|
||||
return None
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
return payload if isinstance(payload, dict) else None
|
||||
|
||||
|
||||
def _write_json_file(path: Path, payload: dict[str, Any]) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(payload))
|
||||
|
||||
|
||||
def _read_pid_record() -> Optional[dict]:
|
||||
pid_path = _get_pid_path()
|
||||
if not pid_path.exists():
|
||||
@@ -99,9 +161,49 @@ def _read_pid_record() -> Optional[dict]:
|
||||
|
||||
def write_pid_file() -> None:
|
||||
"""Write the current process PID and metadata to the gateway PID file."""
|
||||
pid_path = _get_pid_path()
|
||||
pid_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
pid_path.write_text(json.dumps(_build_pid_record()))
|
||||
_write_json_file(_get_pid_path(), _build_pid_record())
|
||||
|
||||
|
||||
def write_runtime_status(
|
||||
*,
|
||||
gateway_state: Optional[str] = None,
|
||||
exit_reason: Optional[str] = None,
|
||||
platform: Optional[str] = None,
|
||||
platform_state: Optional[str] = None,
|
||||
error_code: Optional[str] = None,
|
||||
error_message: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Persist gateway runtime health information for diagnostics/status."""
|
||||
path = _get_runtime_status_path()
|
||||
payload = _read_json_file(path) or _build_runtime_status_record()
|
||||
payload.setdefault("platforms", {})
|
||||
payload.setdefault("kind", _GATEWAY_KIND)
|
||||
payload.setdefault("pid", os.getpid())
|
||||
payload.setdefault("start_time", _get_process_start_time(os.getpid()))
|
||||
payload["updated_at"] = _utc_now_iso()
|
||||
|
||||
if gateway_state is not None:
|
||||
payload["gateway_state"] = gateway_state
|
||||
if exit_reason is not None:
|
||||
payload["exit_reason"] = exit_reason
|
||||
|
||||
if platform is not None:
|
||||
platform_payload = payload["platforms"].get(platform, {})
|
||||
if platform_state is not None:
|
||||
platform_payload["state"] = platform_state
|
||||
if error_code is not None:
|
||||
platform_payload["error_code"] = error_code
|
||||
if error_message is not None:
|
||||
platform_payload["error_message"] = error_message
|
||||
platform_payload["updated_at"] = _utc_now_iso()
|
||||
payload["platforms"][platform] = platform_payload
|
||||
|
||||
_write_json_file(path, payload)
|
||||
|
||||
|
||||
def read_runtime_status() -> Optional[dict[str, Any]]:
|
||||
"""Read the persisted gateway runtime health/status information."""
|
||||
return _read_json_file(_get_runtime_status_path())
|
||||
|
||||
|
||||
def remove_pid_file() -> None:
|
||||
@@ -112,6 +214,87 @@ def remove_pid_file() -> None:
|
||||
pass
|
||||
|
||||
|
||||
def acquire_scoped_lock(scope: str, identity: str, metadata: Optional[dict[str, Any]] = None) -> tuple[bool, Optional[dict[str, Any]]]:
|
||||
"""Acquire a machine-local lock keyed by scope + identity.
|
||||
|
||||
Used to prevent multiple local gateways from using the same external identity
|
||||
at once (e.g. the same Telegram bot token across different HERMES_HOME dirs).
|
||||
"""
|
||||
lock_path = _get_scope_lock_path(scope, identity)
|
||||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
record = {
|
||||
**_build_pid_record(),
|
||||
"scope": scope,
|
||||
"identity_hash": _scope_hash(identity),
|
||||
"metadata": metadata or {},
|
||||
"updated_at": _utc_now_iso(),
|
||||
}
|
||||
|
||||
existing = _read_json_file(lock_path)
|
||||
if existing:
|
||||
try:
|
||||
existing_pid = int(existing["pid"])
|
||||
except (KeyError, TypeError, ValueError):
|
||||
existing_pid = None
|
||||
|
||||
if existing_pid == os.getpid() and existing.get("start_time") == record.get("start_time"):
|
||||
_write_json_file(lock_path, record)
|
||||
return True, existing
|
||||
|
||||
stale = existing_pid is None
|
||||
if not stale:
|
||||
try:
|
||||
os.kill(existing_pid, 0)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
stale = True
|
||||
else:
|
||||
current_start = _get_process_start_time(existing_pid)
|
||||
if (
|
||||
existing.get("start_time") is not None
|
||||
and current_start is not None
|
||||
and current_start != existing.get("start_time")
|
||||
):
|
||||
stale = True
|
||||
if stale:
|
||||
try:
|
||||
lock_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
return False, existing
|
||||
|
||||
try:
|
||||
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
||||
except FileExistsError:
|
||||
return False, _read_json_file(lock_path)
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as handle:
|
||||
json.dump(record, handle)
|
||||
except Exception:
|
||||
try:
|
||||
lock_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
return True, None
|
||||
|
||||
|
||||
def release_scoped_lock(scope: str, identity: str) -> None:
|
||||
"""Release a previously-acquired scope lock when owned by this process."""
|
||||
lock_path = _get_scope_lock_path(scope, identity)
|
||||
existing = _read_json_file(lock_path)
|
||||
if not existing:
|
||||
return
|
||||
if existing.get("pid") != os.getpid():
|
||||
return
|
||||
if existing.get("start_time") != _get_process_start_time(os.getpid()):
|
||||
return
|
||||
try:
|
||||
lock_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def get_running_pid() -> Optional[int]:
|
||||
"""Return the PID of a running gateway instance, or ``None``.
|
||||
|
||||
|
||||
@@ -425,6 +425,13 @@ def systemd_status(deep: bool = False):
|
||||
print("✗ Gateway service is stopped")
|
||||
print(" Run: hermes gateway start")
|
||||
|
||||
runtime_lines = _runtime_health_lines()
|
||||
if runtime_lines:
|
||||
print()
|
||||
print("Recent gateway health:")
|
||||
for line in runtime_lines:
|
||||
print(f" {line}")
|
||||
|
||||
if deep:
|
||||
print_systemd_linger_guidance()
|
||||
else:
|
||||
@@ -751,6 +758,35 @@ def _platform_status(platform: dict) -> str:
|
||||
return "not configured"
|
||||
|
||||
|
||||
def _runtime_health_lines() -> list[str]:
|
||||
"""Summarize the latest persisted gateway runtime health state."""
|
||||
try:
|
||||
from gateway.status import read_runtime_status
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
state = read_runtime_status()
|
||||
if not state:
|
||||
return []
|
||||
|
||||
lines: list[str] = []
|
||||
gateway_state = state.get("gateway_state")
|
||||
exit_reason = state.get("exit_reason")
|
||||
platforms = state.get("platforms", {}) or {}
|
||||
|
||||
for platform, pdata in platforms.items():
|
||||
if pdata.get("state") == "fatal":
|
||||
message = pdata.get("error_message") or "unknown error"
|
||||
lines.append(f"⚠ {platform}: {message}")
|
||||
|
||||
if gateway_state == "startup_failed" and exit_reason:
|
||||
lines.append(f"⚠ Last startup issue: {exit_reason}")
|
||||
elif gateway_state == "stopped" and exit_reason:
|
||||
lines.append(f"⚠ Last shutdown reason: {exit_reason}")
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
def _setup_standard_platform(platform: dict):
|
||||
"""Interactive setup for Telegram, Discord, or Slack."""
|
||||
emoji = platform["emoji"]
|
||||
@@ -1244,11 +1280,23 @@ def gateway_command(args):
|
||||
if pids:
|
||||
print(f"✓ Gateway is running (PID: {', '.join(map(str, pids))})")
|
||||
print(" (Running manually, not as a system service)")
|
||||
runtime_lines = _runtime_health_lines()
|
||||
if runtime_lines:
|
||||
print()
|
||||
print("Recent gateway health:")
|
||||
for line in runtime_lines:
|
||||
print(f" {line}")
|
||||
print()
|
||||
print("To install as a service:")
|
||||
print(" hermes gateway install")
|
||||
else:
|
||||
print("✗ Gateway is not running")
|
||||
runtime_lines = _runtime_health_lines()
|
||||
if runtime_lines:
|
||||
print()
|
||||
print("Recent gateway health:")
|
||||
for line in runtime_lines:
|
||||
print(f" {line}")
|
||||
print()
|
||||
print("To start:")
|
||||
print(" hermes gateway # Run in foreground")
|
||||
|
||||
46
tests/gateway/test_runner_fatal_adapter.py
Normal file
46
tests/gateway/test_runner_fatal_adapter.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||
from gateway.platforms.base import BasePlatformAdapter
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
|
||||
class _FatalAdapter(BasePlatformAdapter):
|
||||
def __init__(self):
|
||||
super().__init__(PlatformConfig(enabled=True, token="token"), Platform.TELEGRAM)
|
||||
|
||||
async def connect(self) -> bool:
|
||||
self._set_fatal_error(
|
||||
"telegram_token_lock",
|
||||
"Another local Hermes gateway is already using this Telegram bot token.",
|
||||
retryable=False,
|
||||
)
|
||||
return False
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
self._mark_disconnected()
|
||||
|
||||
async def send(self, chat_id, content, reply_to=None, metadata=None):
|
||||
raise NotImplementedError
|
||||
|
||||
async def get_chat_info(self, chat_id):
|
||||
return {"id": chat_id}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monkeypatch, tmp_path):
|
||||
config = GatewayConfig(
|
||||
platforms={
|
||||
Platform.TELEGRAM: PlatformConfig(enabled=True, token="token")
|
||||
},
|
||||
sessions_dir=tmp_path / "sessions",
|
||||
)
|
||||
runner = GatewayRunner(config)
|
||||
|
||||
monkeypatch.setattr(runner, "_create_adapter", lambda platform, platform_config: _FatalAdapter())
|
||||
|
||||
ok = await runner.start()
|
||||
|
||||
assert ok is True
|
||||
assert runner.should_exit_cleanly is True
|
||||
assert "already using this Telegram bot token" in runner.exit_reason
|
||||
@@ -25,3 +25,77 @@ class TestGatewayPidState:
|
||||
|
||||
assert status.get_running_pid() is None
|
||||
assert not pid_path.exists()
|
||||
|
||||
|
||||
class TestGatewayRuntimeStatus:
|
||||
def test_write_runtime_status_records_platform_failure(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
|
||||
status.write_runtime_status(
|
||||
gateway_state="startup_failed",
|
||||
exit_reason="telegram conflict",
|
||||
platform="telegram",
|
||||
platform_state="fatal",
|
||||
error_code="telegram_polling_conflict",
|
||||
error_message="another poller is active",
|
||||
)
|
||||
|
||||
payload = status.read_runtime_status()
|
||||
assert payload["gateway_state"] == "startup_failed"
|
||||
assert payload["exit_reason"] == "telegram conflict"
|
||||
assert payload["platforms"]["telegram"]["state"] == "fatal"
|
||||
assert payload["platforms"]["telegram"]["error_code"] == "telegram_polling_conflict"
|
||||
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
|
||||
|
||||
|
||||
class TestScopedLocks:
|
||||
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
|
||||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
lock_path.write_text(json.dumps({
|
||||
"pid": 99999,
|
||||
"start_time": 123,
|
||||
"kind": "hermes-gateway",
|
||||
}))
|
||||
|
||||
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
|
||||
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
|
||||
|
||||
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
|
||||
|
||||
assert acquired is False
|
||||
assert existing["pid"] == 99999
|
||||
|
||||
def test_acquire_scoped_lock_replaces_stale_record(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
|
||||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
lock_path.write_text(json.dumps({
|
||||
"pid": 99999,
|
||||
"start_time": 123,
|
||||
"kind": "hermes-gateway",
|
||||
}))
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
raise ProcessLookupError
|
||||
|
||||
monkeypatch.setattr(status.os, "kill", fake_kill)
|
||||
|
||||
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
|
||||
|
||||
assert acquired is True
|
||||
payload = json.loads(lock_path.read_text())
|
||||
assert payload["pid"] == os.getpid()
|
||||
assert payload["metadata"]["platform"] == "telegram"
|
||||
|
||||
def test_release_scoped_lock_only_removes_current_owner(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||
|
||||
acquired, _ = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
|
||||
assert acquired is True
|
||||
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
|
||||
assert lock_path.exists()
|
||||
|
||||
status.release_scoped_lock("telegram-bot-token", "secret")
|
||||
assert not lock_path.exists()
|
||||
|
||||
100
tests/gateway/test_telegram_conflict.py
Normal file
100
tests/gateway/test_telegram_conflict.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import asyncio
|
||||
import sys
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
|
||||
|
||||
def _ensure_telegram_mock():
|
||||
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
||||
return
|
||||
|
||||
telegram_mod = MagicMock()
|
||||
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
|
||||
telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
|
||||
telegram_mod.constants.ChatType.GROUP = "group"
|
||||
telegram_mod.constants.ChatType.SUPERGROUP = "supergroup"
|
||||
telegram_mod.constants.ChatType.CHANNEL = "channel"
|
||||
telegram_mod.constants.ChatType.PRIVATE = "private"
|
||||
|
||||
for name in ("telegram", "telegram.ext", "telegram.constants"):
|
||||
sys.modules.setdefault(name, telegram_mod)
|
||||
|
||||
|
||||
_ensure_telegram_mock()
|
||||
|
||||
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_rejects_same_host_token_lock(monkeypatch):
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))
|
||||
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.acquire_scoped_lock",
|
||||
lambda scope, identity, metadata=None: (False, {"pid": 4242}),
|
||||
)
|
||||
|
||||
ok = await adapter.connect()
|
||||
|
||||
assert ok is False
|
||||
assert adapter.fatal_error_code == "telegram_token_lock"
|
||||
assert adapter.has_fatal_error is True
|
||||
assert "already using this Telegram bot token" in adapter.fatal_error_message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch):
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))
|
||||
fatal_handler = AsyncMock()
|
||||
adapter.set_fatal_error_handler(fatal_handler)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.acquire_scoped_lock",
|
||||
lambda scope, identity, metadata=None: (True, None),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.release_scoped_lock",
|
||||
lambda scope, identity: None,
|
||||
)
|
||||
|
||||
captured = {}
|
||||
|
||||
async def fake_start_polling(**kwargs):
|
||||
captured["error_callback"] = kwargs["error_callback"]
|
||||
|
||||
updater = SimpleNamespace(
|
||||
start_polling=AsyncMock(side_effect=fake_start_polling),
|
||||
stop=AsyncMock(),
|
||||
)
|
||||
bot = SimpleNamespace(set_my_commands=AsyncMock())
|
||||
app = SimpleNamespace(
|
||||
bot=bot,
|
||||
updater=updater,
|
||||
add_handler=MagicMock(),
|
||||
initialize=AsyncMock(),
|
||||
start=AsyncMock(),
|
||||
)
|
||||
builder = MagicMock()
|
||||
builder.token.return_value = builder
|
||||
builder.build.return_value = app
|
||||
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
|
||||
|
||||
ok = await adapter.connect()
|
||||
|
||||
assert ok is True
|
||||
assert callable(captured["error_callback"])
|
||||
|
||||
conflict = type("Conflict", (Exception,), {})
|
||||
captured["error_callback"](conflict("Conflict: terminated by other getUpdates request; make sure that only one bot instance is running"))
|
||||
|
||||
await asyncio.sleep(0)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert adapter.fatal_error_code == "telegram_polling_conflict"
|
||||
assert adapter.has_fatal_error is True
|
||||
updater.stop.assert_awaited()
|
||||
fatal_handler.assert_awaited_once()
|
||||
22
tests/hermes_cli/test_gateway_runtime_health.py
Normal file
22
tests/hermes_cli/test_gateway_runtime_health.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from hermes_cli.gateway import _runtime_health_lines
|
||||
|
||||
|
||||
def test_runtime_health_lines_include_fatal_platform_and_startup_reason(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.read_runtime_status",
|
||||
lambda: {
|
||||
"gateway_state": "startup_failed",
|
||||
"exit_reason": "telegram conflict",
|
||||
"platforms": {
|
||||
"telegram": {
|
||||
"state": "fatal",
|
||||
"error_message": "another poller is active",
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
lines = _runtime_health_lines()
|
||||
|
||||
assert "⚠ telegram: another poller is active" in lines
|
||||
assert "⚠ Last startup issue: telegram conflict" in lines
|
||||
Reference in New Issue
Block a user