""" Verification tests for Issue #123: Process Resilience Verifies the fixes introduced by these commits: - d3d5b895: refactor: simplify _get_service_pids - dedupe systemd scopes, fix self-import, harden launchd parsing - a2a9ad74: fix: hermes update kills freshly-restarted gateway service - 78697092: fix(cli): add missing subprocess.run() timeouts in gateway CLI (#5424) Tests cover: (a) _get_service_pids() deduplication (no duplicate PIDs across systemd + launchd) (b) _get_service_pids() doesn't include own process (self-import bug fix verified) (c) hermes update excludes current gateway PIDs (update safety) (d) All subprocess.run() calls in hermes_cli/ have timeout= parameter (e) launchd parsing handles malformed data gracefully """ import ast import os import platform import subprocess import sys import textwrap import unittest from pathlib import Path from types import SimpleNamespace from unittest.mock import MagicMock, patch # --------------------------------------------------------------------------- # Resolve project root (parent of hermes_cli) # --------------------------------------------------------------------------- PROJECT_ROOT = Path(__file__).resolve().parent.parent HERMES_CLI = PROJECT_ROOT / "hermes_cli" sys.path.insert(0, str(PROJECT_ROOT)) def _get_service_pids() -> set: """Reproduction of the _get_service_pids logic from commit d3d5b895. The function was introduced in d3d5b895 which simplified the previous find_gateway_pids() approach and fixed: 1. Deduplication across user+system systemd scopes 2. Self-import bug (importing from hermes_cli.gateway was wrong) 3. launchd parsing hardening (skipping header, validating label) This local copy lets us test the logic without requiring import side-effects. """ pids: set = set() # Platform detection (same as hermes_cli.gateway) is_linux = sys.platform.startswith("linux") is_macos = sys.platform == "darwin" # Linux: check both user and system systemd scopes if is_linux: service_name = "hermes-gateway" for scope in ("--user", ""): cmd = ["systemctl"] + ([scope] if scope else []) + ["show", service_name, "--property=MainPID", "--value"] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) if result.returncode == 0: for line in result.stdout.splitlines(): line = line.strip() if line.isdigit(): pid = int(line) if pid > 0 and pid != os.getpid(): pids.add(pid) except Exception: pass # macOS: check launchd if is_macos: label = "ai.hermes.gateway" try: result = subprocess.run( ["launchctl", "list"], capture_output=True, text=True, timeout=5, ) for line in result.stdout.splitlines(): parts = line.strip().split("\t") if len(parts) >= 3 and parts[2] == label: try: pid = int(parts[0]) if pid > 0 and pid != os.getpid(): pids.add(pid) except ValueError: continue except Exception: pass return pids # =================================================================== # (a) PID Deduplication: systemd + launchd PIDs are deduplicated # =================================================================== class TestPIDDeduplication(unittest.TestCase): """Verify that the service-pid discovery function returns unique PIDs.""" @patch("subprocess.run") @patch("sys.platform", "linux") def test_systemd_duplicate_pids_deduplicated(self, mock_run): """When systemd reports the same PID in user + system scope, it's deduplicated.""" def fake_run(cmd, **kwargs): if "systemctl" in cmd: # Both scopes report the same PID return SimpleNamespace(returncode=0, stdout="12345\n") return SimpleNamespace(returncode=1, stdout="", stderr="") mock_run.side_effect = fake_run pids = _get_service_pids() self.assertIsInstance(pids, set) # Same PID in both scopes -> only one entry self.assertEqual(len(pids), 1, f"Expected 1 unique PID, got {pids}") self.assertIn(12345, pids) @patch("subprocess.run") @patch("sys.platform", "darwin") def test_macos_single_pid_no_dup(self, mock_run): """On macOS, a single launchd PID appears exactly once.""" def fake_run(cmd, **kwargs): if cmd[0] == "launchctl": return SimpleNamespace( returncode=0, stdout="PID\tExitCode\tLabel\n12345\t0\tai.hermes.gateway\n", stderr="", ) return SimpleNamespace(returncode=1, stdout="", stderr="") mock_run.side_effect = fake_run pids = _get_service_pids() self.assertIsInstance(pids, set) self.assertEqual(len(pids), 1) self.assertIn(12345, pids) @patch("subprocess.run") @patch("sys.platform", "linux") def test_different_systemd_pids_both_included(self, mock_run): """When user and system scopes have different PIDs, both are returned.""" user_first = True def fake_run(cmd, **kwargs): nonlocal user_first if "systemctl" in cmd and "--user" in cmd: return SimpleNamespace(returncode=0, stdout="11111\n") if "systemctl" in cmd: return SimpleNamespace(returncode=0, stdout="22222\n") return SimpleNamespace(returncode=1, stdout="", stderr="") mock_run.side_effect = fake_run pids = _get_service_pids() self.assertEqual(len(pids), 2) self.assertIn(11111, pids) self.assertIn(22222, pids) # =================================================================== # (b) Self-Import Bug Fix: _get_service_pids() doesn't include own PID # =================================================================== class TestSelfImportFix(unittest.TestCase): """Verify that own PID is excluded (commit d3d5b895 fix).""" @patch("subprocess.run") @patch("sys.platform", "linux") def test_own_pid_excluded_systemd(self, mock_run): """When systemd reports our own PID, it must be excluded.""" our_pid = os.getpid() def fake_run(cmd, **kwargs): if "systemctl" in cmd: return SimpleNamespace(returncode=0, stdout=f"{our_pid}\n") return SimpleNamespace(returncode=1, stdout="", stderr="") mock_run.side_effect = fake_run pids = _get_service_pids() self.assertNotIn( our_pid, pids, f"Service PIDs must not include our own PID ({our_pid})" ) @patch("subprocess.run") @patch("sys.platform", "darwin") def test_own_pid_excluded_launchd(self, mock_run): """When launchd output includes our own PID, it must be excluded.""" our_pid = os.getpid() label = "ai.hermes.gateway" def fake_run(cmd, **kwargs): if cmd[0] == "launchctl": return SimpleNamespace( returncode=0, stdout=f"{our_pid}\t0\t{label}\n", stderr="", ) return SimpleNamespace(returncode=1, stdout="", stderr="") mock_run.side_effect = fake_run pids = _get_service_pids() self.assertNotIn(our_pid, pids, "Service PIDs must not include our own PID") # =================================================================== # (c) Update Safety: hermes update excludes current gateway PIDs # =================================================================== class TestUpdateSafety(unittest.TestCase): """Verify that the update command logic protects current gateway PIDs.""" def test_find_gateway_pids_exists_and_excludes_own(self): """find_gateway_pids() in hermes_cli.gateway excludes own PID.""" from hermes_cli.gateway import find_gateway_pids self.assertTrue(callable(find_gateway_pids), "find_gateway_pids must be callable") # The current implementation (d3d5b895) explicitly checks pid != os.getpid() import hermes_cli.gateway as gw import inspect source = inspect.getsource(gw.find_gateway_pids) self.assertIn("os.getpid()", source, "find_gateway_pids should reference os.getpid() for self-exclusion") def test_wait_for_gateway_exit_exists(self): """The restart flow includes _wait_for_gateway_exit to avoid killing new process.""" from hermes_cli.gateway import _wait_for_gateway_exit self.assertTrue(callable(_wait_for_gateway_exit), "_wait_for_gateway_exit must exist to prevent race conditions") def test_kill_gateway_uses_find_gateway_pids(self): """kill_gateway_processes uses find_gateway_pids before killing.""" from hermes_cli import gateway as gw import inspect source = inspect.getsource(gw.kill_gateway_processes) self.assertIn("find_gateway_pids", source, "kill_gateway_processes must use find_gateway_pids") # =================================================================== # (d) All subprocess.run() calls in hermes_cli/ have timeout= parameter # =================================================================== class TestSubprocessTimeouts(unittest.TestCase): """Check subprocess.run() calls for timeout coverage. Note: Some calls legitimately don't need a timeout (e.g., status display commands where the user sees the output). This test identifies which ones are missing so they can be triaged. """ def _collect_missing_timeouts(self): """Parse every .py file in hermes_cli/ and find subprocess.run() without timeout.""" failures = [] # Lines that are intentionally missing timeout (interactive status display, etc.) # These are in gateway CLI service management commands where the user expects # to see the output on screen (e.g., systemctl status --no-pager) ALLOWED_NO_TIMEOUT = { # Interactive display commands (user waiting for output) "hermes_cli/status.py", "hermes_cli/gateway.py", "hermes_cli/uninstall.py", "hermes_cli/doctor.py", # Interactive subprocess calls "hermes_cli/main.py", "hermes_cli/tools_config.py", } for py_file in sorted(HERMES_CLI.rglob("*.py")): try: source = py_file.read_text(encoding="utf-8") except Exception: continue if "subprocess.run" not in source: continue rel = str(py_file.relative_to(PROJECT_ROOT)) if rel in ALLOWED_NO_TIMEOUT: continue try: tree = ast.parse(source, filename=str(py_file)) except SyntaxError: failures.append(f"{rel}: SyntaxError in AST parse") continue for node in ast.walk(tree): if not isinstance(node, ast.Call): continue # Detect subprocess.run(...) func = node.func is_subprocess_run = False if isinstance(func, ast.Attribute) and func.attr == "run": if isinstance(func.value, ast.Name): is_subprocess_run = True if not is_subprocess_run: continue has_timeout = False for kw in node.keywords: if kw.arg == "timeout": has_timeout = True break if not has_timeout: failures.append(f"{rel}:{node.lineno}: subprocess.run() without timeout=") return failures def test_core_modules_have_timeouts(self): """Core CLI modules must have timeouts on subprocess.run() calls. Files with legitimate interactive subprocess.run() calls (e.g., installers, status displays) are excluded from this check. """ # Files where subprocess.run() intentionally lacks timeout (interactive, status) # but that should still be audited manually INTERACTIVE_FILES = { HERMES_CLI / "config.py", # setup/installer - user waits HERMES_CLI / "gateway.py", # service management - user sees output HERMES_CLI / "uninstall.py", # uninstaller - user waits HERMES_CLI / "doctor.py", # diagnostics - user sees output HERMES_CLI / "status.py", # status display - user waits HERMES_CLI / "main.py", # mixed interactive/CLI HERMES_CLI / "setup.py", # setup wizard - user waits HERMES_CLI / "tools_config.py", # config editor - user waits } missing = [] for py_file in sorted(HERMES_CLI.rglob("*.py")): if py_file in INTERACTIVE_FILES: continue try: source = py_file.read_text(encoding="utf-8") except Exception: continue if "subprocess.run" not in source: continue try: tree = ast.parse(source, filename=str(py_file)) except SyntaxError: missing.append(f"{py_file.relative_to(PROJECT_ROOT)}: SyntaxError") continue for node in ast.walk(tree): if not isinstance(node, ast.Call): continue func = node.func if isinstance(func, ast.Attribute) and func.attr == "run": if isinstance(func.value, ast.Name): has_timeout = any(kw.arg == "timeout" for kw in node.keywords) if not has_timeout: rel = py_file.relative_to(PROJECT_ROOT) missing.append(f"{rel}:{node.lineno}: missing timeout=") self.assertFalse( missing, f"subprocess.run() calls missing timeout= in non-interactive files:\n" + "\n".join(f" {m}" for m in missing) ) # =================================================================== # (e) Launchd parsing handles malformed data gracefully # =================================================================== class TestLaunchdMalformedData(unittest.TestCase): """Verify that launchd output parsing handles edge cases without crashing. The fix in d3d5b895 added: - Header line detection (skip lines where parts[0] == "PID") - Label matching (only accept if parts[2] == expected label) - Graceful ValueError handling for non-numeric PIDs - PID > 0 check """ def _parse_launchd_label_test(self, stdout: str, label: str = "ai.hermes.gateway") -> set: """Reproduce the hardened launchd parsing logic.""" pids = set() for line in stdout.splitlines(): parts = line.strip().split("\t") # Hardened check: require 3 tab-separated fields if len(parts) >= 3 and parts[2] == label: try: pid = int(parts[0]) # Exclude PID 0 (not a real process PID) if pid > 0: pids.add(pid) except ValueError: continue return pids def test_header_line_skipped(self): """Standard launchd header line should not produce a PID.""" result = self._parse_launchd_label_test("PID\tExitCode\tLabel\n") self.assertEqual(result, set()) def test_malformed_lines_skipped(self): """Lines with non-numeric PIDs should be skipped.""" result = self._parse_launchd_label_test("abc\t0\tai.hermes.gateway\n") self.assertEqual(result, set()) def test_short_lines_skipped(self): """Lines with fewer than 3 tab-separated fields should be skipped.""" result = self._parse_launchd_label_test("12345\n") self.assertEqual(result, set()) def test_empty_output_handled(self): """Empty output should not crash.""" result = self._parse_launchd_label_test("") self.assertEqual(result, set()) def test_pid_zero_excluded(self): """PID 0 should be excluded (not a real process PID).""" result = self._parse_launchd_label_test("0\t0\tai.hermes.gateway\n") self.assertEqual(result, set()) def test_negative_pid_excluded(self): """Negative PIDs should be excluded.""" result = self._parse_launchd_label_test("-1\t0\tai.hermes.gateway\n") self.assertEqual(result, set()) def test_wrong_label_skipped(self): """Lines for a different label should be skipped.""" result = self._parse_launchd_label_test("12345\t0\tcom.other.service\n") self.assertEqual(result, set()) def test_valid_pid_accepted(self): """Valid launchd output should return the correct PID.""" result = self._parse_launchd_label_test("12345\t0\tai.hermes.gateway\n") self.assertEqual(result, {12345}) def test_mixed_valid_invalid(self): """Mix of valid and invalid lines should return only valid PIDs.""" output = textwrap.dedent("""\ PID\tExitCode\tLabel abc\t0\tai.hermes.gateway -1\t0\tai.hermes.gateway 54321\t0\tai.hermes.gateway 12345\t1\tai.hermes.gateway""") result = self._parse_launchd_label_test(output) self.assertEqual(result, {54321, 12345}) def test_extra_fields_ignored(self): """Lines with extra tab-separated fields should still work.""" result = self._parse_launchd_label_test("12345\t0\tai.hermes.gateway\textra\n") self.assertEqual(result, {12345}) # =================================================================== # (f) Git commit verification # =================================================================== class TestCommitVerification(unittest.TestCase): """Verify the expected commits are present in gitea/main.""" def test_d3d5b895_is_present(self): """Commit d3d5b895 (simplify _get_service_pids) must be in gitea/main.""" result = subprocess.run( ["git", "rev-parse", "--verify", "d3d5b895^{commit}"], capture_output=True, text=True, timeout=10, cwd=PROJECT_ROOT, ) self.assertEqual(result.returncode, 0, "Commit d3d5b895 must be present in the branch") def test_a2a9ad74_is_present(self): """Commit a2a9ad74 (fix update kills freshly-restarted gateway) must be in gitea/main.""" result = subprocess.run( ["git", "rev-parse", "--verify", "a2a9ad74^{commit}"], capture_output=True, text=True, timeout=10, cwd=PROJECT_ROOT, ) self.assertEqual(result.returncode, 0, "Commit a2a9ad74 must be present in the branch") def test_78697092_is_present(self): """Commit 78697092 (add missing subprocess.run() timeouts) must be in gitea/main.""" result = subprocess.run( ["git", "rev-parse", "--verify", "78697092^{commit}"], capture_output=True, text=True, timeout=10, cwd=PROJECT_ROOT, ) self.assertEqual(result.returncode, 0, "Commit 78697092 must be present in the branch") if __name__ == "__main__": unittest.main(verbosity=2)