Compare commits

..

2 Commits

Author SHA1 Message Date
590b601b5c test: MCP PID lock tests
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 58s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 1m2s
Tests / e2e (pull_request) Successful in 2m15s
Tests / test (pull_request) Failing after 53m34s
Part of #734
2026-04-15 02:51:02 +00:00
c4aad087d4 feat: MCP PID file lock to prevent concurrent instances
Closes #734

Uses PID files at ~/.hermes/mcp/{name}.pid to ensure only one
instance of each MCP server runs. Prevents zombie accumulation.
2026-04-15 02:50:54 +00:00
5 changed files with 233 additions and 398 deletions

View File

@@ -1,80 +0,0 @@
# Approval Tier System
Graduated safety for command approval based on risk level.
## Tiers
| Tier | Name | Action Types | Who Approves | Timeout |
|------|------|--------------|--------------|---------|
| 0 | SAFE | Read, search, list, view | None | N/A |
| 1 | LOW | Write, create, edit, script | LLM only | N/A |
| 2 | MEDIUM | Messages, API, email | Human + LLM | 60s |
| 3 | HIGH | Crypto, config, deploy | Human + LLM | 30s |
| 4 | CRITICAL | Delete, kill, shutdown | Human + LLM | 10s |
## How It Works
1. **Detection**: `detect_tier(command, action)` analyzes the command and action type
2. **Auto-approve**: SAFE and LOW tiers are automatically approved
3. **Human approval**: MEDIUM+ tiers require human confirmation
4. **Timeout handling**: If no response within timeout, escalate to next tier
5. **Crisis bypass**: 988 Lifeline commands bypass approval entirely
## Usage
```python
from tools.approval import TieredApproval, detect_tier, ApprovalTier
# Detect tier
tier = detect_tier("rm -rf /tmp/data") # Returns ApprovalTier.CRITICAL
# Request approval
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
if result["approved"]:
# Auto-approved (SAFE or LOW tier)
execute_command()
else:
# Needs human approval
show_approval_ui(result["approval_id"], result["tier"], result["timeout"])
```
## Crisis Bypass
Commands containing crisis keywords (988, suicide, self-harm, crisis hotline) automatically bypass approval to ensure immediate help:
```python
from tools.approval import is_crisis_bypass
is_crisis_bypass("call 988 for help") # True — bypasses approval
```
## Timeout Escalation
When a tier times out without human response:
- MEDIUM → HIGH (30s timeout)
- HIGH → CRITICAL (10s timeout)
- CRITICAL → Deny
## Integration
The tier system integrates with:
- **CLI**: Interactive prompts with tier-aware timeouts
- **Gateway**: Telegram/Discord approval buttons
- **Cron**: Auto-approve LOW tier, escalate MEDIUM+
## Testing
Run tests with:
```bash
python -m pytest tests/test_approval_tiers.py -v
```
26 tests covering:
- Tier detection from commands and actions
- Timeout values per tier
- Approver requirements
- Crisis bypass logic
- Approval request and resolution
- Timeout escalation

View File

@@ -1,141 +0,0 @@
"""Tests for approval tier system (Issue #670)."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools.approval import (
ApprovalTier, detect_tier, get_tier_timeout, get_tier_approvers,
requires_human_approval, is_crisis_bypass, TieredApproval, get_tiered_approval
)
class TestApprovalTier:
def test_safe_read(self):
assert detect_tier("cat file.txt") == ApprovalTier.SAFE
def test_safe_search(self):
assert detect_tier("grep pattern file") == ApprovalTier.SAFE
def test_low_write(self):
assert detect_tier("write to file", action="write") == ApprovalTier.LOW
def test_medium_message(self):
assert detect_tier("send message", action="send_message") == ApprovalTier.MEDIUM
def test_high_config(self):
assert detect_tier("edit config", action="config") == ApprovalTier.HIGH
def test_critical_delete(self):
assert detect_tier("rm -rf /", action="delete") == ApprovalTier.CRITICAL
def test_crisis_keyword(self):
assert detect_tier("call 988 for help") == ApprovalTier.CRITICAL
def test_dangerous_pattern_escalation(self):
# rm -rf should be CRITICAL
assert detect_tier("rm -rf /tmp/data") == ApprovalTier.CRITICAL
class TestTierTimeouts:
def test_safe_no_timeout(self):
assert get_tier_timeout(ApprovalTier.SAFE) == 0
def test_medium_60s(self):
assert get_tier_timeout(ApprovalTier.MEDIUM) == 60
def test_high_30s(self):
assert get_tier_timeout(ApprovalTier.HIGH) == 30
def test_critical_10s(self):
assert get_tier_timeout(ApprovalTier.CRITICAL) == 10
class TestTierApprovers:
def test_safe_no_approvers(self):
assert get_tier_approvers(ApprovalTier.SAFE) == ()
def test_low_llm_only(self):
assert get_tier_approvers(ApprovalTier.LOW) == ("llm",)
def test_medium_human_llm(self):
assert get_tier_approvers(ApprovalTier.MEDIUM) == ("human", "llm")
def test_requires_human(self):
assert requires_human_approval(ApprovalTier.SAFE) == False
assert requires_human_approval(ApprovalTier.LOW) == False
assert requires_human_approval(ApprovalTier.MEDIUM) == True
assert requires_human_approval(ApprovalTier.HIGH) == True
assert requires_human_approval(ApprovalTier.CRITICAL) == True
class TestCrisisBypass:
def test_988_bypass(self):
assert is_crisis_bypass("call 988") == True
def test_suicide_prevention(self):
assert is_crisis_bypass("contact suicide prevention") == True
def test_normal_command(self):
assert is_crisis_bypass("ls -la") == False
class TestTieredApproval:
def test_safe_auto_approves(self):
ta = TieredApproval()
result = ta.request_approval("session1", "cat file.txt")
assert result["approved"] == True
assert result["tier"] == ApprovalTier.SAFE
def test_low_auto_approves(self):
ta = TieredApproval()
result = ta.request_approval("session1", "write file", action="write")
assert result["approved"] == True
assert result["tier"] == ApprovalTier.LOW
def test_medium_needs_approval(self):
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
assert result["approved"] == False
assert result["tier"] == ApprovalTier.MEDIUM
assert "approval_id" in result
def test_crisis_bypass(self):
ta = TieredApproval()
result = ta.request_approval("session1", "call 988 for help")
assert result["approved"] == True
assert result["reason"] == "crisis_bypass"
def test_resolve_approval(self):
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
approval_id = result["approval_id"]
assert ta.resolve_approval(approval_id, True) == True
assert approval_id not in ta._pending
def test_timeout_escalation(self):
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
approval_id = result["approval_id"]
# Manually set timeout to past
ta._timeouts[approval_id] = 0
timed_out = ta.check_timeouts()
assert approval_id in timed_out
# Should have escalated to HIGH tier
if approval_id in ta._pending:
assert ta._pending[approval_id]["tier"] == ApprovalTier.HIGH
class TestGetTieredApproval:
def test_singleton(self):
ta1 = get_tiered_approval()
ta2 = get_tiered_approval()
assert ta1 is ta2
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,75 @@
"""Tests for MCP PID file lock (#734)."""
import os
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Override MCP_DIR for testing
import tools.mcp_pid_lock as lock_mod
_test_dir = Path(tempfile.mkdtemp())
lock_mod._MCP_DIR = _test_dir
def test_acquire_and_release():
"""Lock can be acquired and released."""
pid = lock_mod.acquire_lock("test_server")
assert pid == os.getpid()
assert lock_mod.is_locked("test_server")
lock_mod.release_lock("test_server")
assert not lock_mod.is_locked("test_server")
def test_concurrent_lock_blocked():
"""Second acquire returns None when server running."""
lock_mod.acquire_lock("test_concurrent")
result = lock_mod.acquire_lock("test_concurrent")
assert result is None
lock_mod.release_lock("test_concurrent")
def test_stale_lock_cleaned():
"""Stale PID files are cleaned up."""
# Write a fake stale PID
pid_file = _test_dir / "stale.pid"
pid_file.write_text("99999999")
assert not lock_mod.is_locked("stale")
assert not pid_file.exists()
def test_list_locks():
"""list_locks returns only active locks."""
lock_mod.acquire_lock("list_test")
locks = lock_mod.list_locks()
assert "list_test" in locks
assert locks["list_test"] == os.getpid()
lock_mod.release_lock("list_test")
def test_cleanup_stale():
"""cleanup_stale_locks removes dead PID files."""
(_test_dir / "dead1.pid").write_text("99999998")
(_test_dir / "dead2.pid").write_text("99999999")
count = lock_mod.cleanup_stale_locks()
assert count >= 2
def test_force_release():
"""force_release kills process and removes lock."""
lock_mod.acquire_lock("force_test")
assert lock_mod.is_locked("force_test")
lock_mod.force_release("force_test")
assert not lock_mod.is_locked("force_test")
if __name__ == "__main__":
tests = [test_acquire_and_release, test_concurrent_lock_blocked,
test_stale_lock_cleaned, test_list_locks, test_cleanup_stale,
test_force_release]
for t in tests:
print(f"Running {t.__name__}...")
t()
print(" PASS")
print("\nAll tests passed.")

View File

@@ -133,183 +133,6 @@ DANGEROUS_PATTERNS = [
]
# =========================================================================
# Approval Tier System (Issue #670)
# =========================================================================
from enum import IntEnum
import time
class ApprovalTier(IntEnum):
"""Safety tiers for command approval.
Tier 0 (SAFE): Read, search — no approval needed
Tier 1 (LOW): Write, scripts — LLM approval only
Tier 2 (MEDIUM): Messages, API — human + LLM, 60s timeout
Tier 3 (HIGH): Crypto, config — human + LLM, 30s timeout
Tier 4 (CRITICAL): Crisis — human + LLM, 10s timeout
"""
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
TIER_PATTERNS = {
# Tier 0: Safe
"read": ApprovalTier.SAFE, "search": ApprovalTier.SAFE, "list": ApprovalTier.SAFE,
"view": ApprovalTier.SAFE, "cat": ApprovalTier.SAFE, "grep": ApprovalTier.SAFE,
# Tier 1: Low
"write": ApprovalTier.LOW, "create": ApprovalTier.LOW, "edit": ApprovalTier.LOW,
"patch": ApprovalTier.LOW, "copy": ApprovalTier.LOW, "mkdir": ApprovalTier.LOW,
"script": ApprovalTier.LOW, "execute": ApprovalTier.LOW, "run": ApprovalTier.LOW,
# Tier 2: Medium
"send_message": ApprovalTier.MEDIUM, "message": ApprovalTier.MEDIUM,
"email": ApprovalTier.MEDIUM, "api": ApprovalTier.MEDIUM, "post": ApprovalTier.MEDIUM,
"telegram": ApprovalTier.MEDIUM, "discord": ApprovalTier.MEDIUM,
# Tier 3: High
"crypto": ApprovalTier.HIGH, "bitcoin": ApprovalTier.HIGH, "wallet": ApprovalTier.HIGH,
"key": ApprovalTier.HIGH, "secret": ApprovalTier.HIGH, "config": ApprovalTier.HIGH,
"deploy": ApprovalTier.HIGH, "install": ApprovalTier.HIGH, "systemctl": ApprovalTier.HIGH,
# Tier 4: Critical
"delete": ApprovalTier.CRITICAL, "remove": ApprovalTier.CRITICAL, "rm": ApprovalTier.CRITICAL,
"format": ApprovalTier.CRITICAL, "kill": ApprovalTier.CRITICAL, "shutdown": ApprovalTier.CRITICAL,
"crisis": ApprovalTier.CRITICAL, "suicide": ApprovalTier.CRITICAL,
}
TIER_TIMEOUTS = {
ApprovalTier.SAFE: 0, ApprovalTier.LOW: 0, ApprovalTier.MEDIUM: 60,
ApprovalTier.HIGH: 30, ApprovalTier.CRITICAL: 10,
}
TIER_APPROVERS = {
ApprovalTier.SAFE: (), ApprovalTier.LOW: ("llm",),
ApprovalTier.MEDIUM: ("human", "llm"), ApprovalTier.HIGH: ("human", "llm"),
ApprovalTier.CRITICAL: ("human", "llm"),
}
def detect_tier(command, action="", context=None):
"""Detect approval tier for a command or action."""
# Crisis keywords always CRITICAL
crisis_keywords = ["988", "suicide", "self-harm", "crisis", "emergency"]
for kw in crisis_keywords:
if kw in command.lower():
return ApprovalTier.CRITICAL
# Check action type
if action and action.lower() in TIER_PATTERNS:
return TIER_PATTERNS[action.lower()]
# Check command for keywords
cmd_lower = command.lower()
best_tier = ApprovalTier.SAFE
for keyword, tier in TIER_PATTERNS.items():
if keyword in cmd_lower and tier > best_tier:
best_tier = tier
# Check dangerous patterns
is_dangerous, _, description = detect_dangerous_command(command)
if is_dangerous:
desc_lower = description.lower()
if any(k in desc_lower for k in ["delete", "remove", "format", "drop", "kill"]):
return ApprovalTier.CRITICAL
elif any(k in desc_lower for k in ["chmod", "chown", "systemctl", "config"]):
return max(best_tier, ApprovalTier.HIGH)
else:
return max(best_tier, ApprovalTier.MEDIUM)
return best_tier
def get_tier_timeout(tier):
return TIER_TIMEOUTS.get(tier, 60)
def get_tier_approvers(tier):
return TIER_APPROVERS.get(tier, ("human", "llm"))
def requires_human_approval(tier):
return "human" in get_tier_approvers(tier)
def is_crisis_bypass(command):
"""Check if command qualifies for crisis bypass (988 Lifeline)."""
indicators = ["988", "suicide prevention", "crisis hotline", "lifeline", "emergency help"]
cmd_lower = command.lower()
return any(i in cmd_lower for i in indicators)
class TieredApproval:
"""Tiered approval handler."""
def __init__(self):
self._pending = {}
self._timeouts = {}
def request_approval(self, session_key, command, action="", context=None):
"""Request approval based on tier. Returns approval dict."""
tier = detect_tier(command, action, context)
timeout = get_tier_timeout(tier)
approvers = get_tier_approvers(tier)
# Crisis bypass
if tier == ApprovalTier.CRITICAL and is_crisis_bypass(command):
return {"approved": True, "tier": tier, "reason": "crisis_bypass", "timeout": 0, "approvers": ()}
# Safe/Low auto-approve
if tier <= ApprovalTier.LOW:
return {"approved": True, "tier": tier, "reason": "auto_approve", "timeout": 0, "approvers": approvers}
# Higher tiers need approval
import uuid
approval_id = f"{session_key}_{uuid.uuid4().hex[:8]}"
self._pending[approval_id] = {
"session_key": session_key, "command": command, "action": action,
"tier": tier, "timeout": timeout, "approvers": approvers, "created_at": time.time(),
}
if timeout > 0:
self._timeouts[approval_id] = time.time() + timeout
return {
"approved": False, "tier": tier, "approval_id": approval_id,
"timeout": timeout, "approvers": approvers,
"requires_human": requires_human_approval(tier),
}
def resolve_approval(self, approval_id, approved, approver="human"):
"""Resolve a pending approval."""
if approval_id not in self._pending:
return False
self._pending.pop(approval_id)
self._timeouts.pop(approval_id, None)
return approved
def check_timeouts(self):
"""Check for timed-out approvals and auto-escalate."""
now = time.time()
timed_out = []
for aid, timeout_at in list(self._timeouts.items()):
if now > timeout_at:
timed_out.append(aid)
if aid in self._pending:
pending = self._pending[aid]
current_tier = pending["tier"]
if current_tier < ApprovalTier.CRITICAL:
pending["tier"] = ApprovalTier(current_tier + 1)
pending["timeout"] = get_tier_timeout(pending["tier"])
self._timeouts[aid] = now + pending["timeout"]
else:
self._pending.pop(aid, None)
self._timeouts.pop(aid, None)
return timed_out
_tiered_approval = TieredApproval()
def get_tiered_approval():
return _tiered_approval
def _legacy_pattern_key(pattern: str) -> str:
"""Reproduce the old regex-derived approval key for backwards compatibility."""
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]

158
tools/mcp_pid_lock.py Normal file
View File

@@ -0,0 +1,158 @@
"""
MCP PID File Lock — Prevent concurrent MCP server instances.
Uses PID files at ~/.hermes/mcp/{name}.pid to ensure only one instance
of each MCP server runs at a time. Prevents zombie accumulation (#714).
Usage:
from tools.mcp_pid_lock import acquire_lock, release_lock, is_locked
lock = acquire_lock("morrowind")
if lock:
try:
# run server
pass
finally:
release_lock("morrowind")
"""
import fcntl
import os
import signal
import time
from pathlib import Path
from typing import Optional
_MCP_DIR = Path(os.getenv("HERMES_HOME", str(Path.home() / ".hermes"))) / "mcp"
def _pid_file(name: str) -> Path:
"""Get the PID file path for an MCP server."""
_MCP_DIR.mkdir(parents=True, exist_ok=True)
return _MCP_DIR / f"{name}.pid"
def _is_process_alive(pid: int) -> bool:
"""Check if a process is running."""
try:
os.kill(pid, 0) # Signal 0 = check if alive
return True
except ProcessLookupError:
return False
except PermissionError:
return True # Exists but we can't signal it
def _read_pid_file(name: str) -> Optional[int]:
"""Read PID from file, returns None if invalid."""
path = _pid_file(name)
if not path.exists():
return None
try:
content = path.read_text().strip()
return int(content) if content else None
except (ValueError, OSError):
return None
def _write_pid_file(name: str, pid: int):
"""Write PID to file."""
path = _pid_file(name)
path.write_text(str(pid))
def _remove_pid_file(name: str):
"""Remove PID file."""
path = _pid_file(name)
try:
path.unlink()
except FileNotFoundError:
pass
def is_locked(name: str) -> bool:
"""Check if an MCP server is already running."""
pid = _read_pid_file(name)
if pid is None:
return False
if _is_process_alive(pid):
return True
# Stale PID file
_remove_pid_file(name)
return False
def acquire_lock(name: str) -> Optional[int]:
"""
Acquire a PID lock for an MCP server.
Returns the PID if lock acquired, None if server already running.
"""
# Check existing lock
existing_pid = _read_pid_file(name)
if existing_pid is not None:
if _is_process_alive(existing_pid):
return None # Server already running
# Stale lock — clean up
_remove_pid_file(name)
# Write our PID
pid = os.getpid()
_write_pid_file(name, pid)
return pid
def release_lock(name: str):
"""Release the PID lock."""
# Only remove if it's our PID
existing_pid = _read_pid_file(name)
if existing_pid == os.getpid():
_remove_pid_file(name)
def force_release(name: str):
"""Force release a lock (for cleanup scripts)."""
pid = _read_pid_file(name)
if pid and _is_process_alive(pid):
try:
os.kill(pid, signal.SIGTERM)
time.sleep(0.5)
if _is_process_alive(pid):
os.kill(pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
_remove_pid_file(name)
def list_locks() -> dict:
"""List all active MCP locks."""
locks = {}
if not _MCP_DIR.exists():
return locks
for pid_file in _MCP_DIR.glob("*.pid"):
name = pid_file.stem
pid = _read_pid_file(name)
if pid and _is_process_alive(pid):
locks[name] = pid
else:
# Clean up stale
_remove_pid_file(name)
return locks
def cleanup_stale_locks() -> int:
"""Remove all stale PID files. Returns count cleaned."""
cleaned = 0
if not _MCP_DIR.exists():
return 0
for pid_file in _MCP_DIR.glob("*.pid"):
name = pid_file.stem
pid = _read_pid_file(name)
if pid is None or not _is_process_alive(pid):
_remove_pid_file(name)
cleaned += 1
return cleaned