[claude] Hermes health monitor — system resources + model management (#1073) #1133
@@ -366,6 +366,16 @@ class Settings(BaseSettings):
|
||||
# Default timeout for git operations.
|
||||
hands_git_timeout: int = 60
|
||||
|
||||
# ── Hermes Health Monitor ─────────────────────────────────────────
|
||||
# Enable the Hermes system health monitor (memory, disk, Ollama, processes, network).
|
||||
hermes_enabled: bool = True
|
||||
# How often Hermes runs a full health cycle (seconds). Default: 5 minutes.
|
||||
hermes_interval_seconds: int = 300
|
||||
# Alert threshold: free memory below this triggers model unloading / alert (GB).
|
||||
hermes_memory_free_min_gb: float = 4.0
|
||||
# Alert threshold: free disk below this triggers cleanup / alert (GB).
|
||||
hermes_disk_free_min_gb: float = 10.0
|
||||
|
||||
# ── Error Logging ─────────────────────────────────────────────────
|
||||
error_log_enabled: bool = True
|
||||
error_log_dir: str = "logs"
|
||||
|
||||
@@ -33,6 +33,7 @@ from dashboard.routes.calm import router as calm_router
|
||||
from dashboard.routes.chat_api import router as chat_api_router
|
||||
from dashboard.routes.chat_api_v1 import router as chat_api_v1_router
|
||||
from dashboard.routes.daily_run import router as daily_run_router
|
||||
from dashboard.routes.hermes import router as hermes_router
|
||||
from dashboard.routes.db_explorer import router as db_explorer_router
|
||||
from dashboard.routes.discord import router as discord_router
|
||||
from dashboard.routes.experiments import router as experiments_router
|
||||
@@ -180,6 +181,33 @@ async def _thinking_scheduler() -> None:
|
||||
await asyncio.sleep(settings.thinking_interval_seconds)
|
||||
|
||||
|
||||
async def _hermes_scheduler() -> None:
|
||||
"""Background task: Hermes system health monitor, runs every 5 minutes.
|
||||
|
||||
Checks memory, disk, Ollama, processes, and network.
|
||||
Auto-resolves what it can; fires push notifications when human help is needed.
|
||||
"""
|
||||
from infrastructure.hermes.monitor import hermes_monitor
|
||||
|
||||
await asyncio.sleep(20) # Stagger after other schedulers
|
||||
|
||||
while True:
|
||||
try:
|
||||
if settings.hermes_enabled:
|
||||
report = await hermes_monitor.run_cycle()
|
||||
if report.has_issues:
|
||||
logger.warning(
|
||||
"Hermes health issues detected — overall: %s",
|
||||
report.overall.value,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.error("Hermes scheduler error: %s", exc)
|
||||
|
||||
await asyncio.sleep(settings.hermes_interval_seconds)
|
||||
|
||||
|
||||
async def _loop_qa_scheduler() -> None:
|
||||
"""Background task: run capability self-tests on a separate timer.
|
||||
|
||||
@@ -381,6 +409,7 @@ def _startup_background_tasks() -> list[asyncio.Task]:
|
||||
asyncio.create_task(_loop_qa_scheduler()),
|
||||
asyncio.create_task(_presence_watcher()),
|
||||
asyncio.create_task(_start_chat_integrations_background()),
|
||||
asyncio.create_task(_hermes_scheduler()),
|
||||
]
|
||||
try:
|
||||
from timmy.paperclip import start_paperclip_poller
|
||||
@@ -638,6 +667,7 @@ app.include_router(world_router)
|
||||
app.include_router(matrix_router)
|
||||
app.include_router(tower_router)
|
||||
app.include_router(daily_run_router)
|
||||
app.include_router(hermes_router)
|
||||
app.include_router(quests_router)
|
||||
app.include_router(scorecards_router)
|
||||
app.include_router(sovereignty_metrics_router)
|
||||
|
||||
45
src/dashboard/routes/hermes.py
Normal file
45
src/dashboard/routes/hermes.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""Hermes health monitor routes.
|
||||
|
||||
Exposes the Hermes health monitor via REST API so the dashboard
|
||||
and external tools can query system status and trigger checks.
|
||||
|
||||
Refs: #1073
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter
|
||||
|
||||
from infrastructure.hermes.monitor import hermes_monitor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/hermes", tags=["hermes"])
|
||||
|
||||
|
||||
@router.get("/status")
|
||||
async def hermes_status():
|
||||
"""Return the most recent Hermes health report.
|
||||
|
||||
Returns the cached result from the last background cycle — does not
|
||||
trigger a new check. Use POST /hermes/check to run an immediate check.
|
||||
"""
|
||||
report = hermes_monitor.last_report
|
||||
if report is None:
|
||||
return {
|
||||
"status": "no_data",
|
||||
"message": "No health report yet — first cycle pending",
|
||||
"seconds_since_last_run": hermes_monitor.seconds_since_last_run,
|
||||
}
|
||||
return report.to_dict()
|
||||
|
||||
|
||||
@router.post("/check")
|
||||
async def hermes_check():
|
||||
"""Trigger an immediate Hermes health check cycle.
|
||||
|
||||
Runs all monitors synchronously and returns the full report.
|
||||
Use sparingly — this blocks until all checks complete (~5 seconds).
|
||||
"""
|
||||
report = await hermes_monitor.run_cycle()
|
||||
return report.to_dict()
|
||||
9
src/infrastructure/hermes/__init__.py
Normal file
9
src/infrastructure/hermes/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
"""Hermes health monitor — system resources + model management.
|
||||
|
||||
Monitors the local machine (Hermes/M3 Max) for memory pressure, disk usage,
|
||||
Ollama model health, zombie processes, and network connectivity.
|
||||
"""
|
||||
|
||||
from infrastructure.hermes.monitor import HermesMonitor, HealthLevel, HealthReport, hermes_monitor
|
||||
|
||||
__all__ = ["HermesMonitor", "HealthLevel", "HealthReport", "hermes_monitor"]
|
||||
668
src/infrastructure/hermes/monitor.py
Normal file
668
src/infrastructure/hermes/monitor.py
Normal file
@@ -0,0 +1,668 @@
|
||||
"""Hermes health monitor — system resources + model management.
|
||||
|
||||
Monitors the local machine (Hermes/M3 Max) and keeps it running smoothly.
|
||||
Runs every 5 minutes, auto-resolves issues where possible, alerts when
|
||||
human intervention is needed.
|
||||
|
||||
Monitors:
|
||||
1. Memory pressure — unified memory, alert if <4GB free, unload models
|
||||
2. Disk usage — alert if <10GB free, clean temp files
|
||||
3. Ollama status — verify reachable, restart if crashed, manage loaded models
|
||||
4. Process health — detect zombie processes
|
||||
5. Network — verify Gitea connectivity
|
||||
|
||||
Refs: #1073
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import urllib.request
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HealthLevel(str, Enum):
|
||||
"""Severity level for a health check result."""
|
||||
|
||||
OK = "ok"
|
||||
WARNING = "warning"
|
||||
CRITICAL = "critical"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CheckResult:
|
||||
"""Result of a single health check."""
|
||||
|
||||
name: str
|
||||
level: HealthLevel
|
||||
message: str
|
||||
details: dict[str, Any] = field(default_factory=dict)
|
||||
auto_resolved: bool = False
|
||||
needs_human: bool = False
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"name": self.name,
|
||||
"level": self.level.value,
|
||||
"message": self.message,
|
||||
"details": self.details,
|
||||
"auto_resolved": self.auto_resolved,
|
||||
"needs_human": self.needs_human,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthReport:
|
||||
"""Full health report from a single monitor cycle."""
|
||||
|
||||
timestamp: str
|
||||
checks: list[CheckResult]
|
||||
overall: HealthLevel
|
||||
|
||||
@property
|
||||
def has_issues(self) -> bool:
|
||||
return any(c.level != HealthLevel.OK for c in self.checks)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"timestamp": self.timestamp,
|
||||
"overall": self.overall.value,
|
||||
"has_issues": self.has_issues,
|
||||
"checks": [c.to_dict() for c in self.checks],
|
||||
}
|
||||
|
||||
|
||||
class HermesMonitor:
|
||||
"""System health monitor for Hermes (local M3 Max machine).
|
||||
|
||||
All blocking I/O (subprocess, HTTP) is wrapped in asyncio.to_thread()
|
||||
so it never blocks the event loop. Results are cached so the dashboard
|
||||
can read the last report without triggering a new cycle.
|
||||
"""
|
||||
|
||||
OLLAMA_REQUEST_TIMEOUT = 5
|
||||
NETWORK_REQUEST_TIMEOUT = 5
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._last_report: HealthReport | None = None
|
||||
self._last_run_ts: float = 0.0
|
||||
|
||||
@property
|
||||
def last_report(self) -> HealthReport | None:
|
||||
"""Most recent health report, or None if no cycle has run yet."""
|
||||
return self._last_report
|
||||
|
||||
@property
|
||||
def seconds_since_last_run(self) -> float:
|
||||
if self._last_run_ts == 0.0:
|
||||
return float("inf")
|
||||
return time.monotonic() - self._last_run_ts
|
||||
|
||||
async def run_cycle(self) -> HealthReport:
|
||||
"""Run a full health check cycle and return the report."""
|
||||
self._last_run_ts = time.monotonic()
|
||||
logger.info("Hermes health cycle starting")
|
||||
|
||||
check_fns = [
|
||||
self._check_memory(),
|
||||
self._check_disk(),
|
||||
self._check_ollama(),
|
||||
self._check_processes(),
|
||||
self._check_network(),
|
||||
]
|
||||
|
||||
raw_results = await asyncio.gather(*check_fns, return_exceptions=True)
|
||||
|
||||
checks: list[CheckResult] = []
|
||||
for i, r in enumerate(raw_results):
|
||||
if isinstance(r, Exception):
|
||||
name = ["memory", "disk", "ollama", "processes", "network"][i]
|
||||
logger.warning("Hermes check '%s' raised: %s", name, r)
|
||||
checks.append(
|
||||
CheckResult(
|
||||
name=name,
|
||||
level=HealthLevel.UNKNOWN,
|
||||
message=f"Check error: {r}",
|
||||
)
|
||||
)
|
||||
else:
|
||||
checks.append(r)
|
||||
|
||||
# Compute overall level
|
||||
levels = {c.level for c in checks}
|
||||
if HealthLevel.CRITICAL in levels:
|
||||
overall = HealthLevel.CRITICAL
|
||||
elif HealthLevel.WARNING in levels:
|
||||
overall = HealthLevel.WARNING
|
||||
elif HealthLevel.UNKNOWN in levels:
|
||||
overall = HealthLevel.UNKNOWN
|
||||
else:
|
||||
overall = HealthLevel.OK
|
||||
|
||||
report = HealthReport(
|
||||
timestamp=datetime.now(UTC).isoformat(),
|
||||
checks=checks,
|
||||
overall=overall,
|
||||
)
|
||||
self._last_report = report
|
||||
|
||||
await self._handle_alerts(report)
|
||||
|
||||
logger.info("Hermes health cycle complete — overall: %s", overall.value)
|
||||
return report
|
||||
|
||||
# ── Memory ───────────────────────────────────────────────────────────────
|
||||
|
||||
async def _check_memory(self) -> CheckResult:
|
||||
"""Check unified memory usage (macOS vm_stat)."""
|
||||
memory_free_min_gb = getattr(settings, "hermes_memory_free_min_gb", 4.0)
|
||||
try:
|
||||
info = await asyncio.to_thread(self._get_memory_info)
|
||||
free_gb = info.get("free_gb", 0.0)
|
||||
total_gb = info.get("total_gb", 0.0)
|
||||
details: dict[str, Any] = {
|
||||
"free_gb": round(free_gb, 2),
|
||||
"total_gb": round(total_gb, 2),
|
||||
}
|
||||
|
||||
if free_gb < memory_free_min_gb:
|
||||
# Attempt auto-remediation: unload Ollama models
|
||||
unloaded = await self._unload_ollama_models()
|
||||
if unloaded:
|
||||
return CheckResult(
|
||||
name="memory",
|
||||
level=HealthLevel.WARNING,
|
||||
message=(
|
||||
f"Low memory ({free_gb:.1f}GB free) — "
|
||||
f"unloaded {unloaded} Ollama model(s)"
|
||||
),
|
||||
details={**details, "models_unloaded": unloaded},
|
||||
auto_resolved=True,
|
||||
)
|
||||
return CheckResult(
|
||||
name="memory",
|
||||
level=HealthLevel.CRITICAL,
|
||||
message=(
|
||||
f"Critical: only {free_gb:.1f}GB free "
|
||||
f"(threshold: {memory_free_min_gb}GB)"
|
||||
),
|
||||
details=details,
|
||||
needs_human=True,
|
||||
)
|
||||
|
||||
return CheckResult(
|
||||
name="memory",
|
||||
level=HealthLevel.OK,
|
||||
message=f"Memory OK — {free_gb:.1f}GB free of {total_gb:.1f}GB",
|
||||
details=details,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Memory check failed: %s", exc)
|
||||
return CheckResult(
|
||||
name="memory",
|
||||
level=HealthLevel.UNKNOWN,
|
||||
message=f"Memory check unavailable: {exc}",
|
||||
)
|
||||
|
||||
def _get_memory_info(self) -> dict[str, float]:
|
||||
"""Get memory stats via macOS sysctl + vm_stat.
|
||||
|
||||
Falls back gracefully on non-macOS systems.
|
||||
"""
|
||||
gb = 1024**3
|
||||
total_bytes = 0.0
|
||||
free_bytes = 0.0
|
||||
|
||||
# Total memory via sysctl
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["sysctl", "-n", "hw.memsize"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=3,
|
||||
)
|
||||
total_bytes = float(result.stdout.strip())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Free + inactive pages via vm_stat (macOS)
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["vm_stat"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=3,
|
||||
)
|
||||
page_size = 16384 # 16 KB default on Apple Silicon
|
||||
for line in result.stdout.splitlines():
|
||||
if "page size of" in line:
|
||||
parts = line.split()
|
||||
for i, part in enumerate(parts):
|
||||
if part == "of" and i + 1 < len(parts):
|
||||
try:
|
||||
page_size = int(parts[i + 1])
|
||||
except ValueError:
|
||||
pass
|
||||
elif "Pages free:" in line:
|
||||
pages = int(line.split(":")[1].strip().rstrip("."))
|
||||
free_bytes += pages * page_size
|
||||
elif "Pages inactive:" in line:
|
||||
pages = int(line.split(":")[1].strip().rstrip("."))
|
||||
free_bytes += pages * page_size
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"total_gb": total_bytes / gb if total_bytes else 0.0,
|
||||
"free_gb": free_bytes / gb if free_bytes else 0.0,
|
||||
}
|
||||
|
||||
# ── Disk ─────────────────────────────────────────────────────────────────
|
||||
|
||||
async def _check_disk(self) -> CheckResult:
|
||||
"""Check disk usage via shutil.disk_usage."""
|
||||
disk_free_min_gb = getattr(settings, "hermes_disk_free_min_gb", 10.0)
|
||||
try:
|
||||
usage = await asyncio.to_thread(shutil.disk_usage, "/")
|
||||
free_gb = usage.free / (1024**3)
|
||||
total_gb = usage.total / (1024**3)
|
||||
used_pct = (usage.used / usage.total) * 100
|
||||
|
||||
details: dict[str, Any] = {
|
||||
"free_gb": round(free_gb, 2),
|
||||
"total_gb": round(total_gb, 2),
|
||||
"used_pct": round(used_pct, 1),
|
||||
}
|
||||
|
||||
if free_gb < disk_free_min_gb:
|
||||
cleaned_gb = await self._cleanup_temp_files()
|
||||
if cleaned_gb > 0.01:
|
||||
return CheckResult(
|
||||
name="disk",
|
||||
level=HealthLevel.WARNING,
|
||||
message=(
|
||||
f"Low disk ({free_gb:.1f}GB free) — "
|
||||
f"cleaned {cleaned_gb:.2f}GB from /tmp"
|
||||
),
|
||||
details={**details, "cleaned_gb": round(cleaned_gb, 2)},
|
||||
auto_resolved=True,
|
||||
)
|
||||
return CheckResult(
|
||||
name="disk",
|
||||
level=HealthLevel.CRITICAL,
|
||||
message=(
|
||||
f"Critical: only {free_gb:.1f}GB free "
|
||||
f"(threshold: {disk_free_min_gb}GB)"
|
||||
),
|
||||
details=details,
|
||||
needs_human=True,
|
||||
)
|
||||
|
||||
return CheckResult(
|
||||
name="disk",
|
||||
level=HealthLevel.OK,
|
||||
message=f"Disk OK — {free_gb:.1f}GB free ({used_pct:.0f}% used)",
|
||||
details=details,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Disk check failed: %s", exc)
|
||||
return CheckResult(
|
||||
name="disk",
|
||||
level=HealthLevel.UNKNOWN,
|
||||
message=f"Disk check unavailable: {exc}",
|
||||
)
|
||||
|
||||
async def _cleanup_temp_files(self) -> float:
|
||||
"""Remove /tmp files older than 24 hours. Returns GB freed."""
|
||||
return await asyncio.to_thread(self._cleanup_temp_files_sync)
|
||||
|
||||
def _cleanup_temp_files_sync(self) -> float:
|
||||
"""Synchronous /tmp cleanup — only touches files older than 24 hours."""
|
||||
from pathlib import Path
|
||||
|
||||
freed_bytes = 0
|
||||
cutoff = time.time() - 86400 # 24 hours ago
|
||||
|
||||
try:
|
||||
tmp = Path("/tmp")
|
||||
for item in tmp.iterdir():
|
||||
try:
|
||||
stat = item.stat()
|
||||
if stat.st_mtime >= cutoff:
|
||||
continue
|
||||
if item.is_file():
|
||||
freed_bytes += stat.st_size
|
||||
item.unlink(missing_ok=True)
|
||||
elif item.is_dir():
|
||||
dir_size = sum(
|
||||
f.stat().st_size
|
||||
for f in item.rglob("*")
|
||||
if f.is_file()
|
||||
)
|
||||
freed_bytes += dir_size
|
||||
shutil.rmtree(str(item), ignore_errors=True)
|
||||
except (PermissionError, OSError):
|
||||
pass # Skip files we can't touch
|
||||
except Exception as exc:
|
||||
logger.warning("Temp cleanup error: %s", exc)
|
||||
|
||||
freed_gb = freed_bytes / (1024**3)
|
||||
if freed_gb > 0.001:
|
||||
logger.info("Hermes disk cleanup: freed %.2fGB from /tmp", freed_gb)
|
||||
return freed_gb
|
||||
|
||||
# ── Ollama ───────────────────────────────────────────────────────────────
|
||||
|
||||
async def _check_ollama(self) -> CheckResult:
|
||||
"""Check Ollama status and loaded models."""
|
||||
try:
|
||||
status = await asyncio.to_thread(self._get_ollama_status)
|
||||
|
||||
if not status.get("reachable"):
|
||||
restarted = await self._restart_ollama()
|
||||
if restarted:
|
||||
return CheckResult(
|
||||
name="ollama",
|
||||
level=HealthLevel.WARNING,
|
||||
message="Ollama was unreachable — restart initiated",
|
||||
details={"restart_attempted": True},
|
||||
auto_resolved=True,
|
||||
)
|
||||
return CheckResult(
|
||||
name="ollama",
|
||||
level=HealthLevel.CRITICAL,
|
||||
message="Ollama unreachable and restart failed",
|
||||
details={"reachable": False},
|
||||
needs_human=True,
|
||||
)
|
||||
|
||||
models = status.get("models", [])
|
||||
loaded = status.get("loaded_models", [])
|
||||
return CheckResult(
|
||||
name="ollama",
|
||||
level=HealthLevel.OK,
|
||||
message=(
|
||||
f"Ollama OK — {len(models)} model(s) available, "
|
||||
f"{len(loaded)} loaded"
|
||||
),
|
||||
details={
|
||||
"reachable": True,
|
||||
"model_count": len(models),
|
||||
"loaded_count": len(loaded),
|
||||
"loaded_models": [m.get("name", "") for m in loaded],
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Ollama check failed: %s", exc)
|
||||
return CheckResult(
|
||||
name="ollama",
|
||||
level=HealthLevel.UNKNOWN,
|
||||
message=f"Ollama check failed: {exc}",
|
||||
)
|
||||
|
||||
def _get_ollama_status(self) -> dict[str, Any]:
|
||||
"""Synchronous Ollama status — checks /api/tags and /api/ps."""
|
||||
url = settings.normalized_ollama_url
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/tags",
|
||||
method="GET",
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp:
|
||||
data = json.loads(resp.read().decode())
|
||||
models = data.get("models", [])
|
||||
except Exception:
|
||||
return {"reachable": False, "models": [], "loaded_models": []}
|
||||
|
||||
# /api/ps lists currently loaded (in-memory) models — Ollama >=0.2
|
||||
loaded: list[dict] = []
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/ps",
|
||||
method="GET",
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp:
|
||||
ps_data = json.loads(resp.read().decode())
|
||||
loaded = ps_data.get("models", [])
|
||||
except Exception:
|
||||
pass # /api/ps absent on older Ollama — non-fatal
|
||||
|
||||
return {"reachable": True, "models": models, "loaded_models": loaded}
|
||||
|
||||
async def _unload_ollama_models(self) -> int:
|
||||
"""Unload in-memory Ollama models to free unified memory.
|
||||
|
||||
Uses the keep_alive=0 trick: POSTing to /api/generate with
|
||||
keep_alive=0 causes Ollama to immediately evict the model.
|
||||
Returns the number of models successfully unloaded.
|
||||
"""
|
||||
return await asyncio.to_thread(self._unload_ollama_models_sync)
|
||||
|
||||
def _unload_ollama_models_sync(self) -> int:
|
||||
"""Synchronous model unload implementation."""
|
||||
url = settings.normalized_ollama_url
|
||||
unloaded = 0
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/ps",
|
||||
method="GET",
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp:
|
||||
ps_data = json.loads(resp.read().decode())
|
||||
loaded = ps_data.get("models", [])
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
for model in loaded:
|
||||
name = model.get("name", "")
|
||||
if not name:
|
||||
continue
|
||||
try:
|
||||
payload = json.dumps({"model": name, "keep_alive": 0}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/generate",
|
||||
data=payload,
|
||||
method="POST",
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=10) as _:
|
||||
pass
|
||||
logger.info("Hermes: unloaded Ollama model %s", name)
|
||||
unloaded += 1
|
||||
except Exception as exc:
|
||||
logger.warning("Hermes: failed to unload model %s: %s", name, exc)
|
||||
|
||||
return unloaded
|
||||
|
||||
async def _restart_ollama(self) -> bool:
|
||||
"""Attempt to restart the Ollama service via launchctl or brew."""
|
||||
return await asyncio.to_thread(self._restart_ollama_sync)
|
||||
|
||||
def _restart_ollama_sync(self) -> bool:
|
||||
"""Try launchctl first, then brew services."""
|
||||
# macOS launchctl (installed via official Ollama installer)
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["launchctl", "stop", "com.ollama.ollama"],
|
||||
capture_output=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
time.sleep(2)
|
||||
subprocess.run(
|
||||
["launchctl", "start", "com.ollama.ollama"],
|
||||
capture_output=True,
|
||||
timeout=10,
|
||||
)
|
||||
logger.info("Hermes: Ollama restarted via launchctl")
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Homebrew fallback
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["brew", "services", "restart", "ollama"],
|
||||
capture_output=True,
|
||||
timeout=20,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
logger.info("Hermes: Ollama restarted via brew services")
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
logger.warning("Hermes: Ollama restart failed — manual intervention needed")
|
||||
return False
|
||||
|
||||
# ── Processes ────────────────────────────────────────────────────────────
|
||||
|
||||
async def _check_processes(self) -> CheckResult:
|
||||
"""Check for zombie processes via ps aux."""
|
||||
try:
|
||||
result = await asyncio.to_thread(self._get_zombie_processes)
|
||||
zombies = result.get("zombies", [])
|
||||
|
||||
if zombies:
|
||||
return CheckResult(
|
||||
name="processes",
|
||||
level=HealthLevel.WARNING,
|
||||
message=f"Found {len(zombies)} zombie process(es)",
|
||||
details={"zombies": zombies[:5]},
|
||||
needs_human=len(zombies) > 3,
|
||||
)
|
||||
|
||||
return CheckResult(
|
||||
name="processes",
|
||||
level=HealthLevel.OK,
|
||||
message="Processes OK — no zombies detected",
|
||||
details={"zombie_count": 0},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Process check failed: %s", exc)
|
||||
return CheckResult(
|
||||
name="processes",
|
||||
level=HealthLevel.UNKNOWN,
|
||||
message=f"Process check unavailable: {exc}",
|
||||
)
|
||||
|
||||
def _get_zombie_processes(self) -> dict[str, Any]:
|
||||
"""Detect zombie processes (state 'Z') via ps aux."""
|
||||
result = subprocess.run(
|
||||
["ps", "aux"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
zombies = []
|
||||
for line in result.stdout.splitlines()[1:]: # Skip header row
|
||||
parts = line.split(None, 10)
|
||||
if len(parts) >= 8 and parts[7] == "Z":
|
||||
zombies.append(
|
||||
{
|
||||
"pid": parts[1],
|
||||
"command": parts[10][:80] if len(parts) > 10 else "",
|
||||
}
|
||||
)
|
||||
return {"zombies": zombies}
|
||||
|
||||
# ── Network ──────────────────────────────────────────────────────────────
|
||||
|
||||
async def _check_network(self) -> CheckResult:
|
||||
"""Check Gitea connectivity."""
|
||||
try:
|
||||
result = await asyncio.to_thread(self._check_gitea_connectivity)
|
||||
reachable = result.get("reachable", False)
|
||||
latency_ms = result.get("latency_ms", -1.0)
|
||||
|
||||
if not reachable:
|
||||
return CheckResult(
|
||||
name="network",
|
||||
level=HealthLevel.WARNING,
|
||||
message=f"Gitea unreachable: {result.get('error', 'unknown')}",
|
||||
details=result,
|
||||
needs_human=True,
|
||||
)
|
||||
|
||||
return CheckResult(
|
||||
name="network",
|
||||
level=HealthLevel.OK,
|
||||
message=f"Network OK — Gitea reachable ({latency_ms:.0f}ms)",
|
||||
details=result,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Network check failed: %s", exc)
|
||||
return CheckResult(
|
||||
name="network",
|
||||
level=HealthLevel.UNKNOWN,
|
||||
message=f"Network check unavailable: {exc}",
|
||||
)
|
||||
|
||||
def _check_gitea_connectivity(self) -> dict[str, Any]:
|
||||
"""Synchronous Gitea reachability check."""
|
||||
url = settings.gitea_url
|
||||
start = time.monotonic()
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/v1/version",
|
||||
method="GET",
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=self.NETWORK_REQUEST_TIMEOUT) as resp:
|
||||
latency_ms = (time.monotonic() - start) * 1000
|
||||
return {
|
||||
"reachable": resp.status == 200,
|
||||
"latency_ms": round(latency_ms, 1),
|
||||
"url": url,
|
||||
}
|
||||
except Exception as exc:
|
||||
return {
|
||||
"reachable": False,
|
||||
"error": str(exc),
|
||||
"url": url,
|
||||
"latency_ms": -1.0,
|
||||
}
|
||||
|
||||
# ── Alerts ───────────────────────────────────────────────────────────────
|
||||
|
||||
async def _handle_alerts(self, report: HealthReport) -> None:
|
||||
"""Send push notifications for issues that need attention."""
|
||||
try:
|
||||
from infrastructure.notifications.push import notifier
|
||||
except Exception:
|
||||
return
|
||||
|
||||
for check in report.checks:
|
||||
if check.level == HealthLevel.CRITICAL or check.needs_human:
|
||||
notifier.notify(
|
||||
title=f"Hermes Alert: {check.name}",
|
||||
message=check.message,
|
||||
category="system",
|
||||
native=check.level == HealthLevel.CRITICAL,
|
||||
)
|
||||
elif check.level == HealthLevel.WARNING and check.auto_resolved:
|
||||
notifier.notify(
|
||||
title=f"Hermes: {check.name} auto-fixed",
|
||||
message=check.message,
|
||||
category="system",
|
||||
)
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
hermes_monitor = HermesMonitor()
|
||||
452
tests/unit/test_hermes_monitor.py
Normal file
452
tests/unit/test_hermes_monitor.py
Normal file
@@ -0,0 +1,452 @@
|
||||
"""Unit tests for the Hermes health monitor.
|
||||
|
||||
Tests all five checks (memory, disk, Ollama, processes, network) using mocks
|
||||
so no real subprocesses or network calls are made.
|
||||
|
||||
Refs: #1073
|
||||
"""
|
||||
|
||||
import json
|
||||
from io import BytesIO
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.hermes.monitor import CheckResult, HealthLevel, HealthReport, HermesMonitor
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def monitor():
|
||||
return HermesMonitor()
|
||||
|
||||
|
||||
# ── Unit helpers ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class _FakeHTTPResponse:
|
||||
"""Minimal urllib response stub."""
|
||||
|
||||
def __init__(self, body: bytes, status: int = 200):
|
||||
self._body = body
|
||||
self.status = status
|
||||
|
||||
def read(self) -> bytes:
|
||||
return self._body
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *_):
|
||||
pass
|
||||
|
||||
|
||||
# ── Memory check ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_get_memory_info_parses_vm_stat(monitor):
|
||||
vm_stat_output = (
|
||||
"Mach Virtual Memory Statistics: (page size of 16384 bytes)\n"
|
||||
"Pages free: 12800.\n"
|
||||
"Pages active: 50000.\n"
|
||||
"Pages inactive: 25600.\n"
|
||||
"Pages speculative: 1000.\n"
|
||||
)
|
||||
with (
|
||||
patch("subprocess.run") as mock_run,
|
||||
):
|
||||
# First call: sysctl hw.memsize (total)
|
||||
sysctl_result = MagicMock()
|
||||
sysctl_result.stdout = "68719476736\n" # 64 GB
|
||||
# Second call: vm_stat
|
||||
vmstat_result = MagicMock()
|
||||
vmstat_result.stdout = vm_stat_output
|
||||
mock_run.side_effect = [sysctl_result, vmstat_result]
|
||||
|
||||
info = monitor._get_memory_info()
|
||||
|
||||
assert info["total_gb"] == pytest.approx(64.0, abs=0.1)
|
||||
# pages free (12800) + inactive (25600) = 38400 * 16384 bytes = 629145600 bytes ≈ 0.586 GB
|
||||
expected_free_gb = (38400 * 16384) / (1024**3)
|
||||
assert info["free_gb"] == pytest.approx(expected_free_gb, abs=0.001)
|
||||
|
||||
|
||||
def test_get_memory_info_handles_subprocess_failure(monitor):
|
||||
with patch("subprocess.run", side_effect=OSError("no sysctl")):
|
||||
info = monitor._get_memory_info()
|
||||
assert info["total_gb"] == 0.0
|
||||
assert info["free_gb"] == 0.0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_memory_ok(monitor):
|
||||
with patch.object(monitor, "_get_memory_info", return_value={"free_gb": 20.0, "total_gb": 64.0}):
|
||||
result = await monitor._check_memory()
|
||||
|
||||
assert result.name == "memory"
|
||||
assert result.level == HealthLevel.OK
|
||||
assert "20.0GB" in result.message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_memory_low_triggers_unload(monitor):
|
||||
with (
|
||||
patch.object(monitor, "_get_memory_info", return_value={"free_gb": 2.0, "total_gb": 64.0}),
|
||||
patch.object(monitor, "_unload_ollama_models", return_value=2),
|
||||
):
|
||||
result = await monitor._check_memory()
|
||||
|
||||
assert result.level == HealthLevel.WARNING
|
||||
assert result.auto_resolved is True
|
||||
assert "unloaded 2" in result.message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_memory_critical_no_models_to_unload(monitor):
|
||||
with (
|
||||
patch.object(monitor, "_get_memory_info", return_value={"free_gb": 1.0, "total_gb": 64.0}),
|
||||
patch.object(monitor, "_unload_ollama_models", return_value=0),
|
||||
):
|
||||
result = await monitor._check_memory()
|
||||
|
||||
assert result.level == HealthLevel.CRITICAL
|
||||
assert result.needs_human is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_memory_exception_returns_unknown(monitor):
|
||||
with patch.object(monitor, "_get_memory_info", side_effect=RuntimeError("boom")):
|
||||
result = await monitor._check_memory()
|
||||
|
||||
assert result.level == HealthLevel.UNKNOWN
|
||||
|
||||
|
||||
# ── Disk check ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_disk_ok(monitor):
|
||||
usage = MagicMock()
|
||||
usage.free = 100 * (1024**3) # 100 GB
|
||||
usage.total = 500 * (1024**3) # 500 GB
|
||||
usage.used = 400 * (1024**3)
|
||||
|
||||
with patch("shutil.disk_usage", return_value=usage):
|
||||
result = await monitor._check_disk()
|
||||
|
||||
assert result.level == HealthLevel.OK
|
||||
assert "100.0GB free" in result.message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_disk_low_triggers_cleanup(monitor):
|
||||
usage = MagicMock()
|
||||
usage.free = 5 * (1024**3) # 5 GB — below threshold
|
||||
usage.total = 500 * (1024**3)
|
||||
usage.used = 495 * (1024**3)
|
||||
|
||||
with (
|
||||
patch("shutil.disk_usage", return_value=usage),
|
||||
patch.object(monitor, "_cleanup_temp_files", return_value=2.5),
|
||||
):
|
||||
result = await monitor._check_disk()
|
||||
|
||||
assert result.level == HealthLevel.WARNING
|
||||
assert result.auto_resolved is True
|
||||
assert "cleaned 2.50GB" in result.message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_disk_critical_when_cleanup_fails(monitor):
|
||||
usage = MagicMock()
|
||||
usage.free = 5 * (1024**3)
|
||||
usage.total = 500 * (1024**3)
|
||||
usage.used = 495 * (1024**3)
|
||||
|
||||
with (
|
||||
patch("shutil.disk_usage", return_value=usage),
|
||||
patch.object(monitor, "_cleanup_temp_files", return_value=0.0),
|
||||
):
|
||||
result = await monitor._check_disk()
|
||||
|
||||
assert result.level == HealthLevel.CRITICAL
|
||||
assert result.needs_human is True
|
||||
|
||||
|
||||
# ── Ollama check ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_get_ollama_status_reachable(monitor):
|
||||
tags_body = json.dumps({
|
||||
"models": [{"name": "qwen3:30b"}, {"name": "llama3.1:8b"}]
|
||||
}).encode()
|
||||
ps_body = json.dumps({
|
||||
"models": [{"name": "qwen3:30b", "size": 1000}]
|
||||
}).encode()
|
||||
|
||||
responses = [
|
||||
_FakeHTTPResponse(tags_body),
|
||||
_FakeHTTPResponse(ps_body),
|
||||
]
|
||||
|
||||
with patch("urllib.request.urlopen", side_effect=responses):
|
||||
status = monitor._get_ollama_status()
|
||||
|
||||
assert status["reachable"] is True
|
||||
assert len(status["models"]) == 2
|
||||
assert len(status["loaded_models"]) == 1
|
||||
|
||||
|
||||
def test_get_ollama_status_unreachable(monitor):
|
||||
with patch("urllib.request.urlopen", side_effect=OSError("connection refused")):
|
||||
status = monitor._get_ollama_status()
|
||||
|
||||
assert status["reachable"] is False
|
||||
assert status["models"] == []
|
||||
assert status["loaded_models"] == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_ollama_ok(monitor):
|
||||
status = {
|
||||
"reachable": True,
|
||||
"models": [{"name": "qwen3:30b"}],
|
||||
"loaded_models": [],
|
||||
}
|
||||
with patch.object(monitor, "_get_ollama_status", return_value=status):
|
||||
result = await monitor._check_ollama()
|
||||
|
||||
assert result.level == HealthLevel.OK
|
||||
assert result.details["reachable"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_ollama_unreachable_restart_success(monitor):
|
||||
status = {"reachable": False, "models": [], "loaded_models": []}
|
||||
with (
|
||||
patch.object(monitor, "_get_ollama_status", return_value=status),
|
||||
patch.object(monitor, "_restart_ollama", return_value=True),
|
||||
):
|
||||
result = await monitor._check_ollama()
|
||||
|
||||
assert result.level == HealthLevel.WARNING
|
||||
assert result.auto_resolved is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_ollama_unreachable_restart_fails(monitor):
|
||||
status = {"reachable": False, "models": [], "loaded_models": []}
|
||||
with (
|
||||
patch.object(monitor, "_get_ollama_status", return_value=status),
|
||||
patch.object(monitor, "_restart_ollama", return_value=False),
|
||||
):
|
||||
result = await monitor._check_ollama()
|
||||
|
||||
assert result.level == HealthLevel.CRITICAL
|
||||
assert result.needs_human is True
|
||||
|
||||
|
||||
# ── Process check ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_get_zombie_processes_none(monitor):
|
||||
ps_output = (
|
||||
"USER PID %CPU %MEM VSZ RSS TT STAT STARTED TIME COMMAND\n"
|
||||
"alex 123 0.1 0.2 100 200 s0 S 1:00 0:01 python\n"
|
||||
"alex 456 0.0 0.1 50 100 s0 S 1:01 0:00 bash\n"
|
||||
)
|
||||
result = MagicMock()
|
||||
result.stdout = ps_output
|
||||
with patch("subprocess.run", return_value=result):
|
||||
info = monitor._get_zombie_processes()
|
||||
|
||||
assert info["zombies"] == []
|
||||
|
||||
|
||||
def test_get_zombie_processes_found(monitor):
|
||||
ps_output = (
|
||||
"USER PID %CPU %MEM VSZ RSS TT STAT STARTED TIME COMMAND\n"
|
||||
"alex 123 0.1 0.2 100 200 s0 S 1:00 0:01 python\n"
|
||||
"alex 789 0.0 0.0 0 0 s0 Z 1:02 0:00 defunct\n"
|
||||
)
|
||||
result = MagicMock()
|
||||
result.stdout = ps_output
|
||||
with patch("subprocess.run", return_value=result):
|
||||
info = monitor._get_zombie_processes()
|
||||
|
||||
assert len(info["zombies"]) == 1
|
||||
assert info["zombies"][0]["pid"] == "789"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_processes_no_zombies(monitor):
|
||||
with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": []}):
|
||||
result = await monitor._check_processes()
|
||||
|
||||
assert result.level == HealthLevel.OK
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_processes_zombies_warning(monitor):
|
||||
zombies = [{"pid": "100", "command": "defunct"}, {"pid": "101", "command": "defunct"}]
|
||||
with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": zombies}):
|
||||
result = await monitor._check_processes()
|
||||
|
||||
assert result.level == HealthLevel.WARNING
|
||||
assert result.needs_human is False # Only 2, threshold is >3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_processes_many_zombies_needs_human(monitor):
|
||||
zombies = [{"pid": str(i), "command": "defunct"} for i in range(5)]
|
||||
with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": zombies}):
|
||||
result = await monitor._check_processes()
|
||||
|
||||
assert result.needs_human is True
|
||||
|
||||
|
||||
# ── Network check ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_check_gitea_connectivity_ok(monitor):
|
||||
body = json.dumps({"version": "1.22.0"}).encode()
|
||||
with patch("urllib.request.urlopen", return_value=_FakeHTTPResponse(body, status=200)):
|
||||
info = monitor._check_gitea_connectivity()
|
||||
|
||||
assert info["reachable"] is True
|
||||
assert info["latency_ms"] >= 0
|
||||
|
||||
|
||||
def test_check_gitea_connectivity_unreachable(monitor):
|
||||
with patch("urllib.request.urlopen", side_effect=OSError("refused")):
|
||||
info = monitor._check_gitea_connectivity()
|
||||
|
||||
assert info["reachable"] is False
|
||||
assert "error" in info
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_network_ok(monitor):
|
||||
with patch.object(
|
||||
monitor,
|
||||
"_check_gitea_connectivity",
|
||||
return_value={"reachable": True, "latency_ms": 5.0, "url": "http://localhost:3000"},
|
||||
):
|
||||
result = await monitor._check_network()
|
||||
|
||||
assert result.level == HealthLevel.OK
|
||||
assert "Gitea reachable" in result.message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_network_unreachable(monitor):
|
||||
with patch.object(
|
||||
monitor,
|
||||
"_check_gitea_connectivity",
|
||||
return_value={"reachable": False, "error": "refused", "url": "http://localhost:3000"},
|
||||
):
|
||||
result = await monitor._check_network()
|
||||
|
||||
assert result.level == HealthLevel.WARNING
|
||||
assert result.needs_human is True
|
||||
|
||||
|
||||
# ── Full cycle ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_all_ok(monitor):
|
||||
ok_result = CheckResult(name="test", level=HealthLevel.OK, message="ok")
|
||||
|
||||
async def _ok_check():
|
||||
return ok_result
|
||||
|
||||
with (
|
||||
patch.object(monitor, "_check_memory", _ok_check),
|
||||
patch.object(monitor, "_check_disk", _ok_check),
|
||||
patch.object(monitor, "_check_ollama", _ok_check),
|
||||
patch.object(monitor, "_check_processes", _ok_check),
|
||||
patch.object(monitor, "_check_network", _ok_check),
|
||||
patch.object(monitor, "_handle_alerts"),
|
||||
):
|
||||
report = await monitor.run_cycle()
|
||||
|
||||
assert report.overall == HealthLevel.OK
|
||||
assert not report.has_issues
|
||||
assert monitor.last_report is report
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_sets_overall_to_worst(monitor):
|
||||
async def _ok():
|
||||
return CheckResult(name="ok", level=HealthLevel.OK, message="ok")
|
||||
|
||||
async def _critical():
|
||||
return CheckResult(name="critical", level=HealthLevel.CRITICAL, message="bad")
|
||||
|
||||
with (
|
||||
patch.object(monitor, "_check_memory", _ok),
|
||||
patch.object(monitor, "_check_disk", _critical),
|
||||
patch.object(monitor, "_check_ollama", _ok),
|
||||
patch.object(monitor, "_check_processes", _ok),
|
||||
patch.object(monitor, "_check_network", _ok),
|
||||
patch.object(monitor, "_handle_alerts"),
|
||||
):
|
||||
report = await monitor.run_cycle()
|
||||
|
||||
assert report.overall == HealthLevel.CRITICAL
|
||||
assert report.has_issues is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_exception_becomes_unknown(monitor):
|
||||
async def _ok():
|
||||
return CheckResult(name="ok", level=HealthLevel.OK, message="ok")
|
||||
|
||||
async def _boom():
|
||||
raise RuntimeError("unexpected error")
|
||||
|
||||
with (
|
||||
patch.object(monitor, "_check_memory", _ok),
|
||||
patch.object(monitor, "_check_disk", _ok),
|
||||
patch.object(monitor, "_check_ollama", _boom),
|
||||
patch.object(monitor, "_check_processes", _ok),
|
||||
patch.object(monitor, "_check_network", _ok),
|
||||
patch.object(monitor, "_handle_alerts"),
|
||||
):
|
||||
report = await monitor.run_cycle()
|
||||
|
||||
levels = {c.level for c in report.checks}
|
||||
assert HealthLevel.UNKNOWN in levels
|
||||
|
||||
|
||||
# ── to_dict serialisation ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_check_result_to_dict():
|
||||
c = CheckResult(
|
||||
name="memory",
|
||||
level=HealthLevel.WARNING,
|
||||
message="low",
|
||||
details={"free_gb": 3.5},
|
||||
auto_resolved=True,
|
||||
)
|
||||
d = c.to_dict()
|
||||
assert d["name"] == "memory"
|
||||
assert d["level"] == "warning"
|
||||
assert d["auto_resolved"] is True
|
||||
assert d["details"]["free_gb"] == 3.5
|
||||
|
||||
|
||||
def test_health_report_to_dict():
|
||||
checks = [
|
||||
CheckResult(name="disk", level=HealthLevel.OK, message="ok"),
|
||||
]
|
||||
report = HealthReport(
|
||||
timestamp="2026-01-01T00:00:00+00:00",
|
||||
checks=checks,
|
||||
overall=HealthLevel.OK,
|
||||
)
|
||||
d = report.to_dict()
|
||||
assert d["overall"] == "ok"
|
||||
assert d["has_issues"] is False
|
||||
assert len(d["checks"]) == 1
|
||||
Reference in New Issue
Block a user