diff --git a/src/config.py b/src/config.py index f0c922cc..9921b0e2 100644 --- a/src/config.py +++ b/src/config.py @@ -366,6 +366,16 @@ class Settings(BaseSettings): # Default timeout for git operations. hands_git_timeout: int = 60 + # ── Hermes Health Monitor ───────────────────────────────────────── + # Enable the Hermes system health monitor (memory, disk, Ollama, processes, network). + hermes_enabled: bool = True + # How often Hermes runs a full health cycle (seconds). Default: 5 minutes. + hermes_interval_seconds: int = 300 + # Alert threshold: free memory below this triggers model unloading / alert (GB). + hermes_memory_free_min_gb: float = 4.0 + # Alert threshold: free disk below this triggers cleanup / alert (GB). + hermes_disk_free_min_gb: float = 10.0 + # ── Error Logging ───────────────────────────────────────────────── error_log_enabled: bool = True error_log_dir: str = "logs" diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 25a78204..3454780b 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -33,6 +33,7 @@ from dashboard.routes.calm import router as calm_router from dashboard.routes.chat_api import router as chat_api_router from dashboard.routes.chat_api_v1 import router as chat_api_v1_router from dashboard.routes.daily_run import router as daily_run_router +from dashboard.routes.hermes import router as hermes_router from dashboard.routes.db_explorer import router as db_explorer_router from dashboard.routes.discord import router as discord_router from dashboard.routes.experiments import router as experiments_router @@ -180,6 +181,33 @@ async def _thinking_scheduler() -> None: await asyncio.sleep(settings.thinking_interval_seconds) +async def _hermes_scheduler() -> None: + """Background task: Hermes system health monitor, runs every 5 minutes. + + Checks memory, disk, Ollama, processes, and network. + Auto-resolves what it can; fires push notifications when human help is needed. + """ + from infrastructure.hermes.monitor import hermes_monitor + + await asyncio.sleep(20) # Stagger after other schedulers + + while True: + try: + if settings.hermes_enabled: + report = await hermes_monitor.run_cycle() + if report.has_issues: + logger.warning( + "Hermes health issues detected — overall: %s", + report.overall.value, + ) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.error("Hermes scheduler error: %s", exc) + + await asyncio.sleep(settings.hermes_interval_seconds) + + async def _loop_qa_scheduler() -> None: """Background task: run capability self-tests on a separate timer. @@ -381,6 +409,7 @@ def _startup_background_tasks() -> list[asyncio.Task]: asyncio.create_task(_loop_qa_scheduler()), asyncio.create_task(_presence_watcher()), asyncio.create_task(_start_chat_integrations_background()), + asyncio.create_task(_hermes_scheduler()), ] try: from timmy.paperclip import start_paperclip_poller @@ -638,6 +667,7 @@ app.include_router(world_router) app.include_router(matrix_router) app.include_router(tower_router) app.include_router(daily_run_router) +app.include_router(hermes_router) app.include_router(quests_router) app.include_router(scorecards_router) app.include_router(sovereignty_metrics_router) diff --git a/src/dashboard/routes/hermes.py b/src/dashboard/routes/hermes.py new file mode 100644 index 00000000..c5ec61cf --- /dev/null +++ b/src/dashboard/routes/hermes.py @@ -0,0 +1,45 @@ +"""Hermes health monitor routes. + +Exposes the Hermes health monitor via REST API so the dashboard +and external tools can query system status and trigger checks. + +Refs: #1073 +""" + +import logging + +from fastapi import APIRouter + +from infrastructure.hermes.monitor import hermes_monitor + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/hermes", tags=["hermes"]) + + +@router.get("/status") +async def hermes_status(): + """Return the most recent Hermes health report. + + Returns the cached result from the last background cycle — does not + trigger a new check. Use POST /hermes/check to run an immediate check. + """ + report = hermes_monitor.last_report + if report is None: + return { + "status": "no_data", + "message": "No health report yet — first cycle pending", + "seconds_since_last_run": hermes_monitor.seconds_since_last_run, + } + return report.to_dict() + + +@router.post("/check") +async def hermes_check(): + """Trigger an immediate Hermes health check cycle. + + Runs all monitors synchronously and returns the full report. + Use sparingly — this blocks until all checks complete (~5 seconds). + """ + report = await hermes_monitor.run_cycle() + return report.to_dict() diff --git a/src/infrastructure/hermes/__init__.py b/src/infrastructure/hermes/__init__.py new file mode 100644 index 00000000..b698c7d5 --- /dev/null +++ b/src/infrastructure/hermes/__init__.py @@ -0,0 +1,9 @@ +"""Hermes health monitor — system resources + model management. + +Monitors the local machine (Hermes/M3 Max) for memory pressure, disk usage, +Ollama model health, zombie processes, and network connectivity. +""" + +from infrastructure.hermes.monitor import HermesMonitor, HealthLevel, HealthReport, hermes_monitor + +__all__ = ["HermesMonitor", "HealthLevel", "HealthReport", "hermes_monitor"] diff --git a/src/infrastructure/hermes/monitor.py b/src/infrastructure/hermes/monitor.py new file mode 100644 index 00000000..7619ea68 --- /dev/null +++ b/src/infrastructure/hermes/monitor.py @@ -0,0 +1,668 @@ +"""Hermes health monitor — system resources + model management. + +Monitors the local machine (Hermes/M3 Max) and keeps it running smoothly. +Runs every 5 minutes, auto-resolves issues where possible, alerts when +human intervention is needed. + +Monitors: + 1. Memory pressure — unified memory, alert if <4GB free, unload models + 2. Disk usage — alert if <10GB free, clean temp files + 3. Ollama status — verify reachable, restart if crashed, manage loaded models + 4. Process health — detect zombie processes + 5. Network — verify Gitea connectivity + +Refs: #1073 +""" + +import asyncio +import json +import logging +import shutil +import subprocess +import time +import urllib.request +from dataclasses import dataclass, field +from datetime import UTC, datetime +from enum import Enum +from typing import Any + +from config import settings + +logger = logging.getLogger(__name__) + + +class HealthLevel(str, Enum): + """Severity level for a health check result.""" + + OK = "ok" + WARNING = "warning" + CRITICAL = "critical" + UNKNOWN = "unknown" + + +@dataclass +class CheckResult: + """Result of a single health check.""" + + name: str + level: HealthLevel + message: str + details: dict[str, Any] = field(default_factory=dict) + auto_resolved: bool = False + needs_human: bool = False + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "level": self.level.value, + "message": self.message, + "details": self.details, + "auto_resolved": self.auto_resolved, + "needs_human": self.needs_human, + } + + +@dataclass +class HealthReport: + """Full health report from a single monitor cycle.""" + + timestamp: str + checks: list[CheckResult] + overall: HealthLevel + + @property + def has_issues(self) -> bool: + return any(c.level != HealthLevel.OK for c in self.checks) + + def to_dict(self) -> dict[str, Any]: + return { + "timestamp": self.timestamp, + "overall": self.overall.value, + "has_issues": self.has_issues, + "checks": [c.to_dict() for c in self.checks], + } + + +class HermesMonitor: + """System health monitor for Hermes (local M3 Max machine). + + All blocking I/O (subprocess, HTTP) is wrapped in asyncio.to_thread() + so it never blocks the event loop. Results are cached so the dashboard + can read the last report without triggering a new cycle. + """ + + OLLAMA_REQUEST_TIMEOUT = 5 + NETWORK_REQUEST_TIMEOUT = 5 + + def __init__(self) -> None: + self._last_report: HealthReport | None = None + self._last_run_ts: float = 0.0 + + @property + def last_report(self) -> HealthReport | None: + """Most recent health report, or None if no cycle has run yet.""" + return self._last_report + + @property + def seconds_since_last_run(self) -> float: + if self._last_run_ts == 0.0: + return float("inf") + return time.monotonic() - self._last_run_ts + + async def run_cycle(self) -> HealthReport: + """Run a full health check cycle and return the report.""" + self._last_run_ts = time.monotonic() + logger.info("Hermes health cycle starting") + + check_fns = [ + self._check_memory(), + self._check_disk(), + self._check_ollama(), + self._check_processes(), + self._check_network(), + ] + + raw_results = await asyncio.gather(*check_fns, return_exceptions=True) + + checks: list[CheckResult] = [] + for i, r in enumerate(raw_results): + if isinstance(r, Exception): + name = ["memory", "disk", "ollama", "processes", "network"][i] + logger.warning("Hermes check '%s' raised: %s", name, r) + checks.append( + CheckResult( + name=name, + level=HealthLevel.UNKNOWN, + message=f"Check error: {r}", + ) + ) + else: + checks.append(r) + + # Compute overall level + levels = {c.level for c in checks} + if HealthLevel.CRITICAL in levels: + overall = HealthLevel.CRITICAL + elif HealthLevel.WARNING in levels: + overall = HealthLevel.WARNING + elif HealthLevel.UNKNOWN in levels: + overall = HealthLevel.UNKNOWN + else: + overall = HealthLevel.OK + + report = HealthReport( + timestamp=datetime.now(UTC).isoformat(), + checks=checks, + overall=overall, + ) + self._last_report = report + + await self._handle_alerts(report) + + logger.info("Hermes health cycle complete — overall: %s", overall.value) + return report + + # ── Memory ─────────────────────────────────────────────────────────────── + + async def _check_memory(self) -> CheckResult: + """Check unified memory usage (macOS vm_stat).""" + memory_free_min_gb = getattr(settings, "hermes_memory_free_min_gb", 4.0) + try: + info = await asyncio.to_thread(self._get_memory_info) + free_gb = info.get("free_gb", 0.0) + total_gb = info.get("total_gb", 0.0) + details: dict[str, Any] = { + "free_gb": round(free_gb, 2), + "total_gb": round(total_gb, 2), + } + + if free_gb < memory_free_min_gb: + # Attempt auto-remediation: unload Ollama models + unloaded = await self._unload_ollama_models() + if unloaded: + return CheckResult( + name="memory", + level=HealthLevel.WARNING, + message=( + f"Low memory ({free_gb:.1f}GB free) — " + f"unloaded {unloaded} Ollama model(s)" + ), + details={**details, "models_unloaded": unloaded}, + auto_resolved=True, + ) + return CheckResult( + name="memory", + level=HealthLevel.CRITICAL, + message=( + f"Critical: only {free_gb:.1f}GB free " + f"(threshold: {memory_free_min_gb}GB)" + ), + details=details, + needs_human=True, + ) + + return CheckResult( + name="memory", + level=HealthLevel.OK, + message=f"Memory OK — {free_gb:.1f}GB free of {total_gb:.1f}GB", + details=details, + ) + except Exception as exc: + logger.warning("Memory check failed: %s", exc) + return CheckResult( + name="memory", + level=HealthLevel.UNKNOWN, + message=f"Memory check unavailable: {exc}", + ) + + def _get_memory_info(self) -> dict[str, float]: + """Get memory stats via macOS sysctl + vm_stat. + + Falls back gracefully on non-macOS systems. + """ + gb = 1024**3 + total_bytes = 0.0 + free_bytes = 0.0 + + # Total memory via sysctl + try: + result = subprocess.run( + ["sysctl", "-n", "hw.memsize"], + capture_output=True, + text=True, + timeout=3, + ) + total_bytes = float(result.stdout.strip()) + except Exception: + pass + + # Free + inactive pages via vm_stat (macOS) + try: + result = subprocess.run( + ["vm_stat"], + capture_output=True, + text=True, + timeout=3, + ) + page_size = 16384 # 16 KB default on Apple Silicon + for line in result.stdout.splitlines(): + if "page size of" in line: + parts = line.split() + for i, part in enumerate(parts): + if part == "of" and i + 1 < len(parts): + try: + page_size = int(parts[i + 1]) + except ValueError: + pass + elif "Pages free:" in line: + pages = int(line.split(":")[1].strip().rstrip(".")) + free_bytes += pages * page_size + elif "Pages inactive:" in line: + pages = int(line.split(":")[1].strip().rstrip(".")) + free_bytes += pages * page_size + except Exception: + pass + + return { + "total_gb": total_bytes / gb if total_bytes else 0.0, + "free_gb": free_bytes / gb if free_bytes else 0.0, + } + + # ── Disk ───────────────────────────────────────────────────────────────── + + async def _check_disk(self) -> CheckResult: + """Check disk usage via shutil.disk_usage.""" + disk_free_min_gb = getattr(settings, "hermes_disk_free_min_gb", 10.0) + try: + usage = await asyncio.to_thread(shutil.disk_usage, "/") + free_gb = usage.free / (1024**3) + total_gb = usage.total / (1024**3) + used_pct = (usage.used / usage.total) * 100 + + details: dict[str, Any] = { + "free_gb": round(free_gb, 2), + "total_gb": round(total_gb, 2), + "used_pct": round(used_pct, 1), + } + + if free_gb < disk_free_min_gb: + cleaned_gb = await self._cleanup_temp_files() + if cleaned_gb > 0.01: + return CheckResult( + name="disk", + level=HealthLevel.WARNING, + message=( + f"Low disk ({free_gb:.1f}GB free) — " + f"cleaned {cleaned_gb:.2f}GB from /tmp" + ), + details={**details, "cleaned_gb": round(cleaned_gb, 2)}, + auto_resolved=True, + ) + return CheckResult( + name="disk", + level=HealthLevel.CRITICAL, + message=( + f"Critical: only {free_gb:.1f}GB free " + f"(threshold: {disk_free_min_gb}GB)" + ), + details=details, + needs_human=True, + ) + + return CheckResult( + name="disk", + level=HealthLevel.OK, + message=f"Disk OK — {free_gb:.1f}GB free ({used_pct:.0f}% used)", + details=details, + ) + except Exception as exc: + logger.warning("Disk check failed: %s", exc) + return CheckResult( + name="disk", + level=HealthLevel.UNKNOWN, + message=f"Disk check unavailable: {exc}", + ) + + async def _cleanup_temp_files(self) -> float: + """Remove /tmp files older than 24 hours. Returns GB freed.""" + return await asyncio.to_thread(self._cleanup_temp_files_sync) + + def _cleanup_temp_files_sync(self) -> float: + """Synchronous /tmp cleanup — only touches files older than 24 hours.""" + from pathlib import Path + + freed_bytes = 0 + cutoff = time.time() - 86400 # 24 hours ago + + try: + tmp = Path("/tmp") + for item in tmp.iterdir(): + try: + stat = item.stat() + if stat.st_mtime >= cutoff: + continue + if item.is_file(): + freed_bytes += stat.st_size + item.unlink(missing_ok=True) + elif item.is_dir(): + dir_size = sum( + f.stat().st_size + for f in item.rglob("*") + if f.is_file() + ) + freed_bytes += dir_size + shutil.rmtree(str(item), ignore_errors=True) + except (PermissionError, OSError): + pass # Skip files we can't touch + except Exception as exc: + logger.warning("Temp cleanup error: %s", exc) + + freed_gb = freed_bytes / (1024**3) + if freed_gb > 0.001: + logger.info("Hermes disk cleanup: freed %.2fGB from /tmp", freed_gb) + return freed_gb + + # ── Ollama ─────────────────────────────────────────────────────────────── + + async def _check_ollama(self) -> CheckResult: + """Check Ollama status and loaded models.""" + try: + status = await asyncio.to_thread(self._get_ollama_status) + + if not status.get("reachable"): + restarted = await self._restart_ollama() + if restarted: + return CheckResult( + name="ollama", + level=HealthLevel.WARNING, + message="Ollama was unreachable — restart initiated", + details={"restart_attempted": True}, + auto_resolved=True, + ) + return CheckResult( + name="ollama", + level=HealthLevel.CRITICAL, + message="Ollama unreachable and restart failed", + details={"reachable": False}, + needs_human=True, + ) + + models = status.get("models", []) + loaded = status.get("loaded_models", []) + return CheckResult( + name="ollama", + level=HealthLevel.OK, + message=( + f"Ollama OK — {len(models)} model(s) available, " + f"{len(loaded)} loaded" + ), + details={ + "reachable": True, + "model_count": len(models), + "loaded_count": len(loaded), + "loaded_models": [m.get("name", "") for m in loaded], + }, + ) + except Exception as exc: + logger.warning("Ollama check failed: %s", exc) + return CheckResult( + name="ollama", + level=HealthLevel.UNKNOWN, + message=f"Ollama check failed: {exc}", + ) + + def _get_ollama_status(self) -> dict[str, Any]: + """Synchronous Ollama status — checks /api/tags and /api/ps.""" + url = settings.normalized_ollama_url + + try: + req = urllib.request.Request( + f"{url}/api/tags", + method="GET", + headers={"Accept": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp: + data = json.loads(resp.read().decode()) + models = data.get("models", []) + except Exception: + return {"reachable": False, "models": [], "loaded_models": []} + + # /api/ps lists currently loaded (in-memory) models — Ollama >=0.2 + loaded: list[dict] = [] + try: + req = urllib.request.Request( + f"{url}/api/ps", + method="GET", + headers={"Accept": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp: + ps_data = json.loads(resp.read().decode()) + loaded = ps_data.get("models", []) + except Exception: + pass # /api/ps absent on older Ollama — non-fatal + + return {"reachable": True, "models": models, "loaded_models": loaded} + + async def _unload_ollama_models(self) -> int: + """Unload in-memory Ollama models to free unified memory. + + Uses the keep_alive=0 trick: POSTing to /api/generate with + keep_alive=0 causes Ollama to immediately evict the model. + Returns the number of models successfully unloaded. + """ + return await asyncio.to_thread(self._unload_ollama_models_sync) + + def _unload_ollama_models_sync(self) -> int: + """Synchronous model unload implementation.""" + url = settings.normalized_ollama_url + unloaded = 0 + + try: + req = urllib.request.Request( + f"{url}/api/ps", + method="GET", + headers={"Accept": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp: + ps_data = json.loads(resp.read().decode()) + loaded = ps_data.get("models", []) + except Exception: + return 0 + + for model in loaded: + name = model.get("name", "") + if not name: + continue + try: + payload = json.dumps({"model": name, "keep_alive": 0}).encode() + req = urllib.request.Request( + f"{url}/api/generate", + data=payload, + method="POST", + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=10) as _: + pass + logger.info("Hermes: unloaded Ollama model %s", name) + unloaded += 1 + except Exception as exc: + logger.warning("Hermes: failed to unload model %s: %s", name, exc) + + return unloaded + + async def _restart_ollama(self) -> bool: + """Attempt to restart the Ollama service via launchctl or brew.""" + return await asyncio.to_thread(self._restart_ollama_sync) + + def _restart_ollama_sync(self) -> bool: + """Try launchctl first, then brew services.""" + # macOS launchctl (installed via official Ollama installer) + try: + result = subprocess.run( + ["launchctl", "stop", "com.ollama.ollama"], + capture_output=True, + timeout=10, + ) + if result.returncode == 0: + time.sleep(2) + subprocess.run( + ["launchctl", "start", "com.ollama.ollama"], + capture_output=True, + timeout=10, + ) + logger.info("Hermes: Ollama restarted via launchctl") + return True + except Exception: + pass + + # Homebrew fallback + try: + result = subprocess.run( + ["brew", "services", "restart", "ollama"], + capture_output=True, + timeout=20, + ) + if result.returncode == 0: + logger.info("Hermes: Ollama restarted via brew services") + return True + except Exception: + pass + + logger.warning("Hermes: Ollama restart failed — manual intervention needed") + return False + + # ── Processes ──────────────────────────────────────────────────────────── + + async def _check_processes(self) -> CheckResult: + """Check for zombie processes via ps aux.""" + try: + result = await asyncio.to_thread(self._get_zombie_processes) + zombies = result.get("zombies", []) + + if zombies: + return CheckResult( + name="processes", + level=HealthLevel.WARNING, + message=f"Found {len(zombies)} zombie process(es)", + details={"zombies": zombies[:5]}, + needs_human=len(zombies) > 3, + ) + + return CheckResult( + name="processes", + level=HealthLevel.OK, + message="Processes OK — no zombies detected", + details={"zombie_count": 0}, + ) + except Exception as exc: + logger.warning("Process check failed: %s", exc) + return CheckResult( + name="processes", + level=HealthLevel.UNKNOWN, + message=f"Process check unavailable: {exc}", + ) + + def _get_zombie_processes(self) -> dict[str, Any]: + """Detect zombie processes (state 'Z') via ps aux.""" + result = subprocess.run( + ["ps", "aux"], + capture_output=True, + text=True, + timeout=5, + ) + zombies = [] + for line in result.stdout.splitlines()[1:]: # Skip header row + parts = line.split(None, 10) + if len(parts) >= 8 and parts[7] == "Z": + zombies.append( + { + "pid": parts[1], + "command": parts[10][:80] if len(parts) > 10 else "", + } + ) + return {"zombies": zombies} + + # ── Network ────────────────────────────────────────────────────────────── + + async def _check_network(self) -> CheckResult: + """Check Gitea connectivity.""" + try: + result = await asyncio.to_thread(self._check_gitea_connectivity) + reachable = result.get("reachable", False) + latency_ms = result.get("latency_ms", -1.0) + + if not reachable: + return CheckResult( + name="network", + level=HealthLevel.WARNING, + message=f"Gitea unreachable: {result.get('error', 'unknown')}", + details=result, + needs_human=True, + ) + + return CheckResult( + name="network", + level=HealthLevel.OK, + message=f"Network OK — Gitea reachable ({latency_ms:.0f}ms)", + details=result, + ) + except Exception as exc: + logger.warning("Network check failed: %s", exc) + return CheckResult( + name="network", + level=HealthLevel.UNKNOWN, + message=f"Network check unavailable: {exc}", + ) + + def _check_gitea_connectivity(self) -> dict[str, Any]: + """Synchronous Gitea reachability check.""" + url = settings.gitea_url + start = time.monotonic() + try: + req = urllib.request.Request( + f"{url}/api/v1/version", + method="GET", + headers={"Accept": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=self.NETWORK_REQUEST_TIMEOUT) as resp: + latency_ms = (time.monotonic() - start) * 1000 + return { + "reachable": resp.status == 200, + "latency_ms": round(latency_ms, 1), + "url": url, + } + except Exception as exc: + return { + "reachable": False, + "error": str(exc), + "url": url, + "latency_ms": -1.0, + } + + # ── Alerts ─────────────────────────────────────────────────────────────── + + async def _handle_alerts(self, report: HealthReport) -> None: + """Send push notifications for issues that need attention.""" + try: + from infrastructure.notifications.push import notifier + except Exception: + return + + for check in report.checks: + if check.level == HealthLevel.CRITICAL or check.needs_human: + notifier.notify( + title=f"Hermes Alert: {check.name}", + message=check.message, + category="system", + native=check.level == HealthLevel.CRITICAL, + ) + elif check.level == HealthLevel.WARNING and check.auto_resolved: + notifier.notify( + title=f"Hermes: {check.name} auto-fixed", + message=check.message, + category="system", + ) + + +# Module-level singleton +hermes_monitor = HermesMonitor() diff --git a/tests/unit/test_hermes_monitor.py b/tests/unit/test_hermes_monitor.py new file mode 100644 index 00000000..c4e0e23f --- /dev/null +++ b/tests/unit/test_hermes_monitor.py @@ -0,0 +1,452 @@ +"""Unit tests for the Hermes health monitor. + +Tests all five checks (memory, disk, Ollama, processes, network) using mocks +so no real subprocesses or network calls are made. + +Refs: #1073 +""" + +import json +from io import BytesIO +from unittest.mock import MagicMock, patch + +import pytest + +from infrastructure.hermes.monitor import CheckResult, HealthLevel, HealthReport, HermesMonitor + + +@pytest.fixture() +def monitor(): + return HermesMonitor() + + +# ── Unit helpers ────────────────────────────────────────────────────────────── + + +class _FakeHTTPResponse: + """Minimal urllib response stub.""" + + def __init__(self, body: bytes, status: int = 200): + self._body = body + self.status = status + + def read(self) -> bytes: + return self._body + + def __enter__(self): + return self + + def __exit__(self, *_): + pass + + +# ── Memory check ────────────────────────────────────────────────────────────── + + +def test_get_memory_info_parses_vm_stat(monitor): + vm_stat_output = ( + "Mach Virtual Memory Statistics: (page size of 16384 bytes)\n" + "Pages free: 12800.\n" + "Pages active: 50000.\n" + "Pages inactive: 25600.\n" + "Pages speculative: 1000.\n" + ) + with ( + patch("subprocess.run") as mock_run, + ): + # First call: sysctl hw.memsize (total) + sysctl_result = MagicMock() + sysctl_result.stdout = "68719476736\n" # 64 GB + # Second call: vm_stat + vmstat_result = MagicMock() + vmstat_result.stdout = vm_stat_output + mock_run.side_effect = [sysctl_result, vmstat_result] + + info = monitor._get_memory_info() + + assert info["total_gb"] == pytest.approx(64.0, abs=0.1) + # pages free (12800) + inactive (25600) = 38400 * 16384 bytes = 629145600 bytes ≈ 0.586 GB + expected_free_gb = (38400 * 16384) / (1024**3) + assert info["free_gb"] == pytest.approx(expected_free_gb, abs=0.001) + + +def test_get_memory_info_handles_subprocess_failure(monitor): + with patch("subprocess.run", side_effect=OSError("no sysctl")): + info = monitor._get_memory_info() + assert info["total_gb"] == 0.0 + assert info["free_gb"] == 0.0 + + +@pytest.mark.asyncio +async def test_check_memory_ok(monitor): + with patch.object(monitor, "_get_memory_info", return_value={"free_gb": 20.0, "total_gb": 64.0}): + result = await monitor._check_memory() + + assert result.name == "memory" + assert result.level == HealthLevel.OK + assert "20.0GB" in result.message + + +@pytest.mark.asyncio +async def test_check_memory_low_triggers_unload(monitor): + with ( + patch.object(monitor, "_get_memory_info", return_value={"free_gb": 2.0, "total_gb": 64.0}), + patch.object(monitor, "_unload_ollama_models", return_value=2), + ): + result = await monitor._check_memory() + + assert result.level == HealthLevel.WARNING + assert result.auto_resolved is True + assert "unloaded 2" in result.message + + +@pytest.mark.asyncio +async def test_check_memory_critical_no_models_to_unload(monitor): + with ( + patch.object(monitor, "_get_memory_info", return_value={"free_gb": 1.0, "total_gb": 64.0}), + patch.object(monitor, "_unload_ollama_models", return_value=0), + ): + result = await monitor._check_memory() + + assert result.level == HealthLevel.CRITICAL + assert result.needs_human is True + + +@pytest.mark.asyncio +async def test_check_memory_exception_returns_unknown(monitor): + with patch.object(monitor, "_get_memory_info", side_effect=RuntimeError("boom")): + result = await monitor._check_memory() + + assert result.level == HealthLevel.UNKNOWN + + +# ── Disk check ──────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_check_disk_ok(monitor): + usage = MagicMock() + usage.free = 100 * (1024**3) # 100 GB + usage.total = 500 * (1024**3) # 500 GB + usage.used = 400 * (1024**3) + + with patch("shutil.disk_usage", return_value=usage): + result = await monitor._check_disk() + + assert result.level == HealthLevel.OK + assert "100.0GB free" in result.message + + +@pytest.mark.asyncio +async def test_check_disk_low_triggers_cleanup(monitor): + usage = MagicMock() + usage.free = 5 * (1024**3) # 5 GB — below threshold + usage.total = 500 * (1024**3) + usage.used = 495 * (1024**3) + + with ( + patch("shutil.disk_usage", return_value=usage), + patch.object(monitor, "_cleanup_temp_files", return_value=2.5), + ): + result = await monitor._check_disk() + + assert result.level == HealthLevel.WARNING + assert result.auto_resolved is True + assert "cleaned 2.50GB" in result.message + + +@pytest.mark.asyncio +async def test_check_disk_critical_when_cleanup_fails(monitor): + usage = MagicMock() + usage.free = 5 * (1024**3) + usage.total = 500 * (1024**3) + usage.used = 495 * (1024**3) + + with ( + patch("shutil.disk_usage", return_value=usage), + patch.object(monitor, "_cleanup_temp_files", return_value=0.0), + ): + result = await monitor._check_disk() + + assert result.level == HealthLevel.CRITICAL + assert result.needs_human is True + + +# ── Ollama check ────────────────────────────────────────────────────────────── + + +def test_get_ollama_status_reachable(monitor): + tags_body = json.dumps({ + "models": [{"name": "qwen3:30b"}, {"name": "llama3.1:8b"}] + }).encode() + ps_body = json.dumps({ + "models": [{"name": "qwen3:30b", "size": 1000}] + }).encode() + + responses = [ + _FakeHTTPResponse(tags_body), + _FakeHTTPResponse(ps_body), + ] + + with patch("urllib.request.urlopen", side_effect=responses): + status = monitor._get_ollama_status() + + assert status["reachable"] is True + assert len(status["models"]) == 2 + assert len(status["loaded_models"]) == 1 + + +def test_get_ollama_status_unreachable(monitor): + with patch("urllib.request.urlopen", side_effect=OSError("connection refused")): + status = monitor._get_ollama_status() + + assert status["reachable"] is False + assert status["models"] == [] + assert status["loaded_models"] == [] + + +@pytest.mark.asyncio +async def test_check_ollama_ok(monitor): + status = { + "reachable": True, + "models": [{"name": "qwen3:30b"}], + "loaded_models": [], + } + with patch.object(monitor, "_get_ollama_status", return_value=status): + result = await monitor._check_ollama() + + assert result.level == HealthLevel.OK + assert result.details["reachable"] is True + + +@pytest.mark.asyncio +async def test_check_ollama_unreachable_restart_success(monitor): + status = {"reachable": False, "models": [], "loaded_models": []} + with ( + patch.object(monitor, "_get_ollama_status", return_value=status), + patch.object(monitor, "_restart_ollama", return_value=True), + ): + result = await monitor._check_ollama() + + assert result.level == HealthLevel.WARNING + assert result.auto_resolved is True + + +@pytest.mark.asyncio +async def test_check_ollama_unreachable_restart_fails(monitor): + status = {"reachable": False, "models": [], "loaded_models": []} + with ( + patch.object(monitor, "_get_ollama_status", return_value=status), + patch.object(monitor, "_restart_ollama", return_value=False), + ): + result = await monitor._check_ollama() + + assert result.level == HealthLevel.CRITICAL + assert result.needs_human is True + + +# ── Process check ───────────────────────────────────────────────────────────── + + +def test_get_zombie_processes_none(monitor): + ps_output = ( + "USER PID %CPU %MEM VSZ RSS TT STAT STARTED TIME COMMAND\n" + "alex 123 0.1 0.2 100 200 s0 S 1:00 0:01 python\n" + "alex 456 0.0 0.1 50 100 s0 S 1:01 0:00 bash\n" + ) + result = MagicMock() + result.stdout = ps_output + with patch("subprocess.run", return_value=result): + info = monitor._get_zombie_processes() + + assert info["zombies"] == [] + + +def test_get_zombie_processes_found(monitor): + ps_output = ( + "USER PID %CPU %MEM VSZ RSS TT STAT STARTED TIME COMMAND\n" + "alex 123 0.1 0.2 100 200 s0 S 1:00 0:01 python\n" + "alex 789 0.0 0.0 0 0 s0 Z 1:02 0:00 defunct\n" + ) + result = MagicMock() + result.stdout = ps_output + with patch("subprocess.run", return_value=result): + info = monitor._get_zombie_processes() + + assert len(info["zombies"]) == 1 + assert info["zombies"][0]["pid"] == "789" + + +@pytest.mark.asyncio +async def test_check_processes_no_zombies(monitor): + with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": []}): + result = await monitor._check_processes() + + assert result.level == HealthLevel.OK + + +@pytest.mark.asyncio +async def test_check_processes_zombies_warning(monitor): + zombies = [{"pid": "100", "command": "defunct"}, {"pid": "101", "command": "defunct"}] + with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": zombies}): + result = await monitor._check_processes() + + assert result.level == HealthLevel.WARNING + assert result.needs_human is False # Only 2, threshold is >3 + + +@pytest.mark.asyncio +async def test_check_processes_many_zombies_needs_human(monitor): + zombies = [{"pid": str(i), "command": "defunct"} for i in range(5)] + with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": zombies}): + result = await monitor._check_processes() + + assert result.needs_human is True + + +# ── Network check ───────────────────────────────────────────────────────────── + + +def test_check_gitea_connectivity_ok(monitor): + body = json.dumps({"version": "1.22.0"}).encode() + with patch("urllib.request.urlopen", return_value=_FakeHTTPResponse(body, status=200)): + info = monitor._check_gitea_connectivity() + + assert info["reachable"] is True + assert info["latency_ms"] >= 0 + + +def test_check_gitea_connectivity_unreachable(monitor): + with patch("urllib.request.urlopen", side_effect=OSError("refused")): + info = monitor._check_gitea_connectivity() + + assert info["reachable"] is False + assert "error" in info + + +@pytest.mark.asyncio +async def test_check_network_ok(monitor): + with patch.object( + monitor, + "_check_gitea_connectivity", + return_value={"reachable": True, "latency_ms": 5.0, "url": "http://localhost:3000"}, + ): + result = await monitor._check_network() + + assert result.level == HealthLevel.OK + assert "Gitea reachable" in result.message + + +@pytest.mark.asyncio +async def test_check_network_unreachable(monitor): + with patch.object( + monitor, + "_check_gitea_connectivity", + return_value={"reachable": False, "error": "refused", "url": "http://localhost:3000"}, + ): + result = await monitor._check_network() + + assert result.level == HealthLevel.WARNING + assert result.needs_human is True + + +# ── Full cycle ──────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_run_cycle_all_ok(monitor): + ok_result = CheckResult(name="test", level=HealthLevel.OK, message="ok") + + async def _ok_check(): + return ok_result + + with ( + patch.object(monitor, "_check_memory", _ok_check), + patch.object(monitor, "_check_disk", _ok_check), + patch.object(monitor, "_check_ollama", _ok_check), + patch.object(monitor, "_check_processes", _ok_check), + patch.object(monitor, "_check_network", _ok_check), + patch.object(monitor, "_handle_alerts"), + ): + report = await monitor.run_cycle() + + assert report.overall == HealthLevel.OK + assert not report.has_issues + assert monitor.last_report is report + + +@pytest.mark.asyncio +async def test_run_cycle_sets_overall_to_worst(monitor): + async def _ok(): + return CheckResult(name="ok", level=HealthLevel.OK, message="ok") + + async def _critical(): + return CheckResult(name="critical", level=HealthLevel.CRITICAL, message="bad") + + with ( + patch.object(monitor, "_check_memory", _ok), + patch.object(monitor, "_check_disk", _critical), + patch.object(monitor, "_check_ollama", _ok), + patch.object(monitor, "_check_processes", _ok), + patch.object(monitor, "_check_network", _ok), + patch.object(monitor, "_handle_alerts"), + ): + report = await monitor.run_cycle() + + assert report.overall == HealthLevel.CRITICAL + assert report.has_issues is True + + +@pytest.mark.asyncio +async def test_run_cycle_exception_becomes_unknown(monitor): + async def _ok(): + return CheckResult(name="ok", level=HealthLevel.OK, message="ok") + + async def _boom(): + raise RuntimeError("unexpected error") + + with ( + patch.object(monitor, "_check_memory", _ok), + patch.object(monitor, "_check_disk", _ok), + patch.object(monitor, "_check_ollama", _boom), + patch.object(monitor, "_check_processes", _ok), + patch.object(monitor, "_check_network", _ok), + patch.object(monitor, "_handle_alerts"), + ): + report = await monitor.run_cycle() + + levels = {c.level for c in report.checks} + assert HealthLevel.UNKNOWN in levels + + +# ── to_dict serialisation ──────────────────────────────────────────────────── + + +def test_check_result_to_dict(): + c = CheckResult( + name="memory", + level=HealthLevel.WARNING, + message="low", + details={"free_gb": 3.5}, + auto_resolved=True, + ) + d = c.to_dict() + assert d["name"] == "memory" + assert d["level"] == "warning" + assert d["auto_resolved"] is True + assert d["details"]["free_gb"] == 3.5 + + +def test_health_report_to_dict(): + checks = [ + CheckResult(name="disk", level=HealthLevel.OK, message="ok"), + ] + report = HealthReport( + timestamp="2026-01-01T00:00:00+00:00", + checks=checks, + overall=HealthLevel.OK, + ) + d = report.to_dict() + assert d["overall"] == "ok" + assert d["has_issues"] is False + assert len(d["checks"]) == 1