Compare commits
2 Commits
fix/749-ba
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aae0357bb0 | ||
|
|
ebf69d155b |
179
tests/tools/test_crisis_detection.py
Normal file
179
tests/tools/test_crisis_detection.py
Normal file
@@ -0,0 +1,179 @@
|
||||
"""Tests for multilingual crisis detection."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
from tools.crisis_detection import (
|
||||
CrisisSeverity, CrisisResult,
|
||||
detect_crisis, detect_language_simple,
|
||||
get_crisis_resources, format_crisis_response,
|
||||
)
|
||||
|
||||
|
||||
class TestLanguageDetection:
|
||||
def test_english_default(self):
|
||||
assert detect_language_simple("I need help") == "en"
|
||||
|
||||
def test_spanish_detected(self):
|
||||
assert detect_language_simple("quiero morir no puedo más") == "es"
|
||||
|
||||
def test_french_detected(self):
|
||||
assert detect_language_simple("je veux mourir je peux plus") == "fr"
|
||||
|
||||
def test_german_detected(self):
|
||||
assert detect_language_simple("ich will sterben nicht mehr leben") == "de"
|
||||
|
||||
def test_portuguese_detected(self):
|
||||
assert detect_language_simple("quero morrer não aguento mais") == "pt"
|
||||
|
||||
def test_chinese_detected(self):
|
||||
assert detect_language_simple("我想死") == "zh"
|
||||
assert detect_language_simple("自殺") == "zh"
|
||||
|
||||
|
||||
class TestEnglishCrisisDetection:
|
||||
def test_explicit_suicide(self):
|
||||
r = detect_crisis("I'm thinking about suicide")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
assert r.language == "en"
|
||||
|
||||
def test_kill_myself(self):
|
||||
r = detect_crisis("I want to kill myself")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
def test_want_to_die(self):
|
||||
r = detect_crisis("I just want to die")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.MEDIUM
|
||||
|
||||
def test_no_reason_to_live(self):
|
||||
r = detect_crisis("there's no reason to live anymore")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.MEDIUM
|
||||
|
||||
def test_what_is_the_point(self):
|
||||
r = detect_crisis("what's the point of anything")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.LOW
|
||||
|
||||
def test_non_crisis(self):
|
||||
r = detect_crisis("I'm having a great day")
|
||||
assert r.detected is False
|
||||
assert r.severity == CrisisSeverity.NONE
|
||||
|
||||
def test_empty_input(self):
|
||||
r = detect_crisis("")
|
||||
assert r.detected is False
|
||||
|
||||
|
||||
class TestSpanishCrisisDetection:
|
||||
def test_quiero_morir(self):
|
||||
r = detect_crisis("quiero morir, no puedo más")
|
||||
assert r.detected is True
|
||||
assert r.language == "es"
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
def test_no_quiero_vivir(self):
|
||||
r = detect_crisis("no quiero vivir más")
|
||||
assert r.detected is True
|
||||
assert r.language == "es"
|
||||
|
||||
def test_suicidio(self):
|
||||
r = detect_crisis("estoy pensando en suicidio, quiero desaparecer")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
|
||||
class TestFrenchCrisisDetection:
|
||||
def test_je_veux_mourir(self):
|
||||
r = detect_crisis("je veux mourir")
|
||||
assert r.detected is True
|
||||
assert r.language == "fr"
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
def test_envie_de_mourir(self):
|
||||
r = detect_crisis("j'ai envie de mourir")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
def test_suicide(self):
|
||||
r = detect_crisis("je pense au suicide")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
|
||||
class TestGermanCrisisDetection:
|
||||
def test_ich_will_sterben(self):
|
||||
r = detect_crisis("ich will sterben")
|
||||
assert r.detected is True
|
||||
assert r.language == "de"
|
||||
|
||||
def test_selbstmord(self):
|
||||
r = detect_crisis("ich denke an Selbstmord")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
|
||||
class TestPortugueseCrisisDetection:
|
||||
def test_quero_morrer(self):
|
||||
r = detect_crisis("eu quero morrer")
|
||||
assert r.detected is True
|
||||
assert r.language == "pt"
|
||||
|
||||
def test_nao_quero_viver(self):
|
||||
r = detect_crisis("não quero mais viver")
|
||||
assert r.detected is True
|
||||
|
||||
|
||||
class TestChineseCrisisDetection:
|
||||
def test_xiang_si(self):
|
||||
r = detect_crisis("我想死")
|
||||
assert r.detected is True
|
||||
assert r.language == "zh"
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
def test_zisha(self):
|
||||
r = detect_crisis("我想自杀")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
def test_bu_xiang_huo(self):
|
||||
r = detect_crisis("我不想活了")
|
||||
assert r.detected is True
|
||||
assert r.severity == CrisisSeverity.HIGH
|
||||
|
||||
|
||||
class TestCrisisResources:
|
||||
def test_english_resources(self):
|
||||
r = get_crisis_resources("en")
|
||||
assert "988" in r["number"]
|
||||
|
||||
def test_spanish_resources(self):
|
||||
r = get_crisis_resources("es")
|
||||
assert r["number"] == "988"
|
||||
|
||||
def test_french_resources(self):
|
||||
r = get_crisis_resources("fr")
|
||||
assert "SOS" in r["hotline"]
|
||||
|
||||
def test_unknown_language_fallback(self):
|
||||
r = get_crisis_resources("xx")
|
||||
assert "988" in r["number"]
|
||||
|
||||
|
||||
class TestFormatResponse:
|
||||
def test_format_includes_resources(self):
|
||||
result = detect_crisis("I want to die")
|
||||
response = format_crisis_response(result)
|
||||
assert "988" in response
|
||||
assert "safe" in response.lower()
|
||||
|
||||
def test_format_spanish(self):
|
||||
result = detect_crisis("quiero morir")
|
||||
response = format_crisis_response(result)
|
||||
assert "988" in response
|
||||
256
tests/tools/test_gpu_scheduler.py
Normal file
256
tests/tools/test_gpu_scheduler.py
Normal file
@@ -0,0 +1,256 @@
|
||||
"""
|
||||
Tests for GPU Inference Scheduler.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from tools.gpu_scheduler import (
|
||||
Priority,
|
||||
ModelSpec,
|
||||
InferenceJob,
|
||||
InferenceScheduler,
|
||||
MODEL_REGISTRY,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scheduler():
|
||||
"""Create a scheduler with a temp database."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "test_scheduler.db"
|
||||
sched = InferenceScheduler(vram_budget_mb=32768, queue_db=str(db_path))
|
||||
yield sched
|
||||
|
||||
|
||||
class TestPriority:
|
||||
"""Test priority ordering."""
|
||||
|
||||
def test_priority_ordering(self):
|
||||
"""Realtime < Interactive < Batch."""
|
||||
assert Priority.REALTIME < Priority.INTERACTIVE
|
||||
assert Priority.INTERACTIVE < Priority.BATCH
|
||||
|
||||
def test_priority_comparison(self):
|
||||
"""Lower value = higher priority."""
|
||||
assert Priority.REALTIME.value == 1
|
||||
assert Priority.INTERACTIVE.value == 2
|
||||
assert Priority.BATCH.value == 3
|
||||
|
||||
|
||||
class TestModelSpec:
|
||||
"""Test model specifications."""
|
||||
|
||||
def test_model_registry_has_models(self):
|
||||
"""Registry should have known models."""
|
||||
assert "llama3_70b" in MODEL_REGISTRY
|
||||
assert "sd_xl" in MODEL_REGISTRY
|
||||
assert "mimo_v2_pro" in MODEL_REGISTRY
|
||||
|
||||
def test_model_vram(self):
|
||||
"""Models should have VRAM requirements."""
|
||||
llama = MODEL_REGISTRY["llama3_70b"]
|
||||
assert llama.vram_mb > 0
|
||||
assert llama.vram_mb == 40960 # 40GB
|
||||
|
||||
|
||||
class TestInferenceScheduler:
|
||||
"""Test the scheduler."""
|
||||
|
||||
def test_init(self, scheduler):
|
||||
"""Scheduler should initialize."""
|
||||
assert scheduler.vram_budget_mb == 32768
|
||||
assert scheduler.gpu_state.total_vram_mb == 32768
|
||||
assert len(scheduler.job_queue) == 0
|
||||
|
||||
def test_submit_job(self, scheduler):
|
||||
"""Submit a job."""
|
||||
job = scheduler.submit_job(
|
||||
job_id="test-1",
|
||||
project="playground",
|
||||
model_name="llama3_8b",
|
||||
priority=Priority.INTERACTIVE,
|
||||
)
|
||||
|
||||
assert job.job_id == "test-1"
|
||||
assert job.status == "queued"
|
||||
assert len(scheduler.job_queue) == 1
|
||||
|
||||
def test_submit_unknown_model(self, scheduler):
|
||||
"""Submit with unknown model should raise."""
|
||||
with pytest.raises(ValueError, match="Unknown model"):
|
||||
scheduler.submit_job(
|
||||
job_id="test-1",
|
||||
project="playground",
|
||||
model_name="nonexistent",
|
||||
)
|
||||
|
||||
def test_priority_ordering(self, scheduler):
|
||||
"""Jobs should be ordered by priority."""
|
||||
scheduler.submit_job("batch-1", "harvester", "llama3_8b", Priority.BATCH)
|
||||
scheduler.submit_job("rt-1", "lpm", "llama3_8b", Priority.REALTIME)
|
||||
scheduler.submit_job("int-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
|
||||
# RT should be first
|
||||
assert scheduler.job_queue[0].job_id == "rt-1"
|
||||
assert scheduler.job_queue[1].job_id == "int-1"
|
||||
assert scheduler.job_queue[2].job_id == "batch-1"
|
||||
|
||||
def test_get_next_job(self, scheduler):
|
||||
"""Get next job should return highest priority."""
|
||||
scheduler.submit_job("batch-1", "harvester", "llama3_8b", Priority.BATCH)
|
||||
scheduler.submit_job("rt-1", "lpm", "llama3_8b", Priority.REALTIME)
|
||||
|
||||
next_job = scheduler.get_next_job()
|
||||
assert next_job.job_id == "rt-1"
|
||||
|
||||
def test_start_job(self, scheduler):
|
||||
"""Start a job."""
|
||||
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
success = scheduler.start_job(job)
|
||||
|
||||
assert success
|
||||
assert job.status == "loading"
|
||||
assert job.started_at is not None
|
||||
assert scheduler.gpu_state.used_vram_mb == 8192 # llama3_8b VRAM
|
||||
|
||||
def test_complete_job(self, scheduler):
|
||||
"""Complete a job."""
|
||||
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
scheduler.start_job(job)
|
||||
scheduler.complete_job(job)
|
||||
|
||||
assert job.status == "completed"
|
||||
assert job.completed_at is not None
|
||||
assert scheduler.gpu_state.used_vram_mb == 0
|
||||
assert len(scheduler.job_queue) == 0
|
||||
assert len(scheduler.completed_jobs) == 1
|
||||
|
||||
def test_complete_job_with_error(self, scheduler):
|
||||
"""Complete a job with error."""
|
||||
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
scheduler.start_job(job)
|
||||
scheduler.complete_job(job, error="CUDA out of memory")
|
||||
|
||||
assert job.status == "failed"
|
||||
assert job.error == "CUDA out of memory"
|
||||
|
||||
def test_vram_tracking(self, scheduler):
|
||||
"""VRAM should be tracked correctly."""
|
||||
# Submit two small jobs
|
||||
job1 = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
job2 = scheduler.submit_job("test-2", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
|
||||
# Start first
|
||||
scheduler.start_job(job1)
|
||||
assert scheduler.gpu_state.used_vram_mb == 8192
|
||||
|
||||
# Start second (should work, still have room)
|
||||
scheduler.start_job(job2)
|
||||
assert scheduler.gpu_state.used_vram_mb == 16384
|
||||
|
||||
# Complete first
|
||||
scheduler.complete_job(job1)
|
||||
assert scheduler.gpu_state.used_vram_mb == 8192
|
||||
|
||||
def test_cpu_fallback(self, scheduler):
|
||||
"""CPU fallback when VRAM full."""
|
||||
# Fill VRAM with two 16GB models (32GB total = our budget)
|
||||
job1 = scheduler.submit_job("big-1", "lpm", "mimo_v2_pro", Priority.REALTIME)
|
||||
scheduler.start_job(job1)
|
||||
assert scheduler.gpu_state.used_vram_mb == 16384
|
||||
|
||||
# Start another 16GB model (should work, exactly fills VRAM)
|
||||
job2 = scheduler.submit_job("big-2", "playground", "mimo_v2_pro", Priority.INTERACTIVE)
|
||||
scheduler.start_job(job2)
|
||||
assert scheduler.gpu_state.used_vram_mb == 32768 # Full
|
||||
|
||||
# Now try a third model - should get CPU fallback
|
||||
job3 = scheduler.submit_job("big-3", "harvester", "mimo_v2_pro", Priority.BATCH)
|
||||
next_job = scheduler.get_next_job()
|
||||
|
||||
# Should get job3 with CPU fallback since VRAM is full
|
||||
assert next_job.job_id == "big-3"
|
||||
assert next_job.use_cpu_fallback
|
||||
|
||||
def test_get_status(self, scheduler):
|
||||
"""Get scheduler status."""
|
||||
scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
scheduler.submit_job("test-2", "harvester", "llama3_8b", Priority.BATCH)
|
||||
|
||||
status = scheduler.get_status()
|
||||
|
||||
assert status["gpu"]["total_vram_mb"] == 32768
|
||||
assert status["queue"]["pending"] == 2
|
||||
assert status["queue"]["by_priority"]["INTERACTIVE"] == 1
|
||||
assert status["queue"]["by_priority"]["BATCH"] == 1
|
||||
|
||||
def test_register_model(self, scheduler):
|
||||
"""Register a custom model."""
|
||||
custom = ModelSpec(name="Custom Model", vram_mb=4096)
|
||||
scheduler.register_model("custom_model", custom)
|
||||
|
||||
assert "custom_model" in MODEL_REGISTRY
|
||||
|
||||
job = scheduler.submit_job("test-1", "playground", "custom_model")
|
||||
assert job.model.vram_mb == 4096
|
||||
|
||||
|
||||
class TestCrossProjectScenarios:
|
||||
"""Test cross-project scenarios from the issue."""
|
||||
|
||||
def test_video_forge_batch_plus_lpm_live(self, scheduler):
|
||||
"""
|
||||
Video Forge batch + LPM live.
|
||||
LPM should get priority, batch should queue.
|
||||
"""
|
||||
# Video Forge batch job
|
||||
vf_job = scheduler.submit_job(
|
||||
"vf-batch-1", "video_forge", "sd_xl", Priority.BATCH
|
||||
)
|
||||
|
||||
# LPM live job (higher priority)
|
||||
lpm_job = scheduler.submit_job(
|
||||
"lpm-live-1", "lpm", "lpm_video", Priority.REALTIME
|
||||
)
|
||||
|
||||
# Next job should be LPM
|
||||
next_job = scheduler.get_next_job()
|
||||
assert next_job.job_id == "lpm-live-1"
|
||||
assert next_job.priority == Priority.REALTIME
|
||||
|
||||
def test_three_video_forge_jobs(self, scheduler):
|
||||
"""Three Video Forge jobs should queue sequentially."""
|
||||
jobs = []
|
||||
for i in range(3):
|
||||
job = scheduler.submit_job(
|
||||
f"vf-{i}", "video_forge", "sd_xl", Priority.BATCH
|
||||
)
|
||||
jobs.append(job)
|
||||
|
||||
# Start first
|
||||
scheduler.start_job(jobs[0])
|
||||
assert scheduler.gpu_state.used_vram_mb == 8192
|
||||
|
||||
# Second should queue (VRAM occupied)
|
||||
next_job = scheduler.get_next_job()
|
||||
assert next_job.job_id == "vf-1"
|
||||
|
||||
def test_night_harvester_plus_playground(self, scheduler):
|
||||
"""Night harvester runs on idle cycles."""
|
||||
harvester = scheduler.submit_job(
|
||||
"harvest-1", "harvester", "llama3_8b", Priority.BATCH
|
||||
)
|
||||
playground = scheduler.submit_job(
|
||||
"play-1", "playground", "sdxl_turbo", Priority.INTERACTIVE
|
||||
)
|
||||
|
||||
# Playground should get priority
|
||||
next_job = scheduler.get_next_job()
|
||||
assert next_job.job_id == "play-1"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
313
tools/crisis_detection.py
Normal file
313
tools/crisis_detection.py
Normal file
@@ -0,0 +1,313 @@
|
||||
"""Crisis Detection — multilingual suicide/self-harm pattern detection.
|
||||
|
||||
Detects crisis signals in user messages across multiple languages.
|
||||
Returns a CrisisResult with severity, matched patterns, and language.
|
||||
|
||||
This module is designed to be called before the agent processes a message.
|
||||
If a crisis is detected, the system should:
|
||||
1. Ask "Are you safe right now?"
|
||||
2. Provide crisis resources (988 for US, local equivalents)
|
||||
3. Stay present — do not disconnect or deflect
|
||||
|
||||
Languages supported: English, Spanish, French, German, Portuguese, Chinese.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CrisisSeverity(Enum):
|
||||
NONE = 0
|
||||
LOW = 1 # Vague distress, check in
|
||||
MEDIUM = 2 # Explicit ideation, needs resources
|
||||
HIGH = 3 # Plan or intent, immediate action
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisResult:
|
||||
detected: bool
|
||||
severity: CrisisSeverity
|
||||
language: str
|
||||
matched_patterns: List[str]
|
||||
confidence: float
|
||||
|
||||
|
||||
# --- Multilingual Crisis Patterns ---
|
||||
|
||||
# Each language has: explicit terms, ideation phrases, method references
|
||||
# Severity increases with specificity
|
||||
|
||||
CRISIS_PATTERNS: Dict[str, Dict[str, List[Tuple[str, CrisisSeverity]]]] = {
|
||||
"en": {
|
||||
"explicit": [
|
||||
(r'\bsuicide\b', CrisisSeverity.HIGH),
|
||||
(r'\bkill\s*(?:my)?self\b', CrisisSeverity.HIGH),
|
||||
(r'\bend\s*(?:my)?\s*life\b', CrisisSeverity.HIGH),
|
||||
(r'\bnot\s+wanna\s+(?:be\s+)?alive\b', CrisisSeverity.HIGH),
|
||||
(r'\bgoing\s+to\s+(?:kill|end)\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
"ideation": [
|
||||
(r'\bwant\s+to\s+die\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bwanna\s+die\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bwish\s+(?:I\s+)?(?:was|were)\s+dead\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bno\s+reason\s+to\s+live\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bbetter\s+off\s+(?:if\s+)?(?:I\s+)?(?:was|were)\s+dead\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bcan\'?t\s+(?:go\s+on|take\s+(?:it|this)\s+anymore)\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bwhat\'?s\s+the\s+point\b', CrisisSeverity.LOW),
|
||||
],
|
||||
"methods": [
|
||||
(r'\boverdose\b', CrisisSeverity.HIGH),
|
||||
(r'\bhanging\b', CrisisSeverity.HIGH),
|
||||
(r'\bjump(?:ing)?\s+(?:off|from)\b', CrisisSeverity.HIGH),
|
||||
(r'\bslit(?:ting)?\s+(?:my\s+)?wrists?\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
},
|
||||
"es": {
|
||||
"explicit": [
|
||||
(r'\bsuicidio\b', CrisisSeverity.HIGH),
|
||||
(r'\bsuicida\b', CrisisSeverity.HIGH),
|
||||
(r'\bmatarme\b', CrisisSeverity.HIGH),
|
||||
(r'\bquitarme\s+la\s+vida\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
"ideation": [
|
||||
(r'\bquiero\s+morir\b', CrisisSeverity.HIGH),
|
||||
(r'\bno\s+quiero\s+vivir\b', CrisisSeverity.HIGH),
|
||||
(r'\bquiero\s+desaparecer\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bno\s+tengo\s+razones?\s+para\s+vivir\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bno\s+puedo\s+m[áa]s\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bpara\s+qu[ée]\s+vivir\b', CrisisSeverity.LOW),
|
||||
],
|
||||
"methods": [
|
||||
(r'\bsobredosis\b', CrisisSeverity.HIGH),
|
||||
(r'\bahorcarme\b', CrisisSeverity.HIGH),
|
||||
(r'\b(?:cortarme|cortar)\s+(?:las?\s+)?(?:muñecas?|venas?)\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
},
|
||||
"fr": {
|
||||
"explicit": [
|
||||
(r'\bsuicide\b', CrisisSeverity.HIGH),
|
||||
(r'\bsuicidaire\b', CrisisSeverity.HIGH),
|
||||
(r'\bme\s+tuer\b', CrisisSeverity.HIGH),
|
||||
(r'\bmettre\s+fin\s+[àa]\s+mes?\s+jours?\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
"ideation": [
|
||||
(r'\b(?:je\s+)?(?:veux|voudrais)\s+mourir\b', CrisisSeverity.HIGH),
|
||||
(r'\benvie\s+de\s+mourir\b', CrisisSeverity.HIGH),
|
||||
(r'\b(?:je\s+)?(?:ne\s+)?(?:veux|veut)\s+(?:plus\s+)?vivre\b', CrisisSeverity.HIGH),
|
||||
(r'\b(?:je\s+n[\'e]?\s*ai\s+)?(?:plus\s+)?(?:de\s+)?raison[s]?\s+(?:de|pour)\s+vivre\b', CrisisSeverity.MEDIUM),
|
||||
(r'\b(?:je\s+)?(?:n[\'e]?\s*en\s+)?peux\s+plus\b', CrisisSeverity.MEDIUM),
|
||||
],
|
||||
"methods": [
|
||||
(r'\bsurdose\b', CrisisSeverity.HIGH),
|
||||
(r'\bse\s+pendre\b', CrisisSeverity.HIGH),
|
||||
(r'\bse\s+(?:ouvrir|couper)\s+(?:les?\s+)?(?:veines?|poignets?)\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
},
|
||||
"de": {
|
||||
"explicit": [
|
||||
(r'\bSelbstmord\b', CrisisSeverity.HIGH),
|
||||
(r'\bSuizid\b', CrisisSeverity.HIGH),
|
||||
(r'\bmich\s+(?:um)?bringen\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
"ideation": [
|
||||
(r'\b(?:ich\s+)?(?:will|möchte)\s+sterben\b', CrisisSeverity.HIGH),
|
||||
(r'\bnicht\s+(?:mehr\s+)?leben\s+(?:wollen)?\b', CrisisSeverity.HIGH),
|
||||
(r'\b(?:keinen|kein)\s+Grund\s+(?:mehr\s+)?(?:zum\s+)?(?:zu\s+)?leben\b', CrisisSeverity.MEDIUM),
|
||||
(r'\b(?:ich\s+)?(?:kann|will)\s+nicht\s+mehr\b', CrisisSeverity.MEDIUM),
|
||||
],
|
||||
"methods": [
|
||||
(r'\bÜberdosis\b', CrisisSeverity.HIGH),
|
||||
(r'\berhängen\b', CrisisSeverity.HIGH),
|
||||
(r'\b(?:Handgelenke|Pulsadern)\s+(?:auf|aufschneiden)\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
},
|
||||
"pt": {
|
||||
"explicit": [
|
||||
(r'\bsuicídio\b', CrisisSeverity.HIGH),
|
||||
(r'\bme\s+matar\b', CrisisSeverity.HIGH),
|
||||
(r'\btirar\s+(?:minha|a)\s+vida\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
"ideation": [
|
||||
(r'\b(?:eu\s+)?quero\s+morrer\b', CrisisSeverity.HIGH),
|
||||
(r'\bnão\s+quero\s+(?:mais\s+)?viver\b', CrisisSeverity.HIGH),
|
||||
(r'\bnão\s+(?:tenho\s+)?(?:mais\s+)?razão(?:es)?\s+pra\s+viver\b', CrisisSeverity.MEDIUM),
|
||||
(r'\bnão\s+(?:eu\s+)?aguento\s+mais\b', CrisisSeverity.MEDIUM),
|
||||
],
|
||||
"methods": [
|
||||
(r'\b(?:overdose|superdosagem)\b', CrisisSeverity.HIGH),
|
||||
(r'\b(?:se\s+)?enforcar\b', CrisisSeverity.HIGH),
|
||||
(r'\bcortar\s+(?:os?\s+)?pulsos?\b', CrisisSeverity.HIGH),
|
||||
],
|
||||
},
|
||||
"zh": {
|
||||
"explicit": [
|
||||
(r'自杀', CrisisSeverity.HIGH),
|
||||
(r'自殺', CrisisSeverity.HIGH),
|
||||
(r'轻生', CrisisSeverity.HIGH),
|
||||
],
|
||||
"ideation": [
|
||||
(r'想死', CrisisSeverity.HIGH),
|
||||
(r'不想活', CrisisSeverity.HIGH),
|
||||
(r'不想活了', CrisisSeverity.HIGH),
|
||||
(r'活着没意思', CrisisSeverity.MEDIUM),
|
||||
(r'活不下去', CrisisSeverity.MEDIUM),
|
||||
(r'死了算了', CrisisSeverity.HIGH),
|
||||
(r'为什么要活着', CrisisSeverity.LOW),
|
||||
],
|
||||
"methods": [
|
||||
(r'跳楼', CrisisSeverity.HIGH),
|
||||
(r'上吊', CrisisSeverity.HIGH),
|
||||
(r'割腕', CrisisSeverity.HIGH),
|
||||
(r'吃药', CrisisSeverity.MEDIUM), # Context-dependent
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
# Crisis resources by language/region
|
||||
CRISIS_RESOURCES: Dict[str, Dict[str, str]] = {
|
||||
"en": {
|
||||
"hotline": "988 Suicide and Crisis Lifeline",
|
||||
"number": "988",
|
||||
"text": "Text HOME to 741741",
|
||||
"web": "https://988lifeline.org",
|
||||
},
|
||||
"es": {
|
||||
"hotline": "Línea Nacional de Prevención del Suicidio",
|
||||
"number": "988",
|
||||
"text": "Envía HOLA al 741741",
|
||||
"web": "https://988lifeline.org (opción en español)",
|
||||
},
|
||||
"fr": {
|
||||
"hotline": "SOS Amitié",
|
||||
"number": "09 72 39 40 50",
|
||||
"web": "https://www.sos-amitie.com",
|
||||
},
|
||||
"de": {
|
||||
"hotline": "Telefonseelsorge",
|
||||
"number": "0800 111 0 111",
|
||||
"web": "https://www.telefonseelsorge.de",
|
||||
},
|
||||
"pt": {
|
||||
"hotline": "Centro de Valorização da Vida (CVV)",
|
||||
"number": "188",
|
||||
"web": "https://www.cvv.org.br",
|
||||
},
|
||||
"zh": {
|
||||
"hotline": "北京心理危机研究与干预中心",
|
||||
"number": "010-82951332",
|
||||
"web": "https://www.crisis.org.cn",
|
||||
},
|
||||
}
|
||||
|
||||
# Fallback resource
|
||||
DEFAULT_RESOURCE = CRISIS_RESOURCES["en"]
|
||||
|
||||
|
||||
def detect_language_simple(text: str) -> str:
|
||||
"""Simple language detection based on character sets and common words.
|
||||
|
||||
Returns ISO 639-1 language code. Defaults to 'en'.
|
||||
"""
|
||||
# Chinese characters
|
||||
if re.search(r'[\u4e00-\u9fff]', text):
|
||||
return "zh"
|
||||
|
||||
# Check for language-specific words/patterns
|
||||
text_lower = text.lower()
|
||||
|
||||
# Spanish indicators
|
||||
es_words = {'quiero', 'morir', 'vivir', 'puedo', 'más', 'para', 'qué', 'razones', 'tengo', 'soy', 'estoy', 'pensando', 'desaparecer', 'suicidio', 'vida'}
|
||||
if len(es_words & set(text_lower.split())) >= 2:
|
||||
return "es"
|
||||
|
||||
# French indicators
|
||||
fr_words = {'je', 'veux', 'mourir', 'plus', 'vie', 'vivre', 'envie', 'peux', 'raison'}
|
||||
if len(fr_words & set(text_lower.split())) >= 2:
|
||||
return "fr"
|
||||
|
||||
# German indicators
|
||||
de_words = {'ich', 'will', 'sterben', 'nicht', 'mehr', 'leben', 'grund', 'selbstmord'}
|
||||
if len(de_words & set(text_lower.split())) >= 2:
|
||||
return "de"
|
||||
|
||||
# Portuguese indicators
|
||||
pt_words = {'quero', 'morrer', 'viver', 'mais', 'aguento', 'razão', 'pra', 'vida'}
|
||||
if len(pt_words & set(text_lower.split())) >= 2:
|
||||
return "pt"
|
||||
|
||||
return "en"
|
||||
|
||||
|
||||
def detect_crisis(text: str) -> CrisisResult:
|
||||
"""Detect crisis signals in user message.
|
||||
|
||||
Args:
|
||||
text: The user's message to analyze.
|
||||
|
||||
Returns:
|
||||
CrisisResult with detection status, severity, language, and matched patterns.
|
||||
"""
|
||||
if not text or not text.strip():
|
||||
return CrisisResult(False, CrisisSeverity.NONE, "en", [], 0.0)
|
||||
|
||||
language = detect_language_simple(text)
|
||||
patterns = CRISIS_PATTERNS.get(language, CRISIS_PATTERNS["en"])
|
||||
|
||||
matched = []
|
||||
max_severity = CrisisSeverity.NONE
|
||||
|
||||
for category, pattern_list in patterns.items():
|
||||
for regex, severity in pattern_list:
|
||||
if re.search(regex, text, re.IGNORECASE):
|
||||
matched.append(f"{language}:{category}:{regex}")
|
||||
if severity.value > max_severity.value:
|
||||
max_severity = severity
|
||||
|
||||
detected = len(matched) > 0
|
||||
|
||||
# Calculate confidence based on number and severity of matches
|
||||
confidence = 0.0
|
||||
if detected:
|
||||
base = 0.5 + len(matched) * 0.15
|
||||
severity_bonus = max_severity.value * 0.1
|
||||
confidence = min(0.99, base + severity_bonus)
|
||||
|
||||
return CrisisResult(
|
||||
detected=detected,
|
||||
severity=max_severity,
|
||||
language=language,
|
||||
matched_patterns=matched,
|
||||
confidence=confidence,
|
||||
)
|
||||
|
||||
|
||||
def get_crisis_resources(language: str = "en") -> Dict[str, str]:
|
||||
"""Get crisis resources for a language/region."""
|
||||
return CRISIS_RESOURCES.get(language, DEFAULT_RESOURCE)
|
||||
|
||||
|
||||
def format_crisis_response(result: CrisisResult) -> str:
|
||||
"""Format a crisis response message with appropriate resources."""
|
||||
resources = get_crisis_resources(result.language)
|
||||
|
||||
lines = [
|
||||
"I want you to know that what you're feeling matters, and you're not alone.",
|
||||
"",
|
||||
f"If you're in immediate danger, please call {resources['hotline']}: {resources['number']}",
|
||||
]
|
||||
|
||||
if "text" in resources:
|
||||
lines.append(f"Or text: {resources['text']}")
|
||||
|
||||
lines.extend([
|
||||
f"Web: {resources['web']}",
|
||||
"",
|
||||
"I'm here with you. Are you safe right now?",
|
||||
])
|
||||
|
||||
return "\n".join(lines)
|
||||
428
tools/gpu_scheduler.py
Normal file
428
tools/gpu_scheduler.py
Normal file
@@ -0,0 +1,428 @@
|
||||
"""
|
||||
GPU Inference Scheduler — Multi-Model Resource Management
|
||||
|
||||
Queue-based model loading with priority lanes and VRAM budget tracking.
|
||||
Prevents GPU OOM crashes when multiple projects compete for VRAM.
|
||||
|
||||
Priority lanes:
|
||||
1. real-time (LPM) — highest priority, interactive
|
||||
2. interactive (playground) — user-facing, medium priority
|
||||
3. batch (harvester) — background, lowest priority
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
import logging
|
||||
from enum import IntEnum
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
from dataclasses import dataclass, field, asdict
|
||||
|
||||
logger = logging.getLogger("hermes.gpu_scheduler")
|
||||
|
||||
|
||||
class Priority(IntEnum):
|
||||
"""Job priority levels. Lower value = higher priority."""
|
||||
REALTIME = 1 # LPM, live video, interactive sessions
|
||||
INTERACTIVE = 2 # Playground, chat, user-facing
|
||||
BATCH = 3 # Harvester, overnight jobs, background
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelSpec:
|
||||
"""Specification for a model and its VRAM requirements."""
|
||||
name: str
|
||||
vram_mb: int # VRAM required in MB
|
||||
loader: str = "ollama" # How to load: ollama, vllm, llama_cpp, custom
|
||||
model_id: str = "" # Model identifier (e.g., "llama3:70b")
|
||||
cacheable: bool = True # Can be cached between jobs
|
||||
cpu_fallback: bool = True # Can fall back to CPU if GPU busy
|
||||
estimated_batch_ms: int = 1000 # Estimated time per batch
|
||||
|
||||
|
||||
@dataclass
|
||||
class InferenceJob:
|
||||
"""A job requesting GPU inference."""
|
||||
job_id: str
|
||||
project: str # "video_forge", "lpm", "playground", "harvester"
|
||||
model: ModelSpec
|
||||
priority: Priority
|
||||
batch_size: int = 1
|
||||
created_at: float = field(default_factory=time.time)
|
||||
started_at: Optional[float] = None
|
||||
completed_at: Optional[float] = None
|
||||
status: str = "queued" # queued, loading, running, completed, failed
|
||||
error: Optional[str] = None
|
||||
use_cpu_fallback: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class GPUState:
|
||||
"""Current GPU state."""
|
||||
total_vram_mb: int = 0
|
||||
used_vram_mb: int = 0
|
||||
loaded_models: List[str] = field(default_factory=list)
|
||||
active_job: Optional[str] = None
|
||||
|
||||
@property
|
||||
def available_vram_mb(self) -> int:
|
||||
return self.total_vram_mb - self.used_vram_mb
|
||||
|
||||
def can_fit(self, model: ModelSpec) -> bool:
|
||||
return self.available_vram_mb >= model.vram_mb
|
||||
|
||||
|
||||
# Known models and their VRAM requirements
|
||||
MODEL_REGISTRY: Dict[str, ModelSpec] = {
|
||||
# Video Forge models
|
||||
"sd_xl": ModelSpec(name="Stable Diffusion XL", vram_mb=8192, loader="comfyui", model_id="sd_xl"),
|
||||
"heartmula": ModelSpec(name="HeartMuLa", vram_mb=4096, loader="custom", model_id="heartmula"),
|
||||
"wan2.1": ModelSpec(name="Wan2.1", vram_mb=12288, loader="custom", model_id="wan2.1"),
|
||||
|
||||
# LPM models
|
||||
"lpm_video": ModelSpec(name="LPM Video Gen", vram_mb=16384, loader="custom", model_id="lpm_video"),
|
||||
"lpm_a2a": ModelSpec(name="LPM A2A", vram_mb=8192, loader="custom", model_id="lpm_a2a"),
|
||||
|
||||
# Local inference (hermes)
|
||||
"llama3_70b": ModelSpec(name="Llama 3 70B", vram_mb=40960, loader="ollama", model_id="llama3:70b"),
|
||||
"llama3_8b": ModelSpec(name="Llama 3 8B", vram_mb=8192, loader="ollama", model_id="llama3:8b"),
|
||||
"mimo_v2_pro": ModelSpec(name="MiMo v2 Pro", vram_mb=16384, loader="ollama", model_id="xiaomi/mimo-v2-pro"),
|
||||
|
||||
# Playground
|
||||
"sdxl_turbo": ModelSpec(name="SDXL Turbo", vram_mb=6144, loader="comfyui", model_id="sdxl_turbo"),
|
||||
}
|
||||
|
||||
# Default VRAM budget (can be overridden)
|
||||
DEFAULT_VRAM_MB = 49152 # 48GB (e.g., L40S, A6000)
|
||||
|
||||
|
||||
class InferenceScheduler:
|
||||
"""
|
||||
GPU Inference Scheduler.
|
||||
|
||||
Manages a queue of inference jobs with priority scheduling,
|
||||
VRAM budget tracking, and CPU fallback.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
vram_budget_mb: int = DEFAULT_VRAM_MB,
|
||||
queue_db: str = "~/.hermes/gpu_scheduler.db",
|
||||
):
|
||||
self.vram_budget_mb = vram_budget_mb
|
||||
self.queue_db = Path(queue_db).expanduser()
|
||||
self.queue_db.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# State
|
||||
self.gpu_state = GPUState(total_vram_mb=vram_budget_mb)
|
||||
self.job_queue: List[InferenceJob] = []
|
||||
self.completed_jobs: List[InferenceJob] = []
|
||||
self._lock = threading.Lock()
|
||||
self._running = False
|
||||
self._worker_thread: Optional[threading.Thread] = None
|
||||
|
||||
# Load persisted state
|
||||
self._load_state()
|
||||
|
||||
logger.info(
|
||||
"GPU Scheduler initialized: %dMB VRAM budget",
|
||||
vram_budget_mb,
|
||||
)
|
||||
|
||||
def _load_state(self):
|
||||
"""Load state from SQLite."""
|
||||
import sqlite3
|
||||
conn = sqlite3.connect(str(self.queue_db))
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
job_id TEXT PRIMARY KEY,
|
||||
project TEXT,
|
||||
model_name TEXT,
|
||||
priority INTEGER,
|
||||
batch_size INTEGER,
|
||||
created_at REAL,
|
||||
started_at REAL,
|
||||
completed_at REAL,
|
||||
status TEXT,
|
||||
error TEXT,
|
||||
use_cpu_fallback INTEGER
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
# Load pending jobs
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM jobs WHERE status IN ('queued', 'loading', 'running')"
|
||||
).fetchall()
|
||||
|
||||
for row in rows:
|
||||
model_name = row[2]
|
||||
model = MODEL_REGISTRY.get(model_name, ModelSpec(name=model_name, vram_mb=8192))
|
||||
job = InferenceJob(
|
||||
job_id=row[0],
|
||||
project=row[1],
|
||||
model=model,
|
||||
priority=Priority(row[3]),
|
||||
batch_size=row[4],
|
||||
created_at=row[5],
|
||||
started_at=row[6],
|
||||
completed_at=row[7],
|
||||
status=row[8],
|
||||
error=row[9],
|
||||
use_cpu_fallback=bool(row[10]),
|
||||
)
|
||||
self.job_queue.append(job)
|
||||
|
||||
conn.close()
|
||||
logger.info("Loaded %d pending jobs", len(self.job_queue))
|
||||
|
||||
def _save_job(self, job: InferenceJob):
|
||||
"""Persist job to SQLite."""
|
||||
import sqlite3
|
||||
conn = sqlite3.connect(str(self.queue_db))
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO jobs
|
||||
(job_id, project, model_name, priority, batch_size, created_at,
|
||||
started_at, completed_at, status, error, use_cpu_fallback)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
job.job_id,
|
||||
job.project,
|
||||
job.model.name,
|
||||
job.priority.value,
|
||||
job.batch_size,
|
||||
job.created_at,
|
||||
job.started_at,
|
||||
job.completed_at,
|
||||
job.status,
|
||||
job.error,
|
||||
int(job.use_cpu_fallback),
|
||||
))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def submit_job(
|
||||
self,
|
||||
job_id: str,
|
||||
project: str,
|
||||
model_name: str,
|
||||
priority: Priority = Priority.BATCH,
|
||||
batch_size: int = 1,
|
||||
) -> InferenceJob:
|
||||
"""
|
||||
Submit an inference job to the queue.
|
||||
|
||||
Args:
|
||||
job_id: Unique job identifier
|
||||
project: Project name (video_forge, lpm, playground, harvester)
|
||||
model_name: Model name from MODEL_REGISTRY
|
||||
priority: Job priority
|
||||
batch_size: Number of items to process
|
||||
|
||||
Returns:
|
||||
The created InferenceJob
|
||||
"""
|
||||
model = MODEL_REGISTRY.get(model_name)
|
||||
if not model:
|
||||
raise ValueError(f"Unknown model: {model_name}. Registered: {list(MODEL_REGISTRY.keys())}")
|
||||
|
||||
job = InferenceJob(
|
||||
job_id=job_id,
|
||||
project=project,
|
||||
model=model,
|
||||
priority=priority,
|
||||
batch_size=batch_size,
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
# Insert in priority order
|
||||
inserted = False
|
||||
for i, existing in enumerate(self.job_queue):
|
||||
if job.priority < existing.priority:
|
||||
self.job_queue.insert(i, job)
|
||||
inserted = True
|
||||
break
|
||||
if not inserted:
|
||||
self.job_queue.append(job)
|
||||
|
||||
self._save_job(job)
|
||||
|
||||
logger.info(
|
||||
"Job submitted: %s (project=%s, model=%s, priority=%s)",
|
||||
job_id, project, model_name, priority.name,
|
||||
)
|
||||
|
||||
return job
|
||||
|
||||
def get_next_job(self) -> Optional[InferenceJob]:
|
||||
"""Get the next job to process based on priority and VRAM availability."""
|
||||
with self._lock:
|
||||
for job in self.job_queue:
|
||||
if job.status != "queued":
|
||||
continue
|
||||
|
||||
# Check if model fits in VRAM
|
||||
if self.gpu_state.can_fit(job.model):
|
||||
return job
|
||||
|
||||
# Check CPU fallback
|
||||
if job.model.cpu_fallback:
|
||||
job.use_cpu_fallback = True
|
||||
return job
|
||||
|
||||
return None
|
||||
|
||||
def start_job(self, job: InferenceJob) -> bool:
|
||||
"""
|
||||
Mark a job as started and load its model.
|
||||
|
||||
Returns True if successful, False if insufficient VRAM.
|
||||
"""
|
||||
with self._lock:
|
||||
if not job.use_cpu_fallback:
|
||||
if not self.gpu_state.can_fit(job.model):
|
||||
logger.warning(
|
||||
"Insufficient VRAM for %s: need %dMB, have %dMB",
|
||||
job.model.name,
|
||||
job.model.vram_mb,
|
||||
self.gpu_state.available_vram_mb,
|
||||
)
|
||||
return False
|
||||
|
||||
# Reserve VRAM
|
||||
self.gpu_state.used_vram_mb += job.model.vram_mb
|
||||
if job.model.name not in self.gpu_state.loaded_models:
|
||||
self.gpu_state.loaded_models.append(job.model.name)
|
||||
|
||||
job.status = "loading"
|
||||
job.started_at = time.time()
|
||||
self.gpu_state.active_job = job.job_id
|
||||
self._save_job(job)
|
||||
|
||||
logger.info(
|
||||
"Job started: %s (model=%s, cpu_fallback=%s, vram_used=%dMB)",
|
||||
job.job_id,
|
||||
job.model.name,
|
||||
job.use_cpu_fallback,
|
||||
self.gpu_state.used_vram_mb,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def complete_job(self, job: InferenceJob, error: str = None):
|
||||
"""Mark a job as completed and release its VRAM."""
|
||||
with self._lock:
|
||||
job.completed_at = time.time()
|
||||
job.status = "completed" if not error else "failed"
|
||||
job.error = error
|
||||
|
||||
if not job.use_cpu_fallback:
|
||||
# Release VRAM
|
||||
self.gpu_state.used_vram_mb = max(
|
||||
0,
|
||||
self.gpu_state.used_vram_mb - job.model.vram_mb,
|
||||
)
|
||||
|
||||
if self.gpu_state.active_job == job.job_id:
|
||||
self.gpu_state.active_job = None
|
||||
|
||||
# Move to completed
|
||||
self.job_queue.remove(job)
|
||||
self.completed_jobs.append(job)
|
||||
self._save_job(job)
|
||||
|
||||
duration = (job.completed_at - job.started_at) * 1000 if job.started_at else 0
|
||||
logger.info(
|
||||
"Job completed: %s (status=%s, duration=%.0fms)",
|
||||
job.job_id,
|
||||
job.status,
|
||||
duration,
|
||||
)
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""Get scheduler status."""
|
||||
with self._lock:
|
||||
return {
|
||||
"gpu": {
|
||||
"total_vram_mb": self.gpu_state.total_vram_mb,
|
||||
"used_vram_mb": self.gpu_state.used_vram_mb,
|
||||
"available_vram_mb": self.gpu_state.available_vram_mb,
|
||||
"utilization_pct": round(
|
||||
self.gpu_state.used_vram_mb / self.gpu_state.total_vram_mb * 100, 1
|
||||
),
|
||||
"loaded_models": self.gpu_state.loaded_models,
|
||||
"active_job": self.gpu_state.active_job,
|
||||
},
|
||||
"queue": {
|
||||
"pending": len([j for j in self.job_queue if j.status == "queued"]),
|
||||
"loading": len([j for j in self.job_queue if j.status == "loading"]),
|
||||
"running": len([j for j in self.job_queue if j.status == "running"]),
|
||||
"by_priority": {
|
||||
p.name: len([j for j in self.job_queue if j.priority == p and j.status == "queued"])
|
||||
for p in Priority
|
||||
},
|
||||
},
|
||||
"completed": {
|
||||
"total": len(self.completed_jobs),
|
||||
"success": len([j for j in self.completed_jobs if j.status == "completed"]),
|
||||
"failed": len([j for j in self.completed_jobs if j.status == "failed"]),
|
||||
},
|
||||
}
|
||||
|
||||
def register_model(self, name: str, spec: ModelSpec):
|
||||
"""Register a new model."""
|
||||
MODEL_REGISTRY[name] = spec
|
||||
logger.info("Registered model: %s (%dMB VRAM)", name, spec.vram_mb)
|
||||
|
||||
def clear_completed(self):
|
||||
"""Clear completed jobs from memory (keep in DB)."""
|
||||
with self._lock:
|
||||
self.completed_jobs.clear()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CLI Interface
|
||||
# ============================================================================
|
||||
|
||||
def main():
|
||||
"""CLI entry point for testing."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="GPU Inference Scheduler")
|
||||
parser.add_argument("action", choices=["status", "submit", "list", "clear"])
|
||||
parser.add_argument("--job-id", help="Job ID for submit")
|
||||
parser.add_argument("--project", help="Project name")
|
||||
parser.add_argument("--model", help="Model name")
|
||||
parser.add_argument("--priority", choices=["realtime", "interactive", "batch"], default="batch")
|
||||
parser.add_argument("--vram", type=int, default=DEFAULT_VRAM_MB, help="VRAM budget in MB")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
scheduler = InferenceScheduler(vram_budget_mb=args.vram)
|
||||
|
||||
if args.action == "status":
|
||||
status = scheduler.get_status()
|
||||
print(json.dumps(status, indent=2))
|
||||
|
||||
elif args.action == "submit":
|
||||
if not all([args.job_id, args.project, args.model]):
|
||||
print("Error: --job-id, --project, and --model required for submit")
|
||||
return
|
||||
|
||||
priority = Priority[args.priority.upper()]
|
||||
job = scheduler.submit_job(args.job_id, args.project, args.model, priority)
|
||||
print(f"Submitted: {job.job_id}")
|
||||
|
||||
elif args.action == "list":
|
||||
print(f"Pending jobs: {len(scheduler.job_queue)}")
|
||||
for job in scheduler.job_queue:
|
||||
print(f" {job.job_id}: {job.project}/{job.model.name} [{job.priority.name}] {job.status}")
|
||||
|
||||
elif args.action == "clear":
|
||||
scheduler.clear_completed()
|
||||
print("Cleared completed jobs from memory")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user