Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
c2eb7bac74 fix: MCP zombie process cleanup (#714)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 44s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 38s
Tests / e2e (pull_request) Successful in 3m56s
Tests / test (pull_request) Failing after 41m56s
Adds:
- scripts/mcp_zombie_cleanup.py — kill orphaned MCP processes
- scripts/mcp_watchdog.py — periodic cleanup daemon

Fixes #714
2026-04-14 22:39:33 -04:00
3 changed files with 293 additions and 162 deletions

83
scripts/mcp_watchdog.py Executable file
View File

@@ -0,0 +1,83 @@
#!/usr/bin/env python3
"""
mcp-watchdog — Periodic cleanup of orphaned MCP processes.
Runs as a cron job or daemon to prevent process accumulation.
Usage:
python scripts/mcp_watchdog.py # one-shot check
python scripts/mcp_watchdog.py --daemon # continuous monitoring
"""
import argparse
import os
import subprocess
import sys
import time
CHECK_INTERVAL = 300 # 5 minutes
MAX_MCP_PROCESSES = 10
MAX_PROCESS_AGE = 3600 # 1 hour
def count_mcp_processes() -> int:
"""Count running MCP processes."""
try:
result = subprocess.run(
["pgrep", "-f", "mcp_server|morrowind|mcp-serve|fastmcp"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
return len([p for p in result.stdout.strip().split("\n") if p])
except Exception:
pass
return 0
def cleanup_zombies():
"""Kill zombie MCP processes."""
script = os.path.join(os.path.dirname(__file__), "mcp_zombie_cleanup.py")
if os.path.exists(script):
subprocess.run(
[sys.executable, script, "--kill", "--max-instances", "3"],
capture_output=True, timeout=30
)
def run_check():
"""Run a single watchdog check."""
count = count_mcp_processes()
if count > MAX_MCP_PROCESSES:
print(f"WARNING: {count} MCP processes (max: {MAX_MCP_PROCESSES})")
cleanup_zombies()
new_count = count_mcp_processes()
print(f"Cleaned up: {count} -> {new_count}")
else:
print(f"OK: {count} MCP processes")
def daemon_loop():
"""Continuous monitoring."""
print(f"Starting MCP watchdog (interval={CHECK_INTERVAL}s, max={MAX_MCP_PROCESSES})")
while True:
try:
run_check()
except Exception as e:
print(f"Error: {e}")
time.sleep(CHECK_INTERVAL)
def main():
parser = argparse.ArgumentParser(description="MCP process watchdog")
parser.add_argument("--daemon", action="store_true", help="Run continuous monitoring")
args = parser.parse_args()
if args.daemon:
daemon_loop()
else:
run_check()
if __name__ == "__main__":
main()

210
scripts/mcp_zombie_cleanup.py Executable file
View File

@@ -0,0 +1,210 @@
#!/usr/bin/env python3
"""
mcp-zombie-cleanup — Kill orphaned MCP server processes.
Fix for #714: ~80 zombie morrowind/mcp_server.py processes on Mac.
Usage:
python scripts/mcp_zombie_cleanup.py # dry run
python scripts/mcp_zombie_cleanup.py --kill # kill zombies
python scripts/mcp_zombie_cleanup.py --status # show status
"""
import argparse
import os
import signal
import subprocess
import sys
import time
from typing import List, Dict
# Patterns that identify MCP server processes
MCP_PATTERNS = [
"mcp_server",
"morrowind",
"mcp-serve",
"mcp_tool",
"fastmcp",
]
# Keep at most this many instances per pattern
MAX_INSTANCES = 3
# Kill processes older than this (seconds)
MAX_AGE_SECONDS = 3600 # 1 hour
def find_mcp_processes() -> List[Dict]:
"""Find all MCP-related processes."""
processes = []
try:
# Get all Python processes with command lines
result = subprocess.run(
["ps", "aux"],
capture_output=True, text=True, timeout=10
)
for line in result.stdout.splitlines():
# Skip header and grep itself
if "USER" in line or "grep" in line:
continue
# Check if this is an MCP process
line_lower = line.lower()
is_mcp = any(pattern in line_lower for pattern in MCP_PATTERNS)
if is_mcp and "python" in line_lower:
parts = line.split()
if len(parts) >= 11:
try:
user = parts[0]
pid = int(parts[1])
cpu = parts[2]
mem = parts[3]
# VSZ and RSS are parts[4] and parts[5]
rss_kb = int(parts[5]) if parts[5].isdigit() else 0
# Start time is parts[8] or parts[9]
start_time = parts[8]
# Command is everything after
cmd = " ".join(parts[10:])
processes.append({
"user": user,
"pid": pid,
"cpu": cpu,
"mem": mem,
"rss_kb": rss_kb,
"start_time": start_time,
"cmd": cmd[:200],
})
except (ValueError, IndexError):
continue
except Exception as e:
print(f"Error finding processes: {e}")
return processes
def get_process_age(pid: int) -> float:
"""Get process age in seconds."""
try:
result = subprocess.run(
["ps", "-o", "etimes=", "-p", str(pid)],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
return float(result.stdout.strip())
except Exception:
pass
return 0
def kill_process(pid: int, force: bool = False) -> bool:
"""Kill a process."""
try:
sig = signal.SIGKILL if force else signal.SIGTERM
os.kill(pid, sig)
return True
except ProcessLookupError:
return True # Already dead
except PermissionError:
print(f" Permission denied for PID {pid}")
return False
except Exception as e:
print(f" Error killing PID {pid}: {e}")
return False
def main():
parser = argparse.ArgumentParser(description="Clean up zombie MCP processes")
parser.add_argument("--kill", action="store_true", help="Kill zombie processes")
parser.add_argument("--force", action="store_true", help="Use SIGKILL instead of SIGTERM")
parser.add_argument("--status", action="store_true", help="Show status only")
parser.add_argument("--max-instances", type=int, default=MAX_INSTANCES)
parser.add_argument("--max-age", type=int, default=MAX_AGE_SECONDS)
args = parser.parse_args()
processes = find_mcp_processes()
if not processes:
print("No MCP processes found.")
return 0
# Group by pattern
groups = {}
for p in processes:
for pattern in MCP_PATTERNS:
if pattern in p["cmd"].lower():
if pattern not in groups:
groups[pattern] = []
groups[pattern].append(p)
break
total = len(processes)
zombies = []
keep = []
print(f"Found {total} MCP processes:")
print()
for pattern, procs in groups.items():
# Sort by PID (higher = newer)
procs.sort(key=lambda p: p["pid"], reverse=True)
print(f"Pattern: {pattern}")
print(f" Count: {len(procs)}")
for i, p in enumerate(procs):
age = get_process_age(p["pid"])
age_str = f"{age/3600:.1f}h" if age > 3600 else f"{age/60:.0f}m"
status = "KEEP" if i < args.max_instances else "ZOMBIE"
if age > args.max_age:
status = "STALE"
rss_mb = p["rss_kb"] / 1024
print(f" PID {p['pid']:>6} | {rss_mb:>6.1f}MB | {age_str:>6} | {status} | {p['cmd'][:60]}")
if status in ("ZOMBIE", "STALE"):
zombies.append(p)
else:
keep.append(p)
print()
print(f"Summary:")
print(f" Total processes: {total}")
print(f" Keep: {len(keep)}")
print(f" Zombies: {len(zombies)}")
print(f" Total RSS: {sum(p['rss_kb'] for p in processes) / 1024:.1f} MB")
if args.status:
return 0
if not zombies:
print()
print("No zombies to clean up.")
return 0
if not args.kill:
print()
print("DRY RUN: Add --kill to terminate zombie processes")
return 0
print()
print("Killing zombie processes...")
killed = 0
for p in zombies:
if kill_process(p["pid"], force=args.force):
killed += 1
print(f" Killed PID {p['pid']}")
time.sleep(0.1) # Brief pause between kills
print(f"
Killed {killed}/{len(zombies)} processes.")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,162 +0,0 @@
"""Tests for Telegram thread-aware session routing.
Verifies that messages in different threads/topics get independent
conversation histories.
"""
import pytest
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
from gateway.session import build_session_key, SessionSource
from gateway.platforms.base import Platform
class TestThreadSessionKey:
"""Verify session keys include thread_id for isolation."""
def test_dm_with_thread_gets_unique_key(self):
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123456",
chat_type="dm",
thread_id="100",
)
key = build_session_key(source)
assert "123456" in key
assert "100" in key
assert key == "agent:main:telegram:dm:123456:100"
def test_dm_without_thread_uses_chat_only(self):
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123456",
chat_type="dm",
)
key = build_session_key(source)
assert key == "agent:main:telegram:dm:123456"
assert ":100" not in key
def test_different_threads_different_keys(self):
source_a = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123456",
chat_type="dm",
thread_id="100",
)
source_b = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123456",
chat_type="dm",
thread_id="200",
)
key_a = build_session_key(source_a)
key_b = build_session_key(source_b)
assert key_a != key_b
def test_same_thread_same_key(self):
source_a = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123456",
chat_type="dm",
thread_id="100",
)
source_b = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123456",
chat_type="dm",
thread_id="100",
)
assert build_session_key(source_a) == build_session_key(source_b)
def test_group_with_thread_includes_thread(self):
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="789",
chat_type="group",
thread_id="50",
user_id="user1",
)
key = build_session_key(source)
assert "789" in key
assert "50" in key
def test_group_without_thread_isolates_by_user(self):
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="789",
chat_type="group",
user_id="user1",
)
key = build_session_key(source, group_sessions_per_user=True)
assert "789" in key
assert "user1" in key
def test_group_thread_shared_across_users(self):
"""In threads, all participants share the same session by default."""
source_a = SessionSource(
platform=Platform.TELEGRAM,
chat_id="789",
chat_type="group",
thread_id="50",
user_id="user1",
)
source_b = SessionSource(
platform=Platform.TELEGRAM,
chat_id="789",
chat_type="group",
thread_id="50",
user_id="user2",
)
key_a = build_session_key(source_a, thread_sessions_per_user=False)
key_b = build_session_key(source_b, thread_sessions_per_user=False)
assert key_a == key_b # Shared session in thread
def test_group_thread_per_user_when_enabled(self):
"""With thread_sessions_per_user=True, users get isolated sessions."""
source_a = SessionSource(
platform=Platform.TELEGRAM,
chat_id="789",
chat_type="group",
thread_id="50",
user_id="user1",
)
source_b = SessionSource(
platform=Platform.TELEGRAM,
chat_id="789",
chat_type="group",
thread_id="50",
user_id="user2",
)
key_a = build_session_key(source_a, thread_sessions_per_user=True)
key_b = build_session_key(source_b, thread_sessions_per_user=True)
assert key_a != key_b
class TestSessionSourceSerialization:
"""Verify SessionSource round-trips correctly with thread_id."""
def test_thread_id_preserved_in_dict(self):
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123",
chat_type="dm",
thread_id="456",
)
d = source.to_dict()
assert d["thread_id"] == "456"
restored = SessionSource.from_dict(d)
assert restored.thread_id == "456"
def test_none_thread_id_preserved(self):
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123",
chat_type="dm",
)
d = source.to_dict()
assert d.get("thread_id") is None
restored = SessionSource.from_dict(d)
assert restored.thread_id is None