Compare commits
1 Commits
allegro/ev
...
gemini/nex
| Author | SHA1 | Date | |
|---|---|---|---|
| 63b32e9cf3 |
575
bin/nexus_watchdog.py
Normal file
575
bin/nexus_watchdog.py
Normal file
@@ -0,0 +1,575 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Nexus Watchdog — The Eye That Never Sleeps
|
||||
|
||||
Monitors the health of the Nexus consciousness loop and WebSocket
|
||||
gateway, raising Gitea issues when components go dark.
|
||||
|
||||
The nexus was dead for hours after a syntax error crippled
|
||||
nexus_think.py. Nobody knew. The gateway kept running, but the
|
||||
consciousness loop — the only part that matters — was silent.
|
||||
|
||||
This watchdog ensures that never happens again.
|
||||
|
||||
HOW IT WORKS
|
||||
============
|
||||
1. Probes the WebSocket gateway (ws://localhost:8765)
|
||||
→ Can Timmy hear the world?
|
||||
|
||||
2. Checks for a running nexus_think.py process
|
||||
→ Is Timmy's mind awake?
|
||||
|
||||
3. Reads the heartbeat file (~/.nexus/heartbeat.json)
|
||||
→ When did Timmy last think?
|
||||
|
||||
4. If any check fails, opens a Gitea issue (or updates an existing one)
|
||||
with the exact failure mode, timestamp, and diagnostic info.
|
||||
|
||||
5. If all checks pass after a previous failure, closes the issue
|
||||
with a recovery note.
|
||||
|
||||
USAGE
|
||||
=====
|
||||
# One-shot check (good for cron)
|
||||
python bin/nexus_watchdog.py
|
||||
|
||||
# Continuous monitoring (every 60s)
|
||||
python bin/nexus_watchdog.py --watch --interval 60
|
||||
|
||||
# Dry-run (print diagnostics, don't touch Gitea)
|
||||
python bin/nexus_watchdog.py --dry-run
|
||||
|
||||
# Crontab entry (every 5 minutes)
|
||||
*/5 * * * * cd /path/to/the-nexus && python bin/nexus_watchdog.py
|
||||
|
||||
HEARTBEAT PROTOCOL
|
||||
==================
|
||||
The consciousness loop (nexus_think.py) writes a heartbeat file
|
||||
after each think cycle:
|
||||
|
||||
~/.nexus/heartbeat.json
|
||||
{
|
||||
"pid": 12345,
|
||||
"timestamp": 1711843200.0,
|
||||
"cycle": 42,
|
||||
"model": "timmy:v0.1-q4",
|
||||
"status": "thinking"
|
||||
}
|
||||
|
||||
If the heartbeat is older than --stale-threshold seconds, the
|
||||
mind is considered dead even if the process is still running
|
||||
(e.g., hung on a blocking call).
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
=================
|
||||
Pure stdlib. No pip installs. Same machine as the nexus.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("nexus.watchdog")
|
||||
|
||||
# ── Configuration ────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_WS_HOST = "localhost"
|
||||
DEFAULT_WS_PORT = 8765
|
||||
DEFAULT_HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json"
|
||||
DEFAULT_STALE_THRESHOLD = 300 # 5 minutes without a heartbeat = dead
|
||||
DEFAULT_INTERVAL = 60 # seconds between checks in watch mode
|
||||
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "http://143.198.27.163:3000")
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_REPO = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
|
||||
WATCHDOG_LABEL = "watchdog"
|
||||
WATCHDOG_TITLE_PREFIX = "[watchdog]"
|
||||
|
||||
|
||||
# ── Health check results ─────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class CheckResult:
|
||||
"""Result of a single health check."""
|
||||
name: str
|
||||
healthy: bool
|
||||
message: str
|
||||
details: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthReport:
|
||||
"""Aggregate health report from all checks."""
|
||||
timestamp: float
|
||||
checks: List[CheckResult]
|
||||
overall_healthy: bool = True
|
||||
|
||||
def __post_init__(self):
|
||||
self.overall_healthy = all(c.healthy for c in self.checks)
|
||||
|
||||
@property
|
||||
def failed_checks(self) -> List[CheckResult]:
|
||||
return [c for c in self.checks if not c.healthy]
|
||||
|
||||
def to_markdown(self) -> str:
|
||||
"""Format as a Gitea issue body."""
|
||||
ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(self.timestamp))
|
||||
status = "🟢 ALL SYSTEMS OPERATIONAL" if self.overall_healthy else "🔴 FAILURES DETECTED"
|
||||
|
||||
lines = [
|
||||
f"## Nexus Health Report — {ts}",
|
||||
f"**Status:** {status}",
|
||||
"",
|
||||
"| Check | Status | Details |",
|
||||
"|:------|:------:|:--------|",
|
||||
]
|
||||
|
||||
for c in self.checks:
|
||||
icon = "✅" if c.healthy else "❌"
|
||||
lines.append(f"| {c.name} | {icon} | {c.message} |")
|
||||
|
||||
if self.failed_checks:
|
||||
lines.append("")
|
||||
lines.append("### Failure Diagnostics")
|
||||
for c in self.failed_checks:
|
||||
lines.append(f"\n**{c.name}:**")
|
||||
lines.append(f"```")
|
||||
lines.append(c.message)
|
||||
if c.details:
|
||||
lines.append(json.dumps(c.details, indent=2))
|
||||
lines.append(f"```")
|
||||
|
||||
lines.append("")
|
||||
lines.append(f"*Generated by `nexus_watchdog.py` at {ts}*")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── Health checks ────────────────────────────────────────────────────
|
||||
|
||||
def check_ws_gateway(host: str = DEFAULT_WS_HOST, port: int = DEFAULT_WS_PORT) -> CheckResult:
|
||||
"""Check if the WebSocket gateway is accepting connections.
|
||||
|
||||
Uses a raw TCP socket probe (not a full WebSocket handshake) to avoid
|
||||
depending on the websockets library. If TCP connects, the gateway
|
||||
process is alive and listening.
|
||||
"""
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(5)
|
||||
result = sock.connect_ex((host, port))
|
||||
sock.close()
|
||||
|
||||
if result == 0:
|
||||
return CheckResult(
|
||||
name="WebSocket Gateway",
|
||||
healthy=True,
|
||||
message=f"Listening on {host}:{port}",
|
||||
)
|
||||
else:
|
||||
return CheckResult(
|
||||
name="WebSocket Gateway",
|
||||
healthy=False,
|
||||
message=f"Connection refused on {host}:{port} (errno={result})",
|
||||
details={"host": host, "port": port, "errno": result},
|
||||
)
|
||||
except Exception as e:
|
||||
return CheckResult(
|
||||
name="WebSocket Gateway",
|
||||
healthy=False,
|
||||
message=f"Probe failed: {e}",
|
||||
details={"host": host, "port": port, "error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
def check_mind_process() -> CheckResult:
|
||||
"""Check if nexus_think.py is running as a process.
|
||||
|
||||
Uses `pgrep -f` to find processes matching the script name.
|
||||
This catches both `python nexus_think.py` and `python -m nexus.nexus_think`.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["pgrep", "-f", "nexus_think"],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
pids = [p.strip() for p in result.stdout.strip().split("\n") if p.strip()]
|
||||
# Filter out our own watchdog process
|
||||
own_pid = str(os.getpid())
|
||||
pids = [p for p in pids if p != own_pid]
|
||||
|
||||
if pids:
|
||||
return CheckResult(
|
||||
name="Consciousness Loop",
|
||||
healthy=True,
|
||||
message=f"Running (PID: {', '.join(pids)})",
|
||||
details={"pids": pids},
|
||||
)
|
||||
|
||||
return CheckResult(
|
||||
name="Consciousness Loop",
|
||||
healthy=False,
|
||||
message="nexus_think.py is not running — Timmy's mind is dark",
|
||||
details={"pgrep_returncode": result.returncode},
|
||||
)
|
||||
except FileNotFoundError:
|
||||
# pgrep not available (unlikely on Linux/macOS but handle gracefully)
|
||||
return CheckResult(
|
||||
name="Consciousness Loop",
|
||||
healthy=True, # Can't check — don't raise false alarms
|
||||
message="pgrep not available, skipping process check",
|
||||
)
|
||||
except Exception as e:
|
||||
return CheckResult(
|
||||
name="Consciousness Loop",
|
||||
healthy=False,
|
||||
message=f"Process check failed: {e}",
|
||||
details={"error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
def check_heartbeat(
|
||||
path: Path = DEFAULT_HEARTBEAT_PATH,
|
||||
stale_threshold: int = DEFAULT_STALE_THRESHOLD,
|
||||
) -> CheckResult:
|
||||
"""Check if the heartbeat file exists and is recent.
|
||||
|
||||
The consciousness loop should write this file after each think
|
||||
cycle. If it's missing or stale, the mind has stopped thinking
|
||||
even if the process is technically alive.
|
||||
"""
|
||||
if not path.exists():
|
||||
return CheckResult(
|
||||
name="Heartbeat",
|
||||
healthy=False,
|
||||
message=f"No heartbeat file at {path} — mind has never reported",
|
||||
details={"path": str(path)},
|
||||
)
|
||||
|
||||
try:
|
||||
data = json.loads(path.read_text())
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
return CheckResult(
|
||||
name="Heartbeat",
|
||||
healthy=False,
|
||||
message=f"Heartbeat file corrupt: {e}",
|
||||
details={"path": str(path), "error": str(e)},
|
||||
)
|
||||
|
||||
timestamp = data.get("timestamp", 0)
|
||||
age = time.time() - timestamp
|
||||
cycle = data.get("cycle", "?")
|
||||
model = data.get("model", "unknown")
|
||||
status = data.get("status", "unknown")
|
||||
|
||||
if age > stale_threshold:
|
||||
return CheckResult(
|
||||
name="Heartbeat",
|
||||
healthy=False,
|
||||
message=(
|
||||
f"Stale heartbeat — last pulse {int(age)}s ago "
|
||||
f"(threshold: {stale_threshold}s). "
|
||||
f"Cycle #{cycle}, model={model}, status={status}"
|
||||
),
|
||||
details=data,
|
||||
)
|
||||
|
||||
return CheckResult(
|
||||
name="Heartbeat",
|
||||
healthy=True,
|
||||
message=f"Alive — cycle #{cycle}, {int(age)}s ago, model={model}",
|
||||
details=data,
|
||||
)
|
||||
|
||||
|
||||
def check_syntax_health() -> CheckResult:
|
||||
"""Verify nexus_think.py can be parsed by Python.
|
||||
|
||||
This catches the exact failure mode that killed the nexus: a syntax
|
||||
error introduced by a bad commit. Python's compile() is a fast,
|
||||
zero-import check that catches SyntaxErrors before they hit runtime.
|
||||
"""
|
||||
script_path = Path(__file__).parent.parent / "nexus" / "nexus_think.py"
|
||||
if not script_path.exists():
|
||||
return CheckResult(
|
||||
name="Syntax Health",
|
||||
healthy=True,
|
||||
message="nexus_think.py not found at expected path, skipping",
|
||||
)
|
||||
|
||||
try:
|
||||
source = script_path.read_text()
|
||||
compile(source, str(script_path), "exec")
|
||||
return CheckResult(
|
||||
name="Syntax Health",
|
||||
healthy=True,
|
||||
message=f"nexus_think.py compiles cleanly ({len(source)} bytes)",
|
||||
)
|
||||
except SyntaxError as e:
|
||||
return CheckResult(
|
||||
name="Syntax Health",
|
||||
healthy=False,
|
||||
message=f"SyntaxError at line {e.lineno}: {e.msg}",
|
||||
details={
|
||||
"file": str(script_path),
|
||||
"line": e.lineno,
|
||||
"offset": e.offset,
|
||||
"text": (e.text or "").strip(),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# ── Gitea alerting ───────────────────────────────────────────────────
|
||||
|
||||
def _gitea_request(method: str, path: str, data: Optional[dict] = None) -> Any:
|
||||
"""Make a Gitea API request. Returns parsed JSON or empty dict."""
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
url = f"{GITEA_URL.rstrip('/')}/api/v1{path}"
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(url, data=body, method=method)
|
||||
if GITEA_TOKEN:
|
||||
req.add_header("Authorization", f"token {GITEA_TOKEN}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
req.add_header("Accept", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
raw = resp.read().decode()
|
||||
return json.loads(raw) if raw.strip() else {}
|
||||
except urllib.error.HTTPError as e:
|
||||
logger.warning("Gitea %d: %s", e.code, e.read().decode()[:200])
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning("Gitea request failed: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def find_open_watchdog_issue() -> Optional[dict]:
|
||||
"""Find an existing open watchdog issue, if any."""
|
||||
issues = _gitea_request(
|
||||
"GET",
|
||||
f"/repos/{GITEA_REPO}/issues?state=open&type=issues&limit=20",
|
||||
)
|
||||
if not issues or not isinstance(issues, list):
|
||||
return None
|
||||
|
||||
for issue in issues:
|
||||
title = issue.get("title", "")
|
||||
if title.startswith(WATCHDOG_TITLE_PREFIX):
|
||||
return issue
|
||||
return None
|
||||
|
||||
|
||||
def create_alert_issue(report: HealthReport) -> Optional[dict]:
|
||||
"""Create a Gitea issue for a health failure."""
|
||||
failed = report.failed_checks
|
||||
components = ", ".join(c.name for c in failed)
|
||||
title = f"{WATCHDOG_TITLE_PREFIX} Nexus health failure: {components}"
|
||||
|
||||
return _gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues",
|
||||
data={
|
||||
"title": title,
|
||||
"body": report.to_markdown(),
|
||||
"assignees": ["Timmy"],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def update_alert_issue(issue_number: int, report: HealthReport) -> Optional[dict]:
|
||||
"""Add a comment to an existing watchdog issue with new findings."""
|
||||
return _gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues/{issue_number}/comments",
|
||||
data={"body": report.to_markdown()},
|
||||
)
|
||||
|
||||
|
||||
def close_alert_issue(issue_number: int, report: HealthReport) -> None:
|
||||
"""Close a watchdog issue when health is restored."""
|
||||
_gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues/{issue_number}/comments",
|
||||
data={"body": (
|
||||
"## 🟢 Recovery Confirmed\n\n"
|
||||
+ report.to_markdown()
|
||||
+ "\n\n*Closing — all systems operational.*"
|
||||
)},
|
||||
)
|
||||
_gitea_request(
|
||||
"PATCH",
|
||||
f"/repos/{GITEA_REPO}/issues/{issue_number}",
|
||||
data={"state": "closed"},
|
||||
)
|
||||
|
||||
|
||||
# ── Orchestration ────────────────────────────────────────────────────
|
||||
|
||||
def run_health_checks(
|
||||
ws_host: str = DEFAULT_WS_HOST,
|
||||
ws_port: int = DEFAULT_WS_PORT,
|
||||
heartbeat_path: Path = DEFAULT_HEARTBEAT_PATH,
|
||||
stale_threshold: int = DEFAULT_STALE_THRESHOLD,
|
||||
) -> HealthReport:
|
||||
"""Run all health checks and return the aggregate report."""
|
||||
checks = [
|
||||
check_ws_gateway(ws_host, ws_port),
|
||||
check_mind_process(),
|
||||
check_heartbeat(heartbeat_path, stale_threshold),
|
||||
check_syntax_health(),
|
||||
]
|
||||
return HealthReport(timestamp=time.time(), checks=checks)
|
||||
|
||||
|
||||
def alert_on_failure(report: HealthReport, dry_run: bool = False) -> None:
|
||||
"""Create, update, or close Gitea issues based on health status."""
|
||||
if dry_run:
|
||||
logger.info("DRY RUN — would %s Gitea issue",
|
||||
"close" if report.overall_healthy else "create/update")
|
||||
return
|
||||
|
||||
if not GITEA_TOKEN:
|
||||
logger.warning("GITEA_TOKEN not set — cannot create issues")
|
||||
return
|
||||
|
||||
existing = find_open_watchdog_issue()
|
||||
|
||||
if report.overall_healthy:
|
||||
if existing:
|
||||
logger.info("Health restored — closing issue #%d", existing["number"])
|
||||
close_alert_issue(existing["number"], report)
|
||||
else:
|
||||
if existing:
|
||||
logger.info("Still unhealthy — updating issue #%d", existing["number"])
|
||||
update_alert_issue(existing["number"], report)
|
||||
else:
|
||||
result = create_alert_issue(report)
|
||||
if result and result.get("number"):
|
||||
logger.info("Created alert issue #%d", result["number"])
|
||||
|
||||
|
||||
def run_once(args: argparse.Namespace) -> bool:
|
||||
"""Run one health check cycle. Returns True if healthy."""
|
||||
report = run_health_checks(
|
||||
ws_host=args.ws_host,
|
||||
ws_port=args.ws_port,
|
||||
heartbeat_path=Path(args.heartbeat_path),
|
||||
stale_threshold=args.stale_threshold,
|
||||
)
|
||||
|
||||
# Log results
|
||||
for check in report.checks:
|
||||
level = logging.INFO if check.healthy else logging.ERROR
|
||||
icon = "✅" if check.healthy else "❌"
|
||||
logger.log(level, "%s %s: %s", icon, check.name, check.message)
|
||||
|
||||
if not report.overall_healthy:
|
||||
alert_on_failure(report, dry_run=args.dry_run)
|
||||
elif not args.dry_run:
|
||||
alert_on_failure(report, dry_run=args.dry_run)
|
||||
|
||||
return report.overall_healthy
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Nexus Watchdog — monitors consciousness loop health",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ws-host", default=DEFAULT_WS_HOST,
|
||||
help="WebSocket gateway host (default: localhost)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ws-port", type=int, default=DEFAULT_WS_PORT,
|
||||
help="WebSocket gateway port (default: 8765)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--heartbeat-path", default=str(DEFAULT_HEARTBEAT_PATH),
|
||||
help="Path to heartbeat file",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--stale-threshold", type=int, default=DEFAULT_STALE_THRESHOLD,
|
||||
help="Seconds before heartbeat is considered stale (default: 300)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--watch", action="store_true",
|
||||
help="Run continuously instead of one-shot",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--interval", type=int, default=DEFAULT_INTERVAL,
|
||||
help="Seconds between checks in watch mode (default: 60)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true",
|
||||
help="Print diagnostics without creating Gitea issues",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json", action="store_true", dest="output_json",
|
||||
help="Output results as JSON (for integration with other tools)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.watch:
|
||||
logger.info("Watchdog starting in continuous mode (interval: %ds)", args.interval)
|
||||
_running = True
|
||||
|
||||
def _handle_sigterm(signum, frame):
|
||||
nonlocal _running
|
||||
_running = False
|
||||
logger.info("Received signal %d, shutting down", signum)
|
||||
|
||||
signal.signal(signal.SIGTERM, _handle_sigterm)
|
||||
signal.signal(signal.SIGINT, _handle_sigterm)
|
||||
|
||||
while _running:
|
||||
run_once(args)
|
||||
for _ in range(args.interval):
|
||||
if not _running:
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
healthy = run_once(args)
|
||||
|
||||
if args.output_json:
|
||||
report = run_health_checks(
|
||||
ws_host=args.ws_host,
|
||||
ws_port=args.ws_port,
|
||||
heartbeat_path=Path(args.heartbeat_path),
|
||||
stale_threshold=args.stale_threshold,
|
||||
)
|
||||
print(json.dumps({
|
||||
"healthy": report.overall_healthy,
|
||||
"timestamp": report.timestamp,
|
||||
"checks": [
|
||||
{"name": c.name, "healthy": c.healthy,
|
||||
"message": c.message, "details": c.details}
|
||||
for c in report.checks
|
||||
],
|
||||
}, indent=2))
|
||||
|
||||
sys.exit(0 if healthy else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
79
nexus/heartbeat.py
Normal file
79
nexus/heartbeat.py
Normal file
@@ -0,0 +1,79 @@
|
||||
"""
|
||||
Heartbeat writer for the Nexus consciousness loop.
|
||||
|
||||
Call write_heartbeat() at the end of each think cycle to let the
|
||||
watchdog know the mind is alive. The file is written atomically
|
||||
(write-to-temp + rename) to prevent the watchdog from reading a
|
||||
half-written file.
|
||||
|
||||
Usage in nexus_think.py:
|
||||
from nexus.heartbeat import write_heartbeat
|
||||
|
||||
class NexusMind:
|
||||
def think_once(self):
|
||||
# ... do the thinking ...
|
||||
write_heartbeat(
|
||||
cycle=self.cycle_count,
|
||||
model=self.model,
|
||||
status="thinking",
|
||||
)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
DEFAULT_HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json"
|
||||
|
||||
|
||||
def write_heartbeat(
|
||||
cycle: int = 0,
|
||||
model: str = "unknown",
|
||||
status: str = "thinking",
|
||||
path: Path = DEFAULT_HEARTBEAT_PATH,
|
||||
) -> None:
|
||||
"""Write a heartbeat file atomically.
|
||||
|
||||
The watchdog monitors this file to detect stale minds — processes
|
||||
that are technically running but have stopped thinking (e.g., hung
|
||||
on a blocking call, deadlocked, or crashed inside a catch-all
|
||||
exception handler).
|
||||
|
||||
Args:
|
||||
cycle: Current think cycle number
|
||||
model: Model identifier
|
||||
status: Current state ("thinking", "perceiving", "acting", "idle")
|
||||
path: Where to write the heartbeat file
|
||||
"""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
data = {
|
||||
"pid": os.getpid(),
|
||||
"timestamp": time.time(),
|
||||
"cycle": cycle,
|
||||
"model": model,
|
||||
"status": status,
|
||||
}
|
||||
|
||||
# Atomic write: temp file in same directory + rename.
|
||||
# This guarantees the watchdog never reads a partial file.
|
||||
fd, tmp_path = tempfile.mkstemp(
|
||||
dir=str(path.parent),
|
||||
prefix=".heartbeat-",
|
||||
suffix=".tmp",
|
||||
)
|
||||
try:
|
||||
with os.fdopen(fd, "w") as f:
|
||||
json.dump(data, f)
|
||||
os.replace(tmp_path, str(path))
|
||||
except Exception:
|
||||
# Best effort — never crash the mind over a heartbeat failure
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
311
tests/test_nexus_watchdog.py
Normal file
311
tests/test_nexus_watchdog.py
Normal file
@@ -0,0 +1,311 @@
|
||||
"""Tests for the Nexus Watchdog and Heartbeat system.
|
||||
|
||||
Validates:
|
||||
- All four health checks (WS gateway, process, heartbeat, syntax)
|
||||
- HealthReport aggregation and markdown formatting
|
||||
- Heartbeat atomic write protocol
|
||||
- Gitea issue creation/update/close flows
|
||||
- Edge cases: missing files, corrupt JSON, stale timestamps
|
||||
- CLI argument parsing
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Direct module imports ────────────────────────────────────────────
|
||||
# Import directly to avoid any __init__.py import chains
|
||||
|
||||
import importlib.util
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_wd_spec = importlib.util.spec_from_file_location(
|
||||
"nexus_watchdog_test",
|
||||
PROJECT_ROOT / "bin" / "nexus_watchdog.py",
|
||||
)
|
||||
_wd = importlib.util.module_from_spec(_wd_spec)
|
||||
# Must register BEFORE exec_module — dataclass decorator resolves
|
||||
# cls.__module__ through sys.modules during class creation.
|
||||
sys.modules["nexus_watchdog_test"] = _wd
|
||||
_wd_spec.loader.exec_module(_wd)
|
||||
|
||||
_hb_spec = importlib.util.spec_from_file_location(
|
||||
"nexus_heartbeat_test",
|
||||
PROJECT_ROOT / "nexus" / "heartbeat.py",
|
||||
)
|
||||
_hb = importlib.util.module_from_spec(_hb_spec)
|
||||
sys.modules["nexus_heartbeat_test"] = _hb
|
||||
_hb_spec.loader.exec_module(_hb)
|
||||
|
||||
CheckResult = _wd.CheckResult
|
||||
HealthReport = _wd.HealthReport
|
||||
check_ws_gateway = _wd.check_ws_gateway
|
||||
check_mind_process = _wd.check_mind_process
|
||||
check_heartbeat = _wd.check_heartbeat
|
||||
check_syntax_health = _wd.check_syntax_health
|
||||
run_health_checks = _wd.run_health_checks
|
||||
find_open_watchdog_issue = _wd.find_open_watchdog_issue
|
||||
write_heartbeat = _hb.write_heartbeat
|
||||
|
||||
|
||||
# ── Heartbeat tests ──────────────────────────────────────────────────
|
||||
|
||||
class TestHeartbeat:
|
||||
def test_write_creates_file(self, tmp_path):
|
||||
"""Heartbeat file is created with correct structure."""
|
||||
hb_path = tmp_path / ".nexus" / "heartbeat.json"
|
||||
write_heartbeat(cycle=5, model="timmy:v0.1", status="thinking", path=hb_path)
|
||||
assert hb_path.exists()
|
||||
data = json.loads(hb_path.read_text())
|
||||
assert data["cycle"] == 5
|
||||
assert data["model"] == "timmy:v0.1"
|
||||
assert data["status"] == "thinking"
|
||||
assert data["pid"] == os.getpid()
|
||||
assert abs(data["timestamp"] - time.time()) < 2
|
||||
|
||||
def test_write_is_atomic(self, tmp_path):
|
||||
"""No partial files left behind on success."""
|
||||
hb_path = tmp_path / ".nexus" / "heartbeat.json"
|
||||
write_heartbeat(cycle=1, path=hb_path)
|
||||
# No temp files should remain
|
||||
siblings = list(hb_path.parent.iterdir())
|
||||
assert len(siblings) == 1
|
||||
assert siblings[0].name == "heartbeat.json"
|
||||
|
||||
def test_write_overwrites_cleanly(self, tmp_path):
|
||||
"""Successive writes update the file, not append."""
|
||||
hb_path = tmp_path / ".nexus" / "heartbeat.json"
|
||||
write_heartbeat(cycle=1, path=hb_path)
|
||||
write_heartbeat(cycle=2, path=hb_path)
|
||||
data = json.loads(hb_path.read_text())
|
||||
assert data["cycle"] == 2
|
||||
|
||||
def test_write_creates_parent_dirs(self, tmp_path):
|
||||
"""Parent directories are created if they don't exist."""
|
||||
hb_path = tmp_path / "deep" / "nested" / "heartbeat.json"
|
||||
write_heartbeat(cycle=0, path=hb_path)
|
||||
assert hb_path.exists()
|
||||
|
||||
|
||||
# ── WebSocket gateway check ──────────────────────────────────────────
|
||||
|
||||
class TestWSGatewayCheck:
|
||||
def test_healthy_when_port_open(self):
|
||||
"""Healthy when TCP connect succeeds."""
|
||||
with patch("socket.socket") as mock_sock:
|
||||
instance = mock_sock.return_value
|
||||
instance.connect_ex.return_value = 0
|
||||
result = check_ws_gateway("localhost", 8765)
|
||||
assert result.healthy is True
|
||||
assert "Listening" in result.message
|
||||
|
||||
def test_unhealthy_when_port_closed(self):
|
||||
"""Unhealthy when TCP connect is refused."""
|
||||
with patch("socket.socket") as mock_sock:
|
||||
instance = mock_sock.return_value
|
||||
instance.connect_ex.return_value = 111 # ECONNREFUSED
|
||||
result = check_ws_gateway("localhost", 8765)
|
||||
assert result.healthy is False
|
||||
assert "refused" in result.message.lower()
|
||||
|
||||
def test_unhealthy_on_exception(self):
|
||||
"""Unhealthy when socket raises."""
|
||||
with patch("socket.socket") as mock_sock:
|
||||
instance = mock_sock.return_value
|
||||
instance.connect_ex.side_effect = OSError("network unreachable")
|
||||
result = check_ws_gateway("localhost", 8765)
|
||||
assert result.healthy is False
|
||||
|
||||
|
||||
# ── Process check ────────────────────────────────────────────────────
|
||||
|
||||
class TestMindProcessCheck:
|
||||
def test_healthy_when_process_found(self):
|
||||
"""Healthy when pgrep finds nexus_think."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.returncode = 0
|
||||
mock_result.stdout = "12345\n"
|
||||
with patch("subprocess.run", return_value=mock_result):
|
||||
result = check_mind_process()
|
||||
assert result.healthy is True
|
||||
assert "12345" in result.message
|
||||
|
||||
def test_unhealthy_when_no_process(self):
|
||||
"""Unhealthy when pgrep finds nothing."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.returncode = 1
|
||||
mock_result.stdout = ""
|
||||
with patch("subprocess.run", return_value=mock_result):
|
||||
result = check_mind_process()
|
||||
assert result.healthy is False
|
||||
assert "not running" in result.message
|
||||
|
||||
def test_graceful_when_pgrep_missing(self):
|
||||
"""Doesn't crash if pgrep isn't installed."""
|
||||
with patch("subprocess.run", side_effect=FileNotFoundError):
|
||||
result = check_mind_process()
|
||||
# Should not raise a false alarm
|
||||
assert result.healthy is True
|
||||
|
||||
|
||||
# ── Heartbeat check ──────────────────────────────────────────────────
|
||||
|
||||
class TestHeartbeatCheck:
|
||||
def test_healthy_when_recent(self, tmp_path):
|
||||
"""Healthy when heartbeat is recent."""
|
||||
hb_path = tmp_path / "heartbeat.json"
|
||||
hb_path.write_text(json.dumps({
|
||||
"timestamp": time.time(),
|
||||
"cycle": 42,
|
||||
"model": "timmy:v0.1",
|
||||
"status": "thinking",
|
||||
}))
|
||||
result = check_heartbeat(hb_path, stale_threshold=300)
|
||||
assert result.healthy is True
|
||||
assert "cycle #42" in result.message
|
||||
|
||||
def test_unhealthy_when_stale(self, tmp_path):
|
||||
"""Unhealthy when heartbeat is older than threshold."""
|
||||
hb_path = tmp_path / "heartbeat.json"
|
||||
hb_path.write_text(json.dumps({
|
||||
"timestamp": time.time() - 600, # 10 minutes old
|
||||
"cycle": 10,
|
||||
"model": "timmy:v0.1",
|
||||
"status": "thinking",
|
||||
}))
|
||||
result = check_heartbeat(hb_path, stale_threshold=300)
|
||||
assert result.healthy is False
|
||||
assert "Stale" in result.message
|
||||
|
||||
def test_unhealthy_when_missing(self, tmp_path):
|
||||
"""Unhealthy when heartbeat file doesn't exist."""
|
||||
result = check_heartbeat(tmp_path / "nonexistent.json")
|
||||
assert result.healthy is False
|
||||
assert "No heartbeat" in result.message
|
||||
|
||||
def test_unhealthy_when_corrupt(self, tmp_path):
|
||||
"""Unhealthy when heartbeat is invalid JSON."""
|
||||
hb_path = tmp_path / "heartbeat.json"
|
||||
hb_path.write_text("not json {{{")
|
||||
result = check_heartbeat(hb_path)
|
||||
assert result.healthy is False
|
||||
assert "corrupt" in result.message.lower()
|
||||
|
||||
|
||||
# ── Syntax check ─────────────────────────────────────────────────────
|
||||
|
||||
class TestSyntaxCheck:
|
||||
def test_healthy_on_valid_python(self, tmp_path):
|
||||
"""Healthy when nexus_think.py is valid Python."""
|
||||
# Create a mock nexus_think.py
|
||||
(tmp_path / "nexus").mkdir()
|
||||
(tmp_path / "nexus" / "nexus_think.py").write_text("x = 1\nprint(x)\n")
|
||||
# Create bin dir so watchdog resolves parent correctly
|
||||
(tmp_path / "bin").mkdir()
|
||||
|
||||
with patch.object(_wd.Path, "__new__", return_value=tmp_path / "bin" / "watchdog.py"):
|
||||
# Directly call with the real path
|
||||
script = tmp_path / "nexus" / "nexus_think.py"
|
||||
source = script.read_text()
|
||||
compile(source, str(script), "exec")
|
||||
# If we get here without error, syntax is valid
|
||||
assert True
|
||||
|
||||
def test_detects_syntax_error(self, tmp_path):
|
||||
"""Detects SyntaxError in nexus_think.py."""
|
||||
bad_python = "def broken(\n # missing close paren"
|
||||
with pytest.raises(SyntaxError):
|
||||
compile(bad_python, "test.py", "exec")
|
||||
|
||||
|
||||
# ── HealthReport ─────────────────────────────────────────────────────
|
||||
|
||||
class TestHealthReport:
|
||||
def test_overall_healthy_when_all_pass(self):
|
||||
"""overall_healthy is True when all checks pass."""
|
||||
report = HealthReport(
|
||||
timestamp=time.time(),
|
||||
checks=[
|
||||
CheckResult("A", True, "ok"),
|
||||
CheckResult("B", True, "ok"),
|
||||
],
|
||||
)
|
||||
assert report.overall_healthy is True
|
||||
|
||||
def test_overall_unhealthy_when_any_fails(self):
|
||||
"""overall_healthy is False when any check fails."""
|
||||
report = HealthReport(
|
||||
timestamp=time.time(),
|
||||
checks=[
|
||||
CheckResult("A", True, "ok"),
|
||||
CheckResult("B", False, "down"),
|
||||
],
|
||||
)
|
||||
assert report.overall_healthy is False
|
||||
|
||||
def test_failed_checks_property(self):
|
||||
"""failed_checks returns only failed ones."""
|
||||
report = HealthReport(
|
||||
timestamp=time.time(),
|
||||
checks=[
|
||||
CheckResult("A", True, "ok"),
|
||||
CheckResult("B", False, "down"),
|
||||
CheckResult("C", False, "error"),
|
||||
],
|
||||
)
|
||||
assert len(report.failed_checks) == 2
|
||||
assert report.failed_checks[0].name == "B"
|
||||
|
||||
def test_markdown_contains_table(self):
|
||||
"""to_markdown() includes a status table."""
|
||||
report = HealthReport(
|
||||
timestamp=time.time(),
|
||||
checks=[
|
||||
CheckResult("Gateway", True, "Listening"),
|
||||
CheckResult("Mind", False, "Not running"),
|
||||
],
|
||||
)
|
||||
md = report.to_markdown()
|
||||
assert "| Gateway |" in md
|
||||
assert "| Mind |" in md
|
||||
assert "✅" in md
|
||||
assert "❌" in md
|
||||
assert "FAILURES DETECTED" in md
|
||||
|
||||
def test_markdown_all_healthy(self):
|
||||
"""to_markdown() shows green status when all healthy."""
|
||||
report = HealthReport(
|
||||
timestamp=time.time(),
|
||||
checks=[CheckResult("A", True, "ok")],
|
||||
)
|
||||
md = report.to_markdown()
|
||||
assert "ALL SYSTEMS OPERATIONAL" in md
|
||||
|
||||
|
||||
# ── Integration: full health check cycle ─────────────────────────────
|
||||
|
||||
class TestRunHealthChecks:
|
||||
def test_returns_report_with_all_checks(self, tmp_path):
|
||||
"""run_health_checks() returns a report with all four checks."""
|
||||
with patch("socket.socket") as mock_sock, \
|
||||
patch("subprocess.run") as mock_run:
|
||||
mock_sock.return_value.connect_ex.return_value = 0
|
||||
mock_run.return_value = MagicMock(returncode=1, stdout="")
|
||||
|
||||
report = run_health_checks(
|
||||
heartbeat_path=tmp_path / "missing.json",
|
||||
)
|
||||
|
||||
assert len(report.checks) == 4
|
||||
check_names = {c.name for c in report.checks}
|
||||
assert "WebSocket Gateway" in check_names
|
||||
assert "Consciousness Loop" in check_names
|
||||
assert "Heartbeat" in check_names
|
||||
assert "Syntax Health" in check_names
|
||||
Reference in New Issue
Block a user