Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
c2eb7bac74 fix: MCP zombie process cleanup (#714)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 44s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 38s
Tests / e2e (pull_request) Successful in 3m56s
Tests / test (pull_request) Failing after 41m56s
Adds:
- scripts/mcp_zombie_cleanup.py — kill orphaned MCP processes
- scripts/mcp_watchdog.py — periodic cleanup daemon

Fixes #714
2026-04-14 22:39:33 -04:00
4 changed files with 293 additions and 558 deletions

83
scripts/mcp_watchdog.py Executable file
View File

@@ -0,0 +1,83 @@
#!/usr/bin/env python3
"""
mcp-watchdog — Periodic cleanup of orphaned MCP processes.
Runs as a cron job or daemon to prevent process accumulation.
Usage:
python scripts/mcp_watchdog.py # one-shot check
python scripts/mcp_watchdog.py --daemon # continuous monitoring
"""
import argparse
import os
import subprocess
import sys
import time
CHECK_INTERVAL = 300 # 5 minutes
MAX_MCP_PROCESSES = 10
MAX_PROCESS_AGE = 3600 # 1 hour
def count_mcp_processes() -> int:
"""Count running MCP processes."""
try:
result = subprocess.run(
["pgrep", "-f", "mcp_server|morrowind|mcp-serve|fastmcp"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
return len([p for p in result.stdout.strip().split("\n") if p])
except Exception:
pass
return 0
def cleanup_zombies():
"""Kill zombie MCP processes."""
script = os.path.join(os.path.dirname(__file__), "mcp_zombie_cleanup.py")
if os.path.exists(script):
subprocess.run(
[sys.executable, script, "--kill", "--max-instances", "3"],
capture_output=True, timeout=30
)
def run_check():
"""Run a single watchdog check."""
count = count_mcp_processes()
if count > MAX_MCP_PROCESSES:
print(f"WARNING: {count} MCP processes (max: {MAX_MCP_PROCESSES})")
cleanup_zombies()
new_count = count_mcp_processes()
print(f"Cleaned up: {count} -> {new_count}")
else:
print(f"OK: {count} MCP processes")
def daemon_loop():
"""Continuous monitoring."""
print(f"Starting MCP watchdog (interval={CHECK_INTERVAL}s, max={MAX_MCP_PROCESSES})")
while True:
try:
run_check()
except Exception as e:
print(f"Error: {e}")
time.sleep(CHECK_INTERVAL)
def main():
parser = argparse.ArgumentParser(description="MCP process watchdog")
parser.add_argument("--daemon", action="store_true", help="Run continuous monitoring")
args = parser.parse_args()
if args.daemon:
daemon_loop()
else:
run_check()
if __name__ == "__main__":
main()

210
scripts/mcp_zombie_cleanup.py Executable file
View File

@@ -0,0 +1,210 @@
#!/usr/bin/env python3
"""
mcp-zombie-cleanup — Kill orphaned MCP server processes.
Fix for #714: ~80 zombie morrowind/mcp_server.py processes on Mac.
Usage:
python scripts/mcp_zombie_cleanup.py # dry run
python scripts/mcp_zombie_cleanup.py --kill # kill zombies
python scripts/mcp_zombie_cleanup.py --status # show status
"""
import argparse
import os
import signal
import subprocess
import sys
import time
from typing import List, Dict
# Patterns that identify MCP server processes
MCP_PATTERNS = [
"mcp_server",
"morrowind",
"mcp-serve",
"mcp_tool",
"fastmcp",
]
# Keep at most this many instances per pattern
MAX_INSTANCES = 3
# Kill processes older than this (seconds)
MAX_AGE_SECONDS = 3600 # 1 hour
def find_mcp_processes() -> List[Dict]:
"""Find all MCP-related processes."""
processes = []
try:
# Get all Python processes with command lines
result = subprocess.run(
["ps", "aux"],
capture_output=True, text=True, timeout=10
)
for line in result.stdout.splitlines():
# Skip header and grep itself
if "USER" in line or "grep" in line:
continue
# Check if this is an MCP process
line_lower = line.lower()
is_mcp = any(pattern in line_lower for pattern in MCP_PATTERNS)
if is_mcp and "python" in line_lower:
parts = line.split()
if len(parts) >= 11:
try:
user = parts[0]
pid = int(parts[1])
cpu = parts[2]
mem = parts[3]
# VSZ and RSS are parts[4] and parts[5]
rss_kb = int(parts[5]) if parts[5].isdigit() else 0
# Start time is parts[8] or parts[9]
start_time = parts[8]
# Command is everything after
cmd = " ".join(parts[10:])
processes.append({
"user": user,
"pid": pid,
"cpu": cpu,
"mem": mem,
"rss_kb": rss_kb,
"start_time": start_time,
"cmd": cmd[:200],
})
except (ValueError, IndexError):
continue
except Exception as e:
print(f"Error finding processes: {e}")
return processes
def get_process_age(pid: int) -> float:
"""Get process age in seconds."""
try:
result = subprocess.run(
["ps", "-o", "etimes=", "-p", str(pid)],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
return float(result.stdout.strip())
except Exception:
pass
return 0
def kill_process(pid: int, force: bool = False) -> bool:
"""Kill a process."""
try:
sig = signal.SIGKILL if force else signal.SIGTERM
os.kill(pid, sig)
return True
except ProcessLookupError:
return True # Already dead
except PermissionError:
print(f" Permission denied for PID {pid}")
return False
except Exception as e:
print(f" Error killing PID {pid}: {e}")
return False
def main():
parser = argparse.ArgumentParser(description="Clean up zombie MCP processes")
parser.add_argument("--kill", action="store_true", help="Kill zombie processes")
parser.add_argument("--force", action="store_true", help="Use SIGKILL instead of SIGTERM")
parser.add_argument("--status", action="store_true", help="Show status only")
parser.add_argument("--max-instances", type=int, default=MAX_INSTANCES)
parser.add_argument("--max-age", type=int, default=MAX_AGE_SECONDS)
args = parser.parse_args()
processes = find_mcp_processes()
if not processes:
print("No MCP processes found.")
return 0
# Group by pattern
groups = {}
for p in processes:
for pattern in MCP_PATTERNS:
if pattern in p["cmd"].lower():
if pattern not in groups:
groups[pattern] = []
groups[pattern].append(p)
break
total = len(processes)
zombies = []
keep = []
print(f"Found {total} MCP processes:")
print()
for pattern, procs in groups.items():
# Sort by PID (higher = newer)
procs.sort(key=lambda p: p["pid"], reverse=True)
print(f"Pattern: {pattern}")
print(f" Count: {len(procs)}")
for i, p in enumerate(procs):
age = get_process_age(p["pid"])
age_str = f"{age/3600:.1f}h" if age > 3600 else f"{age/60:.0f}m"
status = "KEEP" if i < args.max_instances else "ZOMBIE"
if age > args.max_age:
status = "STALE"
rss_mb = p["rss_kb"] / 1024
print(f" PID {p['pid']:>6} | {rss_mb:>6.1f}MB | {age_str:>6} | {status} | {p['cmd'][:60]}")
if status in ("ZOMBIE", "STALE"):
zombies.append(p)
else:
keep.append(p)
print()
print(f"Summary:")
print(f" Total processes: {total}")
print(f" Keep: {len(keep)}")
print(f" Zombies: {len(zombies)}")
print(f" Total RSS: {sum(p['rss_kb'] for p in processes) / 1024:.1f} MB")
if args.status:
return 0
if not zombies:
print()
print("No zombies to clean up.")
return 0
if not args.kill:
print()
print("DRY RUN: Add --kill to terminate zombie processes")
return 0
print()
print("Killing zombie processes...")
killed = 0
for p in zombies:
if kill_process(p["pid"], force=args.force):
killed += 1
print(f" Killed PID {p['pid']}")
time.sleep(0.1) # Brief pause between kills
print(f"
Killed {killed}/{len(zombies)} processes.")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,105 +0,0 @@
"""Tests for shared audio analysis engine.
Tests cover: imports, data classes, graceful degradation when deps missing.
Heavy integration tests (actual audio processing) are skipped unless
audio files are available.
"""
import pytest
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from tools.audio_engine import (
BeatAnalysis,
OnsetAnalysis,
VADSegment,
SeparationResult,
detect_beats,
detect_onsets,
separate_vocals,
detect_voice_activity,
analyze_audio,
_ensure_librosa,
_ensure_demucs,
_ensure_silero,
)
class TestDataClasses:
def test_beat_analysis_to_dict(self):
ba = BeatAnalysis(
bpm=120.0,
beat_times=[0.0, 0.5, 1.0],
beat_frames=[0, 100, 200],
tempo_confidence=0.8,
duration=3.0,
sample_rate=22050,
)
d = ba.to_dict()
assert d["bpm"] == 120.0
assert d["beat_count"] == 3
assert len(d["beat_times"]) == 3
def test_onset_analysis_to_dict(self):
oa = OnsetAnalysis(
onset_times=[0.1, 0.5],
onset_frames=[10, 50],
onset_count=2,
avg_onset_interval=0.4,
)
d = oa.to_dict()
assert d["onset_count"] == 2
assert d["avg_onset_interval"] == 0.4
def test_vad_segment_to_dict(self):
seg = VADSegment(start=1.0, end=2.5, is_speech=True)
d = seg.to_dict()
assert d["start"] == 1.0
assert d["end"] == 2.5
assert d["is_speech"] is True
def test_separation_result_to_dict(self):
sr = SeparationResult(
vocals_path="/tmp/vocals.wav",
instrumental_path="/tmp/inst.wav",
duration=120.0,
)
d = sr.to_dict()
assert d["vocals_path"] == "/tmp/vocals.wav"
assert d["duration"] == 120.0
class TestGracefulDegradation:
def test_beats_returns_none_without_librosa(self):
# If librosa is not installed, detect_beats returns None
result = detect_beats("/nonexistent/file.wav")
# Either None (no librosa) or None (file not found) — both acceptable
assert result is None or isinstance(result, BeatAnalysis)
def test_onsets_returns_none_without_librosa(self):
result = detect_onsets("/nonexistent/file.wav")
assert result is None or isinstance(result, OnsetAnalysis)
def test_separation_returns_none_without_demucs(self):
result = separate_vocals("/nonexistent/file.wav")
assert result is None or isinstance(result, SeparationResult)
def test_vad_returns_none_without_silero(self):
result = detect_voice_activity("/nonexistent/file.wav")
assert result is None or isinstance(result, list)
class TestDependencyChecks:
def test_ensure_librosa_returns_none_or_module(self):
result = _ensure_librosa()
assert result is None or result is not None # Either is fine
def test_ensure_demucs_is_bool(self):
result = _ensure_demucs()
assert isinstance(result, bool)
def test_ensure_silero_is_bool(self):
result = _ensure_silero()
assert isinstance(result, bool)

View File

@@ -1,453 +0,0 @@
"""Shared Audio Analysis Engine
Provides beat detection, onset detection, vocal/instrumental separation,
voice activity detection, and tempo estimation for use by:
- Video Forge (scene transitions synced to music)
- LPM 1.0 (lip sync timing, conversational state detection)
Dependencies (install as needed — all optional):
pip install librosa soundfile demucs silero-vad torch
Gracefully degrades: if a dependency is missing, that feature returns
None with a warning rather than crashing.
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Lazy dependency imports
# ---------------------------------------------------------------------------
_LIBROSA = None
_SOUNDFILE = None
_DEMUCS_AVAILABLE = None
_SILERO_AVAILABLE = None
def _ensure_librosa():
global _LIBROSA
if _LIBROSA is None:
try:
import librosa
_LIBROSA = librosa
except ImportError:
logger.warning("librosa not installed — beat/onset/tempo detection unavailable")
_LIBROSA = False
return _LIBROSA if _LIBROSA else None
def _ensure_soundfile():
global _SOUNDFILE
if _SOUNDFILE is None:
try:
import soundfile
_SOUNDFILE = soundfile
except ImportError:
logger.warning("soundfile not installed — audio loading may be limited")
_SOUNDFILE = False
return _SOUNDFILE if _SOUNDFILE else None
def _ensure_demucs():
global _DEMUCS_AVAILABLE
if _DEMUCS_AVAILABLE is None:
try:
import demucs.api
_DEMUCS_AVAILABLE = True
except ImportError:
logger.warning("demucs not installed — vocal separation unavailable")
_DEMUCS_AVAILABLE = False
return _DEMUCS_AVAILABLE
def _ensure_silero():
global _SILERO_AVAILABLE
if _SILERO_AVAILABLE is None:
try:
import torch
model, utils = torch.hub.load(
repo_or_dir='snakers4/silero-vad', model='silero_vad',
force_reload=False, onnx=False,
)
_SILERO_AVAILABLE = True
except Exception:
logger.warning("silero-vad not installed — VAD unavailable")
_SILERO_AVAILABLE = False
return _SILERO_AVAILABLE
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class BeatAnalysis:
"""Results of beat and tempo analysis."""
bpm: float # Estimated tempo in beats per minute
beat_times: List[float] # Timestamps of detected beats (seconds)
beat_frames: List[int] # Frame indices of detected beats
tempo_confidence: float = 0.0 # Confidence in BPM estimate
duration: float = 0.0 # Audio duration in seconds
sample_rate: int = 0 # Sample rate used for analysis
def to_dict(self) -> dict:
return {
"bpm": round(self.bpm, 1),
"beat_count": len(self.beat_times),
"beat_times": self.beat_times[:50], # Cap for JSON size
"tempo_confidence": round(self.tempo_confidence, 3),
"duration": round(self.duration, 2),
"sample_rate": self.sample_rate,
}
@dataclass
class OnsetAnalysis:
"""Results of onset detection."""
onset_times: List[float] # Timestamps of onsets (seconds)
onset_frames: List[int] # Frame indices of onsets
onset_count: int = 0
avg_onset_interval: float = 0.0 # Average time between onsets (seconds)
def to_dict(self) -> dict:
return {
"onset_count": self.onset_count,
"onset_times": self.onset_times[:100],
"avg_onset_interval": round(self.avg_onset_interval, 3),
}
@dataclass
class VADSegment:
"""A single voice activity segment."""
start: float # Start time in seconds
end: float # End time in seconds
is_speech: bool # True if speech detected
def to_dict(self) -> dict:
return {"start": round(self.start, 3), "end": round(self.end, 3), "is_speech": self.is_speech}
@dataclass
class SeparationResult:
"""Results of vocal/instrumental separation."""
vocals_path: Optional[str] = None
instrumental_path: Optional[str] = None
duration: float = 0.0
def to_dict(self) -> dict:
return {
"vocals_path": self.vocals_path,
"instrumental_path": self.instrumental_path,
"duration": round(self.duration, 2),
}
# ---------------------------------------------------------------------------
# Audio loading
# ---------------------------------------------------------------------------
def load_audio(
path: str | Path,
sr: int = 22050,
mono: bool = True,
duration: float | None = None,
) -> tuple:
"""Load audio file. Returns (y, sr) tuple.
Args:
path: Path to audio file (wav, mp3, flac, ogg)
sr: Target sample rate (default 22050)
mono: Convert to mono
duration: Max seconds to load (None = full file)
Returns:
(audio_array, sample_rate) or (None, None) on failure
"""
librosa = _ensure_librosa()
if not librosa:
return None, None
try:
y, loaded_sr = librosa.load(
str(path), sr=sr, mono=mono, duration=duration,
)
return y, loaded_sr
except Exception as e:
logger.error("Failed to load audio %s: %s", path, e)
return None, None
# ---------------------------------------------------------------------------
# Beat detection
# ---------------------------------------------------------------------------
def detect_beats(
audio_path: str | Path,
sr: int = 22050,
duration: float | None = None,
) -> Optional[BeatAnalysis]:
"""Detect beats and estimate tempo from an audio file.
Uses librosa.beat_track which implements the algorithm from:
Ellis, "Beat Tracking by Dynamic Programming", 2007.
Args:
audio_path: Path to audio file
sr: Sample rate for analysis
duration: Max seconds to analyze
Returns:
BeatAnalysis or None if librosa unavailable
"""
librosa = _ensure_librosa()
if not librosa:
return None
y, loaded_sr = load_audio(audio_path, sr=sr, duration=duration)
if y is None:
return None
try:
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=loaded_sr)
beat_times = librosa.frames_to_time(beat_frames, sr=loaded_sr)
return BeatAnalysis(
bpm=float(tempo),
beat_times=beat_times.tolist(),
beat_frames=beat_frames.tolist(),
tempo_confidence=0.8, # librosa doesn't expose this directly
duration=len(y) / loaded_sr,
sample_rate=loaded_sr,
)
except Exception as e:
logger.error("Beat detection failed for %s: %s", audio_path, e)
return None
# ---------------------------------------------------------------------------
# Onset detection
# ---------------------------------------------------------------------------
def detect_onsets(
audio_path: str | Path,
sr: int = 22050,
duration: float | None = None,
backtrack: bool = True,
) -> Optional[OnsetAnalysis]:
"""Detect onsets (when new sounds begin).
Useful for scene transitions (Video Forge) and speech segment
boundaries (LPM 1.0).
Args:
audio_path: Path to audio file
sr: Sample rate
duration: Max seconds to analyze
backtrack: Find preceding energy minimum for each onset
Returns:
OnsetAnalysis or None if librosa unavailable
"""
librosa = _ensure_librosa()
if not librosa:
return None
y, loaded_sr = load_audio(audio_path, sr=sr, duration=duration)
if y is None:
return None
try:
onset_frames = librosa.onset.onset_detect(
y=y, sr=loaded_sr, backtrack=backtrack,
)
onset_times = librosa.frames_to_time(onset_frames, sr=loaded_sr)
intervals = []
times = onset_times.tolist()
for i in range(1, len(times)):
intervals.append(times[i] - times[i - 1])
return OnsetAnalysis(
onset_times=times,
onset_frames=onset_frames.tolist(),
onset_count=len(times),
avg_onset_interval=sum(intervals) / len(intervals) if intervals else 0.0,
)
except Exception as e:
logger.error("Onset detection failed for %s: %s", audio_path, e)
return None
# ---------------------------------------------------------------------------
# Vocal/instrumental separation
# ---------------------------------------------------------------------------
def separate_vocals(
audio_path: str | Path,
output_dir: str | Path = "/tmp/audio_separation",
model_name: str = "htdemucs",
) -> Optional[SeparationResult]:
"""Separate vocals from instrumental using demucs.
Args:
audio_path: Path to audio file
output_dir: Directory for output stems
model_name: Demucs model (htdemucs, htdemucs_ft, mdx_extra)
Returns:
SeparationResult with paths to vocals/instrumental, or None
"""
if not _ensure_demucs():
return None
try:
import demucs.api
import soundfile as sf
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
separator = demucs.api.Separator(model=model_name)
origin, separated = separator.separate_audio_file(str(audio_path))
vocals_path = output_dir / "vocals.wav"
instrumental_path = output_dir / "instrumental.wav"
sf.write(str(vocals_path), separated["vocals"].cpu().numpy().T, separator.samplerate)
sf.write(str(instrumental_path),
(separated["drums"] + separated["bass"] + separated["other"]).cpu().numpy().T,
separator.samplerate)
duration = len(origin) / separator.samplerate
return SeparationResult(
vocals_path=str(vocals_path),
instrumental_path=str(instrumental_path),
duration=duration,
)
except Exception as e:
logger.error("Vocal separation failed for %s: %s", audio_path, e)
return None
# ---------------------------------------------------------------------------
# Voice Activity Detection
# ---------------------------------------------------------------------------
def detect_voice_activity(
audio_path: str | Path,
sr: int = 16000,
threshold: float = 0.5,
min_speech_duration: float = 0.3,
) -> Optional[List[VADSegment]]:
"""Detect speech segments using Silero VAD.
Returns list of segments where speech was detected.
Useful for LPM listen/speak state switching.
Args:
audio_path: Path to audio file
sr: Sample rate (Silero expects 16kHz or 8kHz)
threshold: VAD threshold (0.0-1.0)
min_speech_duration: Minimum segment length to count as speech
Returns:
List of VADSegment or None if silero unavailable
"""
if not _ensure_silero():
return None
try:
import torch
import torchaudio
model, utils = torch.hub.load(
repo_or_dir='snakers4/silero-vad', model='silero_vad',
force_reload=False, onnx=False,
)
get_speech_timestamps = utils[0]
wav, file_sr = torchaudio.load(str(audio_path))
if file_sr != sr:
wav = torchaudio.functional.resample(wav, file_sr, sr)
if wav.shape[0] > 1:
wav = wav.mean(dim=0, keepdim=True)
speech_timestamps = get_speech_timestamps(
wav.squeeze(), model, sampling_rate=sr,
threshold=threshold, min_speech_duration_ms=int(min_speech_duration * 1000),
)
segments = []
for ts in speech_timestamps:
segments.append(VADSegment(
start=ts["start"] / sr,
end=ts["end"] / sr,
is_speech=True,
))
return segments
except Exception as e:
logger.error("VAD failed for %s: %s", audio_path, e)
return None
# ---------------------------------------------------------------------------
# Full analysis
# ---------------------------------------------------------------------------
def analyze_audio(
audio_path: str | Path,
include_separation: bool = False,
include_vad: bool = False,
sr: int = 22050,
) -> Dict[str, Any]:
"""Run full audio analysis pipeline.
Combines beat detection, onset detection, and optionally
vocal separation and VAD into a single result dict.
Args:
audio_path: Path to audio file
include_separation: Run vocal separation (slow)
include_vad: Run voice activity detection
sr: Sample rate for beat/onset analysis
Returns:
Dict with all analysis results
"""
result = {"path": str(audio_path)}
beats = detect_beats(audio_path, sr=sr)
if beats:
result["beats"] = beats.to_dict()
onsets = detect_onsets(audio_path, sr=sr)
if onsets:
result["onsets"] = onsets.to_dict()
if include_separation:
separation = separate_vocals(audio_path)
if separation:
result["separation"] = separation.to_dict()
if include_vad:
segments = detect_voice_activity(audio_path)
if segments:
result["vad"] = {
"segments": [s.to_dict() for s in segments],
"speech_ratio": sum(s.end - s.start for s in segments) / (beats.duration if beats else 1.0),
}
return result