fix(cli): add missing subprocess.run() timeouts in doctor and status (#4009)
Add timeout parameters to 4 subprocess.run() calls that could hang indefinitely if the child process blocks (e.g., unresponsive docker daemon, systemctl waiting for D-Bus): - doctor.py: docker info (timeout=10), ssh check (timeout=15) - status.py: systemctl is-active (timeout=5), launchctl list (timeout=5) Each call site now catches subprocess.TimeoutExpired and treats it as a failure, consistent with how non-zero return codes are already handled. Add AST-based regression test that verifies every subprocess.run() call in CLI modules specifies a timeout keyword argument. Co-authored-by: dieutx <dangtc94@gmail.com>
This commit is contained in:
@@ -406,8 +406,11 @@ def run_doctor(args):
|
||||
if terminal_env == "docker":
|
||||
if shutil.which("docker"):
|
||||
# Check if docker daemon is running
|
||||
result = subprocess.run(["docker", "info"], capture_output=True)
|
||||
if result.returncode == 0:
|
||||
try:
|
||||
result = subprocess.run(["docker", "info"], capture_output=True, timeout=10)
|
||||
except subprocess.TimeoutExpired:
|
||||
result = None
|
||||
if result is not None and result.returncode == 0:
|
||||
check_ok("docker", "(daemon running)")
|
||||
else:
|
||||
check_fail("docker daemon not running")
|
||||
@@ -426,12 +429,16 @@ def run_doctor(args):
|
||||
ssh_host = os.getenv("TERMINAL_SSH_HOST")
|
||||
if ssh_host:
|
||||
# Try to connect
|
||||
result = subprocess.run(
|
||||
["ssh", "-o", "ConnectTimeout=5", "-o", "BatchMode=yes", ssh_host, "echo ok"],
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
if result.returncode == 0:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ssh", "-o", "ConnectTimeout=5", "-o", "BatchMode=yes", ssh_host, "echo ok"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=15
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
result = None
|
||||
if result is not None and result.returncode == 0:
|
||||
check_ok(f"SSH connection to {ssh_host}")
|
||||
else:
|
||||
check_fail(f"SSH connection to {ssh_host}")
|
||||
|
||||
@@ -285,23 +285,31 @@ def show_status(args):
|
||||
_gw_svc = get_service_name()
|
||||
except Exception:
|
||||
_gw_svc = "hermes-gateway"
|
||||
result = subprocess.run(
|
||||
["systemctl", "--user", "is-active", _gw_svc],
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
is_active = result.stdout.strip() == "active"
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["systemctl", "--user", "is-active", _gw_svc],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
is_active = result.stdout.strip() == "active"
|
||||
except subprocess.TimeoutExpired:
|
||||
is_active = False
|
||||
print(f" Status: {check_mark(is_active)} {'running' if is_active else 'stopped'}")
|
||||
print(" Manager: systemd (user)")
|
||||
|
||||
elif sys.platform == 'darwin':
|
||||
from hermes_cli.gateway import get_launchd_label
|
||||
result = subprocess.run(
|
||||
["launchctl", "list", get_launchd_label()],
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
is_loaded = result.returncode == 0
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["launchctl", "list", get_launchd_label()],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
is_loaded = result.returncode == 0
|
||||
except subprocess.TimeoutExpired:
|
||||
is_loaded = False
|
||||
print(f" Status: {check_mark(is_loaded)} {'loaded' if is_loaded else 'not loaded'}")
|
||||
print(" Manager: launchd")
|
||||
else:
|
||||
|
||||
44
tests/hermes_cli/test_subprocess_timeouts.py
Normal file
44
tests/hermes_cli/test_subprocess_timeouts.py
Normal file
@@ -0,0 +1,44 @@
|
||||
"""Tests for subprocess.run() timeout coverage in CLI utilities."""
|
||||
import ast
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# Parameterise over every CLI module that calls subprocess.run
|
||||
_CLI_MODULES = [
|
||||
"hermes_cli/doctor.py",
|
||||
"hermes_cli/status.py",
|
||||
"hermes_cli/clipboard.py",
|
||||
"hermes_cli/banner.py",
|
||||
]
|
||||
|
||||
|
||||
def _subprocess_run_calls(filepath: str) -> list[dict]:
|
||||
"""Parse a Python file and return info about subprocess.run() calls."""
|
||||
source = Path(filepath).read_text()
|
||||
tree = ast.parse(source, filename=filepath)
|
||||
calls = []
|
||||
for node in ast.walk(tree):
|
||||
if not isinstance(node, ast.Call):
|
||||
continue
|
||||
func = node.func
|
||||
if (isinstance(func, ast.Attribute) and func.attr == "run"
|
||||
and isinstance(func.value, ast.Name)
|
||||
and func.value.id == "subprocess"):
|
||||
has_timeout = any(kw.arg == "timeout" for kw in node.keywords)
|
||||
calls.append({"line": node.lineno, "has_timeout": has_timeout})
|
||||
return calls
|
||||
|
||||
|
||||
@pytest.mark.parametrize("filepath", _CLI_MODULES)
|
||||
def test_all_subprocess_run_calls_have_timeout(filepath):
|
||||
"""Every subprocess.run() call in CLI modules must specify a timeout."""
|
||||
if not Path(filepath).exists():
|
||||
pytest.skip(f"{filepath} not found")
|
||||
calls = _subprocess_run_calls(filepath)
|
||||
missing = [c for c in calls if not c["has_timeout"]]
|
||||
assert not missing, (
|
||||
f"{filepath} has subprocess.run() without timeout at "
|
||||
f"line(s): {[c['line'] for c in missing]}"
|
||||
)
|
||||
Reference in New Issue
Block a user