Compare commits

..

2 Commits

Author SHA1 Message Date
590b601b5c test: MCP PID lock tests
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 58s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 1m2s
Tests / e2e (pull_request) Successful in 2m15s
Tests / test (pull_request) Failing after 53m34s
Part of #734
2026-04-15 02:51:02 +00:00
c4aad087d4 feat: MCP PID file lock to prevent concurrent instances
Closes #734

Uses PID files at ~/.hermes/mcp/{name}.pid to ensure only one
instance of each MCP server runs. Prevents zombie accumulation.
2026-04-15 02:50:54 +00:00
4 changed files with 233 additions and 368 deletions

View File

@@ -1,272 +0,0 @@
#!/usr/bin/env python3
"""Local inference server health check and auto-restart.
Checks llama-server, Ollama, and other local inference endpoints.
Reports status, latency, and can auto-restart dead processes.
Refs: #713 — llama-server DOWN on port 8081
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
import time
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
@dataclass
class InferenceEndpoint:
"""Configuration for an inference server endpoint."""
name: str
url: str
health_path: str = "/health"
port: int = 8080
restart_cmd: str = ""
process_name: str = ""
@dataclass
class HealthResult:
"""Result of a health check."""
name: str
url: str
status: str # "ok", "down", "slow", "error"
latency_ms: float = 0.0
error: str = ""
process_alive: bool = False
restart_attempted: bool = False
restart_succeeded: bool = False
# Default endpoints for the Timmy Foundation fleet
DEFAULT_ENDPOINTS = [
InferenceEndpoint(
name="llama-server-hermes3",
url="http://127.0.0.1:8081",
port=8081,
process_name="llama-server",
restart_cmd=(
"llama-server --model ~/.ollama/models/blobs/sha256-c8985d "
"--port 8081 --host 127.0.0.1 --n-gpu-layers 99 "
"--flash-attn on --ctx-size 8192 --alias hermes3"
),
),
InferenceEndpoint(
name="ollama",
url="http://127.0.0.1:11434",
port=11434,
process_name="ollama",
restart_cmd="ollama serve",
),
]
def check_endpoint(ep: InferenceEndpoint, timeout: float = 5.0) -> HealthResult:
"""Check a single inference endpoint.
Args:
ep: Endpoint configuration.
timeout: HTTP timeout in seconds.
Returns:
HealthResult with status and latency.
"""
url = ep.url.rstrip("/") + ep.health_path
start = time.time()
# Check if process is alive
process_alive = False
if ep.process_name:
try:
result = subprocess.run(
["pgrep", "-f", ep.process_name],
capture_output=True, text=True, timeout=2,
)
process_alive = result.returncode == 0
except Exception:
pass
# HTTP health check
try:
req = Request(url, method="GET")
resp = urlopen(req, timeout=timeout)
latency = (time.time() - start) * 1000
if resp.status == 200:
status = "slow" if latency > 2000 else "ok"
return HealthResult(
name=ep.name, url=ep.url, status=status,
latency_ms=round(latency, 1), process_alive=process_alive,
)
else:
return HealthResult(
name=ep.name, url=ep.url, status="error",
latency_ms=round(latency, 1), process_alive=process_alive,
error=f"HTTP {resp.status}",
)
except URLError as e:
latency = (time.time() - start) * 1000
error_msg = str(e.reason) if hasattr(e, 'reason') else str(e)
return HealthResult(
name=ep.name, url=ep.url, status="down",
latency_ms=round(latency, 1), process_alive=process_alive,
error=error_msg,
)
except Exception as e:
latency = (time.time() - start) * 1000
return HealthResult(
name=ep.name, url=ep.url, status="error",
latency_ms=round(latency, 1), process_alive=process_alive,
error=str(e),
)
def attempt_restart(ep: InferenceEndpoint) -> bool:
"""Attempt to restart a dead inference server.
Args:
ep: Endpoint configuration with restart_cmd.
Returns:
True if restart command executed successfully.
"""
if not ep.restart_cmd:
return False
try:
# Run restart in background
subprocess.Popen(
ep.restart_cmd,
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Wait a moment for the process to start
time.sleep(3)
return True
except Exception as e:
print(f"Restart failed for {ep.name}: {e}", file=sys.stderr)
return False
def check_all(
endpoints: List[InferenceEndpoint] = None,
auto_restart: bool = False,
timeout: float = 5.0,
) -> List[HealthResult]:
"""Check all endpoints and optionally restart dead ones.
Args:
endpoints: List of endpoints to check. Uses DEFAULT_ENDPOINTS if None.
auto_restart: If True, attempt to restart down endpoints.
timeout: HTTP timeout per endpoint.
Returns:
List of HealthResult for each endpoint.
"""
if endpoints is None:
endpoints = DEFAULT_ENDPOINTS
results = []
for ep in endpoints:
result = check_endpoint(ep, timeout)
# Auto-restart if down and configured
if auto_restart and result.status == "down" and ep.restart_cmd:
result.restart_attempted = True
result.restart_succeeded = attempt_restart(ep)
if result.restart_succeeded:
# Re-check after restart
time.sleep(2)
result2 = check_endpoint(ep, timeout)
result.status = result2.status
result.latency_ms = result2.latency_ms
result.error = result2.error
results.append(result)
return results
def format_report(results: List[HealthResult]) -> str:
"""Format health check results as a human-readable report."""
lines = [
"# Local Inference Health Check",
f"Time: {time.strftime('%Y-%m-%d %H:%M:%S')}",
"",
"| Endpoint | Status | Latency | Process | Error |",
"|----------|--------|---------|---------|-------|",
]
for r in results:
status_icon = {"ok": "", "slow": "⚠️", "down": "", "error": "💥"}.get(r.status, "?")
proc = "alive" if r.process_alive else "dead"
lat = f"{r.latency_ms}ms" if r.latency_ms > 0 else "-"
err = r.error[:40] if r.error else "-"
lines.append(f"| {r.name} | {status_icon} {r.status} | {lat} | {proc} | {err} |")
down = [r for r in results if r.status in ("down", "error")]
if down:
lines.extend(["", "## DOWN", ""])
for r in down:
lines.append(f"- **{r.name}** ({r.url}): {r.error}")
if r.restart_attempted:
status = "✅ restarted" if r.restart_succeeded else "❌ restart failed"
lines.append(f" Restart: {status}")
return "\n".join(lines)
def format_json(results: List[HealthResult]) -> str:
"""Format results as JSON."""
data = []
for r in results:
data.append({
"name": r.name,
"url": r.url,
"status": r.status,
"latency_ms": r.latency_ms,
"process_alive": r.process_alive,
"error": r.error or None,
"restart_attempted": r.restart_attempted,
"restart_succeeded": r.restart_succeeded,
})
return json.dumps({"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), "endpoints": data}, indent=2)
def main():
import argparse
p = argparse.ArgumentParser(description="Local inference health check")
p.add_argument("--json", action="store_true", help="JSON output")
p.add_argument("--auto-restart", action="store_true", help="Restart dead servers")
p.add_argument("--timeout", type=float, default=5.0, help="HTTP timeout (seconds)")
p.add_argument("--port", type=int, help="Check specific port only")
a = p.parse_args()
endpoints = DEFAULT_ENDPOINTS
if a.port:
endpoints = [ep for ep in DEFAULT_ENDPOINTS if ep.port == a.port]
if not endpoints:
print(f"No endpoint configured for port {a.port}", file=sys.stderr)
sys.exit(1)
results = check_all(endpoints, auto_restart=a.auto_restart, timeout=a.timeout)
if a.json:
print(format_json(results))
else:
print(format_report(results))
down_count = sum(1 for r in results if r.status in ("down", "error"))
sys.exit(1 if down_count > 0 else 0)
if __name__ == "__main__":
main()

View File

@@ -1,96 +0,0 @@
"""Tests for inference health check (#713)."""
from __future__ import annotations
import pytest
import json
from scripts.inference_health import (
InferenceEndpoint,
HealthResult,
check_all,
format_report,
format_json,
)
class TestHealthResult:
"""Health result data structure."""
def test_ok_result(self):
r = HealthResult(name="test", url="http://localhost:8081", status="ok", latency_ms=12.5)
assert r.status == "ok"
assert r.latency_ms == 12.5
assert not r.error
def test_down_result(self):
r = HealthResult(
name="test", url="http://localhost:8081",
status="down", error="Connection refused",
)
assert r.status == "down"
assert r.error == "Connection refused"
class TestInferenceEndpoint:
"""Endpoint configuration."""
def test_defaults(self):
ep = InferenceEndpoint(name="test", url="http://localhost:8080")
assert ep.health_path == "/health"
assert ep.port == 8080
assert ep.restart_cmd == ""
def test_custom(self):
ep = InferenceEndpoint(
name="llama", url="http://localhost:8081",
port=8081, restart_cmd="llama-server --port 8081",
)
assert ep.port == 8081
assert "llama-server" in ep.restart_cmd
class TestFormatReport:
"""Report formatting."""
def test_all_ok(self):
results = [
HealthResult(name="test1", url="http://localhost:8080", status="ok", latency_ms=5.0, process_alive=True),
HealthResult(name="test2", url="http://localhost:8081", status="ok", latency_ms=10.0, process_alive=True),
]
report = format_report(results)
assert "Health Check" in report
assert "test1" in report
assert "test2" in report
assert "DOWN" not in report
def test_with_down(self):
results = [
HealthResult(name="test1", url="http://localhost:8080", status="ok", latency_ms=5.0),
HealthResult(
name="test2", url="http://localhost:8081",
status="down", error="Connection refused", process_alive=False,
),
]
report = format_report(results)
assert "DOWN" in report
assert "Connection refused" in report
class TestFormatJson:
"""JSON output format."""
def test_valid_json(self):
results = [HealthResult(name="test", url="http://localhost:8080", status="ok", latency_ms=5.0)]
output = format_json(results)
data = json.loads(output)
assert "timestamp" in data
assert "endpoints" in data
assert len(data["endpoints"]) == 1
assert data["endpoints"][0]["name"] == "test"
def test_none_error_serializes(self):
results = [HealthResult(name="test", url="http://localhost:8080", status="ok")]
output = format_json(results)
data = json.loads(output)
assert data["endpoints"][0]["error"] is None

View File

@@ -0,0 +1,75 @@
"""Tests for MCP PID file lock (#734)."""
import os
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Override MCP_DIR for testing
import tools.mcp_pid_lock as lock_mod
_test_dir = Path(tempfile.mkdtemp())
lock_mod._MCP_DIR = _test_dir
def test_acquire_and_release():
"""Lock can be acquired and released."""
pid = lock_mod.acquire_lock("test_server")
assert pid == os.getpid()
assert lock_mod.is_locked("test_server")
lock_mod.release_lock("test_server")
assert not lock_mod.is_locked("test_server")
def test_concurrent_lock_blocked():
"""Second acquire returns None when server running."""
lock_mod.acquire_lock("test_concurrent")
result = lock_mod.acquire_lock("test_concurrent")
assert result is None
lock_mod.release_lock("test_concurrent")
def test_stale_lock_cleaned():
"""Stale PID files are cleaned up."""
# Write a fake stale PID
pid_file = _test_dir / "stale.pid"
pid_file.write_text("99999999")
assert not lock_mod.is_locked("stale")
assert not pid_file.exists()
def test_list_locks():
"""list_locks returns only active locks."""
lock_mod.acquire_lock("list_test")
locks = lock_mod.list_locks()
assert "list_test" in locks
assert locks["list_test"] == os.getpid()
lock_mod.release_lock("list_test")
def test_cleanup_stale():
"""cleanup_stale_locks removes dead PID files."""
(_test_dir / "dead1.pid").write_text("99999998")
(_test_dir / "dead2.pid").write_text("99999999")
count = lock_mod.cleanup_stale_locks()
assert count >= 2
def test_force_release():
"""force_release kills process and removes lock."""
lock_mod.acquire_lock("force_test")
assert lock_mod.is_locked("force_test")
lock_mod.force_release("force_test")
assert not lock_mod.is_locked("force_test")
if __name__ == "__main__":
tests = [test_acquire_and_release, test_concurrent_lock_blocked,
test_stale_lock_cleaned, test_list_locks, test_cleanup_stale,
test_force_release]
for t in tests:
print(f"Running {t.__name__}...")
t()
print(" PASS")
print("\nAll tests passed.")

158
tools/mcp_pid_lock.py Normal file
View File

@@ -0,0 +1,158 @@
"""
MCP PID File Lock — Prevent concurrent MCP server instances.
Uses PID files at ~/.hermes/mcp/{name}.pid to ensure only one instance
of each MCP server runs at a time. Prevents zombie accumulation (#714).
Usage:
from tools.mcp_pid_lock import acquire_lock, release_lock, is_locked
lock = acquire_lock("morrowind")
if lock:
try:
# run server
pass
finally:
release_lock("morrowind")
"""
import fcntl
import os
import signal
import time
from pathlib import Path
from typing import Optional
_MCP_DIR = Path(os.getenv("HERMES_HOME", str(Path.home() / ".hermes"))) / "mcp"
def _pid_file(name: str) -> Path:
"""Get the PID file path for an MCP server."""
_MCP_DIR.mkdir(parents=True, exist_ok=True)
return _MCP_DIR / f"{name}.pid"
def _is_process_alive(pid: int) -> bool:
"""Check if a process is running."""
try:
os.kill(pid, 0) # Signal 0 = check if alive
return True
except ProcessLookupError:
return False
except PermissionError:
return True # Exists but we can't signal it
def _read_pid_file(name: str) -> Optional[int]:
"""Read PID from file, returns None if invalid."""
path = _pid_file(name)
if not path.exists():
return None
try:
content = path.read_text().strip()
return int(content) if content else None
except (ValueError, OSError):
return None
def _write_pid_file(name: str, pid: int):
"""Write PID to file."""
path = _pid_file(name)
path.write_text(str(pid))
def _remove_pid_file(name: str):
"""Remove PID file."""
path = _pid_file(name)
try:
path.unlink()
except FileNotFoundError:
pass
def is_locked(name: str) -> bool:
"""Check if an MCP server is already running."""
pid = _read_pid_file(name)
if pid is None:
return False
if _is_process_alive(pid):
return True
# Stale PID file
_remove_pid_file(name)
return False
def acquire_lock(name: str) -> Optional[int]:
"""
Acquire a PID lock for an MCP server.
Returns the PID if lock acquired, None if server already running.
"""
# Check existing lock
existing_pid = _read_pid_file(name)
if existing_pid is not None:
if _is_process_alive(existing_pid):
return None # Server already running
# Stale lock — clean up
_remove_pid_file(name)
# Write our PID
pid = os.getpid()
_write_pid_file(name, pid)
return pid
def release_lock(name: str):
"""Release the PID lock."""
# Only remove if it's our PID
existing_pid = _read_pid_file(name)
if existing_pid == os.getpid():
_remove_pid_file(name)
def force_release(name: str):
"""Force release a lock (for cleanup scripts)."""
pid = _read_pid_file(name)
if pid and _is_process_alive(pid):
try:
os.kill(pid, signal.SIGTERM)
time.sleep(0.5)
if _is_process_alive(pid):
os.kill(pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
_remove_pid_file(name)
def list_locks() -> dict:
"""List all active MCP locks."""
locks = {}
if not _MCP_DIR.exists():
return locks
for pid_file in _MCP_DIR.glob("*.pid"):
name = pid_file.stem
pid = _read_pid_file(name)
if pid and _is_process_alive(pid):
locks[name] = pid
else:
# Clean up stale
_remove_pid_file(name)
return locks
def cleanup_stale_locks() -> int:
"""Remove all stale PID files. Returns count cleaned."""
cleaned = 0
if not _MCP_DIR.exists():
return 0
for pid_file in _MCP_DIR.glob("*.pid"):
name = pid_file.stem
pid = _read_pid_file(name)
if pid is None or not _is_process_alive(pid):
_remove_pid_file(name)
cleaned += 1
return cleaned