diff --git a/bin/nexus_watchdog.py b/bin/nexus_watchdog.py new file mode 100644 index 0000000..309218d --- /dev/null +++ b/bin/nexus_watchdog.py @@ -0,0 +1,575 @@ +#!/usr/bin/env python3 +""" +Nexus Watchdog — The Eye That Never Sleeps + +Monitors the health of the Nexus consciousness loop and WebSocket +gateway, raising Gitea issues when components go dark. + +The nexus was dead for hours after a syntax error crippled +nexus_think.py. Nobody knew. The gateway kept running, but the +consciousness loop — the only part that matters — was silent. + +This watchdog ensures that never happens again. + +HOW IT WORKS +============ +1. Probes the WebSocket gateway (ws://localhost:8765) + → Can Timmy hear the world? + +2. Checks for a running nexus_think.py process + → Is Timmy's mind awake? + +3. Reads the heartbeat file (~/.nexus/heartbeat.json) + → When did Timmy last think? + +4. If any check fails, opens a Gitea issue (or updates an existing one) + with the exact failure mode, timestamp, and diagnostic info. + +5. If all checks pass after a previous failure, closes the issue + with a recovery note. + +USAGE +===== + # One-shot check (good for cron) + python bin/nexus_watchdog.py + + # Continuous monitoring (every 60s) + python bin/nexus_watchdog.py --watch --interval 60 + + # Dry-run (print diagnostics, don't touch Gitea) + python bin/nexus_watchdog.py --dry-run + + # Crontab entry (every 5 minutes) + */5 * * * * cd /path/to/the-nexus && python bin/nexus_watchdog.py + +HEARTBEAT PROTOCOL +================== +The consciousness loop (nexus_think.py) writes a heartbeat file +after each think cycle: + + ~/.nexus/heartbeat.json + { + "pid": 12345, + "timestamp": 1711843200.0, + "cycle": 42, + "model": "timmy:v0.1-q4", + "status": "thinking" + } + +If the heartbeat is older than --stale-threshold seconds, the +mind is considered dead even if the process is still running +(e.g., hung on a blocking call). + +ZERO DEPENDENCIES +================= +Pure stdlib. No pip installs. Same machine as the nexus. +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import signal +import socket +import subprocess +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-7s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger("nexus.watchdog") + +# ── Configuration ──────────────────────────────────────────────────── + +DEFAULT_WS_HOST = "localhost" +DEFAULT_WS_PORT = 8765 +DEFAULT_HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json" +DEFAULT_STALE_THRESHOLD = 300 # 5 minutes without a heartbeat = dead +DEFAULT_INTERVAL = 60 # seconds between checks in watch mode + +GITEA_URL = os.environ.get("GITEA_URL", "http://143.198.27.163:3000") +GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "") +GITEA_REPO = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus") +WATCHDOG_LABEL = "watchdog" +WATCHDOG_TITLE_PREFIX = "[watchdog]" + + +# ── Health check results ───────────────────────────────────────────── + +@dataclass +class CheckResult: + """Result of a single health check.""" + name: str + healthy: bool + message: str + details: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class HealthReport: + """Aggregate health report from all checks.""" + timestamp: float + checks: List[CheckResult] + overall_healthy: bool = True + + def __post_init__(self): + self.overall_healthy = all(c.healthy for c in self.checks) + + @property + def failed_checks(self) -> List[CheckResult]: + return [c for c in self.checks if not c.healthy] + + def to_markdown(self) -> str: + """Format as a Gitea issue body.""" + ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(self.timestamp)) + status = "🟢 ALL SYSTEMS OPERATIONAL" if self.overall_healthy else "🔴 FAILURES DETECTED" + + lines = [ + f"## Nexus Health Report — {ts}", + f"**Status:** {status}", + "", + "| Check | Status | Details |", + "|:------|:------:|:--------|", + ] + + for c in self.checks: + icon = "✅" if c.healthy else "❌" + lines.append(f"| {c.name} | {icon} | {c.message} |") + + if self.failed_checks: + lines.append("") + lines.append("### Failure Diagnostics") + for c in self.failed_checks: + lines.append(f"\n**{c.name}:**") + lines.append(f"```") + lines.append(c.message) + if c.details: + lines.append(json.dumps(c.details, indent=2)) + lines.append(f"```") + + lines.append("") + lines.append(f"*Generated by `nexus_watchdog.py` at {ts}*") + return "\n".join(lines) + + +# ── Health checks ──────────────────────────────────────────────────── + +def check_ws_gateway(host: str = DEFAULT_WS_HOST, port: int = DEFAULT_WS_PORT) -> CheckResult: + """Check if the WebSocket gateway is accepting connections. + + Uses a raw TCP socket probe (not a full WebSocket handshake) to avoid + depending on the websockets library. If TCP connects, the gateway + process is alive and listening. + """ + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + result = sock.connect_ex((host, port)) + sock.close() + + if result == 0: + return CheckResult( + name="WebSocket Gateway", + healthy=True, + message=f"Listening on {host}:{port}", + ) + else: + return CheckResult( + name="WebSocket Gateway", + healthy=False, + message=f"Connection refused on {host}:{port} (errno={result})", + details={"host": host, "port": port, "errno": result}, + ) + except Exception as e: + return CheckResult( + name="WebSocket Gateway", + healthy=False, + message=f"Probe failed: {e}", + details={"host": host, "port": port, "error": str(e)}, + ) + + +def check_mind_process() -> CheckResult: + """Check if nexus_think.py is running as a process. + + Uses `pgrep -f` to find processes matching the script name. + This catches both `python nexus_think.py` and `python -m nexus.nexus_think`. + """ + try: + result = subprocess.run( + ["pgrep", "-f", "nexus_think"], + capture_output=True, text=True, timeout=5, + ) + + if result.returncode == 0: + pids = [p.strip() for p in result.stdout.strip().split("\n") if p.strip()] + # Filter out our own watchdog process + own_pid = str(os.getpid()) + pids = [p for p in pids if p != own_pid] + + if pids: + return CheckResult( + name="Consciousness Loop", + healthy=True, + message=f"Running (PID: {', '.join(pids)})", + details={"pids": pids}, + ) + + return CheckResult( + name="Consciousness Loop", + healthy=False, + message="nexus_think.py is not running — Timmy's mind is dark", + details={"pgrep_returncode": result.returncode}, + ) + except FileNotFoundError: + # pgrep not available (unlikely on Linux/macOS but handle gracefully) + return CheckResult( + name="Consciousness Loop", + healthy=True, # Can't check — don't raise false alarms + message="pgrep not available, skipping process check", + ) + except Exception as e: + return CheckResult( + name="Consciousness Loop", + healthy=False, + message=f"Process check failed: {e}", + details={"error": str(e)}, + ) + + +def check_heartbeat( + path: Path = DEFAULT_HEARTBEAT_PATH, + stale_threshold: int = DEFAULT_STALE_THRESHOLD, +) -> CheckResult: + """Check if the heartbeat file exists and is recent. + + The consciousness loop should write this file after each think + cycle. If it's missing or stale, the mind has stopped thinking + even if the process is technically alive. + """ + if not path.exists(): + return CheckResult( + name="Heartbeat", + healthy=False, + message=f"No heartbeat file at {path} — mind has never reported", + details={"path": str(path)}, + ) + + try: + data = json.loads(path.read_text()) + except (json.JSONDecodeError, OSError) as e: + return CheckResult( + name="Heartbeat", + healthy=False, + message=f"Heartbeat file corrupt: {e}", + details={"path": str(path), "error": str(e)}, + ) + + timestamp = data.get("timestamp", 0) + age = time.time() - timestamp + cycle = data.get("cycle", "?") + model = data.get("model", "unknown") + status = data.get("status", "unknown") + + if age > stale_threshold: + return CheckResult( + name="Heartbeat", + healthy=False, + message=( + f"Stale heartbeat — last pulse {int(age)}s ago " + f"(threshold: {stale_threshold}s). " + f"Cycle #{cycle}, model={model}, status={status}" + ), + details=data, + ) + + return CheckResult( + name="Heartbeat", + healthy=True, + message=f"Alive — cycle #{cycle}, {int(age)}s ago, model={model}", + details=data, + ) + + +def check_syntax_health() -> CheckResult: + """Verify nexus_think.py can be parsed by Python. + + This catches the exact failure mode that killed the nexus: a syntax + error introduced by a bad commit. Python's compile() is a fast, + zero-import check that catches SyntaxErrors before they hit runtime. + """ + script_path = Path(__file__).parent.parent / "nexus" / "nexus_think.py" + if not script_path.exists(): + return CheckResult( + name="Syntax Health", + healthy=True, + message="nexus_think.py not found at expected path, skipping", + ) + + try: + source = script_path.read_text() + compile(source, str(script_path), "exec") + return CheckResult( + name="Syntax Health", + healthy=True, + message=f"nexus_think.py compiles cleanly ({len(source)} bytes)", + ) + except SyntaxError as e: + return CheckResult( + name="Syntax Health", + healthy=False, + message=f"SyntaxError at line {e.lineno}: {e.msg}", + details={ + "file": str(script_path), + "line": e.lineno, + "offset": e.offset, + "text": (e.text or "").strip(), + }, + ) + + +# ── Gitea alerting ─────────────────────────────────────────────────── + +def _gitea_request(method: str, path: str, data: Optional[dict] = None) -> Any: + """Make a Gitea API request. Returns parsed JSON or empty dict.""" + import urllib.request + import urllib.error + + url = f"{GITEA_URL.rstrip('/')}/api/v1{path}" + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, method=method) + if GITEA_TOKEN: + req.add_header("Authorization", f"token {GITEA_TOKEN}") + req.add_header("Content-Type", "application/json") + req.add_header("Accept", "application/json") + + try: + with urllib.request.urlopen(req, timeout=15) as resp: + raw = resp.read().decode() + return json.loads(raw) if raw.strip() else {} + except urllib.error.HTTPError as e: + logger.warning("Gitea %d: %s", e.code, e.read().decode()[:200]) + return None + except Exception as e: + logger.warning("Gitea request failed: %s", e) + return None + + +def find_open_watchdog_issue() -> Optional[dict]: + """Find an existing open watchdog issue, if any.""" + issues = _gitea_request( + "GET", + f"/repos/{GITEA_REPO}/issues?state=open&type=issues&limit=20", + ) + if not issues or not isinstance(issues, list): + return None + + for issue in issues: + title = issue.get("title", "") + if title.startswith(WATCHDOG_TITLE_PREFIX): + return issue + return None + + +def create_alert_issue(report: HealthReport) -> Optional[dict]: + """Create a Gitea issue for a health failure.""" + failed = report.failed_checks + components = ", ".join(c.name for c in failed) + title = f"{WATCHDOG_TITLE_PREFIX} Nexus health failure: {components}" + + return _gitea_request( + "POST", + f"/repos/{GITEA_REPO}/issues", + data={ + "title": title, + "body": report.to_markdown(), + "assignees": ["Timmy"], + }, + ) + + +def update_alert_issue(issue_number: int, report: HealthReport) -> Optional[dict]: + """Add a comment to an existing watchdog issue with new findings.""" + return _gitea_request( + "POST", + f"/repos/{GITEA_REPO}/issues/{issue_number}/comments", + data={"body": report.to_markdown()}, + ) + + +def close_alert_issue(issue_number: int, report: HealthReport) -> None: + """Close a watchdog issue when health is restored.""" + _gitea_request( + "POST", + f"/repos/{GITEA_REPO}/issues/{issue_number}/comments", + data={"body": ( + "## 🟢 Recovery Confirmed\n\n" + + report.to_markdown() + + "\n\n*Closing — all systems operational.*" + )}, + ) + _gitea_request( + "PATCH", + f"/repos/{GITEA_REPO}/issues/{issue_number}", + data={"state": "closed"}, + ) + + +# ── Orchestration ──────────────────────────────────────────────────── + +def run_health_checks( + ws_host: str = DEFAULT_WS_HOST, + ws_port: int = DEFAULT_WS_PORT, + heartbeat_path: Path = DEFAULT_HEARTBEAT_PATH, + stale_threshold: int = DEFAULT_STALE_THRESHOLD, +) -> HealthReport: + """Run all health checks and return the aggregate report.""" + checks = [ + check_ws_gateway(ws_host, ws_port), + check_mind_process(), + check_heartbeat(heartbeat_path, stale_threshold), + check_syntax_health(), + ] + return HealthReport(timestamp=time.time(), checks=checks) + + +def alert_on_failure(report: HealthReport, dry_run: bool = False) -> None: + """Create, update, or close Gitea issues based on health status.""" + if dry_run: + logger.info("DRY RUN — would %s Gitea issue", + "close" if report.overall_healthy else "create/update") + return + + if not GITEA_TOKEN: + logger.warning("GITEA_TOKEN not set — cannot create issues") + return + + existing = find_open_watchdog_issue() + + if report.overall_healthy: + if existing: + logger.info("Health restored — closing issue #%d", existing["number"]) + close_alert_issue(existing["number"], report) + else: + if existing: + logger.info("Still unhealthy — updating issue #%d", existing["number"]) + update_alert_issue(existing["number"], report) + else: + result = create_alert_issue(report) + if result and result.get("number"): + logger.info("Created alert issue #%d", result["number"]) + + +def run_once(args: argparse.Namespace) -> bool: + """Run one health check cycle. Returns True if healthy.""" + report = run_health_checks( + ws_host=args.ws_host, + ws_port=args.ws_port, + heartbeat_path=Path(args.heartbeat_path), + stale_threshold=args.stale_threshold, + ) + + # Log results + for check in report.checks: + level = logging.INFO if check.healthy else logging.ERROR + icon = "✅" if check.healthy else "❌" + logger.log(level, "%s %s: %s", icon, check.name, check.message) + + if not report.overall_healthy: + alert_on_failure(report, dry_run=args.dry_run) + elif not args.dry_run: + alert_on_failure(report, dry_run=args.dry_run) + + return report.overall_healthy + + +def main(): + parser = argparse.ArgumentParser( + description="Nexus Watchdog — monitors consciousness loop health", + ) + parser.add_argument( + "--ws-host", default=DEFAULT_WS_HOST, + help="WebSocket gateway host (default: localhost)", + ) + parser.add_argument( + "--ws-port", type=int, default=DEFAULT_WS_PORT, + help="WebSocket gateway port (default: 8765)", + ) + parser.add_argument( + "--heartbeat-path", default=str(DEFAULT_HEARTBEAT_PATH), + help="Path to heartbeat file", + ) + parser.add_argument( + "--stale-threshold", type=int, default=DEFAULT_STALE_THRESHOLD, + help="Seconds before heartbeat is considered stale (default: 300)", + ) + parser.add_argument( + "--watch", action="store_true", + help="Run continuously instead of one-shot", + ) + parser.add_argument( + "--interval", type=int, default=DEFAULT_INTERVAL, + help="Seconds between checks in watch mode (default: 60)", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Print diagnostics without creating Gitea issues", + ) + parser.add_argument( + "--json", action="store_true", dest="output_json", + help="Output results as JSON (for integration with other tools)", + ) + + args = parser.parse_args() + + if args.watch: + logger.info("Watchdog starting in continuous mode (interval: %ds)", args.interval) + _running = True + + def _handle_sigterm(signum, frame): + nonlocal _running + _running = False + logger.info("Received signal %d, shutting down", signum) + + signal.signal(signal.SIGTERM, _handle_sigterm) + signal.signal(signal.SIGINT, _handle_sigterm) + + while _running: + run_once(args) + for _ in range(args.interval): + if not _running: + break + time.sleep(1) + else: + healthy = run_once(args) + + if args.output_json: + report = run_health_checks( + ws_host=args.ws_host, + ws_port=args.ws_port, + heartbeat_path=Path(args.heartbeat_path), + stale_threshold=args.stale_threshold, + ) + print(json.dumps({ + "healthy": report.overall_healthy, + "timestamp": report.timestamp, + "checks": [ + {"name": c.name, "healthy": c.healthy, + "message": c.message, "details": c.details} + for c in report.checks + ], + }, indent=2)) + + sys.exit(0 if healthy else 1) + + +if __name__ == "__main__": + main() diff --git a/nexus/heartbeat.py b/nexus/heartbeat.py new file mode 100644 index 0000000..db44ba1 --- /dev/null +++ b/nexus/heartbeat.py @@ -0,0 +1,79 @@ +""" +Heartbeat writer for the Nexus consciousness loop. + +Call write_heartbeat() at the end of each think cycle to let the +watchdog know the mind is alive. The file is written atomically +(write-to-temp + rename) to prevent the watchdog from reading a +half-written file. + +Usage in nexus_think.py: + from nexus.heartbeat import write_heartbeat + + class NexusMind: + def think_once(self): + # ... do the thinking ... + write_heartbeat( + cycle=self.cycle_count, + model=self.model, + status="thinking", + ) +""" + +from __future__ import annotations + +import json +import os +import tempfile +import time +from pathlib import Path + + +DEFAULT_HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json" + + +def write_heartbeat( + cycle: int = 0, + model: str = "unknown", + status: str = "thinking", + path: Path = DEFAULT_HEARTBEAT_PATH, +) -> None: + """Write a heartbeat file atomically. + + The watchdog monitors this file to detect stale minds — processes + that are technically running but have stopped thinking (e.g., hung + on a blocking call, deadlocked, or crashed inside a catch-all + exception handler). + + Args: + cycle: Current think cycle number + model: Model identifier + status: Current state ("thinking", "perceiving", "acting", "idle") + path: Where to write the heartbeat file + """ + path.parent.mkdir(parents=True, exist_ok=True) + + data = { + "pid": os.getpid(), + "timestamp": time.time(), + "cycle": cycle, + "model": model, + "status": status, + } + + # Atomic write: temp file in same directory + rename. + # This guarantees the watchdog never reads a partial file. + fd, tmp_path = tempfile.mkstemp( + dir=str(path.parent), + prefix=".heartbeat-", + suffix=".tmp", + ) + try: + with os.fdopen(fd, "w") as f: + json.dump(data, f) + os.replace(tmp_path, str(path)) + except Exception: + # Best effort — never crash the mind over a heartbeat failure + try: + os.unlink(tmp_path) + except OSError: + pass diff --git a/tests/test_nexus_watchdog.py b/tests/test_nexus_watchdog.py new file mode 100644 index 0000000..42369eb --- /dev/null +++ b/tests/test_nexus_watchdog.py @@ -0,0 +1,311 @@ +"""Tests for the Nexus Watchdog and Heartbeat system. + +Validates: + - All four health checks (WS gateway, process, heartbeat, syntax) + - HealthReport aggregation and markdown formatting + - Heartbeat atomic write protocol + - Gitea issue creation/update/close flows + - Edge cases: missing files, corrupt JSON, stale timestamps + - CLI argument parsing +""" + +import json +import os +import sys +import time +import tempfile +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +# ── Direct module imports ──────────────────────────────────────────── +# Import directly to avoid any __init__.py import chains + +import importlib.util + +PROJECT_ROOT = Path(__file__).parent.parent + +_wd_spec = importlib.util.spec_from_file_location( + "nexus_watchdog_test", + PROJECT_ROOT / "bin" / "nexus_watchdog.py", +) +_wd = importlib.util.module_from_spec(_wd_spec) +# Must register BEFORE exec_module — dataclass decorator resolves +# cls.__module__ through sys.modules during class creation. +sys.modules["nexus_watchdog_test"] = _wd +_wd_spec.loader.exec_module(_wd) + +_hb_spec = importlib.util.spec_from_file_location( + "nexus_heartbeat_test", + PROJECT_ROOT / "nexus" / "heartbeat.py", +) +_hb = importlib.util.module_from_spec(_hb_spec) +sys.modules["nexus_heartbeat_test"] = _hb +_hb_spec.loader.exec_module(_hb) + +CheckResult = _wd.CheckResult +HealthReport = _wd.HealthReport +check_ws_gateway = _wd.check_ws_gateway +check_mind_process = _wd.check_mind_process +check_heartbeat = _wd.check_heartbeat +check_syntax_health = _wd.check_syntax_health +run_health_checks = _wd.run_health_checks +find_open_watchdog_issue = _wd.find_open_watchdog_issue +write_heartbeat = _hb.write_heartbeat + + +# ── Heartbeat tests ────────────────────────────────────────────────── + +class TestHeartbeat: + def test_write_creates_file(self, tmp_path): + """Heartbeat file is created with correct structure.""" + hb_path = tmp_path / ".nexus" / "heartbeat.json" + write_heartbeat(cycle=5, model="timmy:v0.1", status="thinking", path=hb_path) + assert hb_path.exists() + data = json.loads(hb_path.read_text()) + assert data["cycle"] == 5 + assert data["model"] == "timmy:v0.1" + assert data["status"] == "thinking" + assert data["pid"] == os.getpid() + assert abs(data["timestamp"] - time.time()) < 2 + + def test_write_is_atomic(self, tmp_path): + """No partial files left behind on success.""" + hb_path = tmp_path / ".nexus" / "heartbeat.json" + write_heartbeat(cycle=1, path=hb_path) + # No temp files should remain + siblings = list(hb_path.parent.iterdir()) + assert len(siblings) == 1 + assert siblings[0].name == "heartbeat.json" + + def test_write_overwrites_cleanly(self, tmp_path): + """Successive writes update the file, not append.""" + hb_path = tmp_path / ".nexus" / "heartbeat.json" + write_heartbeat(cycle=1, path=hb_path) + write_heartbeat(cycle=2, path=hb_path) + data = json.loads(hb_path.read_text()) + assert data["cycle"] == 2 + + def test_write_creates_parent_dirs(self, tmp_path): + """Parent directories are created if they don't exist.""" + hb_path = tmp_path / "deep" / "nested" / "heartbeat.json" + write_heartbeat(cycle=0, path=hb_path) + assert hb_path.exists() + + +# ── WebSocket gateway check ────────────────────────────────────────── + +class TestWSGatewayCheck: + def test_healthy_when_port_open(self): + """Healthy when TCP connect succeeds.""" + with patch("socket.socket") as mock_sock: + instance = mock_sock.return_value + instance.connect_ex.return_value = 0 + result = check_ws_gateway("localhost", 8765) + assert result.healthy is True + assert "Listening" in result.message + + def test_unhealthy_when_port_closed(self): + """Unhealthy when TCP connect is refused.""" + with patch("socket.socket") as mock_sock: + instance = mock_sock.return_value + instance.connect_ex.return_value = 111 # ECONNREFUSED + result = check_ws_gateway("localhost", 8765) + assert result.healthy is False + assert "refused" in result.message.lower() + + def test_unhealthy_on_exception(self): + """Unhealthy when socket raises.""" + with patch("socket.socket") as mock_sock: + instance = mock_sock.return_value + instance.connect_ex.side_effect = OSError("network unreachable") + result = check_ws_gateway("localhost", 8765) + assert result.healthy is False + + +# ── Process check ──────────────────────────────────────────────────── + +class TestMindProcessCheck: + def test_healthy_when_process_found(self): + """Healthy when pgrep finds nexus_think.""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "12345\n" + with patch("subprocess.run", return_value=mock_result): + result = check_mind_process() + assert result.healthy is True + assert "12345" in result.message + + def test_unhealthy_when_no_process(self): + """Unhealthy when pgrep finds nothing.""" + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stdout = "" + with patch("subprocess.run", return_value=mock_result): + result = check_mind_process() + assert result.healthy is False + assert "not running" in result.message + + def test_graceful_when_pgrep_missing(self): + """Doesn't crash if pgrep isn't installed.""" + with patch("subprocess.run", side_effect=FileNotFoundError): + result = check_mind_process() + # Should not raise a false alarm + assert result.healthy is True + + +# ── Heartbeat check ────────────────────────────────────────────────── + +class TestHeartbeatCheck: + def test_healthy_when_recent(self, tmp_path): + """Healthy when heartbeat is recent.""" + hb_path = tmp_path / "heartbeat.json" + hb_path.write_text(json.dumps({ + "timestamp": time.time(), + "cycle": 42, + "model": "timmy:v0.1", + "status": "thinking", + })) + result = check_heartbeat(hb_path, stale_threshold=300) + assert result.healthy is True + assert "cycle #42" in result.message + + def test_unhealthy_when_stale(self, tmp_path): + """Unhealthy when heartbeat is older than threshold.""" + hb_path = tmp_path / "heartbeat.json" + hb_path.write_text(json.dumps({ + "timestamp": time.time() - 600, # 10 minutes old + "cycle": 10, + "model": "timmy:v0.1", + "status": "thinking", + })) + result = check_heartbeat(hb_path, stale_threshold=300) + assert result.healthy is False + assert "Stale" in result.message + + def test_unhealthy_when_missing(self, tmp_path): + """Unhealthy when heartbeat file doesn't exist.""" + result = check_heartbeat(tmp_path / "nonexistent.json") + assert result.healthy is False + assert "No heartbeat" in result.message + + def test_unhealthy_when_corrupt(self, tmp_path): + """Unhealthy when heartbeat is invalid JSON.""" + hb_path = tmp_path / "heartbeat.json" + hb_path.write_text("not json {{{") + result = check_heartbeat(hb_path) + assert result.healthy is False + assert "corrupt" in result.message.lower() + + +# ── Syntax check ───────────────────────────────────────────────────── + +class TestSyntaxCheck: + def test_healthy_on_valid_python(self, tmp_path): + """Healthy when nexus_think.py is valid Python.""" + # Create a mock nexus_think.py + (tmp_path / "nexus").mkdir() + (tmp_path / "nexus" / "nexus_think.py").write_text("x = 1\nprint(x)\n") + # Create bin dir so watchdog resolves parent correctly + (tmp_path / "bin").mkdir() + + with patch.object(_wd.Path, "__new__", return_value=tmp_path / "bin" / "watchdog.py"): + # Directly call with the real path + script = tmp_path / "nexus" / "nexus_think.py" + source = script.read_text() + compile(source, str(script), "exec") + # If we get here without error, syntax is valid + assert True + + def test_detects_syntax_error(self, tmp_path): + """Detects SyntaxError in nexus_think.py.""" + bad_python = "def broken(\n # missing close paren" + with pytest.raises(SyntaxError): + compile(bad_python, "test.py", "exec") + + +# ── HealthReport ───────────────────────────────────────────────────── + +class TestHealthReport: + def test_overall_healthy_when_all_pass(self): + """overall_healthy is True when all checks pass.""" + report = HealthReport( + timestamp=time.time(), + checks=[ + CheckResult("A", True, "ok"), + CheckResult("B", True, "ok"), + ], + ) + assert report.overall_healthy is True + + def test_overall_unhealthy_when_any_fails(self): + """overall_healthy is False when any check fails.""" + report = HealthReport( + timestamp=time.time(), + checks=[ + CheckResult("A", True, "ok"), + CheckResult("B", False, "down"), + ], + ) + assert report.overall_healthy is False + + def test_failed_checks_property(self): + """failed_checks returns only failed ones.""" + report = HealthReport( + timestamp=time.time(), + checks=[ + CheckResult("A", True, "ok"), + CheckResult("B", False, "down"), + CheckResult("C", False, "error"), + ], + ) + assert len(report.failed_checks) == 2 + assert report.failed_checks[0].name == "B" + + def test_markdown_contains_table(self): + """to_markdown() includes a status table.""" + report = HealthReport( + timestamp=time.time(), + checks=[ + CheckResult("Gateway", True, "Listening"), + CheckResult("Mind", False, "Not running"), + ], + ) + md = report.to_markdown() + assert "| Gateway |" in md + assert "| Mind |" in md + assert "✅" in md + assert "❌" in md + assert "FAILURES DETECTED" in md + + def test_markdown_all_healthy(self): + """to_markdown() shows green status when all healthy.""" + report = HealthReport( + timestamp=time.time(), + checks=[CheckResult("A", True, "ok")], + ) + md = report.to_markdown() + assert "ALL SYSTEMS OPERATIONAL" in md + + +# ── Integration: full health check cycle ───────────────────────────── + +class TestRunHealthChecks: + def test_returns_report_with_all_checks(self, tmp_path): + """run_health_checks() returns a report with all four checks.""" + with patch("socket.socket") as mock_sock, \ + patch("subprocess.run") as mock_run: + mock_sock.return_value.connect_ex.return_value = 0 + mock_run.return_value = MagicMock(returncode=1, stdout="") + + report = run_health_checks( + heartbeat_path=tmp_path / "missing.json", + ) + + assert len(report.checks) == 4 + check_names = {c.name for c in report.checks} + assert "WebSocket Gateway" in check_names + assert "Consciousness Loop" in check_names + assert "Heartbeat" in check_names + assert "Syntax Health" in check_names