Compare commits

..

2 Commits

Author SHA1 Message Date
590b601b5c test: MCP PID lock tests
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 58s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 1m2s
Tests / e2e (pull_request) Successful in 2m15s
Tests / test (pull_request) Failing after 53m34s
Part of #734
2026-04-15 02:51:02 +00:00
c4aad087d4 feat: MCP PID file lock to prevent concurrent instances
Closes #734

Uses PID files at ~/.hermes/mcp/{name}.pid to ensure only one
instance of each MCP server runs. Prevents zombie accumulation.
2026-04-15 02:50:54 +00:00
4 changed files with 233 additions and 288 deletions

View File

@@ -1,55 +0,0 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -0,0 +1,75 @@
"""Tests for MCP PID file lock (#734)."""
import os
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Override MCP_DIR for testing
import tools.mcp_pid_lock as lock_mod
_test_dir = Path(tempfile.mkdtemp())
lock_mod._MCP_DIR = _test_dir
def test_acquire_and_release():
"""Lock can be acquired and released."""
pid = lock_mod.acquire_lock("test_server")
assert pid == os.getpid()
assert lock_mod.is_locked("test_server")
lock_mod.release_lock("test_server")
assert not lock_mod.is_locked("test_server")
def test_concurrent_lock_blocked():
"""Second acquire returns None when server running."""
lock_mod.acquire_lock("test_concurrent")
result = lock_mod.acquire_lock("test_concurrent")
assert result is None
lock_mod.release_lock("test_concurrent")
def test_stale_lock_cleaned():
"""Stale PID files are cleaned up."""
# Write a fake stale PID
pid_file = _test_dir / "stale.pid"
pid_file.write_text("99999999")
assert not lock_mod.is_locked("stale")
assert not pid_file.exists()
def test_list_locks():
"""list_locks returns only active locks."""
lock_mod.acquire_lock("list_test")
locks = lock_mod.list_locks()
assert "list_test" in locks
assert locks["list_test"] == os.getpid()
lock_mod.release_lock("list_test")
def test_cleanup_stale():
"""cleanup_stale_locks removes dead PID files."""
(_test_dir / "dead1.pid").write_text("99999998")
(_test_dir / "dead2.pid").write_text("99999999")
count = lock_mod.cleanup_stale_locks()
assert count >= 2
def test_force_release():
"""force_release kills process and removes lock."""
lock_mod.acquire_lock("force_test")
assert lock_mod.is_locked("force_test")
lock_mod.force_release("force_test")
assert not lock_mod.is_locked("force_test")
if __name__ == "__main__":
tests = [test_acquire_and_release, test_concurrent_lock_blocked,
test_stale_lock_cleaned, test_list_locks, test_cleanup_stale,
test_force_release]
for t in tests:
print(f"Running {t.__name__}...")
t()
print(" PASS")
print("\nAll tests passed.")

View File

@@ -1,233 +0,0 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"

158
tools/mcp_pid_lock.py Normal file
View File

@@ -0,0 +1,158 @@
"""
MCP PID File Lock — Prevent concurrent MCP server instances.
Uses PID files at ~/.hermes/mcp/{name}.pid to ensure only one instance
of each MCP server runs at a time. Prevents zombie accumulation (#714).
Usage:
from tools.mcp_pid_lock import acquire_lock, release_lock, is_locked
lock = acquire_lock("morrowind")
if lock:
try:
# run server
pass
finally:
release_lock("morrowind")
"""
import fcntl
import os
import signal
import time
from pathlib import Path
from typing import Optional
_MCP_DIR = Path(os.getenv("HERMES_HOME", str(Path.home() / ".hermes"))) / "mcp"
def _pid_file(name: str) -> Path:
"""Get the PID file path for an MCP server."""
_MCP_DIR.mkdir(parents=True, exist_ok=True)
return _MCP_DIR / f"{name}.pid"
def _is_process_alive(pid: int) -> bool:
"""Check if a process is running."""
try:
os.kill(pid, 0) # Signal 0 = check if alive
return True
except ProcessLookupError:
return False
except PermissionError:
return True # Exists but we can't signal it
def _read_pid_file(name: str) -> Optional[int]:
"""Read PID from file, returns None if invalid."""
path = _pid_file(name)
if not path.exists():
return None
try:
content = path.read_text().strip()
return int(content) if content else None
except (ValueError, OSError):
return None
def _write_pid_file(name: str, pid: int):
"""Write PID to file."""
path = _pid_file(name)
path.write_text(str(pid))
def _remove_pid_file(name: str):
"""Remove PID file."""
path = _pid_file(name)
try:
path.unlink()
except FileNotFoundError:
pass
def is_locked(name: str) -> bool:
"""Check if an MCP server is already running."""
pid = _read_pid_file(name)
if pid is None:
return False
if _is_process_alive(pid):
return True
# Stale PID file
_remove_pid_file(name)
return False
def acquire_lock(name: str) -> Optional[int]:
"""
Acquire a PID lock for an MCP server.
Returns the PID if lock acquired, None if server already running.
"""
# Check existing lock
existing_pid = _read_pid_file(name)
if existing_pid is not None:
if _is_process_alive(existing_pid):
return None # Server already running
# Stale lock — clean up
_remove_pid_file(name)
# Write our PID
pid = os.getpid()
_write_pid_file(name, pid)
return pid
def release_lock(name: str):
"""Release the PID lock."""
# Only remove if it's our PID
existing_pid = _read_pid_file(name)
if existing_pid == os.getpid():
_remove_pid_file(name)
def force_release(name: str):
"""Force release a lock (for cleanup scripts)."""
pid = _read_pid_file(name)
if pid and _is_process_alive(pid):
try:
os.kill(pid, signal.SIGTERM)
time.sleep(0.5)
if _is_process_alive(pid):
os.kill(pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
_remove_pid_file(name)
def list_locks() -> dict:
"""List all active MCP locks."""
locks = {}
if not _MCP_DIR.exists():
return locks
for pid_file in _MCP_DIR.glob("*.pid"):
name = pid_file.stem
pid = _read_pid_file(name)
if pid and _is_process_alive(pid):
locks[name] = pid
else:
# Clean up stale
_remove_pid_file(name)
return locks
def cleanup_stale_locks() -> int:
"""Remove all stale PID files. Returns count cleaned."""
cleaned = 0
if not _MCP_DIR.exists():
return 0
for pid_file in _MCP_DIR.glob("*.pid"):
name = pid_file.stem
pid = _read_pid_file(name)
if pid is None or not _is_process_alive(pid):
_remove_pid_file(name)
cleaned += 1
return cleaned