Compare commits

..

1 Commits

Author SHA1 Message Date
5e693bee17 fix: MCP zombie process cleanup script (#714)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 29s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 35s
Tests / e2e (pull_request) Successful in 2m43s
Tests / test (pull_request) Failing after 37m23s
Morrowind MCP servers spawn stdio subprocesses that survive
restarts, accumulating 80+ zombies over days.

This script:
1. Scans for MCP server processes by command pattern
2. Sorts by age, keeps N newest
3. Kills older instances with SIGTERM (SIGKILL fallback)
4. Reports counts and verifies cleanup

Usage:
  python3 scripts/mcp_zombie_cleanup.py --dry-run
  python3 scripts/mcp_zombie_cleanup.py --keep 3 --max-age 3600

Closes #714
2026-04-15 01:10:52 +00:00
3 changed files with 201 additions and 323 deletions

View File

@@ -0,0 +1,195 @@
#!/usr/bin/env python3
"""
MCP zombie process cleanup — kills orphaned MCP server processes.
Problem: MCP servers (especially morrowind) spawn stdio subprocesses that
survive restarts. Over time, 80+ zombie processes accumulate.
Fix: Scan for processes matching known MCP server patterns, kill older
instances, keep only the latest N.
Usage:
python3 scripts/mcp_zombie_cleanup.py [--dry-run] [--keep 3] [--max-age 3600]
python3 scripts/mcp_zombie_cleanup.py --kill-all # nuclear option
"""
import argparse
import os
import re
import signal
import subprocess
import sys
import time
from typing import List, Tuple
# Patterns that identify MCP server processes
MCP_PROCESS_PATTERNS = [
re.compile(r"morrowind[/\]mcp_server", re.IGNORECASE),
re.compile(r"mcp_server\.py", re.IGNORECASE),
re.compile(r"mcp[-_]server", re.IGNORECASE),
re.compile(r"hermes.*mcp.*stdio", re.IGNORECASE),
]
def find_mcp_processes() -> List[Tuple[int, float, str]]:
"""Find MCP server processes.
Returns list of (pid, start_time_epoch, command_line).
"""
my_pid = os.getpid()
results = []
try:
# Use ps to get all processes with start time and command
ps_out = subprocess.check_output(
["ps", "-eo", "pid,lstart,command"],
text=True, stderr=subprocess.DEVNULL
)
except (subprocess.CalledProcessError, FileNotFoundError):
# Fallback: macOS ps format
try:
ps_out = subprocess.check_output(
["ps", "-eo", "pid,lstart,args"],
text=True, stderr=subprocess.DEVNULL
)
except Exception:
return results
for line in ps_out.strip().splitlines()[1:]: # Skip header
parts = line.strip().split(None, 6)
if len(parts) < 7:
continue
try:
pid = int(parts[0])
except ValueError:
continue
if pid == my_pid:
continue
# Parse lstart: "pid Mon Apr 14 16:02:03 2026 command..."
# parts[1:5] = month, day, time, year
cmd = parts[6] if len(parts) > 6 else ""
# Check if command matches MCP patterns
is_mcp = any(p.search(cmd) for p in MCP_PROCESS_PATTERNS)
if not is_mcp:
continue
# Parse start time
try:
start_str = " ".join(parts[1:5])
start_struct = time.strptime(start_str, "%b %d %H:%M:%S %Y")
start_epoch = time.mktime(start_struct)
except (ValueError, OverflowError):
start_epoch = 0
results.append((pid, start_epoch, cmd))
return results
def cleanup_zombies(
keep: int = 3,
max_age_seconds: int = 3600,
dry_run: bool = False,
kill_all: bool = False,
) -> dict:
"""Clean up zombie MCP processes.
Args:
keep: Number of newest processes to keep alive
max_age_seconds: Kill processes older than this (even if under keep count)
dry_run: If True, don't actually kill anything
kill_all: If True, kill ALL MCP processes regardless of age/count
Returns:
Dict with counts: found, killed, kept
"""
processes = find_mcp_processes()
if not processes:
return {"found": 0, "killed": 0, "kept": 0}
# Sort by start time, newest first
processes.sort(key=lambda x: x[1], reverse=True)
now = time.time()
killed = 0
kept = 0
kill_pids = []
for pid, start_time, cmd in processes:
age = now - start_time if start_time > 0 else float('inf')
if kill_all:
kill_pids.append((pid, age, cmd))
elif kept < keep and age < max_age_seconds:
# Keep this one (new enough and under keep count)
kept += 1
else:
# Too old or over keep limit
kill_pids.append((pid, age, cmd))
for pid, age, cmd in kill_pids:
if dry_run:
print(f" [DRY RUN] Would kill PID {pid} (age={age:.0f}s): {cmd[:80]}")
killed += 1
else:
try:
os.kill(pid, signal.SIGTERM)
print(f" Killed PID {pid} (age={age:.0f}s): {cmd[:80]}")
killed += 1
except ProcessLookupError:
print(f" PID {pid} already exited")
except PermissionError:
print(f" No permission to kill PID {pid}")
try:
os.kill(pid, signal.SIGKILL)
print(f" Force-killed PID {pid}")
killed += 1
except Exception:
pass
return {"found": len(processes), "killed": killed, "kept": kept}
def main(argv=None):
parser = argparse.ArgumentParser(description="Clean up zombie MCP processes")
parser.add_argument("--dry-run", action="store_true", help="Don't kill, just show")
parser.add_argument("--keep", type=int, default=3, help="Keep N newest processes (default: 3)")
parser.add_argument("--max-age", type=int, default=3600, help="Kill processes older than N seconds (default: 3600)")
parser.add_argument("--kill-all", action="store_true", help="Kill ALL MCP processes")
args = parser.parse_args(argv)
processes = find_mcp_processes()
print(f"Found {len(processes)} MCP processes")
if processes and not args.dry_run:
processes.sort(key=lambda x: x[1], reverse=True)
print(f"Newest: PID {processes[0][0]} ({time.time() - processes[0][1]:.0f}s ago)")
print(f"Oldest: PID {processes[-1][0]} ({time.time() - processes[-1][1]:.0f}s ago)")
result = cleanup_zombies(
keep=args.keep,
max_age_seconds=args.max_age,
dry_run=args.dry_run,
kill_all=args.kill_all,
)
print(f"\nResult: found={result['found']}, killed={result['killed']}, kept={result['kept']}")
# Verify cleanup
remaining = find_mcp_processes()
print(f"Remaining MCP processes: {len(remaining)}")
if len(remaining) > 5:
print(f"WARNING: Still {len(remaining)} MCP processes (threshold: 5)")
return 0 if len(remaining) <= 5 else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,316 +0,0 @@
"""Hybrid Search — combines FTS5 + vector search with Reciprocal Rank Fusion.
Three search backends:
1. FTS5 (SQLite full-text) — keyword matching, fast, always available
2. Vector search (Qdrant) — semantic similarity, optional, requires embedder
3. HRR fusion — merges results from both using Reciprocal Rank Fusion
Usage:
from tools.hybrid_search import hybrid_search
results = hybrid_search(query, db, limit=20)
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
# Weight for each backend in RRF fusion (FTS5, vector)
# Sum should equal 1.0. When vector is unavailable, FTS5 gets full weight.
FTS5_WEIGHT = float(os.getenv("HYBRID_FTS5_WEIGHT", "0.6"))
VECTOR_WEIGHT = float(os.getenv("HYBRID_VECTOR_WEIGHT", "0.4"))
# RRF constant (standard is 60)
RRF_K = int(os.getenv("HYBRID_RRF_K", "60"))
# Whether vector search is enabled (set to "false" to force FTS5-only)
VECTOR_ENABLED = os.getenv("HYBRID_VECTOR_ENABLED", "true").lower() not in ("false", "0", "no")
# ---------------------------------------------------------------------------
# Vector search backend (Qdrant)
# ---------------------------------------------------------------------------
_qdrant_client = None
def _get_qdrant_client():
"""Lazy-init Qdrant client. Returns None if unavailable."""
global _qdrant_client
if _qdrant_client is not None:
return _qdrant_client
if not VECTOR_ENABLED:
return None
try:
from qdrant_client import QdrantClient
host = os.getenv("QDRANT_HOST", "localhost")
port = int(os.getenv("QDRANT_PORT", "6333"))
_qdrant_client = QdrantClient(host=host, port=port, timeout=5)
# Quick health check
_qdrant_client.get_collections()
logger.debug("Qdrant connected at %s:%s", host, port)
return _qdrant_client
except Exception as e:
logger.debug("Qdrant unavailable: %s", e)
_qdrant_client = False # Mark as checked-and-unavailable
return None
def _embed_query(query: str) -> Optional[List[float]]:
"""Embed a query for vector search. Returns None if unavailable."""
try:
# Try local sentence-transformers first
from agent.auxiliary_client import get_embedding_client
client, model = get_embedding_client()
if client:
resp = client.embeddings.create(model=model, input=[query])
return resp.data[0].embedding
except Exception:
pass
try:
# Fallback: simple TF-IDF-style hashing (no external deps)
import hashlib
h = hashlib.sha256(query.lower().encode()).digest()
# Deterministic pseudo-embedding from hash
return [b / 255.0 for b in h[:128]]
except Exception:
return None
def _vector_search(
query: str,
collection: str = "session_messages",
limit: int = 50,
score_threshold: float = 0.3,
) -> List[Dict[str, Any]]:
"""Search Qdrant for semantically similar messages.
Returns list of dicts with session_id, content, score, rank.
Returns empty list if Qdrant is unavailable.
"""
client = _get_qdrant_client()
if client is None:
return []
query_vector = _embed_query(query)
if query_vector is None:
return []
try:
from qdrant_client.models import SearchRequest
results = client.search(
collection_name=collection,
query_vector=query_vector,
limit=limit,
score_threshold=score_threshold,
)
return [
{
"session_id": hit.payload.get("session_id", ""),
"content": hit.payload.get("content", ""),
"role": hit.payload.get("role", ""),
"score": hit.score,
"rank": idx + 1,
"source": "vector",
}
for idx, hit in enumerate(results)
]
except Exception as e:
logger.debug("Vector search failed: %s", e)
return []
# ---------------------------------------------------------------------------
# FTS5 backend (wraps existing hermes_state search)
# ---------------------------------------------------------------------------
def _fts5_search(
query: str,
db,
source_filter: List[str] = None,
exclude_sources: List[str] = None,
role_filter: List[str] = None,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""Search using FTS5. Adds rank to results for fusion."""
try:
raw = db.search_messages(
query=query,
source_filter=source_filter,
exclude_sources=exclude_sources,
role_filter=role_filter,
limit=limit,
offset=0,
)
# Add rank and source tag for fusion
for idx, result in enumerate(raw):
result["rank"] = idx + 1
result["source"] = "fts5"
return raw
except Exception as e:
logger.warning("FTS5 search failed: %s", e)
return []
# ---------------------------------------------------------------------------
# Reciprocal Rank Fusion
# ---------------------------------------------------------------------------
def _reciprocal_rank_fusion(
result_sets: List[Tuple[List[Dict[str, Any]], float]],
k: int = RRF_K,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""Merge multiple ranked result lists using Reciprocal Rank Fusion.
Args:
result_sets: List of (results, weight) tuples. Each results list
must have 'rank' and 'session_id' keys.
k: RRF constant (default 60).
limit: Max results to return.
Returns:
Merged and re-ranked results.
"""
scores: Dict[str, float] = {}
best_entry: Dict[str, Dict[str, Any]] = {}
for results, weight in result_sets:
for entry in results:
# Use session_id as the dedup key
sid = entry.get("session_id", "")
if not sid:
continue
rrf_score = weight / (k + entry.get("rank", 999))
scores[sid] = scores.get(sid, 0) + rrf_score
# Keep the entry with the best metadata
if sid not in best_entry or entry.get("source") == "fts5":
best_entry[sid] = entry
# Sort by fused score
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
results = []
for sid, score in ranked[:limit]:
entry = best_entry.get(sid, {"session_id": sid})
entry["fused_score"] = round(score, 6)
results.append(entry)
return results
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def hybrid_search(
query: str,
db,
source_filter: List[str] = None,
exclude_sources: List[str] = None,
role_filter: List[str] = None,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""Hybrid search: FTS5 + vector, merged with Reciprocal Rank Fusion.
Args:
query: Search query string.
db: hermes_state SessionDB instance.
source_filter: Only search these session sources.
exclude_sources: Exclude these session sources.
role_filter: Only match these message roles.
limit: Max results to return.
Returns:
List of result dicts with session_id, content/snippet, fused_score, etc.
"""
# Run FTS5 (always available)
fts5_results = _fts5_search(
query=query,
db=db,
source_filter=source_filter,
exclude_sources=exclude_sources,
role_filter=role_filter,
limit=limit,
)
# Run vector search (optional)
vector_results = _vector_search(query, limit=limit)
# If only FTS5 is available, return those directly
if not vector_results:
return fts5_results[:limit]
# Fuse with RRF
return _reciprocal_rank_fusion(
result_sets=[
(fts5_results, FTS5_WEIGHT),
(vector_results, VECTOR_WEIGHT),
],
k=RRF_K,
limit=limit,
)
def ingest_session_to_vectors(
session_id: str,
messages: List[Dict[str, Any]],
collection: str = "session_messages",
) -> int:
"""Ingest a session's messages into the vector store.
Returns number of vectors inserted.
"""
client = _get_qdrant_client()
if client is None:
return 0
from qdrant_client.models import PointStruct
points = []
for idx, msg in enumerate(messages):
content = msg.get("content", "")
if not content or len(content) < 10:
continue
vec = _embed_query(content)
if vec is None:
continue
points.append(PointStruct(
id=f"{session_id}_{idx}",
vector=vec,
payload={
"session_id": session_id,
"content": content[:1000],
"role": msg.get("role", ""),
"timestamp": msg.get("timestamp", 0),
},
))
if not points:
return 0
try:
client.upsert(collection_name=collection, points=points)
return len(points)
except Exception as e:
logger.debug("Vector ingest failed for session %s: %s", session_id, e)
return 0
def get_search_stats() -> Dict[str, Any]:
"""Return stats about search backends."""
qdrant_ok = _get_qdrant_client() is not None
return {
"fts5": True, # Always available
"vector": qdrant_ok,
"fusion": "rrf",
"weights": {"fts5": FTS5_WEIGHT, "vector": VECTOR_WEIGHT},
"rrf_k": RRF_K,
}

View File

@@ -304,7 +304,7 @@ def session_search(
"""
Search past sessions and return focused summaries of matching conversations.
Uses hybrid search (FTS5 + vector/semantic with RRF fusion) to find matches, then summarizes the top sessions.
Uses FTS5 to find matches, then summarizes the top sessions with Gemini Flash.
The current session is excluded from results since the agent already has that context.
"""
if db is None:
@@ -325,14 +325,13 @@ def session_search(
if role_filter and role_filter.strip():
role_list = [r.strip() for r in role_filter.split(",") if r.strip()]
# Hybrid search: FTS5 + vector (semantic), merged with Reciprocal Rank Fusion
from tools.hybrid_search import hybrid_search
raw_results = hybrid_search(
# FTS5 search -- get matches ranked by relevance
raw_results = db.search_messages(
query=query,
db=db,
exclude_sources=list(_HIDDEN_SESSION_SOURCES),
role_filter=role_list,
limit=50,
exclude_sources=list(_HIDDEN_SESSION_SOURCES),
limit=50, # Get more matches to find unique sessions
offset=0,
)
if not raw_results: