Centralizes two widely-duplicated patterns into hermes_constants.py:
1. get_hermes_home() — Path resolution for ~/.hermes (HERMES_HOME env var)
- Was copy-pasted inline across 30+ files as:
Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
- Now defined once in hermes_constants.py (zero-dependency module)
- hermes_cli/config.py re-exports it for backward compatibility
- Removed local wrapper functions in honcho_integration/client.py,
tools/website_policy.py, tools/tirith_security.py, hermes_cli/uninstall.py
2. parse_reasoning_effort() — Reasoning effort string validation
- Was copy-pasted in cli.py, gateway/run.py, cron/scheduler.py
- Same validation logic: check against (xhigh, high, medium, low, minimal, none)
- Now defined once in hermes_constants.py, called from all 3 locations
- Warning log for unknown values kept at call sites (context-specific)
31 files changed, net +31 lines (125 insertions, 94 deletions)
Full test suite: 6179 passed, 0 failed
392 lines
12 KiB
Python
392 lines
12 KiB
Python
"""
|
|
Gateway runtime status helpers.
|
|
|
|
Provides PID-file based detection of whether the gateway daemon is running,
|
|
used by send_message's check_fn to gate availability in the CLI.
|
|
|
|
The PID file lives at ``{HERMES_HOME}/gateway.pid``. HERMES_HOME defaults to
|
|
``~/.hermes`` but can be overridden via the environment variable. This means
|
|
separate HERMES_HOME directories naturally get separate PID files — a property
|
|
that will be useful when we add named profiles (multiple agents running
|
|
concurrently under distinct configurations).
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from hermes_constants import get_hermes_home
|
|
from typing import Any, Optional
|
|
|
|
_GATEWAY_KIND = "hermes-gateway"
|
|
_RUNTIME_STATUS_FILE = "gateway_state.json"
|
|
_LOCKS_DIRNAME = "gateway-locks"
|
|
|
|
|
|
def _get_pid_path() -> Path:
|
|
"""Return the path to the gateway PID file, respecting HERMES_HOME."""
|
|
home = get_hermes_home()
|
|
return home / "gateway.pid"
|
|
|
|
|
|
def _get_runtime_status_path() -> Path:
|
|
"""Return the persisted runtime health/status file path."""
|
|
return _get_pid_path().with_name(_RUNTIME_STATUS_FILE)
|
|
|
|
|
|
def _get_lock_dir() -> Path:
|
|
"""Return the machine-local directory for token-scoped gateway locks."""
|
|
override = os.getenv("HERMES_GATEWAY_LOCK_DIR")
|
|
if override:
|
|
return Path(override)
|
|
state_home = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local" / "state"))
|
|
return state_home / "hermes" / _LOCKS_DIRNAME
|
|
|
|
|
|
def _utc_now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _scope_hash(identity: str) -> str:
|
|
return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16]
|
|
|
|
|
|
def _get_scope_lock_path(scope: str, identity: str) -> Path:
|
|
return _get_lock_dir() / f"{scope}-{_scope_hash(identity)}.lock"
|
|
|
|
|
|
def _get_process_start_time(pid: int) -> Optional[int]:
|
|
"""Return the kernel start time for a process when available."""
|
|
stat_path = Path(f"/proc/{pid}/stat")
|
|
try:
|
|
# Field 22 in /proc/<pid>/stat is process start time (clock ticks).
|
|
return int(stat_path.read_text().split()[21])
|
|
except (FileNotFoundError, IndexError, PermissionError, ValueError, OSError):
|
|
return None
|
|
|
|
|
|
def _read_process_cmdline(pid: int) -> Optional[str]:
|
|
"""Return the process command line as a space-separated string."""
|
|
cmdline_path = Path(f"/proc/{pid}/cmdline")
|
|
try:
|
|
raw = cmdline_path.read_bytes()
|
|
except (FileNotFoundError, PermissionError, OSError):
|
|
return None
|
|
|
|
if not raw:
|
|
return None
|
|
return raw.replace(b"\x00", b" ").decode("utf-8", errors="ignore").strip()
|
|
|
|
|
|
def _looks_like_gateway_process(pid: int) -> bool:
|
|
"""Return True when the live PID still looks like the Hermes gateway."""
|
|
cmdline = _read_process_cmdline(pid)
|
|
if not cmdline:
|
|
return False
|
|
|
|
patterns = (
|
|
"hermes_cli.main gateway",
|
|
"hermes_cli/main.py gateway",
|
|
"hermes gateway",
|
|
"gateway/run.py",
|
|
)
|
|
return any(pattern in cmdline for pattern in patterns)
|
|
|
|
|
|
def _record_looks_like_gateway(record: dict[str, Any]) -> bool:
|
|
"""Validate gateway identity from PID-file metadata when cmdline is unavailable."""
|
|
if record.get("kind") != _GATEWAY_KIND:
|
|
return False
|
|
|
|
argv = record.get("argv")
|
|
if not isinstance(argv, list) or not argv:
|
|
return False
|
|
|
|
cmdline = " ".join(str(part) for part in argv)
|
|
patterns = (
|
|
"hermes_cli.main gateway",
|
|
"hermes_cli/main.py gateway",
|
|
"hermes gateway",
|
|
"gateway/run.py",
|
|
)
|
|
return any(pattern in cmdline for pattern in patterns)
|
|
|
|
|
|
def _build_pid_record() -> dict:
|
|
return {
|
|
"pid": os.getpid(),
|
|
"kind": _GATEWAY_KIND,
|
|
"argv": list(sys.argv),
|
|
"start_time": _get_process_start_time(os.getpid()),
|
|
}
|
|
|
|
|
|
def _build_runtime_status_record() -> dict[str, Any]:
|
|
payload = _build_pid_record()
|
|
payload.update({
|
|
"gateway_state": "starting",
|
|
"exit_reason": None,
|
|
"platforms": {},
|
|
"updated_at": _utc_now_iso(),
|
|
})
|
|
return payload
|
|
|
|
|
|
def _read_json_file(path: Path) -> Optional[dict[str, Any]]:
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
raw = path.read_text().strip()
|
|
except OSError:
|
|
return None
|
|
if not raw:
|
|
return None
|
|
try:
|
|
payload = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return payload if isinstance(payload, dict) else None
|
|
|
|
|
|
def _write_json_file(path: Path, payload: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(payload))
|
|
|
|
|
|
def _read_pid_record() -> Optional[dict]:
|
|
pid_path = _get_pid_path()
|
|
if not pid_path.exists():
|
|
return None
|
|
|
|
raw = pid_path.read_text().strip()
|
|
if not raw:
|
|
return None
|
|
|
|
try:
|
|
payload = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
try:
|
|
return {"pid": int(raw)}
|
|
except ValueError:
|
|
return None
|
|
|
|
if isinstance(payload, int):
|
|
return {"pid": payload}
|
|
if isinstance(payload, dict):
|
|
return payload
|
|
return None
|
|
|
|
|
|
def write_pid_file() -> None:
|
|
"""Write the current process PID and metadata to the gateway PID file."""
|
|
_write_json_file(_get_pid_path(), _build_pid_record())
|
|
|
|
|
|
def write_runtime_status(
|
|
*,
|
|
gateway_state: Optional[str] = None,
|
|
exit_reason: Optional[str] = None,
|
|
platform: Optional[str] = None,
|
|
platform_state: Optional[str] = None,
|
|
error_code: Optional[str] = None,
|
|
error_message: Optional[str] = None,
|
|
) -> None:
|
|
"""Persist gateway runtime health information for diagnostics/status."""
|
|
path = _get_runtime_status_path()
|
|
payload = _read_json_file(path) or _build_runtime_status_record()
|
|
payload.setdefault("platforms", {})
|
|
payload.setdefault("kind", _GATEWAY_KIND)
|
|
payload["pid"] = os.getpid()
|
|
payload["start_time"] = _get_process_start_time(os.getpid())
|
|
payload["updated_at"] = _utc_now_iso()
|
|
|
|
if gateway_state is not None:
|
|
payload["gateway_state"] = gateway_state
|
|
if exit_reason is not None:
|
|
payload["exit_reason"] = exit_reason
|
|
|
|
if platform is not None:
|
|
platform_payload = payload["platforms"].get(platform, {})
|
|
if platform_state is not None:
|
|
platform_payload["state"] = platform_state
|
|
if error_code is not None:
|
|
platform_payload["error_code"] = error_code
|
|
if error_message is not None:
|
|
platform_payload["error_message"] = error_message
|
|
platform_payload["updated_at"] = _utc_now_iso()
|
|
payload["platforms"][platform] = platform_payload
|
|
|
|
_write_json_file(path, payload)
|
|
|
|
|
|
def read_runtime_status() -> Optional[dict[str, Any]]:
|
|
"""Read the persisted gateway runtime health/status information."""
|
|
return _read_json_file(_get_runtime_status_path())
|
|
|
|
|
|
def remove_pid_file() -> None:
|
|
"""Remove the gateway PID file if it exists."""
|
|
try:
|
|
_get_pid_path().unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def acquire_scoped_lock(scope: str, identity: str, metadata: Optional[dict[str, Any]] = None) -> tuple[bool, Optional[dict[str, Any]]]:
|
|
"""Acquire a machine-local lock keyed by scope + identity.
|
|
|
|
Used to prevent multiple local gateways from using the same external identity
|
|
at once (e.g. the same Telegram bot token across different HERMES_HOME dirs).
|
|
"""
|
|
lock_path = _get_scope_lock_path(scope, identity)
|
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
|
record = {
|
|
**_build_pid_record(),
|
|
"scope": scope,
|
|
"identity_hash": _scope_hash(identity),
|
|
"metadata": metadata or {},
|
|
"updated_at": _utc_now_iso(),
|
|
}
|
|
|
|
existing = _read_json_file(lock_path)
|
|
if existing:
|
|
try:
|
|
existing_pid = int(existing["pid"])
|
|
except (KeyError, TypeError, ValueError):
|
|
existing_pid = None
|
|
|
|
if existing_pid == os.getpid() and existing.get("start_time") == record.get("start_time"):
|
|
_write_json_file(lock_path, record)
|
|
return True, existing
|
|
|
|
stale = existing_pid is None
|
|
if not stale:
|
|
try:
|
|
os.kill(existing_pid, 0)
|
|
except (ProcessLookupError, PermissionError):
|
|
stale = True
|
|
else:
|
|
current_start = _get_process_start_time(existing_pid)
|
|
if (
|
|
existing.get("start_time") is not None
|
|
and current_start is not None
|
|
and current_start != existing.get("start_time")
|
|
):
|
|
stale = True
|
|
# Check if process is stopped (Ctrl+Z / SIGTSTP) — stopped
|
|
# processes still respond to os.kill(pid, 0) but are not
|
|
# actually running. Treat them as stale so --replace works.
|
|
if not stale:
|
|
try:
|
|
_proc_status = Path(f"/proc/{existing_pid}/status")
|
|
if _proc_status.exists():
|
|
for _line in _proc_status.read_text().splitlines():
|
|
if _line.startswith("State:"):
|
|
_state = _line.split()[1]
|
|
if _state in ("T", "t"): # stopped or tracing stop
|
|
stale = True
|
|
break
|
|
except (OSError, PermissionError):
|
|
pass
|
|
if stale:
|
|
try:
|
|
lock_path.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
else:
|
|
return False, existing
|
|
|
|
try:
|
|
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
|
except FileExistsError:
|
|
return False, _read_json_file(lock_path)
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as handle:
|
|
json.dump(record, handle)
|
|
except Exception:
|
|
try:
|
|
lock_path.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
return True, None
|
|
|
|
|
|
def release_scoped_lock(scope: str, identity: str) -> None:
|
|
"""Release a previously-acquired scope lock when owned by this process."""
|
|
lock_path = _get_scope_lock_path(scope, identity)
|
|
existing = _read_json_file(lock_path)
|
|
if not existing:
|
|
return
|
|
if existing.get("pid") != os.getpid():
|
|
return
|
|
if existing.get("start_time") != _get_process_start_time(os.getpid()):
|
|
return
|
|
try:
|
|
lock_path.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def release_all_scoped_locks() -> int:
|
|
"""Remove all scoped lock files in the lock directory.
|
|
|
|
Called during --replace to clean up stale locks left by stopped/killed
|
|
gateway processes that did not release their locks gracefully.
|
|
Returns the number of lock files removed.
|
|
"""
|
|
lock_dir = _get_lock_dir()
|
|
removed = 0
|
|
if lock_dir.exists():
|
|
for lock_file in lock_dir.glob("*.lock"):
|
|
try:
|
|
lock_file.unlink(missing_ok=True)
|
|
removed += 1
|
|
except OSError:
|
|
pass
|
|
return removed
|
|
|
|
|
|
def get_running_pid() -> Optional[int]:
|
|
"""Return the PID of a running gateway instance, or ``None``.
|
|
|
|
Checks the PID file and verifies the process is actually alive.
|
|
Cleans up stale PID files automatically.
|
|
"""
|
|
record = _read_pid_record()
|
|
if not record:
|
|
remove_pid_file()
|
|
return None
|
|
|
|
try:
|
|
pid = int(record["pid"])
|
|
except (KeyError, TypeError, ValueError):
|
|
remove_pid_file()
|
|
return None
|
|
|
|
try:
|
|
os.kill(pid, 0) # signal 0 = existence check, no actual signal sent
|
|
except (ProcessLookupError, PermissionError):
|
|
remove_pid_file()
|
|
return None
|
|
|
|
recorded_start = record.get("start_time")
|
|
current_start = _get_process_start_time(pid)
|
|
if recorded_start is not None and current_start is not None and current_start != recorded_start:
|
|
remove_pid_file()
|
|
return None
|
|
|
|
if not _looks_like_gateway_process(pid):
|
|
if not _record_looks_like_gateway(record):
|
|
remove_pid_file()
|
|
return None
|
|
|
|
return pid
|
|
|
|
|
|
def is_gateway_running() -> bool:
|
|
"""Check if the gateway daemon is currently running."""
|
|
return get_running_pid() is not None
|