[claude] Hermes health monitor — system resources + model management (#1073) #1133

Merged
Rockachopa merged 1 commits from claude/issue-1073 into main 2026-03-23 18:36:07 +00:00
6 changed files with 1214 additions and 0 deletions

View File

@@ -366,6 +366,16 @@ class Settings(BaseSettings):
# Default timeout for git operations.
hands_git_timeout: int = 60
# ── Hermes Health Monitor ─────────────────────────────────────────
# Enable the Hermes system health monitor (memory, disk, Ollama, processes, network).
hermes_enabled: bool = True
# How often Hermes runs a full health cycle (seconds). Default: 5 minutes.
hermes_interval_seconds: int = 300
# Alert threshold: free memory below this triggers model unloading / alert (GB).
hermes_memory_free_min_gb: float = 4.0
# Alert threshold: free disk below this triggers cleanup / alert (GB).
hermes_disk_free_min_gb: float = 10.0
# ── Error Logging ─────────────────────────────────────────────────
error_log_enabled: bool = True
error_log_dir: str = "logs"

View File

@@ -33,6 +33,7 @@ from dashboard.routes.calm import router as calm_router
from dashboard.routes.chat_api import router as chat_api_router
from dashboard.routes.chat_api_v1 import router as chat_api_v1_router
from dashboard.routes.daily_run import router as daily_run_router
from dashboard.routes.hermes import router as hermes_router
from dashboard.routes.db_explorer import router as db_explorer_router
from dashboard.routes.discord import router as discord_router
from dashboard.routes.experiments import router as experiments_router
@@ -180,6 +181,33 @@ async def _thinking_scheduler() -> None:
await asyncio.sleep(settings.thinking_interval_seconds)
async def _hermes_scheduler() -> None:
"""Background task: Hermes system health monitor, runs every 5 minutes.
Checks memory, disk, Ollama, processes, and network.
Auto-resolves what it can; fires push notifications when human help is needed.
"""
from infrastructure.hermes.monitor import hermes_monitor
await asyncio.sleep(20) # Stagger after other schedulers
while True:
try:
if settings.hermes_enabled:
report = await hermes_monitor.run_cycle()
if report.has_issues:
logger.warning(
"Hermes health issues detected — overall: %s",
report.overall.value,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Hermes scheduler error: %s", exc)
await asyncio.sleep(settings.hermes_interval_seconds)
async def _loop_qa_scheduler() -> None:
"""Background task: run capability self-tests on a separate timer.
@@ -381,6 +409,7 @@ def _startup_background_tasks() -> list[asyncio.Task]:
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
asyncio.create_task(_hermes_scheduler()),
]
try:
from timmy.paperclip import start_paperclip_poller
@@ -638,6 +667,7 @@ app.include_router(world_router)
app.include_router(matrix_router)
app.include_router(tower_router)
app.include_router(daily_run_router)
app.include_router(hermes_router)
app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)

View File

@@ -0,0 +1,45 @@
"""Hermes health monitor routes.
Exposes the Hermes health monitor via REST API so the dashboard
and external tools can query system status and trigger checks.
Refs: #1073
"""
import logging
from fastapi import APIRouter
from infrastructure.hermes.monitor import hermes_monitor
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/hermes", tags=["hermes"])
@router.get("/status")
async def hermes_status():
"""Return the most recent Hermes health report.
Returns the cached result from the last background cycle — does not
trigger a new check. Use POST /hermes/check to run an immediate check.
"""
report = hermes_monitor.last_report
if report is None:
return {
"status": "no_data",
"message": "No health report yet — first cycle pending",
"seconds_since_last_run": hermes_monitor.seconds_since_last_run,
}
return report.to_dict()
@router.post("/check")
async def hermes_check():
"""Trigger an immediate Hermes health check cycle.
Runs all monitors synchronously and returns the full report.
Use sparingly — this blocks until all checks complete (~5 seconds).
"""
report = await hermes_monitor.run_cycle()
return report.to_dict()

View File

@@ -0,0 +1,9 @@
"""Hermes health monitor — system resources + model management.
Monitors the local machine (Hermes/M3 Max) for memory pressure, disk usage,
Ollama model health, zombie processes, and network connectivity.
"""
from infrastructure.hermes.monitor import HermesMonitor, HealthLevel, HealthReport, hermes_monitor
__all__ = ["HermesMonitor", "HealthLevel", "HealthReport", "hermes_monitor"]

View File

@@ -0,0 +1,668 @@
"""Hermes health monitor — system resources + model management.
Monitors the local machine (Hermes/M3 Max) and keeps it running smoothly.
Runs every 5 minutes, auto-resolves issues where possible, alerts when
human intervention is needed.
Monitors:
1. Memory pressure — unified memory, alert if <4GB free, unload models
2. Disk usage — alert if <10GB free, clean temp files
3. Ollama status — verify reachable, restart if crashed, manage loaded models
4. Process health — detect zombie processes
5. Network — verify Gitea connectivity
Refs: #1073
"""
import asyncio
import json
import logging
import shutil
import subprocess
import time
import urllib.request
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
class HealthLevel(str, Enum):
"""Severity level for a health check result."""
OK = "ok"
WARNING = "warning"
CRITICAL = "critical"
UNKNOWN = "unknown"
@dataclass
class CheckResult:
"""Result of a single health check."""
name: str
level: HealthLevel
message: str
details: dict[str, Any] = field(default_factory=dict)
auto_resolved: bool = False
needs_human: bool = False
def to_dict(self) -> dict[str, Any]:
return {
"name": self.name,
"level": self.level.value,
"message": self.message,
"details": self.details,
"auto_resolved": self.auto_resolved,
"needs_human": self.needs_human,
}
@dataclass
class HealthReport:
"""Full health report from a single monitor cycle."""
timestamp: str
checks: list[CheckResult]
overall: HealthLevel
@property
def has_issues(self) -> bool:
return any(c.level != HealthLevel.OK for c in self.checks)
def to_dict(self) -> dict[str, Any]:
return {
"timestamp": self.timestamp,
"overall": self.overall.value,
"has_issues": self.has_issues,
"checks": [c.to_dict() for c in self.checks],
}
class HermesMonitor:
"""System health monitor for Hermes (local M3 Max machine).
All blocking I/O (subprocess, HTTP) is wrapped in asyncio.to_thread()
so it never blocks the event loop. Results are cached so the dashboard
can read the last report without triggering a new cycle.
"""
OLLAMA_REQUEST_TIMEOUT = 5
NETWORK_REQUEST_TIMEOUT = 5
def __init__(self) -> None:
self._last_report: HealthReport | None = None
self._last_run_ts: float = 0.0
@property
def last_report(self) -> HealthReport | None:
"""Most recent health report, or None if no cycle has run yet."""
return self._last_report
@property
def seconds_since_last_run(self) -> float:
if self._last_run_ts == 0.0:
return float("inf")
return time.monotonic() - self._last_run_ts
async def run_cycle(self) -> HealthReport:
"""Run a full health check cycle and return the report."""
self._last_run_ts = time.monotonic()
logger.info("Hermes health cycle starting")
check_fns = [
self._check_memory(),
self._check_disk(),
self._check_ollama(),
self._check_processes(),
self._check_network(),
]
raw_results = await asyncio.gather(*check_fns, return_exceptions=True)
checks: list[CheckResult] = []
for i, r in enumerate(raw_results):
if isinstance(r, Exception):
name = ["memory", "disk", "ollama", "processes", "network"][i]
logger.warning("Hermes check '%s' raised: %s", name, r)
checks.append(
CheckResult(
name=name,
level=HealthLevel.UNKNOWN,
message=f"Check error: {r}",
)
)
else:
checks.append(r)
# Compute overall level
levels = {c.level for c in checks}
if HealthLevel.CRITICAL in levels:
overall = HealthLevel.CRITICAL
elif HealthLevel.WARNING in levels:
overall = HealthLevel.WARNING
elif HealthLevel.UNKNOWN in levels:
overall = HealthLevel.UNKNOWN
else:
overall = HealthLevel.OK
report = HealthReport(
timestamp=datetime.now(UTC).isoformat(),
checks=checks,
overall=overall,
)
self._last_report = report
await self._handle_alerts(report)
logger.info("Hermes health cycle complete — overall: %s", overall.value)
return report
# ── Memory ───────────────────────────────────────────────────────────────
async def _check_memory(self) -> CheckResult:
"""Check unified memory usage (macOS vm_stat)."""
memory_free_min_gb = getattr(settings, "hermes_memory_free_min_gb", 4.0)
try:
info = await asyncio.to_thread(self._get_memory_info)
free_gb = info.get("free_gb", 0.0)
total_gb = info.get("total_gb", 0.0)
details: dict[str, Any] = {
"free_gb": round(free_gb, 2),
"total_gb": round(total_gb, 2),
}
if free_gb < memory_free_min_gb:
# Attempt auto-remediation: unload Ollama models
unloaded = await self._unload_ollama_models()
if unloaded:
return CheckResult(
name="memory",
level=HealthLevel.WARNING,
message=(
f"Low memory ({free_gb:.1f}GB free) — "
f"unloaded {unloaded} Ollama model(s)"
),
details={**details, "models_unloaded": unloaded},
auto_resolved=True,
)
return CheckResult(
name="memory",
level=HealthLevel.CRITICAL,
message=(
f"Critical: only {free_gb:.1f}GB free "
f"(threshold: {memory_free_min_gb}GB)"
),
details=details,
needs_human=True,
)
return CheckResult(
name="memory",
level=HealthLevel.OK,
message=f"Memory OK — {free_gb:.1f}GB free of {total_gb:.1f}GB",
details=details,
)
except Exception as exc:
logger.warning("Memory check failed: %s", exc)
return CheckResult(
name="memory",
level=HealthLevel.UNKNOWN,
message=f"Memory check unavailable: {exc}",
)
def _get_memory_info(self) -> dict[str, float]:
"""Get memory stats via macOS sysctl + vm_stat.
Falls back gracefully on non-macOS systems.
"""
gb = 1024**3
total_bytes = 0.0
free_bytes = 0.0
# Total memory via sysctl
try:
result = subprocess.run(
["sysctl", "-n", "hw.memsize"],
capture_output=True,
text=True,
timeout=3,
)
total_bytes = float(result.stdout.strip())
except Exception:
pass
# Free + inactive pages via vm_stat (macOS)
try:
result = subprocess.run(
["vm_stat"],
capture_output=True,
text=True,
timeout=3,
)
page_size = 16384 # 16 KB default on Apple Silicon
for line in result.stdout.splitlines():
if "page size of" in line:
parts = line.split()
for i, part in enumerate(parts):
if part == "of" and i + 1 < len(parts):
try:
page_size = int(parts[i + 1])
except ValueError:
pass
elif "Pages free:" in line:
pages = int(line.split(":")[1].strip().rstrip("."))
free_bytes += pages * page_size
elif "Pages inactive:" in line:
pages = int(line.split(":")[1].strip().rstrip("."))
free_bytes += pages * page_size
except Exception:
pass
return {
"total_gb": total_bytes / gb if total_bytes else 0.0,
"free_gb": free_bytes / gb if free_bytes else 0.0,
}
# ── Disk ─────────────────────────────────────────────────────────────────
async def _check_disk(self) -> CheckResult:
"""Check disk usage via shutil.disk_usage."""
disk_free_min_gb = getattr(settings, "hermes_disk_free_min_gb", 10.0)
try:
usage = await asyncio.to_thread(shutil.disk_usage, "/")
free_gb = usage.free / (1024**3)
total_gb = usage.total / (1024**3)
used_pct = (usage.used / usage.total) * 100
details: dict[str, Any] = {
"free_gb": round(free_gb, 2),
"total_gb": round(total_gb, 2),
"used_pct": round(used_pct, 1),
}
if free_gb < disk_free_min_gb:
cleaned_gb = await self._cleanup_temp_files()
if cleaned_gb > 0.01:
return CheckResult(
name="disk",
level=HealthLevel.WARNING,
message=(
f"Low disk ({free_gb:.1f}GB free) — "
f"cleaned {cleaned_gb:.2f}GB from /tmp"
),
details={**details, "cleaned_gb": round(cleaned_gb, 2)},
auto_resolved=True,
)
return CheckResult(
name="disk",
level=HealthLevel.CRITICAL,
message=(
f"Critical: only {free_gb:.1f}GB free "
f"(threshold: {disk_free_min_gb}GB)"
),
details=details,
needs_human=True,
)
return CheckResult(
name="disk",
level=HealthLevel.OK,
message=f"Disk OK — {free_gb:.1f}GB free ({used_pct:.0f}% used)",
details=details,
)
except Exception as exc:
logger.warning("Disk check failed: %s", exc)
return CheckResult(
name="disk",
level=HealthLevel.UNKNOWN,
message=f"Disk check unavailable: {exc}",
)
async def _cleanup_temp_files(self) -> float:
"""Remove /tmp files older than 24 hours. Returns GB freed."""
return await asyncio.to_thread(self._cleanup_temp_files_sync)
def _cleanup_temp_files_sync(self) -> float:
"""Synchronous /tmp cleanup — only touches files older than 24 hours."""
from pathlib import Path
freed_bytes = 0
cutoff = time.time() - 86400 # 24 hours ago
try:
tmp = Path("/tmp")
for item in tmp.iterdir():
try:
stat = item.stat()
if stat.st_mtime >= cutoff:
continue
if item.is_file():
freed_bytes += stat.st_size
item.unlink(missing_ok=True)
elif item.is_dir():
dir_size = sum(
f.stat().st_size
for f in item.rglob("*")
if f.is_file()
)
freed_bytes += dir_size
shutil.rmtree(str(item), ignore_errors=True)
except (PermissionError, OSError):
pass # Skip files we can't touch
except Exception as exc:
logger.warning("Temp cleanup error: %s", exc)
freed_gb = freed_bytes / (1024**3)
if freed_gb > 0.001:
logger.info("Hermes disk cleanup: freed %.2fGB from /tmp", freed_gb)
return freed_gb
# ── Ollama ───────────────────────────────────────────────────────────────
async def _check_ollama(self) -> CheckResult:
"""Check Ollama status and loaded models."""
try:
status = await asyncio.to_thread(self._get_ollama_status)
if not status.get("reachable"):
restarted = await self._restart_ollama()
if restarted:
return CheckResult(
name="ollama",
level=HealthLevel.WARNING,
message="Ollama was unreachable — restart initiated",
details={"restart_attempted": True},
auto_resolved=True,
)
return CheckResult(
name="ollama",
level=HealthLevel.CRITICAL,
message="Ollama unreachable and restart failed",
details={"reachable": False},
needs_human=True,
)
models = status.get("models", [])
loaded = status.get("loaded_models", [])
return CheckResult(
name="ollama",
level=HealthLevel.OK,
message=(
f"Ollama OK — {len(models)} model(s) available, "
f"{len(loaded)} loaded"
),
details={
"reachable": True,
"model_count": len(models),
"loaded_count": len(loaded),
"loaded_models": [m.get("name", "") for m in loaded],
},
)
except Exception as exc:
logger.warning("Ollama check failed: %s", exc)
return CheckResult(
name="ollama",
level=HealthLevel.UNKNOWN,
message=f"Ollama check failed: {exc}",
)
def _get_ollama_status(self) -> dict[str, Any]:
"""Synchronous Ollama status — checks /api/tags and /api/ps."""
url = settings.normalized_ollama_url
try:
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp:
data = json.loads(resp.read().decode())
models = data.get("models", [])
except Exception:
return {"reachable": False, "models": [], "loaded_models": []}
# /api/ps lists currently loaded (in-memory) models — Ollama >=0.2
loaded: list[dict] = []
try:
req = urllib.request.Request(
f"{url}/api/ps",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp:
ps_data = json.loads(resp.read().decode())
loaded = ps_data.get("models", [])
except Exception:
pass # /api/ps absent on older Ollama — non-fatal
return {"reachable": True, "models": models, "loaded_models": loaded}
async def _unload_ollama_models(self) -> int:
"""Unload in-memory Ollama models to free unified memory.
Uses the keep_alive=0 trick: POSTing to /api/generate with
keep_alive=0 causes Ollama to immediately evict the model.
Returns the number of models successfully unloaded.
"""
return await asyncio.to_thread(self._unload_ollama_models_sync)
def _unload_ollama_models_sync(self) -> int:
"""Synchronous model unload implementation."""
url = settings.normalized_ollama_url
unloaded = 0
try:
req = urllib.request.Request(
f"{url}/api/ps",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp:
ps_data = json.loads(resp.read().decode())
loaded = ps_data.get("models", [])
except Exception:
return 0
for model in loaded:
name = model.get("name", "")
if not name:
continue
try:
payload = json.dumps({"model": name, "keep_alive": 0}).encode()
req = urllib.request.Request(
f"{url}/api/generate",
data=payload,
method="POST",
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=10) as _:
pass
logger.info("Hermes: unloaded Ollama model %s", name)
unloaded += 1
except Exception as exc:
logger.warning("Hermes: failed to unload model %s: %s", name, exc)
return unloaded
async def _restart_ollama(self) -> bool:
"""Attempt to restart the Ollama service via launchctl or brew."""
return await asyncio.to_thread(self._restart_ollama_sync)
def _restart_ollama_sync(self) -> bool:
"""Try launchctl first, then brew services."""
# macOS launchctl (installed via official Ollama installer)
try:
result = subprocess.run(
["launchctl", "stop", "com.ollama.ollama"],
capture_output=True,
timeout=10,
)
if result.returncode == 0:
time.sleep(2)
subprocess.run(
["launchctl", "start", "com.ollama.ollama"],
capture_output=True,
timeout=10,
)
logger.info("Hermes: Ollama restarted via launchctl")
return True
except Exception:
pass
# Homebrew fallback
try:
result = subprocess.run(
["brew", "services", "restart", "ollama"],
capture_output=True,
timeout=20,
)
if result.returncode == 0:
logger.info("Hermes: Ollama restarted via brew services")
return True
except Exception:
pass
logger.warning("Hermes: Ollama restart failed — manual intervention needed")
return False
# ── Processes ────────────────────────────────────────────────────────────
async def _check_processes(self) -> CheckResult:
"""Check for zombie processes via ps aux."""
try:
result = await asyncio.to_thread(self._get_zombie_processes)
zombies = result.get("zombies", [])
if zombies:
return CheckResult(
name="processes",
level=HealthLevel.WARNING,
message=f"Found {len(zombies)} zombie process(es)",
details={"zombies": zombies[:5]},
needs_human=len(zombies) > 3,
)
return CheckResult(
name="processes",
level=HealthLevel.OK,
message="Processes OK — no zombies detected",
details={"zombie_count": 0},
)
except Exception as exc:
logger.warning("Process check failed: %s", exc)
return CheckResult(
name="processes",
level=HealthLevel.UNKNOWN,
message=f"Process check unavailable: {exc}",
)
def _get_zombie_processes(self) -> dict[str, Any]:
"""Detect zombie processes (state 'Z') via ps aux."""
result = subprocess.run(
["ps", "aux"],
capture_output=True,
text=True,
timeout=5,
)
zombies = []
for line in result.stdout.splitlines()[1:]: # Skip header row
parts = line.split(None, 10)
if len(parts) >= 8 and parts[7] == "Z":
zombies.append(
{
"pid": parts[1],
"command": parts[10][:80] if len(parts) > 10 else "",
}
)
return {"zombies": zombies}
# ── Network ──────────────────────────────────────────────────────────────
async def _check_network(self) -> CheckResult:
"""Check Gitea connectivity."""
try:
result = await asyncio.to_thread(self._check_gitea_connectivity)
reachable = result.get("reachable", False)
latency_ms = result.get("latency_ms", -1.0)
if not reachable:
return CheckResult(
name="network",
level=HealthLevel.WARNING,
message=f"Gitea unreachable: {result.get('error', 'unknown')}",
details=result,
needs_human=True,
)
return CheckResult(
name="network",
level=HealthLevel.OK,
message=f"Network OK — Gitea reachable ({latency_ms:.0f}ms)",
details=result,
)
except Exception as exc:
logger.warning("Network check failed: %s", exc)
return CheckResult(
name="network",
level=HealthLevel.UNKNOWN,
message=f"Network check unavailable: {exc}",
)
def _check_gitea_connectivity(self) -> dict[str, Any]:
"""Synchronous Gitea reachability check."""
url = settings.gitea_url
start = time.monotonic()
try:
req = urllib.request.Request(
f"{url}/api/v1/version",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=self.NETWORK_REQUEST_TIMEOUT) as resp:
latency_ms = (time.monotonic() - start) * 1000
return {
"reachable": resp.status == 200,
"latency_ms": round(latency_ms, 1),
"url": url,
}
except Exception as exc:
return {
"reachable": False,
"error": str(exc),
"url": url,
"latency_ms": -1.0,
}
# ── Alerts ───────────────────────────────────────────────────────────────
async def _handle_alerts(self, report: HealthReport) -> None:
"""Send push notifications for issues that need attention."""
try:
from infrastructure.notifications.push import notifier
except Exception:
return
for check in report.checks:
if check.level == HealthLevel.CRITICAL or check.needs_human:
notifier.notify(
title=f"Hermes Alert: {check.name}",
message=check.message,
category="system",
native=check.level == HealthLevel.CRITICAL,
)
elif check.level == HealthLevel.WARNING and check.auto_resolved:
notifier.notify(
title=f"Hermes: {check.name} auto-fixed",
message=check.message,
category="system",
)
# Module-level singleton
hermes_monitor = HermesMonitor()

View File

@@ -0,0 +1,452 @@
"""Unit tests for the Hermes health monitor.
Tests all five checks (memory, disk, Ollama, processes, network) using mocks
so no real subprocesses or network calls are made.
Refs: #1073
"""
import json
from io import BytesIO
from unittest.mock import MagicMock, patch
import pytest
from infrastructure.hermes.monitor import CheckResult, HealthLevel, HealthReport, HermesMonitor
@pytest.fixture()
def monitor():
return HermesMonitor()
# ── Unit helpers ──────────────────────────────────────────────────────────────
class _FakeHTTPResponse:
"""Minimal urllib response stub."""
def __init__(self, body: bytes, status: int = 200):
self._body = body
self.status = status
def read(self) -> bytes:
return self._body
def __enter__(self):
return self
def __exit__(self, *_):
pass
# ── Memory check ──────────────────────────────────────────────────────────────
def test_get_memory_info_parses_vm_stat(monitor):
vm_stat_output = (
"Mach Virtual Memory Statistics: (page size of 16384 bytes)\n"
"Pages free: 12800.\n"
"Pages active: 50000.\n"
"Pages inactive: 25600.\n"
"Pages speculative: 1000.\n"
)
with (
patch("subprocess.run") as mock_run,
):
# First call: sysctl hw.memsize (total)
sysctl_result = MagicMock()
sysctl_result.stdout = "68719476736\n" # 64 GB
# Second call: vm_stat
vmstat_result = MagicMock()
vmstat_result.stdout = vm_stat_output
mock_run.side_effect = [sysctl_result, vmstat_result]
info = monitor._get_memory_info()
assert info["total_gb"] == pytest.approx(64.0, abs=0.1)
# pages free (12800) + inactive (25600) = 38400 * 16384 bytes = 629145600 bytes ≈ 0.586 GB
expected_free_gb = (38400 * 16384) / (1024**3)
assert info["free_gb"] == pytest.approx(expected_free_gb, abs=0.001)
def test_get_memory_info_handles_subprocess_failure(monitor):
with patch("subprocess.run", side_effect=OSError("no sysctl")):
info = monitor._get_memory_info()
assert info["total_gb"] == 0.0
assert info["free_gb"] == 0.0
@pytest.mark.asyncio
async def test_check_memory_ok(monitor):
with patch.object(monitor, "_get_memory_info", return_value={"free_gb": 20.0, "total_gb": 64.0}):
result = await monitor._check_memory()
assert result.name == "memory"
assert result.level == HealthLevel.OK
assert "20.0GB" in result.message
@pytest.mark.asyncio
async def test_check_memory_low_triggers_unload(monitor):
with (
patch.object(monitor, "_get_memory_info", return_value={"free_gb": 2.0, "total_gb": 64.0}),
patch.object(monitor, "_unload_ollama_models", return_value=2),
):
result = await monitor._check_memory()
assert result.level == HealthLevel.WARNING
assert result.auto_resolved is True
assert "unloaded 2" in result.message
@pytest.mark.asyncio
async def test_check_memory_critical_no_models_to_unload(monitor):
with (
patch.object(monitor, "_get_memory_info", return_value={"free_gb": 1.0, "total_gb": 64.0}),
patch.object(monitor, "_unload_ollama_models", return_value=0),
):
result = await monitor._check_memory()
assert result.level == HealthLevel.CRITICAL
assert result.needs_human is True
@pytest.mark.asyncio
async def test_check_memory_exception_returns_unknown(monitor):
with patch.object(monitor, "_get_memory_info", side_effect=RuntimeError("boom")):
result = await monitor._check_memory()
assert result.level == HealthLevel.UNKNOWN
# ── Disk check ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_check_disk_ok(monitor):
usage = MagicMock()
usage.free = 100 * (1024**3) # 100 GB
usage.total = 500 * (1024**3) # 500 GB
usage.used = 400 * (1024**3)
with patch("shutil.disk_usage", return_value=usage):
result = await monitor._check_disk()
assert result.level == HealthLevel.OK
assert "100.0GB free" in result.message
@pytest.mark.asyncio
async def test_check_disk_low_triggers_cleanup(monitor):
usage = MagicMock()
usage.free = 5 * (1024**3) # 5 GB — below threshold
usage.total = 500 * (1024**3)
usage.used = 495 * (1024**3)
with (
patch("shutil.disk_usage", return_value=usage),
patch.object(monitor, "_cleanup_temp_files", return_value=2.5),
):
result = await monitor._check_disk()
assert result.level == HealthLevel.WARNING
assert result.auto_resolved is True
assert "cleaned 2.50GB" in result.message
@pytest.mark.asyncio
async def test_check_disk_critical_when_cleanup_fails(monitor):
usage = MagicMock()
usage.free = 5 * (1024**3)
usage.total = 500 * (1024**3)
usage.used = 495 * (1024**3)
with (
patch("shutil.disk_usage", return_value=usage),
patch.object(monitor, "_cleanup_temp_files", return_value=0.0),
):
result = await monitor._check_disk()
assert result.level == HealthLevel.CRITICAL
assert result.needs_human is True
# ── Ollama check ──────────────────────────────────────────────────────────────
def test_get_ollama_status_reachable(monitor):
tags_body = json.dumps({
"models": [{"name": "qwen3:30b"}, {"name": "llama3.1:8b"}]
}).encode()
ps_body = json.dumps({
"models": [{"name": "qwen3:30b", "size": 1000}]
}).encode()
responses = [
_FakeHTTPResponse(tags_body),
_FakeHTTPResponse(ps_body),
]
with patch("urllib.request.urlopen", side_effect=responses):
status = monitor._get_ollama_status()
assert status["reachable"] is True
assert len(status["models"]) == 2
assert len(status["loaded_models"]) == 1
def test_get_ollama_status_unreachable(monitor):
with patch("urllib.request.urlopen", side_effect=OSError("connection refused")):
status = monitor._get_ollama_status()
assert status["reachable"] is False
assert status["models"] == []
assert status["loaded_models"] == []
@pytest.mark.asyncio
async def test_check_ollama_ok(monitor):
status = {
"reachable": True,
"models": [{"name": "qwen3:30b"}],
"loaded_models": [],
}
with patch.object(monitor, "_get_ollama_status", return_value=status):
result = await monitor._check_ollama()
assert result.level == HealthLevel.OK
assert result.details["reachable"] is True
@pytest.mark.asyncio
async def test_check_ollama_unreachable_restart_success(monitor):
status = {"reachable": False, "models": [], "loaded_models": []}
with (
patch.object(monitor, "_get_ollama_status", return_value=status),
patch.object(monitor, "_restart_ollama", return_value=True),
):
result = await monitor._check_ollama()
assert result.level == HealthLevel.WARNING
assert result.auto_resolved is True
@pytest.mark.asyncio
async def test_check_ollama_unreachable_restart_fails(monitor):
status = {"reachable": False, "models": [], "loaded_models": []}
with (
patch.object(monitor, "_get_ollama_status", return_value=status),
patch.object(monitor, "_restart_ollama", return_value=False),
):
result = await monitor._check_ollama()
assert result.level == HealthLevel.CRITICAL
assert result.needs_human is True
# ── Process check ─────────────────────────────────────────────────────────────
def test_get_zombie_processes_none(monitor):
ps_output = (
"USER PID %CPU %MEM VSZ RSS TT STAT STARTED TIME COMMAND\n"
"alex 123 0.1 0.2 100 200 s0 S 1:00 0:01 python\n"
"alex 456 0.0 0.1 50 100 s0 S 1:01 0:00 bash\n"
)
result = MagicMock()
result.stdout = ps_output
with patch("subprocess.run", return_value=result):
info = monitor._get_zombie_processes()
assert info["zombies"] == []
def test_get_zombie_processes_found(monitor):
ps_output = (
"USER PID %CPU %MEM VSZ RSS TT STAT STARTED TIME COMMAND\n"
"alex 123 0.1 0.2 100 200 s0 S 1:00 0:01 python\n"
"alex 789 0.0 0.0 0 0 s0 Z 1:02 0:00 defunct\n"
)
result = MagicMock()
result.stdout = ps_output
with patch("subprocess.run", return_value=result):
info = monitor._get_zombie_processes()
assert len(info["zombies"]) == 1
assert info["zombies"][0]["pid"] == "789"
@pytest.mark.asyncio
async def test_check_processes_no_zombies(monitor):
with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": []}):
result = await monitor._check_processes()
assert result.level == HealthLevel.OK
@pytest.mark.asyncio
async def test_check_processes_zombies_warning(monitor):
zombies = [{"pid": "100", "command": "defunct"}, {"pid": "101", "command": "defunct"}]
with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": zombies}):
result = await monitor._check_processes()
assert result.level == HealthLevel.WARNING
assert result.needs_human is False # Only 2, threshold is >3
@pytest.mark.asyncio
async def test_check_processes_many_zombies_needs_human(monitor):
zombies = [{"pid": str(i), "command": "defunct"} for i in range(5)]
with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": zombies}):
result = await monitor._check_processes()
assert result.needs_human is True
# ── Network check ─────────────────────────────────────────────────────────────
def test_check_gitea_connectivity_ok(monitor):
body = json.dumps({"version": "1.22.0"}).encode()
with patch("urllib.request.urlopen", return_value=_FakeHTTPResponse(body, status=200)):
info = monitor._check_gitea_connectivity()
assert info["reachable"] is True
assert info["latency_ms"] >= 0
def test_check_gitea_connectivity_unreachable(monitor):
with patch("urllib.request.urlopen", side_effect=OSError("refused")):
info = monitor._check_gitea_connectivity()
assert info["reachable"] is False
assert "error" in info
@pytest.mark.asyncio
async def test_check_network_ok(monitor):
with patch.object(
monitor,
"_check_gitea_connectivity",
return_value={"reachable": True, "latency_ms": 5.0, "url": "http://localhost:3000"},
):
result = await monitor._check_network()
assert result.level == HealthLevel.OK
assert "Gitea reachable" in result.message
@pytest.mark.asyncio
async def test_check_network_unreachable(monitor):
with patch.object(
monitor,
"_check_gitea_connectivity",
return_value={"reachable": False, "error": "refused", "url": "http://localhost:3000"},
):
result = await monitor._check_network()
assert result.level == HealthLevel.WARNING
assert result.needs_human is True
# ── Full cycle ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_run_cycle_all_ok(monitor):
ok_result = CheckResult(name="test", level=HealthLevel.OK, message="ok")
async def _ok_check():
return ok_result
with (
patch.object(monitor, "_check_memory", _ok_check),
patch.object(monitor, "_check_disk", _ok_check),
patch.object(monitor, "_check_ollama", _ok_check),
patch.object(monitor, "_check_processes", _ok_check),
patch.object(monitor, "_check_network", _ok_check),
patch.object(monitor, "_handle_alerts"),
):
report = await monitor.run_cycle()
assert report.overall == HealthLevel.OK
assert not report.has_issues
assert monitor.last_report is report
@pytest.mark.asyncio
async def test_run_cycle_sets_overall_to_worst(monitor):
async def _ok():
return CheckResult(name="ok", level=HealthLevel.OK, message="ok")
async def _critical():
return CheckResult(name="critical", level=HealthLevel.CRITICAL, message="bad")
with (
patch.object(monitor, "_check_memory", _ok),
patch.object(monitor, "_check_disk", _critical),
patch.object(monitor, "_check_ollama", _ok),
patch.object(monitor, "_check_processes", _ok),
patch.object(monitor, "_check_network", _ok),
patch.object(monitor, "_handle_alerts"),
):
report = await monitor.run_cycle()
assert report.overall == HealthLevel.CRITICAL
assert report.has_issues is True
@pytest.mark.asyncio
async def test_run_cycle_exception_becomes_unknown(monitor):
async def _ok():
return CheckResult(name="ok", level=HealthLevel.OK, message="ok")
async def _boom():
raise RuntimeError("unexpected error")
with (
patch.object(monitor, "_check_memory", _ok),
patch.object(monitor, "_check_disk", _ok),
patch.object(monitor, "_check_ollama", _boom),
patch.object(monitor, "_check_processes", _ok),
patch.object(monitor, "_check_network", _ok),
patch.object(monitor, "_handle_alerts"),
):
report = await monitor.run_cycle()
levels = {c.level for c in report.checks}
assert HealthLevel.UNKNOWN in levels
# ── to_dict serialisation ────────────────────────────────────────────────────
def test_check_result_to_dict():
c = CheckResult(
name="memory",
level=HealthLevel.WARNING,
message="low",
details={"free_gb": 3.5},
auto_resolved=True,
)
d = c.to_dict()
assert d["name"] == "memory"
assert d["level"] == "warning"
assert d["auto_resolved"] is True
assert d["details"]["free_gb"] == 3.5
def test_health_report_to_dict():
checks = [
CheckResult(name="disk", level=HealthLevel.OK, message="ok"),
]
report = HealthReport(
timestamp="2026-01-01T00:00:00+00:00",
checks=checks,
overall=HealthLevel.OK,
)
d = report.to_dict()
assert d["overall"] == "ok"
assert d["has_issues"] is False
assert len(d["checks"]) == 1