""" Gateway runtime status helpers. Provides PID-file based detection of whether the gateway daemon is running, used by send_message's check_fn to gate availability in the CLI. The PID file lives at ``{HERMES_HOME}/gateway.pid``. HERMES_HOME defaults to ``~/.hermes`` but can be overridden via the environment variable. This means separate HERMES_HOME directories naturally get separate PID files — a property that will be useful when we add named profiles (multiple agents running concurrently under distinct configurations). """ import hashlib import json import os import sys from datetime import datetime, timezone from pathlib import Path from hermes_constants import get_hermes_home from typing import Any, Optional _GATEWAY_KIND = "hermes-gateway" _RUNTIME_STATUS_FILE = "gateway_state.json" _LOCKS_DIRNAME = "gateway-locks" def _get_pid_path() -> Path: """Return the path to the gateway PID file, respecting HERMES_HOME.""" home = get_hermes_home() return home / "gateway.pid" def _get_runtime_status_path() -> Path: """Return the persisted runtime health/status file path.""" return _get_pid_path().with_name(_RUNTIME_STATUS_FILE) def _get_lock_dir() -> Path: """Return the machine-local directory for token-scoped gateway locks.""" override = os.getenv("HERMES_GATEWAY_LOCK_DIR") if override: return Path(override) state_home = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local" / "state")) return state_home / "hermes" / _LOCKS_DIRNAME def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _scope_hash(identity: str) -> str: return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16] def _get_scope_lock_path(scope: str, identity: str) -> Path: return _get_lock_dir() / f"{scope}-{_scope_hash(identity)}.lock" def _get_process_start_time(pid: int) -> Optional[int]: """Return the kernel start time for a process when available.""" stat_path = Path(f"/proc/{pid}/stat") try: # Field 22 in /proc//stat is process start time (clock ticks). return int(stat_path.read_text().split()[21]) except (FileNotFoundError, IndexError, PermissionError, ValueError, OSError): return None def _read_process_cmdline(pid: int) -> Optional[str]: """Return the process command line as a space-separated string.""" cmdline_path = Path(f"/proc/{pid}/cmdline") try: raw = cmdline_path.read_bytes() except (FileNotFoundError, PermissionError, OSError): return None if not raw: return None return raw.replace(b"\x00", b" ").decode("utf-8", errors="ignore").strip() def _looks_like_gateway_process(pid: int) -> bool: """Return True when the live PID still looks like the Hermes gateway.""" cmdline = _read_process_cmdline(pid) if not cmdline: return False patterns = ( "hermes_cli.main gateway", "hermes_cli/main.py gateway", "hermes gateway", "gateway/run.py", ) return any(pattern in cmdline for pattern in patterns) def _record_looks_like_gateway(record: dict[str, Any]) -> bool: """Validate gateway identity from PID-file metadata when cmdline is unavailable.""" if record.get("kind") != _GATEWAY_KIND: return False argv = record.get("argv") if not isinstance(argv, list) or not argv: return False cmdline = " ".join(str(part) for part in argv) patterns = ( "hermes_cli.main gateway", "hermes_cli/main.py gateway", "hermes gateway", "gateway/run.py", ) return any(pattern in cmdline for pattern in patterns) def _build_pid_record() -> dict: return { "pid": os.getpid(), "kind": _GATEWAY_KIND, "argv": list(sys.argv), "start_time": _get_process_start_time(os.getpid()), } def _build_runtime_status_record() -> dict[str, Any]: payload = _build_pid_record() payload.update({ "gateway_state": "starting", "exit_reason": None, "platforms": {}, "updated_at": _utc_now_iso(), }) return payload def _read_json_file(path: Path) -> Optional[dict[str, Any]]: if not path.exists(): return None try: raw = path.read_text().strip() except OSError: return None if not raw: return None try: payload = json.loads(raw) except json.JSONDecodeError: return None return payload if isinstance(payload, dict) else None def _write_json_file(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload)) def _read_pid_record() -> Optional[dict]: pid_path = _get_pid_path() if not pid_path.exists(): return None raw = pid_path.read_text().strip() if not raw: return None try: payload = json.loads(raw) except json.JSONDecodeError: try: return {"pid": int(raw)} except ValueError: return None if isinstance(payload, int): return {"pid": payload} if isinstance(payload, dict): return payload return None def write_pid_file() -> None: """Write the current process PID and metadata to the gateway PID file.""" _write_json_file(_get_pid_path(), _build_pid_record()) def write_runtime_status( *, gateway_state: Optional[str] = None, exit_reason: Optional[str] = None, platform: Optional[str] = None, platform_state: Optional[str] = None, error_code: Optional[str] = None, error_message: Optional[str] = None, ) -> None: """Persist gateway runtime health information for diagnostics/status.""" path = _get_runtime_status_path() payload = _read_json_file(path) or _build_runtime_status_record() payload.setdefault("platforms", {}) payload.setdefault("kind", _GATEWAY_KIND) payload["pid"] = os.getpid() payload["start_time"] = _get_process_start_time(os.getpid()) payload["updated_at"] = _utc_now_iso() if gateway_state is not None: payload["gateway_state"] = gateway_state if exit_reason is not None: payload["exit_reason"] = exit_reason if platform is not None: platform_payload = payload["platforms"].get(platform, {}) if platform_state is not None: platform_payload["state"] = platform_state if error_code is not None: platform_payload["error_code"] = error_code if error_message is not None: platform_payload["error_message"] = error_message platform_payload["updated_at"] = _utc_now_iso() payload["platforms"][platform] = platform_payload _write_json_file(path, payload) def read_runtime_status() -> Optional[dict[str, Any]]: """Read the persisted gateway runtime health/status information.""" return _read_json_file(_get_runtime_status_path()) def remove_pid_file() -> None: """Remove the gateway PID file if it exists.""" try: _get_pid_path().unlink(missing_ok=True) except Exception: pass def acquire_scoped_lock(scope: str, identity: str, metadata: Optional[dict[str, Any]] = None) -> tuple[bool, Optional[dict[str, Any]]]: """Acquire a machine-local lock keyed by scope + identity. Used to prevent multiple local gateways from using the same external identity at once (e.g. the same Telegram bot token across different HERMES_HOME dirs). """ lock_path = _get_scope_lock_path(scope, identity) lock_path.parent.mkdir(parents=True, exist_ok=True) record = { **_build_pid_record(), "scope": scope, "identity_hash": _scope_hash(identity), "metadata": metadata or {}, "updated_at": _utc_now_iso(), } existing = _read_json_file(lock_path) if existing: try: existing_pid = int(existing["pid"]) except (KeyError, TypeError, ValueError): existing_pid = None if existing_pid == os.getpid() and existing.get("start_time") == record.get("start_time"): _write_json_file(lock_path, record) return True, existing stale = existing_pid is None if not stale: try: os.kill(existing_pid, 0) except (ProcessLookupError, PermissionError): stale = True else: current_start = _get_process_start_time(existing_pid) if ( existing.get("start_time") is not None and current_start is not None and current_start != existing.get("start_time") ): stale = True # Check if process is stopped (Ctrl+Z / SIGTSTP) — stopped # processes still respond to os.kill(pid, 0) but are not # actually running. Treat them as stale so --replace works. if not stale: try: _proc_status = Path(f"/proc/{existing_pid}/status") if _proc_status.exists(): for _line in _proc_status.read_text().splitlines(): if _line.startswith("State:"): _state = _line.split()[1] if _state in ("T", "t"): # stopped or tracing stop stale = True break except (OSError, PermissionError): pass if stale: try: lock_path.unlink(missing_ok=True) except OSError: pass else: return False, existing try: fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) except FileExistsError: return False, _read_json_file(lock_path) try: with os.fdopen(fd, "w", encoding="utf-8") as handle: json.dump(record, handle) except Exception: try: lock_path.unlink(missing_ok=True) except OSError: pass raise return True, None def release_scoped_lock(scope: str, identity: str) -> None: """Release a previously-acquired scope lock when owned by this process.""" lock_path = _get_scope_lock_path(scope, identity) existing = _read_json_file(lock_path) if not existing: return if existing.get("pid") != os.getpid(): return if existing.get("start_time") != _get_process_start_time(os.getpid()): return try: lock_path.unlink(missing_ok=True) except OSError: pass def release_all_scoped_locks() -> int: """Remove all scoped lock files in the lock directory. Called during --replace to clean up stale locks left by stopped/killed gateway processes that did not release their locks gracefully. Returns the number of lock files removed. """ lock_dir = _get_lock_dir() removed = 0 if lock_dir.exists(): for lock_file in lock_dir.glob("*.lock"): try: lock_file.unlink(missing_ok=True) removed += 1 except OSError: pass return removed def get_running_pid() -> Optional[int]: """Return the PID of a running gateway instance, or ``None``. Checks the PID file and verifies the process is actually alive. Cleans up stale PID files automatically. """ record = _read_pid_record() if not record: remove_pid_file() return None try: pid = int(record["pid"]) except (KeyError, TypeError, ValueError): remove_pid_file() return None try: os.kill(pid, 0) # signal 0 = existence check, no actual signal sent except (ProcessLookupError, PermissionError): remove_pid_file() return None recorded_start = record.get("start_time") current_start = _get_process_start_time(pid) if recorded_start is not None and current_start is not None and current_start != recorded_start: remove_pid_file() return None if not _looks_like_gateway_process(pid): if not _record_looks_like_gateway(record): remove_pid_file() return None return pid def is_gateway_running() -> bool: """Check if the gateway daemon is currently running.""" return get_running_pid() is not None