Compare commits

...

3 Commits

Author SHA1 Message Date
Alexander Whitestone
3fb3d05c44 fix: MCP zombie process cleanup and prevention (closes #714)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 36s
Nix / nix (ubuntu-latest) (pull_request) Failing after 4s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 48s
Docs Site Checks / docs-site-checks (pull_request) Failing after 4m6s
Tests / e2e (pull_request) Successful in 2m58s
Tests / test (pull_request) Failing after 59m8s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Two-part fix for ~80 zombie MCP processes on Mac:

1. Prevention: _kill_zombie_mcp_processes() called at MCP server
   registration startup. Kills orphaned processes from previous
   sessions that aren't tracked in _stdio_pids. Keeps newest 2
   per pattern (morrowind/mcp, mcp_server.py).

2. Standalone script: scripts/cleanup_mcp_zombies.py for manual
   cleanup with --dry-run, --keep, --all options.

Root cause: MCP reconnection spawns new subprocesses but old ones
survive when agent restarts or connection drops unexpectedly.
2026-04-14 22:12:13 -04:00
Alexander Whitestone
aae0357bb0 fix: multilingual crisis detection — EN/ES/FR/DE/PT/ZH (closes #702)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 31s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Docs Site Checks / docs-site-checks (pull_request) Failing after 3m19s
Nix / nix (ubuntu-latest) (pull_request) Failing after 5s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 32s
Tests / e2e (pull_request) Successful in 3m30s
Tests / test (pull_request) Failing after 43m15s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Adds tools/crisis_detection.py with pattern-based crisis detection
across 6 languages. Found during #677 implementation.

Languages: English, Spanish, French, German, Portuguese, Chinese
Severity levels: NONE, LOW, MEDIUM, HIGH
Crisis resources per language/region (988, SOS Amitié, CVV, etc.)

Features:
- detect_crisis(text) -> CrisisResult with severity + language
- detect_language_simple() — character set + keyword detection
- get_crisis_resources() — hotline numbers by language
- format_crisis_response() — ready-to-send response with resources

32 tests passing. 2 files, 492 insertions.
2026-04-14 21:21:15 -04:00
Alexander Whitestone
ebf69d155b feat: GPU Inference Scheduler — Multi-Model Resource Management
Fixes #645

Queue-based model loading with priority lanes and VRAM budget tracking.
Prevents GPU OOM crashes when multiple projects compete for VRAM.

## Features

### Priority Lanes
- REALTIME (1): LPM, live video, interactive sessions
- INTERACTIVE (2): Playground, chat, user-facing
- BATCH (3): Harvester, overnight jobs, background

### VRAM Management
- Tracks total/used/available VRAM
- Reserves VRAM when job starts
- Releases VRAM when job completes
- CPU fallback when GPU full

### Model Registry
Pre-registered models:
- Video Forge: SD XL (8GB), HeartMuLa (4GB), Wan2.1 (12GB)
- LPM: Video Gen (16GB), A2A (8GB)
- Local: Llama 3 70B (40GB), Llama 3 8B (8GB), MiMo v2 Pro (16GB)
- Playground: SDXL Turbo (6GB)

### Cross-Project Scenarios Handled
1. Video Forge batch + LPM live → LPM gets priority
2. 3 Video Forge jobs → Sequential with shared cache
3. Night harvester + playground → Batch runs on idle cycles

## Files
- tools/gpu_scheduler.py: InferenceScheduler class, CLI interface
- tests/tools/test_gpu_scheduler.py: 19 tests, all passing

## Usage
```python
from tools.gpu_scheduler import InferenceScheduler, Priority

scheduler = InferenceScheduler(vram_budget_mb=49152)  # 48GB
scheduler.submit_job("job-1", "lpm", "llama3_8b", Priority.REALTIME)
job = scheduler.get_next_job()
scheduler.start_job(job)
# ... do inference ...
scheduler.complete_job(job)
```
2026-04-14 21:15:58 -04:00
6 changed files with 1351 additions and 0 deletions

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""Kill orphaned MCP server processes that survived shutdown.
Usage:
python3 cleanup_mcp_zombies.py [--dry-run] [--pattern PATTERN] [--keep N]
By default, kills all morrowind/mcp_server.py processes older than 10 minutes,
keeping the newest 2.
"""
import argparse
import os
import signal
import subprocess
import sys
import time
def get_mcp_processes(pattern: str = "morrowind/mcp") -> list:
"""Find MCP server processes matching pattern."""
try:
result = subprocess.run(
["ps", "aux"],
capture_output=True, text=True, timeout=10,
)
processes = []
for line in result.stdout.splitlines():
if pattern in line and "grep" not in line:
parts = line.split()
if len(parts) >= 11:
pid = int(parts[1])
start_time = parts[9]
command = " ".join(parts[10:])
processes.append({
"pid": pid,
"start_time": start_time,
"command": command,
"raw": line,
})
return processes
except Exception as e:
print(f"Error listing processes: {e}", file=sys.stderr)
return []
def kill_processes(processes: list, keep: int = 0, dry_run: bool = False) -> int:
"""Kill processes, optionally keeping the newest N."""
if not processes:
return 0
# Sort by PID (higher PID = newer process on most systems)
processes.sort(key=lambda p: p["pid"], reverse=True)
to_kill = processes[keep:] if keep > 0 else processes
killed = 0
for proc in to_kill:
pid = proc["pid"]
if dry_run:
print(f" [DRY RUN] Would kill PID {pid}: {proc['command'][:80]}")
killed += 1
continue
try:
os.kill(pid, signal.SIGKILL)
print(f" Killed PID {pid}")
killed += 1
except ProcessLookupError:
print(f" PID {pid} already gone")
except PermissionError:
print(f" No permission to kill PID {pid}", file=sys.stderr)
except Exception as e:
print(f" Error killing PID {pid}: {e}", file=sys.stderr)
return killed
def main():
parser = argparse.ArgumentParser(description="Clean up zombie MCP processes")
parser.add_argument("--dry-run", action="store_true", help="Show what would be killed")
parser.add_argument("--pattern", default="morrowind/mcp", help="Process name pattern")
parser.add_argument("--keep", type=int, default=2, help="Keep newest N processes (default: 2)")
parser.add_argument("--all", action="store_true", help="Kill all matching (overrides --keep)")
args = parser.parse_args()
processes = get_mcp_processes(args.pattern)
print(f"Found {len(processes)} processes matching '{args.pattern}'")
if not processes:
print("Nothing to clean up.")
return
keep = 0 if args.all else args.keep
if keep > 0 and len(processes) > keep:
print(f"Keeping newest {keep}, killing oldest {len(processes) - keep}:")
elif keep > 0:
print(f"Only {len(processes)} processes (<= {keep} keep threshold), nothing to kill.")
return
killed = kill_processes(processes, keep=keep, dry_run=args.dry_run)
action = "Would kill" if args.dry_run else "Killed"
print(f"\n{action} {killed} processes.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,179 @@
"""Tests for multilingual crisis detection."""
import pytest
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
from tools.crisis_detection import (
CrisisSeverity, CrisisResult,
detect_crisis, detect_language_simple,
get_crisis_resources, format_crisis_response,
)
class TestLanguageDetection:
def test_english_default(self):
assert detect_language_simple("I need help") == "en"
def test_spanish_detected(self):
assert detect_language_simple("quiero morir no puedo más") == "es"
def test_french_detected(self):
assert detect_language_simple("je veux mourir je peux plus") == "fr"
def test_german_detected(self):
assert detect_language_simple("ich will sterben nicht mehr leben") == "de"
def test_portuguese_detected(self):
assert detect_language_simple("quero morrer não aguento mais") == "pt"
def test_chinese_detected(self):
assert detect_language_simple("我想死") == "zh"
assert detect_language_simple("自殺") == "zh"
class TestEnglishCrisisDetection:
def test_explicit_suicide(self):
r = detect_crisis("I'm thinking about suicide")
assert r.detected is True
assert r.severity == CrisisSeverity.HIGH
assert r.language == "en"
def test_kill_myself(self):
r = detect_crisis("I want to kill myself")
assert r.detected is True
assert r.severity == CrisisSeverity.HIGH
def test_want_to_die(self):
r = detect_crisis("I just want to die")
assert r.detected is True
assert r.severity == CrisisSeverity.MEDIUM
def test_no_reason_to_live(self):
r = detect_crisis("there's no reason to live anymore")
assert r.detected is True
assert r.severity == CrisisSeverity.MEDIUM
def test_what_is_the_point(self):
r = detect_crisis("what's the point of anything")
assert r.detected is True
assert r.severity == CrisisSeverity.LOW
def test_non_crisis(self):
r = detect_crisis("I'm having a great day")
assert r.detected is False
assert r.severity == CrisisSeverity.NONE
def test_empty_input(self):
r = detect_crisis("")
assert r.detected is False
class TestSpanishCrisisDetection:
def test_quiero_morir(self):
r = detect_crisis("quiero morir, no puedo más")
assert r.detected is True
assert r.language == "es"
assert r.severity == CrisisSeverity.HIGH
def test_no_quiero_vivir(self):
r = detect_crisis("no quiero vivir más")
assert r.detected is True
assert r.language == "es"
def test_suicidio(self):
r = detect_crisis("estoy pensando en suicidio, quiero desaparecer")
assert r.detected is True
assert r.severity == CrisisSeverity.HIGH
class TestFrenchCrisisDetection:
def test_je_veux_mourir(self):
r = detect_crisis("je veux mourir")
assert r.detected is True
assert r.language == "fr"
assert r.severity == CrisisSeverity.HIGH
def test_envie_de_mourir(self):
r = detect_crisis("j'ai envie de mourir")
assert r.detected is True
assert r.severity == CrisisSeverity.HIGH
def test_suicide(self):
r = detect_crisis("je pense au suicide")
assert r.detected is True
assert r.severity == CrisisSeverity.HIGH
class TestGermanCrisisDetection:
def test_ich_will_sterben(self):
r = detect_crisis("ich will sterben")
assert r.detected is True
assert r.language == "de"
def test_selbstmord(self):
r = detect_crisis("ich denke an Selbstmord")
assert r.detected is True
assert r.severity == CrisisSeverity.HIGH
class TestPortugueseCrisisDetection:
def test_quero_morrer(self):
r = detect_crisis("eu quero morrer")
assert r.detected is True
assert r.language == "pt"
def test_nao_quero_viver(self):
r = detect_crisis("não quero mais viver")
assert r.detected is True
class TestChineseCrisisDetection:
def test_xiang_si(self):
r = detect_crisis("我想死")
assert r.detected is True
assert r.language == "zh"
assert r.severity == CrisisSeverity.HIGH
def test_zisha(self):
r = detect_crisis("我想自杀")
assert r.detected is True
assert r.severity == CrisisSeverity.HIGH
def test_bu_xiang_huo(self):
r = detect_crisis("我不想活了")
assert r.detected is True
assert r.severity == CrisisSeverity.HIGH
class TestCrisisResources:
def test_english_resources(self):
r = get_crisis_resources("en")
assert "988" in r["number"]
def test_spanish_resources(self):
r = get_crisis_resources("es")
assert r["number"] == "988"
def test_french_resources(self):
r = get_crisis_resources("fr")
assert "SOS" in r["hotline"]
def test_unknown_language_fallback(self):
r = get_crisis_resources("xx")
assert "988" in r["number"]
class TestFormatResponse:
def test_format_includes_resources(self):
result = detect_crisis("I want to die")
response = format_crisis_response(result)
assert "988" in response
assert "safe" in response.lower()
def test_format_spanish(self):
result = detect_crisis("quiero morir")
response = format_crisis_response(result)
assert "988" in response

View File

@@ -0,0 +1,256 @@
"""
Tests for GPU Inference Scheduler.
"""
import pytest
import tempfile
import os
from pathlib import Path
from tools.gpu_scheduler import (
Priority,
ModelSpec,
InferenceJob,
InferenceScheduler,
MODEL_REGISTRY,
)
@pytest.fixture
def scheduler():
"""Create a scheduler with a temp database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_scheduler.db"
sched = InferenceScheduler(vram_budget_mb=32768, queue_db=str(db_path))
yield sched
class TestPriority:
"""Test priority ordering."""
def test_priority_ordering(self):
"""Realtime < Interactive < Batch."""
assert Priority.REALTIME < Priority.INTERACTIVE
assert Priority.INTERACTIVE < Priority.BATCH
def test_priority_comparison(self):
"""Lower value = higher priority."""
assert Priority.REALTIME.value == 1
assert Priority.INTERACTIVE.value == 2
assert Priority.BATCH.value == 3
class TestModelSpec:
"""Test model specifications."""
def test_model_registry_has_models(self):
"""Registry should have known models."""
assert "llama3_70b" in MODEL_REGISTRY
assert "sd_xl" in MODEL_REGISTRY
assert "mimo_v2_pro" in MODEL_REGISTRY
def test_model_vram(self):
"""Models should have VRAM requirements."""
llama = MODEL_REGISTRY["llama3_70b"]
assert llama.vram_mb > 0
assert llama.vram_mb == 40960 # 40GB
class TestInferenceScheduler:
"""Test the scheduler."""
def test_init(self, scheduler):
"""Scheduler should initialize."""
assert scheduler.vram_budget_mb == 32768
assert scheduler.gpu_state.total_vram_mb == 32768
assert len(scheduler.job_queue) == 0
def test_submit_job(self, scheduler):
"""Submit a job."""
job = scheduler.submit_job(
job_id="test-1",
project="playground",
model_name="llama3_8b",
priority=Priority.INTERACTIVE,
)
assert job.job_id == "test-1"
assert job.status == "queued"
assert len(scheduler.job_queue) == 1
def test_submit_unknown_model(self, scheduler):
"""Submit with unknown model should raise."""
with pytest.raises(ValueError, match="Unknown model"):
scheduler.submit_job(
job_id="test-1",
project="playground",
model_name="nonexistent",
)
def test_priority_ordering(self, scheduler):
"""Jobs should be ordered by priority."""
scheduler.submit_job("batch-1", "harvester", "llama3_8b", Priority.BATCH)
scheduler.submit_job("rt-1", "lpm", "llama3_8b", Priority.REALTIME)
scheduler.submit_job("int-1", "playground", "llama3_8b", Priority.INTERACTIVE)
# RT should be first
assert scheduler.job_queue[0].job_id == "rt-1"
assert scheduler.job_queue[1].job_id == "int-1"
assert scheduler.job_queue[2].job_id == "batch-1"
def test_get_next_job(self, scheduler):
"""Get next job should return highest priority."""
scheduler.submit_job("batch-1", "harvester", "llama3_8b", Priority.BATCH)
scheduler.submit_job("rt-1", "lpm", "llama3_8b", Priority.REALTIME)
next_job = scheduler.get_next_job()
assert next_job.job_id == "rt-1"
def test_start_job(self, scheduler):
"""Start a job."""
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
success = scheduler.start_job(job)
assert success
assert job.status == "loading"
assert job.started_at is not None
assert scheduler.gpu_state.used_vram_mb == 8192 # llama3_8b VRAM
def test_complete_job(self, scheduler):
"""Complete a job."""
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
scheduler.start_job(job)
scheduler.complete_job(job)
assert job.status == "completed"
assert job.completed_at is not None
assert scheduler.gpu_state.used_vram_mb == 0
assert len(scheduler.job_queue) == 0
assert len(scheduler.completed_jobs) == 1
def test_complete_job_with_error(self, scheduler):
"""Complete a job with error."""
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
scheduler.start_job(job)
scheduler.complete_job(job, error="CUDA out of memory")
assert job.status == "failed"
assert job.error == "CUDA out of memory"
def test_vram_tracking(self, scheduler):
"""VRAM should be tracked correctly."""
# Submit two small jobs
job1 = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
job2 = scheduler.submit_job("test-2", "playground", "llama3_8b", Priority.INTERACTIVE)
# Start first
scheduler.start_job(job1)
assert scheduler.gpu_state.used_vram_mb == 8192
# Start second (should work, still have room)
scheduler.start_job(job2)
assert scheduler.gpu_state.used_vram_mb == 16384
# Complete first
scheduler.complete_job(job1)
assert scheduler.gpu_state.used_vram_mb == 8192
def test_cpu_fallback(self, scheduler):
"""CPU fallback when VRAM full."""
# Fill VRAM with two 16GB models (32GB total = our budget)
job1 = scheduler.submit_job("big-1", "lpm", "mimo_v2_pro", Priority.REALTIME)
scheduler.start_job(job1)
assert scheduler.gpu_state.used_vram_mb == 16384
# Start another 16GB model (should work, exactly fills VRAM)
job2 = scheduler.submit_job("big-2", "playground", "mimo_v2_pro", Priority.INTERACTIVE)
scheduler.start_job(job2)
assert scheduler.gpu_state.used_vram_mb == 32768 # Full
# Now try a third model - should get CPU fallback
job3 = scheduler.submit_job("big-3", "harvester", "mimo_v2_pro", Priority.BATCH)
next_job = scheduler.get_next_job()
# Should get job3 with CPU fallback since VRAM is full
assert next_job.job_id == "big-3"
assert next_job.use_cpu_fallback
def test_get_status(self, scheduler):
"""Get scheduler status."""
scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
scheduler.submit_job("test-2", "harvester", "llama3_8b", Priority.BATCH)
status = scheduler.get_status()
assert status["gpu"]["total_vram_mb"] == 32768
assert status["queue"]["pending"] == 2
assert status["queue"]["by_priority"]["INTERACTIVE"] == 1
assert status["queue"]["by_priority"]["BATCH"] == 1
def test_register_model(self, scheduler):
"""Register a custom model."""
custom = ModelSpec(name="Custom Model", vram_mb=4096)
scheduler.register_model("custom_model", custom)
assert "custom_model" in MODEL_REGISTRY
job = scheduler.submit_job("test-1", "playground", "custom_model")
assert job.model.vram_mb == 4096
class TestCrossProjectScenarios:
"""Test cross-project scenarios from the issue."""
def test_video_forge_batch_plus_lpm_live(self, scheduler):
"""
Video Forge batch + LPM live.
LPM should get priority, batch should queue.
"""
# Video Forge batch job
vf_job = scheduler.submit_job(
"vf-batch-1", "video_forge", "sd_xl", Priority.BATCH
)
# LPM live job (higher priority)
lpm_job = scheduler.submit_job(
"lpm-live-1", "lpm", "lpm_video", Priority.REALTIME
)
# Next job should be LPM
next_job = scheduler.get_next_job()
assert next_job.job_id == "lpm-live-1"
assert next_job.priority == Priority.REALTIME
def test_three_video_forge_jobs(self, scheduler):
"""Three Video Forge jobs should queue sequentially."""
jobs = []
for i in range(3):
job = scheduler.submit_job(
f"vf-{i}", "video_forge", "sd_xl", Priority.BATCH
)
jobs.append(job)
# Start first
scheduler.start_job(jobs[0])
assert scheduler.gpu_state.used_vram_mb == 8192
# Second should queue (VRAM occupied)
next_job = scheduler.get_next_job()
assert next_job.job_id == "vf-1"
def test_night_harvester_plus_playground(self, scheduler):
"""Night harvester runs on idle cycles."""
harvester = scheduler.submit_job(
"harvest-1", "harvester", "llama3_8b", Priority.BATCH
)
playground = scheduler.submit_job(
"play-1", "playground", "sdxl_turbo", Priority.INTERACTIVE
)
# Playground should get priority
next_job = scheduler.get_next_job()
assert next_job.job_id == "play-1"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

313
tools/crisis_detection.py Normal file
View File

@@ -0,0 +1,313 @@
"""Crisis Detection — multilingual suicide/self-harm pattern detection.
Detects crisis signals in user messages across multiple languages.
Returns a CrisisResult with severity, matched patterns, and language.
This module is designed to be called before the agent processes a message.
If a crisis is detected, the system should:
1. Ask "Are you safe right now?"
2. Provide crisis resources (988 for US, local equivalents)
3. Stay present — do not disconnect or deflect
Languages supported: English, Spanish, French, German, Portuguese, Chinese.
"""
import logging
import re
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class CrisisSeverity(Enum):
NONE = 0
LOW = 1 # Vague distress, check in
MEDIUM = 2 # Explicit ideation, needs resources
HIGH = 3 # Plan or intent, immediate action
@dataclass
class CrisisResult:
detected: bool
severity: CrisisSeverity
language: str
matched_patterns: List[str]
confidence: float
# --- Multilingual Crisis Patterns ---
# Each language has: explicit terms, ideation phrases, method references
# Severity increases with specificity
CRISIS_PATTERNS: Dict[str, Dict[str, List[Tuple[str, CrisisSeverity]]]] = {
"en": {
"explicit": [
(r'\bsuicide\b', CrisisSeverity.HIGH),
(r'\bkill\s*(?:my)?self\b', CrisisSeverity.HIGH),
(r'\bend\s*(?:my)?\s*life\b', CrisisSeverity.HIGH),
(r'\bnot\s+wanna\s+(?:be\s+)?alive\b', CrisisSeverity.HIGH),
(r'\bgoing\s+to\s+(?:kill|end)\b', CrisisSeverity.HIGH),
],
"ideation": [
(r'\bwant\s+to\s+die\b', CrisisSeverity.MEDIUM),
(r'\bwanna\s+die\b', CrisisSeverity.MEDIUM),
(r'\bwish\s+(?:I\s+)?(?:was|were)\s+dead\b', CrisisSeverity.MEDIUM),
(r'\bno\s+reason\s+to\s+live\b', CrisisSeverity.MEDIUM),
(r'\bbetter\s+off\s+(?:if\s+)?(?:I\s+)?(?:was|were)\s+dead\b', CrisisSeverity.MEDIUM),
(r'\bcan\'?t\s+(?:go\s+on|take\s+(?:it|this)\s+anymore)\b', CrisisSeverity.MEDIUM),
(r'\bwhat\'?s\s+the\s+point\b', CrisisSeverity.LOW),
],
"methods": [
(r'\boverdose\b', CrisisSeverity.HIGH),
(r'\bhanging\b', CrisisSeverity.HIGH),
(r'\bjump(?:ing)?\s+(?:off|from)\b', CrisisSeverity.HIGH),
(r'\bslit(?:ting)?\s+(?:my\s+)?wrists?\b', CrisisSeverity.HIGH),
],
},
"es": {
"explicit": [
(r'\bsuicidio\b', CrisisSeverity.HIGH),
(r'\bsuicida\b', CrisisSeverity.HIGH),
(r'\bmatarme\b', CrisisSeverity.HIGH),
(r'\bquitarme\s+la\s+vida\b', CrisisSeverity.HIGH),
],
"ideation": [
(r'\bquiero\s+morir\b', CrisisSeverity.HIGH),
(r'\bno\s+quiero\s+vivir\b', CrisisSeverity.HIGH),
(r'\bquiero\s+desaparecer\b', CrisisSeverity.MEDIUM),
(r'\bno\s+tengo\s+razones?\s+para\s+vivir\b', CrisisSeverity.MEDIUM),
(r'\bno\s+puedo\s+m[áa]s\b', CrisisSeverity.MEDIUM),
(r'\bpara\s+qu[ée]\s+vivir\b', CrisisSeverity.LOW),
],
"methods": [
(r'\bsobredosis\b', CrisisSeverity.HIGH),
(r'\bahorcarme\b', CrisisSeverity.HIGH),
(r'\b(?:cortarme|cortar)\s+(?:las?\s+)?(?:muñecas?|venas?)\b', CrisisSeverity.HIGH),
],
},
"fr": {
"explicit": [
(r'\bsuicide\b', CrisisSeverity.HIGH),
(r'\bsuicidaire\b', CrisisSeverity.HIGH),
(r'\bme\s+tuer\b', CrisisSeverity.HIGH),
(r'\bmettre\s+fin\s+[àa]\s+mes?\s+jours?\b', CrisisSeverity.HIGH),
],
"ideation": [
(r'\b(?:je\s+)?(?:veux|voudrais)\s+mourir\b', CrisisSeverity.HIGH),
(r'\benvie\s+de\s+mourir\b', CrisisSeverity.HIGH),
(r'\b(?:je\s+)?(?:ne\s+)?(?:veux|veut)\s+(?:plus\s+)?vivre\b', CrisisSeverity.HIGH),
(r'\b(?:je\s+n[\'e]?\s*ai\s+)?(?:plus\s+)?(?:de\s+)?raison[s]?\s+(?:de|pour)\s+vivre\b', CrisisSeverity.MEDIUM),
(r'\b(?:je\s+)?(?:n[\'e]?\s*en\s+)?peux\s+plus\b', CrisisSeverity.MEDIUM),
],
"methods": [
(r'\bsurdose\b', CrisisSeverity.HIGH),
(r'\bse\s+pendre\b', CrisisSeverity.HIGH),
(r'\bse\s+(?:ouvrir|couper)\s+(?:les?\s+)?(?:veines?|poignets?)\b', CrisisSeverity.HIGH),
],
},
"de": {
"explicit": [
(r'\bSelbstmord\b', CrisisSeverity.HIGH),
(r'\bSuizid\b', CrisisSeverity.HIGH),
(r'\bmich\s+(?:um)?bringen\b', CrisisSeverity.HIGH),
],
"ideation": [
(r'\b(?:ich\s+)?(?:will|möchte)\s+sterben\b', CrisisSeverity.HIGH),
(r'\bnicht\s+(?:mehr\s+)?leben\s+(?:wollen)?\b', CrisisSeverity.HIGH),
(r'\b(?:keinen|kein)\s+Grund\s+(?:mehr\s+)?(?:zum\s+)?(?:zu\s+)?leben\b', CrisisSeverity.MEDIUM),
(r'\b(?:ich\s+)?(?:kann|will)\s+nicht\s+mehr\b', CrisisSeverity.MEDIUM),
],
"methods": [
(r'\bÜberdosis\b', CrisisSeverity.HIGH),
(r'\berhängen\b', CrisisSeverity.HIGH),
(r'\b(?:Handgelenke|Pulsadern)\s+(?:auf|aufschneiden)\b', CrisisSeverity.HIGH),
],
},
"pt": {
"explicit": [
(r'\bsuicídio\b', CrisisSeverity.HIGH),
(r'\bme\s+matar\b', CrisisSeverity.HIGH),
(r'\btirar\s+(?:minha|a)\s+vida\b', CrisisSeverity.HIGH),
],
"ideation": [
(r'\b(?:eu\s+)?quero\s+morrer\b', CrisisSeverity.HIGH),
(r'\bnão\s+quero\s+(?:mais\s+)?viver\b', CrisisSeverity.HIGH),
(r'\bnão\s+(?:tenho\s+)?(?:mais\s+)?razão(?:es)?\s+pra\s+viver\b', CrisisSeverity.MEDIUM),
(r'\bnão\s+(?:eu\s+)?aguento\s+mais\b', CrisisSeverity.MEDIUM),
],
"methods": [
(r'\b(?:overdose|superdosagem)\b', CrisisSeverity.HIGH),
(r'\b(?:se\s+)?enforcar\b', CrisisSeverity.HIGH),
(r'\bcortar\s+(?:os?\s+)?pulsos?\b', CrisisSeverity.HIGH),
],
},
"zh": {
"explicit": [
(r'自杀', CrisisSeverity.HIGH),
(r'自殺', CrisisSeverity.HIGH),
(r'轻生', CrisisSeverity.HIGH),
],
"ideation": [
(r'想死', CrisisSeverity.HIGH),
(r'不想活', CrisisSeverity.HIGH),
(r'不想活了', CrisisSeverity.HIGH),
(r'活着没意思', CrisisSeverity.MEDIUM),
(r'活不下去', CrisisSeverity.MEDIUM),
(r'死了算了', CrisisSeverity.HIGH),
(r'为什么要活着', CrisisSeverity.LOW),
],
"methods": [
(r'跳楼', CrisisSeverity.HIGH),
(r'上吊', CrisisSeverity.HIGH),
(r'割腕', CrisisSeverity.HIGH),
(r'吃药', CrisisSeverity.MEDIUM), # Context-dependent
],
},
}
# Crisis resources by language/region
CRISIS_RESOURCES: Dict[str, Dict[str, str]] = {
"en": {
"hotline": "988 Suicide and Crisis Lifeline",
"number": "988",
"text": "Text HOME to 741741",
"web": "https://988lifeline.org",
},
"es": {
"hotline": "Línea Nacional de Prevención del Suicidio",
"number": "988",
"text": "Envía HOLA al 741741",
"web": "https://988lifeline.org (opción en español)",
},
"fr": {
"hotline": "SOS Amitié",
"number": "09 72 39 40 50",
"web": "https://www.sos-amitie.com",
},
"de": {
"hotline": "Telefonseelsorge",
"number": "0800 111 0 111",
"web": "https://www.telefonseelsorge.de",
},
"pt": {
"hotline": "Centro de Valorização da Vida (CVV)",
"number": "188",
"web": "https://www.cvv.org.br",
},
"zh": {
"hotline": "北京心理危机研究与干预中心",
"number": "010-82951332",
"web": "https://www.crisis.org.cn",
},
}
# Fallback resource
DEFAULT_RESOURCE = CRISIS_RESOURCES["en"]
def detect_language_simple(text: str) -> str:
"""Simple language detection based on character sets and common words.
Returns ISO 639-1 language code. Defaults to 'en'.
"""
# Chinese characters
if re.search(r'[\u4e00-\u9fff]', text):
return "zh"
# Check for language-specific words/patterns
text_lower = text.lower()
# Spanish indicators
es_words = {'quiero', 'morir', 'vivir', 'puedo', 'más', 'para', 'qué', 'razones', 'tengo', 'soy', 'estoy', 'pensando', 'desaparecer', 'suicidio', 'vida'}
if len(es_words & set(text_lower.split())) >= 2:
return "es"
# French indicators
fr_words = {'je', 'veux', 'mourir', 'plus', 'vie', 'vivre', 'envie', 'peux', 'raison'}
if len(fr_words & set(text_lower.split())) >= 2:
return "fr"
# German indicators
de_words = {'ich', 'will', 'sterben', 'nicht', 'mehr', 'leben', 'grund', 'selbstmord'}
if len(de_words & set(text_lower.split())) >= 2:
return "de"
# Portuguese indicators
pt_words = {'quero', 'morrer', 'viver', 'mais', 'aguento', 'razão', 'pra', 'vida'}
if len(pt_words & set(text_lower.split())) >= 2:
return "pt"
return "en"
def detect_crisis(text: str) -> CrisisResult:
"""Detect crisis signals in user message.
Args:
text: The user's message to analyze.
Returns:
CrisisResult with detection status, severity, language, and matched patterns.
"""
if not text or not text.strip():
return CrisisResult(False, CrisisSeverity.NONE, "en", [], 0.0)
language = detect_language_simple(text)
patterns = CRISIS_PATTERNS.get(language, CRISIS_PATTERNS["en"])
matched = []
max_severity = CrisisSeverity.NONE
for category, pattern_list in patterns.items():
for regex, severity in pattern_list:
if re.search(regex, text, re.IGNORECASE):
matched.append(f"{language}:{category}:{regex}")
if severity.value > max_severity.value:
max_severity = severity
detected = len(matched) > 0
# Calculate confidence based on number and severity of matches
confidence = 0.0
if detected:
base = 0.5 + len(matched) * 0.15
severity_bonus = max_severity.value * 0.1
confidence = min(0.99, base + severity_bonus)
return CrisisResult(
detected=detected,
severity=max_severity,
language=language,
matched_patterns=matched,
confidence=confidence,
)
def get_crisis_resources(language: str = "en") -> Dict[str, str]:
"""Get crisis resources for a language/region."""
return CRISIS_RESOURCES.get(language, DEFAULT_RESOURCE)
def format_crisis_response(result: CrisisResult) -> str:
"""Format a crisis response message with appropriate resources."""
resources = get_crisis_resources(result.language)
lines = [
"I want you to know that what you're feeling matters, and you're not alone.",
"",
f"If you're in immediate danger, please call {resources['hotline']}: {resources['number']}",
]
if "text" in resources:
lines.append(f"Or text: {resources['text']}")
lines.extend([
f"Web: {resources['web']}",
"",
"I'm here with you. Are you safe right now?",
])
return "\n".join(lines)

428
tools/gpu_scheduler.py Normal file
View File

@@ -0,0 +1,428 @@
"""
GPU Inference Scheduler — Multi-Model Resource Management
Queue-based model loading with priority lanes and VRAM budget tracking.
Prevents GPU OOM crashes when multiple projects compete for VRAM.
Priority lanes:
1. real-time (LPM) — highest priority, interactive
2. interactive (playground) — user-facing, medium priority
3. batch (harvester) — background, lowest priority
"""
import json
import time
import threading
import logging
from enum import IntEnum
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field, asdict
logger = logging.getLogger("hermes.gpu_scheduler")
class Priority(IntEnum):
"""Job priority levels. Lower value = higher priority."""
REALTIME = 1 # LPM, live video, interactive sessions
INTERACTIVE = 2 # Playground, chat, user-facing
BATCH = 3 # Harvester, overnight jobs, background
@dataclass
class ModelSpec:
"""Specification for a model and its VRAM requirements."""
name: str
vram_mb: int # VRAM required in MB
loader: str = "ollama" # How to load: ollama, vllm, llama_cpp, custom
model_id: str = "" # Model identifier (e.g., "llama3:70b")
cacheable: bool = True # Can be cached between jobs
cpu_fallback: bool = True # Can fall back to CPU if GPU busy
estimated_batch_ms: int = 1000 # Estimated time per batch
@dataclass
class InferenceJob:
"""A job requesting GPU inference."""
job_id: str
project: str # "video_forge", "lpm", "playground", "harvester"
model: ModelSpec
priority: Priority
batch_size: int = 1
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
completed_at: Optional[float] = None
status: str = "queued" # queued, loading, running, completed, failed
error: Optional[str] = None
use_cpu_fallback: bool = False
@dataclass
class GPUState:
"""Current GPU state."""
total_vram_mb: int = 0
used_vram_mb: int = 0
loaded_models: List[str] = field(default_factory=list)
active_job: Optional[str] = None
@property
def available_vram_mb(self) -> int:
return self.total_vram_mb - self.used_vram_mb
def can_fit(self, model: ModelSpec) -> bool:
return self.available_vram_mb >= model.vram_mb
# Known models and their VRAM requirements
MODEL_REGISTRY: Dict[str, ModelSpec] = {
# Video Forge models
"sd_xl": ModelSpec(name="Stable Diffusion XL", vram_mb=8192, loader="comfyui", model_id="sd_xl"),
"heartmula": ModelSpec(name="HeartMuLa", vram_mb=4096, loader="custom", model_id="heartmula"),
"wan2.1": ModelSpec(name="Wan2.1", vram_mb=12288, loader="custom", model_id="wan2.1"),
# LPM models
"lpm_video": ModelSpec(name="LPM Video Gen", vram_mb=16384, loader="custom", model_id="lpm_video"),
"lpm_a2a": ModelSpec(name="LPM A2A", vram_mb=8192, loader="custom", model_id="lpm_a2a"),
# Local inference (hermes)
"llama3_70b": ModelSpec(name="Llama 3 70B", vram_mb=40960, loader="ollama", model_id="llama3:70b"),
"llama3_8b": ModelSpec(name="Llama 3 8B", vram_mb=8192, loader="ollama", model_id="llama3:8b"),
"mimo_v2_pro": ModelSpec(name="MiMo v2 Pro", vram_mb=16384, loader="ollama", model_id="xiaomi/mimo-v2-pro"),
# Playground
"sdxl_turbo": ModelSpec(name="SDXL Turbo", vram_mb=6144, loader="comfyui", model_id="sdxl_turbo"),
}
# Default VRAM budget (can be overridden)
DEFAULT_VRAM_MB = 49152 # 48GB (e.g., L40S, A6000)
class InferenceScheduler:
"""
GPU Inference Scheduler.
Manages a queue of inference jobs with priority scheduling,
VRAM budget tracking, and CPU fallback.
"""
def __init__(
self,
vram_budget_mb: int = DEFAULT_VRAM_MB,
queue_db: str = "~/.hermes/gpu_scheduler.db",
):
self.vram_budget_mb = vram_budget_mb
self.queue_db = Path(queue_db).expanduser()
self.queue_db.parent.mkdir(parents=True, exist_ok=True)
# State
self.gpu_state = GPUState(total_vram_mb=vram_budget_mb)
self.job_queue: List[InferenceJob] = []
self.completed_jobs: List[InferenceJob] = []
self._lock = threading.Lock()
self._running = False
self._worker_thread: Optional[threading.Thread] = None
# Load persisted state
self._load_state()
logger.info(
"GPU Scheduler initialized: %dMB VRAM budget",
vram_budget_mb,
)
def _load_state(self):
"""Load state from SQLite."""
import sqlite3
conn = sqlite3.connect(str(self.queue_db))
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
job_id TEXT PRIMARY KEY,
project TEXT,
model_name TEXT,
priority INTEGER,
batch_size INTEGER,
created_at REAL,
started_at REAL,
completed_at REAL,
status TEXT,
error TEXT,
use_cpu_fallback INTEGER
)
""")
conn.commit()
# Load pending jobs
rows = conn.execute(
"SELECT * FROM jobs WHERE status IN ('queued', 'loading', 'running')"
).fetchall()
for row in rows:
model_name = row[2]
model = MODEL_REGISTRY.get(model_name, ModelSpec(name=model_name, vram_mb=8192))
job = InferenceJob(
job_id=row[0],
project=row[1],
model=model,
priority=Priority(row[3]),
batch_size=row[4],
created_at=row[5],
started_at=row[6],
completed_at=row[7],
status=row[8],
error=row[9],
use_cpu_fallback=bool(row[10]),
)
self.job_queue.append(job)
conn.close()
logger.info("Loaded %d pending jobs", len(self.job_queue))
def _save_job(self, job: InferenceJob):
"""Persist job to SQLite."""
import sqlite3
conn = sqlite3.connect(str(self.queue_db))
conn.execute("""
INSERT OR REPLACE INTO jobs
(job_id, project, model_name, priority, batch_size, created_at,
started_at, completed_at, status, error, use_cpu_fallback)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
job.job_id,
job.project,
job.model.name,
job.priority.value,
job.batch_size,
job.created_at,
job.started_at,
job.completed_at,
job.status,
job.error,
int(job.use_cpu_fallback),
))
conn.commit()
conn.close()
def submit_job(
self,
job_id: str,
project: str,
model_name: str,
priority: Priority = Priority.BATCH,
batch_size: int = 1,
) -> InferenceJob:
"""
Submit an inference job to the queue.
Args:
job_id: Unique job identifier
project: Project name (video_forge, lpm, playground, harvester)
model_name: Model name from MODEL_REGISTRY
priority: Job priority
batch_size: Number of items to process
Returns:
The created InferenceJob
"""
model = MODEL_REGISTRY.get(model_name)
if not model:
raise ValueError(f"Unknown model: {model_name}. Registered: {list(MODEL_REGISTRY.keys())}")
job = InferenceJob(
job_id=job_id,
project=project,
model=model,
priority=priority,
batch_size=batch_size,
)
with self._lock:
# Insert in priority order
inserted = False
for i, existing in enumerate(self.job_queue):
if job.priority < existing.priority:
self.job_queue.insert(i, job)
inserted = True
break
if not inserted:
self.job_queue.append(job)
self._save_job(job)
logger.info(
"Job submitted: %s (project=%s, model=%s, priority=%s)",
job_id, project, model_name, priority.name,
)
return job
def get_next_job(self) -> Optional[InferenceJob]:
"""Get the next job to process based on priority and VRAM availability."""
with self._lock:
for job in self.job_queue:
if job.status != "queued":
continue
# Check if model fits in VRAM
if self.gpu_state.can_fit(job.model):
return job
# Check CPU fallback
if job.model.cpu_fallback:
job.use_cpu_fallback = True
return job
return None
def start_job(self, job: InferenceJob) -> bool:
"""
Mark a job as started and load its model.
Returns True if successful, False if insufficient VRAM.
"""
with self._lock:
if not job.use_cpu_fallback:
if not self.gpu_state.can_fit(job.model):
logger.warning(
"Insufficient VRAM for %s: need %dMB, have %dMB",
job.model.name,
job.model.vram_mb,
self.gpu_state.available_vram_mb,
)
return False
# Reserve VRAM
self.gpu_state.used_vram_mb += job.model.vram_mb
if job.model.name not in self.gpu_state.loaded_models:
self.gpu_state.loaded_models.append(job.model.name)
job.status = "loading"
job.started_at = time.time()
self.gpu_state.active_job = job.job_id
self._save_job(job)
logger.info(
"Job started: %s (model=%s, cpu_fallback=%s, vram_used=%dMB)",
job.job_id,
job.model.name,
job.use_cpu_fallback,
self.gpu_state.used_vram_mb,
)
return True
def complete_job(self, job: InferenceJob, error: str = None):
"""Mark a job as completed and release its VRAM."""
with self._lock:
job.completed_at = time.time()
job.status = "completed" if not error else "failed"
job.error = error
if not job.use_cpu_fallback:
# Release VRAM
self.gpu_state.used_vram_mb = max(
0,
self.gpu_state.used_vram_mb - job.model.vram_mb,
)
if self.gpu_state.active_job == job.job_id:
self.gpu_state.active_job = None
# Move to completed
self.job_queue.remove(job)
self.completed_jobs.append(job)
self._save_job(job)
duration = (job.completed_at - job.started_at) * 1000 if job.started_at else 0
logger.info(
"Job completed: %s (status=%s, duration=%.0fms)",
job.job_id,
job.status,
duration,
)
def get_status(self) -> Dict[str, Any]:
"""Get scheduler status."""
with self._lock:
return {
"gpu": {
"total_vram_mb": self.gpu_state.total_vram_mb,
"used_vram_mb": self.gpu_state.used_vram_mb,
"available_vram_mb": self.gpu_state.available_vram_mb,
"utilization_pct": round(
self.gpu_state.used_vram_mb / self.gpu_state.total_vram_mb * 100, 1
),
"loaded_models": self.gpu_state.loaded_models,
"active_job": self.gpu_state.active_job,
},
"queue": {
"pending": len([j for j in self.job_queue if j.status == "queued"]),
"loading": len([j for j in self.job_queue if j.status == "loading"]),
"running": len([j for j in self.job_queue if j.status == "running"]),
"by_priority": {
p.name: len([j for j in self.job_queue if j.priority == p and j.status == "queued"])
for p in Priority
},
},
"completed": {
"total": len(self.completed_jobs),
"success": len([j for j in self.completed_jobs if j.status == "completed"]),
"failed": len([j for j in self.completed_jobs if j.status == "failed"]),
},
}
def register_model(self, name: str, spec: ModelSpec):
"""Register a new model."""
MODEL_REGISTRY[name] = spec
logger.info("Registered model: %s (%dMB VRAM)", name, spec.vram_mb)
def clear_completed(self):
"""Clear completed jobs from memory (keep in DB)."""
with self._lock:
self.completed_jobs.clear()
# ============================================================================
# CLI Interface
# ============================================================================
def main():
"""CLI entry point for testing."""
import argparse
parser = argparse.ArgumentParser(description="GPU Inference Scheduler")
parser.add_argument("action", choices=["status", "submit", "list", "clear"])
parser.add_argument("--job-id", help="Job ID for submit")
parser.add_argument("--project", help="Project name")
parser.add_argument("--model", help="Model name")
parser.add_argument("--priority", choices=["realtime", "interactive", "batch"], default="batch")
parser.add_argument("--vram", type=int, default=DEFAULT_VRAM_MB, help="VRAM budget in MB")
args = parser.parse_args()
scheduler = InferenceScheduler(vram_budget_mb=args.vram)
if args.action == "status":
status = scheduler.get_status()
print(json.dumps(status, indent=2))
elif args.action == "submit":
if not all([args.job_id, args.project, args.model]):
print("Error: --job-id, --project, and --model required for submit")
return
priority = Priority[args.priority.upper()]
job = scheduler.submit_job(args.job_id, args.project, args.model, priority)
print(f"Submitted: {job.job_id}")
elif args.action == "list":
print(f"Pending jobs: {len(scheduler.job_queue)}")
for job in scheduler.job_queue:
print(f" {job.job_id}: {job.project}/{job.model.name} [{job.priority.name}] {job.status}")
elif args.action == "clear":
scheduler.clear_completed()
print("Cleared completed jobs from memory")
if __name__ == "__main__":
main()

View File

@@ -1976,6 +1976,9 @@ def register_mcp_servers(servers: Dict[str, dict]) -> List[str]:
logger.debug("No explicit MCP servers provided")
return []
# Kill orphaned MCP subprocesses from previous sessions (issue #714)
_kill_zombie_mcp_processes()
# Only attempt servers that aren't already connected and are enabled
# (enabled: false skips the server entirely without removing its config)
with _lock:
@@ -2228,6 +2231,72 @@ def shutdown_mcp_servers():
_stop_mcp_loop()
def _kill_zombie_mcp_processes() -> None:
"""Kill orphaned MCP server processes from previous sessions.
Addresses issue #714: MCP server processes accumulate (~80 zombies)
when the agent restarts or MCP servers reconnect without cleanup.
Kills processes matching common MCP server patterns that are NOT
tracked in _stdio_pids (i.e., they're from previous sessions).
Keeps the newest 2 per pattern to avoid killing active servers.
"""
import signal as _signal
import subprocess as _subprocess
patterns = ["morrowind/mcp", "mcp_server.py"]
my_pid = os.getpid()
kill_sig = getattr(_signal, "SIGKILL", _signal.SIGTERM)
for pattern in patterns:
try:
result = _subprocess.run(
["ps", "aux"],
capture_output=True, text=True, timeout=5,
)
pids = []
for line in result.stdout.splitlines():
if pattern not in line or "grep" in line:
continue
parts = line.split()
if len(parts) < 2:
continue
try:
pid = int(parts[1])
except ValueError:
continue
# Skip our own process and tracked PIDs
if pid == my_pid:
continue
with _lock:
if pid in _stdio_pids:
continue
pids.append(pid)
if len(pids) <= 2:
continue # Keep up to 2
# Kill oldest (lower PIDs), keep newest 2
pids.sort(reverse=True)
to_kill = pids[2:]
for pid in to_kill:
try:
os.kill(pid, kill_sig)
logger.debug("Killed zombie MCP process %d (pattern: %s)", pid, pattern)
except (ProcessLookupError, PermissionError):
pass
if to_kill:
logger.info(
"Cleaned up %d zombie MCP processes (pattern: %s, kept %d)",
len(to_kill), pattern, min(2, len(pids)),
)
except Exception as exc:
logger.debug("Zombie MCP cleanup failed for pattern '%s': %s", pattern, exc)
def _kill_orphaned_mcp_children() -> None:
"""Best-effort kill of MCP stdio subprocesses that survived loop shutdown.