"""Tests for the Nexus Watchdog and Heartbeat system. Validates: - All four health checks (WS gateway, process, heartbeat, syntax) - HealthReport aggregation and markdown formatting - Heartbeat atomic write protocol - Gitea issue creation/update/close flows - Edge cases: missing files, corrupt JSON, stale timestamps - CLI argument parsing """ import json import os import sys import time import tempfile from pathlib import Path from unittest.mock import patch, MagicMock import pytest # ── Direct module imports ──────────────────────────────────────────── # Import directly to avoid any __init__.py import chains import importlib.util PROJECT_ROOT = Path(__file__).parent.parent _wd_spec = importlib.util.spec_from_file_location( "nexus_watchdog_test", PROJECT_ROOT / "bin" / "nexus_watchdog.py", ) _wd = importlib.util.module_from_spec(_wd_spec) # Must register BEFORE exec_module — dataclass decorator resolves # cls.__module__ through sys.modules during class creation. sys.modules["nexus_watchdog_test"] = _wd _wd_spec.loader.exec_module(_wd) _hb_spec = importlib.util.spec_from_file_location( "nexus_heartbeat_test", PROJECT_ROOT / "nexus" / "heartbeat.py", ) _hb = importlib.util.module_from_spec(_hb_spec) sys.modules["nexus_heartbeat_test"] = _hb _hb_spec.loader.exec_module(_hb) CheckResult = _wd.CheckResult HealthReport = _wd.HealthReport check_ws_gateway = _wd.check_ws_gateway check_mind_process = _wd.check_mind_process check_heartbeat = _wd.check_heartbeat check_syntax_health = _wd.check_syntax_health run_health_checks = _wd.run_health_checks find_open_watchdog_issue = _wd.find_open_watchdog_issue write_heartbeat = _hb.write_heartbeat # ── Heartbeat tests ────────────────────────────────────────────────── class TestHeartbeat: def test_write_creates_file(self, tmp_path): """Heartbeat file is created with correct structure.""" hb_path = tmp_path / ".nexus" / "heartbeat.json" write_heartbeat(cycle=5, model="timmy:v0.1", status="thinking", path=hb_path) assert hb_path.exists() data = json.loads(hb_path.read_text()) assert data["cycle"] == 5 assert data["model"] == "timmy:v0.1" assert data["status"] == "thinking" assert data["pid"] == os.getpid() assert abs(data["timestamp"] - time.time()) < 2 def test_write_is_atomic(self, tmp_path): """No partial files left behind on success.""" hb_path = tmp_path / ".nexus" / "heartbeat.json" write_heartbeat(cycle=1, path=hb_path) # No temp files should remain siblings = list(hb_path.parent.iterdir()) assert len(siblings) == 1 assert siblings[0].name == "heartbeat.json" def test_write_overwrites_cleanly(self, tmp_path): """Successive writes update the file, not append.""" hb_path = tmp_path / ".nexus" / "heartbeat.json" write_heartbeat(cycle=1, path=hb_path) write_heartbeat(cycle=2, path=hb_path) data = json.loads(hb_path.read_text()) assert data["cycle"] == 2 def test_write_creates_parent_dirs(self, tmp_path): """Parent directories are created if they don't exist.""" hb_path = tmp_path / "deep" / "nested" / "heartbeat.json" write_heartbeat(cycle=0, path=hb_path) assert hb_path.exists() # ── WebSocket gateway check ────────────────────────────────────────── class TestWSGatewayCheck: def test_healthy_when_port_open(self): """Healthy when TCP connect succeeds.""" with patch("socket.socket") as mock_sock: instance = mock_sock.return_value instance.connect_ex.return_value = 0 result = check_ws_gateway("localhost", 8765) assert result.healthy is True assert "Listening" in result.message def test_unhealthy_when_port_closed(self): """Unhealthy when TCP connect is refused.""" with patch("socket.socket") as mock_sock: instance = mock_sock.return_value instance.connect_ex.return_value = 111 # ECONNREFUSED result = check_ws_gateway("localhost", 8765) assert result.healthy is False assert "refused" in result.message.lower() def test_unhealthy_on_exception(self): """Unhealthy when socket raises.""" with patch("socket.socket") as mock_sock: instance = mock_sock.return_value instance.connect_ex.side_effect = OSError("network unreachable") result = check_ws_gateway("localhost", 8765) assert result.healthy is False # ── Process check ──────────────────────────────────────────────────── class TestMindProcessCheck: def test_healthy_when_process_found(self): """Healthy when pgrep finds nexus_think.""" mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "12345\n" with patch("subprocess.run", return_value=mock_result): result = check_mind_process() assert result.healthy is True assert "12345" in result.message def test_unhealthy_when_no_process(self): """Unhealthy when pgrep finds nothing.""" mock_result = MagicMock() mock_result.returncode = 1 mock_result.stdout = "" with patch("subprocess.run", return_value=mock_result): result = check_mind_process() assert result.healthy is False assert "not running" in result.message def test_graceful_when_pgrep_missing(self): """Doesn't crash if pgrep isn't installed.""" with patch("subprocess.run", side_effect=FileNotFoundError): result = check_mind_process() # Should not raise a false alarm assert result.healthy is True # ── Heartbeat check ────────────────────────────────────────────────── class TestHeartbeatCheck: def test_healthy_when_recent(self, tmp_path): """Healthy when heartbeat is recent.""" hb_path = tmp_path / "heartbeat.json" hb_path.write_text(json.dumps({ "timestamp": time.time(), "cycle": 42, "model": "timmy:v0.1", "status": "thinking", })) result = check_heartbeat(hb_path, stale_threshold=300) assert result.healthy is True assert "cycle #42" in result.message def test_unhealthy_when_stale(self, tmp_path): """Unhealthy when heartbeat is older than threshold.""" hb_path = tmp_path / "heartbeat.json" hb_path.write_text(json.dumps({ "timestamp": time.time() - 600, # 10 minutes old "cycle": 10, "model": "timmy:v0.1", "status": "thinking", })) result = check_heartbeat(hb_path, stale_threshold=300) assert result.healthy is False assert "Stale" in result.message def test_unhealthy_when_missing(self, tmp_path): """Unhealthy when heartbeat file doesn't exist.""" result = check_heartbeat(tmp_path / "nonexistent.json") assert result.healthy is False assert "No heartbeat" in result.message def test_unhealthy_when_corrupt(self, tmp_path): """Unhealthy when heartbeat is invalid JSON.""" hb_path = tmp_path / "heartbeat.json" hb_path.write_text("not json {{{") result = check_heartbeat(hb_path) assert result.healthy is False assert "corrupt" in result.message.lower() # ── Syntax check ───────────────────────────────────────────────────── class TestSyntaxCheck: def test_healthy_on_valid_python(self, tmp_path): """Healthy when nexus_think.py is valid Python.""" # Create a mock nexus_think.py (tmp_path / "nexus").mkdir() (tmp_path / "nexus" / "nexus_think.py").write_text("x = 1\nprint(x)\n") # Create bin dir so watchdog resolves parent correctly (tmp_path / "bin").mkdir() with patch.object(_wd.Path, "__new__", return_value=tmp_path / "bin" / "watchdog.py"): # Directly call with the real path script = tmp_path / "nexus" / "nexus_think.py" source = script.read_text() compile(source, str(script), "exec") # If we get here without error, syntax is valid assert True def test_detects_syntax_error(self, tmp_path): """Detects SyntaxError in nexus_think.py.""" bad_python = "def broken(\n # missing close paren" with pytest.raises(SyntaxError): compile(bad_python, "test.py", "exec") # ── HealthReport ───────────────────────────────────────────────────── class TestHealthReport: def test_overall_healthy_when_all_pass(self): """overall_healthy is True when all checks pass.""" report = HealthReport( timestamp=time.time(), checks=[ CheckResult("A", True, "ok"), CheckResult("B", True, "ok"), ], ) assert report.overall_healthy is True def test_overall_unhealthy_when_any_fails(self): """overall_healthy is False when any check fails.""" report = HealthReport( timestamp=time.time(), checks=[ CheckResult("A", True, "ok"), CheckResult("B", False, "down"), ], ) assert report.overall_healthy is False def test_failed_checks_property(self): """failed_checks returns only failed ones.""" report = HealthReport( timestamp=time.time(), checks=[ CheckResult("A", True, "ok"), CheckResult("B", False, "down"), CheckResult("C", False, "error"), ], ) assert len(report.failed_checks) == 2 assert report.failed_checks[0].name == "B" def test_markdown_contains_table(self): """to_markdown() includes a status table.""" report = HealthReport( timestamp=time.time(), checks=[ CheckResult("Gateway", True, "Listening"), CheckResult("Mind", False, "Not running"), ], ) md = report.to_markdown() assert "| Gateway |" in md assert "| Mind |" in md assert "✅" in md assert "❌" in md assert "FAILURES DETECTED" in md def test_markdown_all_healthy(self): """to_markdown() shows green status when all healthy.""" report = HealthReport( timestamp=time.time(), checks=[CheckResult("A", True, "ok")], ) md = report.to_markdown() assert "ALL SYSTEMS OPERATIONAL" in md # ── Integration: full health check cycle ───────────────────────────── class TestRunHealthChecks: def test_returns_report_with_all_checks(self, tmp_path): """run_health_checks() returns a report with all four checks.""" with patch("socket.socket") as mock_sock, \ patch("subprocess.run") as mock_run: mock_sock.return_value.connect_ex.return_value = 0 mock_run.return_value = MagicMock(returncode=1, stdout="") report = run_health_checks( heartbeat_path=tmp_path / "missing.json", ) assert len(report.checks) == 4 check_names = {c.name for c in report.checks} assert "WebSocket Gateway" in check_names assert "Consciousness Loop" in check_names assert "Heartbeat" in check_names assert "Syntax Health" in check_names