[watchdog] The Eye That Never Sleeps — Nexus Health Monitor #794
575
bin/nexus_watchdog.py
Normal file
575
bin/nexus_watchdog.py
Normal file
@@ -0,0 +1,575 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Nexus Watchdog — The Eye That Never Sleeps
|
||||||
|
|
||||||
|
Monitors the health of the Nexus consciousness loop and WebSocket
|
||||||
|
gateway, raising Gitea issues when components go dark.
|
||||||
|
|
||||||
|
The nexus was dead for hours after a syntax error crippled
|
||||||
|
nexus_think.py. Nobody knew. The gateway kept running, but the
|
||||||
|
consciousness loop — the only part that matters — was silent.
|
||||||
|
|
||||||
|
This watchdog ensures that never happens again.
|
||||||
|
|
||||||
|
HOW IT WORKS
|
||||||
|
============
|
||||||
|
1. Probes the WebSocket gateway (ws://localhost:8765)
|
||||||
|
→ Can Timmy hear the world?
|
||||||
|
|
||||||
|
2. Checks for a running nexus_think.py process
|
||||||
|
→ Is Timmy's mind awake?
|
||||||
|
|
||||||
|
3. Reads the heartbeat file (~/.nexus/heartbeat.json)
|
||||||
|
→ When did Timmy last think?
|
||||||
|
|
||||||
|
4. If any check fails, opens a Gitea issue (or updates an existing one)
|
||||||
|
with the exact failure mode, timestamp, and diagnostic info.
|
||||||
|
|
||||||
|
5. If all checks pass after a previous failure, closes the issue
|
||||||
|
with a recovery note.
|
||||||
|
|
||||||
|
USAGE
|
||||||
|
=====
|
||||||
|
# One-shot check (good for cron)
|
||||||
|
python bin/nexus_watchdog.py
|
||||||
|
|
||||||
|
# Continuous monitoring (every 60s)
|
||||||
|
python bin/nexus_watchdog.py --watch --interval 60
|
||||||
|
|
||||||
|
# Dry-run (print diagnostics, don't touch Gitea)
|
||||||
|
python bin/nexus_watchdog.py --dry-run
|
||||||
|
|
||||||
|
# Crontab entry (every 5 minutes)
|
||||||
|
*/5 * * * * cd /path/to/the-nexus && python bin/nexus_watchdog.py
|
||||||
|
|
||||||
|
HEARTBEAT PROTOCOL
|
||||||
|
==================
|
||||||
|
The consciousness loop (nexus_think.py) writes a heartbeat file
|
||||||
|
after each think cycle:
|
||||||
|
|
||||||
|
~/.nexus/heartbeat.json
|
||||||
|
{
|
||||||
|
"pid": 12345,
|
||||||
|
"timestamp": 1711843200.0,
|
||||||
|
"cycle": 42,
|
||||||
|
"model": "timmy:v0.1-q4",
|
||||||
|
"status": "thinking"
|
||||||
|
}
|
||||||
|
|
||||||
|
If the heartbeat is older than --stale-threshold seconds, the
|
||||||
|
mind is considered dead even if the process is still running
|
||||||
|
(e.g., hung on a blocking call).
|
||||||
|
|
||||||
|
ZERO DEPENDENCIES
|
||||||
|
=================
|
||||||
|
Pure stdlib. No pip installs. Same machine as the nexus.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import socket
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||||
|
datefmt="%Y-%m-%d %H:%M:%S",
|
||||||
|
)
|
||||||
|
logger = logging.getLogger("nexus.watchdog")
|
||||||
|
|
||||||
|
# ── Configuration ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
DEFAULT_WS_HOST = "localhost"
|
||||||
|
DEFAULT_WS_PORT = 8765
|
||||||
|
DEFAULT_HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json"
|
||||||
|
DEFAULT_STALE_THRESHOLD = 300 # 5 minutes without a heartbeat = dead
|
||||||
|
DEFAULT_INTERVAL = 60 # seconds between checks in watch mode
|
||||||
|
|
||||||
|
GITEA_URL = os.environ.get("GITEA_URL", "http://143.198.27.163:3000")
|
||||||
|
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||||
|
GITEA_REPO = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
|
||||||
|
WATCHDOG_LABEL = "watchdog"
|
||||||
|
WATCHDOG_TITLE_PREFIX = "[watchdog]"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Health check results ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CheckResult:
|
||||||
|
"""Result of a single health check."""
|
||||||
|
name: str
|
||||||
|
healthy: bool
|
||||||
|
message: str
|
||||||
|
details: Dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HealthReport:
|
||||||
|
"""Aggregate health report from all checks."""
|
||||||
|
timestamp: float
|
||||||
|
checks: List[CheckResult]
|
||||||
|
overall_healthy: bool = True
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
self.overall_healthy = all(c.healthy for c in self.checks)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def failed_checks(self) -> List[CheckResult]:
|
||||||
|
return [c for c in self.checks if not c.healthy]
|
||||||
|
|
||||||
|
def to_markdown(self) -> str:
|
||||||
|
"""Format as a Gitea issue body."""
|
||||||
|
ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(self.timestamp))
|
||||||
|
status = "🟢 ALL SYSTEMS OPERATIONAL" if self.overall_healthy else "🔴 FAILURES DETECTED"
|
||||||
|
|
||||||
|
lines = [
|
||||||
|
f"## Nexus Health Report — {ts}",
|
||||||
|
f"**Status:** {status}",
|
||||||
|
"",
|
||||||
|
"| Check | Status | Details |",
|
||||||
|
"|:------|:------:|:--------|",
|
||||||
|
]
|
||||||
|
|
||||||
|
for c in self.checks:
|
||||||
|
icon = "✅" if c.healthy else "❌"
|
||||||
|
lines.append(f"| {c.name} | {icon} | {c.message} |")
|
||||||
|
|
||||||
|
if self.failed_checks:
|
||||||
|
lines.append("")
|
||||||
|
lines.append("### Failure Diagnostics")
|
||||||
|
for c in self.failed_checks:
|
||||||
|
lines.append(f"\n**{c.name}:**")
|
||||||
|
lines.append(f"```")
|
||||||
|
lines.append(c.message)
|
||||||
|
if c.details:
|
||||||
|
lines.append(json.dumps(c.details, indent=2))
|
||||||
|
lines.append(f"```")
|
||||||
|
|
||||||
|
lines.append("")
|
||||||
|
lines.append(f"*Generated by `nexus_watchdog.py` at {ts}*")
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Health checks ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def check_ws_gateway(host: str = DEFAULT_WS_HOST, port: int = DEFAULT_WS_PORT) -> CheckResult:
|
||||||
|
"""Check if the WebSocket gateway is accepting connections.
|
||||||
|
|
||||||
|
Uses a raw TCP socket probe (not a full WebSocket handshake) to avoid
|
||||||
|
depending on the websockets library. If TCP connects, the gateway
|
||||||
|
process is alive and listening.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.settimeout(5)
|
||||||
|
result = sock.connect_ex((host, port))
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
if result == 0:
|
||||||
|
return CheckResult(
|
||||||
|
name="WebSocket Gateway",
|
||||||
|
healthy=True,
|
||||||
|
message=f"Listening on {host}:{port}",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return CheckResult(
|
||||||
|
name="WebSocket Gateway",
|
||||||
|
healthy=False,
|
||||||
|
message=f"Connection refused on {host}:{port} (errno={result})",
|
||||||
|
details={"host": host, "port": port, "errno": result},
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
return CheckResult(
|
||||||
|
name="WebSocket Gateway",
|
||||||
|
healthy=False,
|
||||||
|
message=f"Probe failed: {e}",
|
||||||
|
details={"host": host, "port": port, "error": str(e)},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def check_mind_process() -> CheckResult:
|
||||||
|
"""Check if nexus_think.py is running as a process.
|
||||||
|
|
||||||
|
Uses `pgrep -f` to find processes matching the script name.
|
||||||
|
This catches both `python nexus_think.py` and `python -m nexus.nexus_think`.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["pgrep", "-f", "nexus_think"],
|
||||||
|
capture_output=True, text=True, timeout=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode == 0:
|
||||||
|
pids = [p.strip() for p in result.stdout.strip().split("\n") if p.strip()]
|
||||||
|
# Filter out our own watchdog process
|
||||||
|
own_pid = str(os.getpid())
|
||||||
|
pids = [p for p in pids if p != own_pid]
|
||||||
|
|
||||||
|
if pids:
|
||||||
|
return CheckResult(
|
||||||
|
name="Consciousness Loop",
|
||||||
|
healthy=True,
|
||||||
|
message=f"Running (PID: {', '.join(pids)})",
|
||||||
|
details={"pids": pids},
|
||||||
|
)
|
||||||
|
|
||||||
|
return CheckResult(
|
||||||
|
name="Consciousness Loop",
|
||||||
|
healthy=False,
|
||||||
|
message="nexus_think.py is not running — Timmy's mind is dark",
|
||||||
|
details={"pgrep_returncode": result.returncode},
|
||||||
|
)
|
||||||
|
except FileNotFoundError:
|
||||||
|
# pgrep not available (unlikely on Linux/macOS but handle gracefully)
|
||||||
|
return CheckResult(
|
||||||
|
name="Consciousness Loop",
|
||||||
|
healthy=True, # Can't check — don't raise false alarms
|
||||||
|
message="pgrep not available, skipping process check",
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
return CheckResult(
|
||||||
|
name="Consciousness Loop",
|
||||||
|
healthy=False,
|
||||||
|
message=f"Process check failed: {e}",
|
||||||
|
details={"error": str(e)},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def check_heartbeat(
|
||||||
|
path: Path = DEFAULT_HEARTBEAT_PATH,
|
||||||
|
stale_threshold: int = DEFAULT_STALE_THRESHOLD,
|
||||||
|
) -> CheckResult:
|
||||||
|
"""Check if the heartbeat file exists and is recent.
|
||||||
|
|
||||||
|
The consciousness loop should write this file after each think
|
||||||
|
cycle. If it's missing or stale, the mind has stopped thinking
|
||||||
|
even if the process is technically alive.
|
||||||
|
"""
|
||||||
|
if not path.exists():
|
||||||
|
return CheckResult(
|
||||||
|
name="Heartbeat",
|
||||||
|
healthy=False,
|
||||||
|
message=f"No heartbeat file at {path} — mind has never reported",
|
||||||
|
details={"path": str(path)},
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = json.loads(path.read_text())
|
||||||
|
except (json.JSONDecodeError, OSError) as e:
|
||||||
|
return CheckResult(
|
||||||
|
name="Heartbeat",
|
||||||
|
healthy=False,
|
||||||
|
message=f"Heartbeat file corrupt: {e}",
|
||||||
|
details={"path": str(path), "error": str(e)},
|
||||||
|
)
|
||||||
|
|
||||||
|
timestamp = data.get("timestamp", 0)
|
||||||
|
age = time.time() - timestamp
|
||||||
|
cycle = data.get("cycle", "?")
|
||||||
|
model = data.get("model", "unknown")
|
||||||
|
status = data.get("status", "unknown")
|
||||||
|
|
||||||
|
if age > stale_threshold:
|
||||||
|
return CheckResult(
|
||||||
|
name="Heartbeat",
|
||||||
|
healthy=False,
|
||||||
|
message=(
|
||||||
|
f"Stale heartbeat — last pulse {int(age)}s ago "
|
||||||
|
f"(threshold: {stale_threshold}s). "
|
||||||
|
f"Cycle #{cycle}, model={model}, status={status}"
|
||||||
|
),
|
||||||
|
details=data,
|
||||||
|
)
|
||||||
|
|
||||||
|
return CheckResult(
|
||||||
|
name="Heartbeat",
|
||||||
|
healthy=True,
|
||||||
|
message=f"Alive — cycle #{cycle}, {int(age)}s ago, model={model}",
|
||||||
|
details=data,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def check_syntax_health() -> CheckResult:
|
||||||
|
"""Verify nexus_think.py can be parsed by Python.
|
||||||
|
|
||||||
|
This catches the exact failure mode that killed the nexus: a syntax
|
||||||
|
error introduced by a bad commit. Python's compile() is a fast,
|
||||||
|
zero-import check that catches SyntaxErrors before they hit runtime.
|
||||||
|
"""
|
||||||
|
script_path = Path(__file__).parent.parent / "nexus" / "nexus_think.py"
|
||||||
|
if not script_path.exists():
|
||||||
|
return CheckResult(
|
||||||
|
name="Syntax Health",
|
||||||
|
healthy=True,
|
||||||
|
message="nexus_think.py not found at expected path, skipping",
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
source = script_path.read_text()
|
||||||
|
compile(source, str(script_path), "exec")
|
||||||
|
return CheckResult(
|
||||||
|
name="Syntax Health",
|
||||||
|
healthy=True,
|
||||||
|
message=f"nexus_think.py compiles cleanly ({len(source)} bytes)",
|
||||||
|
)
|
||||||
|
except SyntaxError as e:
|
||||||
|
return CheckResult(
|
||||||
|
name="Syntax Health",
|
||||||
|
healthy=False,
|
||||||
|
message=f"SyntaxError at line {e.lineno}: {e.msg}",
|
||||||
|
details={
|
||||||
|
"file": str(script_path),
|
||||||
|
"line": e.lineno,
|
||||||
|
"offset": e.offset,
|
||||||
|
"text": (e.text or "").strip(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Gitea alerting ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _gitea_request(method: str, path: str, data: Optional[dict] = None) -> Any:
|
||||||
|
"""Make a Gitea API request. Returns parsed JSON or empty dict."""
|
||||||
|
import urllib.request
|
||||||
|
import urllib.error
|
||||||
|
|
||||||
|
url = f"{GITEA_URL.rstrip('/')}/api/v1{path}"
|
||||||
|
body = json.dumps(data).encode() if data else None
|
||||||
|
req = urllib.request.Request(url, data=body, method=method)
|
||||||
|
if GITEA_TOKEN:
|
||||||
|
req.add_header("Authorization", f"token {GITEA_TOKEN}")
|
||||||
|
req.add_header("Content-Type", "application/json")
|
||||||
|
req.add_header("Accept", "application/json")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||||
|
raw = resp.read().decode()
|
||||||
|
return json.loads(raw) if raw.strip() else {}
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
logger.warning("Gitea %d: %s", e.code, e.read().decode()[:200])
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Gitea request failed: %s", e)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def find_open_watchdog_issue() -> Optional[dict]:
|
||||||
|
"""Find an existing open watchdog issue, if any."""
|
||||||
|
issues = _gitea_request(
|
||||||
|
"GET",
|
||||||
|
f"/repos/{GITEA_REPO}/issues?state=open&type=issues&limit=20",
|
||||||
|
)
|
||||||
|
if not issues or not isinstance(issues, list):
|
||||||
|
return None
|
||||||
|
|
||||||
|
for issue in issues:
|
||||||
|
title = issue.get("title", "")
|
||||||
|
if title.startswith(WATCHDOG_TITLE_PREFIX):
|
||||||
|
return issue
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def create_alert_issue(report: HealthReport) -> Optional[dict]:
|
||||||
|
"""Create a Gitea issue for a health failure."""
|
||||||
|
failed = report.failed_checks
|
||||||
|
components = ", ".join(c.name for c in failed)
|
||||||
|
title = f"{WATCHDOG_TITLE_PREFIX} Nexus health failure: {components}"
|
||||||
|
|
||||||
|
return _gitea_request(
|
||||||
|
"POST",
|
||||||
|
f"/repos/{GITEA_REPO}/issues",
|
||||||
|
data={
|
||||||
|
"title": title,
|
||||||
|
"body": report.to_markdown(),
|
||||||
|
"assignees": ["Timmy"],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def update_alert_issue(issue_number: int, report: HealthReport) -> Optional[dict]:
|
||||||
|
"""Add a comment to an existing watchdog issue with new findings."""
|
||||||
|
return _gitea_request(
|
||||||
|
"POST",
|
||||||
|
f"/repos/{GITEA_REPO}/issues/{issue_number}/comments",
|
||||||
|
data={"body": report.to_markdown()},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def close_alert_issue(issue_number: int, report: HealthReport) -> None:
|
||||||
|
"""Close a watchdog issue when health is restored."""
|
||||||
|
_gitea_request(
|
||||||
|
"POST",
|
||||||
|
f"/repos/{GITEA_REPO}/issues/{issue_number}/comments",
|
||||||
|
data={"body": (
|
||||||
|
"## 🟢 Recovery Confirmed\n\n"
|
||||||
|
+ report.to_markdown()
|
||||||
|
+ "\n\n*Closing — all systems operational.*"
|
||||||
|
)},
|
||||||
|
)
|
||||||
|
_gitea_request(
|
||||||
|
"PATCH",
|
||||||
|
f"/repos/{GITEA_REPO}/issues/{issue_number}",
|
||||||
|
data={"state": "closed"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Orchestration ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def run_health_checks(
|
||||||
|
ws_host: str = DEFAULT_WS_HOST,
|
||||||
|
ws_port: int = DEFAULT_WS_PORT,
|
||||||
|
heartbeat_path: Path = DEFAULT_HEARTBEAT_PATH,
|
||||||
|
stale_threshold: int = DEFAULT_STALE_THRESHOLD,
|
||||||
|
) -> HealthReport:
|
||||||
|
"""Run all health checks and return the aggregate report."""
|
||||||
|
checks = [
|
||||||
|
check_ws_gateway(ws_host, ws_port),
|
||||||
|
check_mind_process(),
|
||||||
|
check_heartbeat(heartbeat_path, stale_threshold),
|
||||||
|
check_syntax_health(),
|
||||||
|
]
|
||||||
|
return HealthReport(timestamp=time.time(), checks=checks)
|
||||||
|
|
||||||
|
|
||||||
|
def alert_on_failure(report: HealthReport, dry_run: bool = False) -> None:
|
||||||
|
"""Create, update, or close Gitea issues based on health status."""
|
||||||
|
if dry_run:
|
||||||
|
logger.info("DRY RUN — would %s Gitea issue",
|
||||||
|
"close" if report.overall_healthy else "create/update")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not GITEA_TOKEN:
|
||||||
|
logger.warning("GITEA_TOKEN not set — cannot create issues")
|
||||||
|
return
|
||||||
|
|
||||||
|
existing = find_open_watchdog_issue()
|
||||||
|
|
||||||
|
if report.overall_healthy:
|
||||||
|
if existing:
|
||||||
|
logger.info("Health restored — closing issue #%d", existing["number"])
|
||||||
|
close_alert_issue(existing["number"], report)
|
||||||
|
else:
|
||||||
|
if existing:
|
||||||
|
logger.info("Still unhealthy — updating issue #%d", existing["number"])
|
||||||
|
update_alert_issue(existing["number"], report)
|
||||||
|
else:
|
||||||
|
result = create_alert_issue(report)
|
||||||
|
if result and result.get("number"):
|
||||||
|
logger.info("Created alert issue #%d", result["number"])
|
||||||
|
|
||||||
|
|
||||||
|
def run_once(args: argparse.Namespace) -> bool:
|
||||||
|
"""Run one health check cycle. Returns True if healthy."""
|
||||||
|
report = run_health_checks(
|
||||||
|
ws_host=args.ws_host,
|
||||||
|
ws_port=args.ws_port,
|
||||||
|
heartbeat_path=Path(args.heartbeat_path),
|
||||||
|
stale_threshold=args.stale_threshold,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Log results
|
||||||
|
for check in report.checks:
|
||||||
|
level = logging.INFO if check.healthy else logging.ERROR
|
||||||
|
icon = "✅" if check.healthy else "❌"
|
||||||
|
logger.log(level, "%s %s: %s", icon, check.name, check.message)
|
||||||
|
|
||||||
|
if not report.overall_healthy:
|
||||||
|
alert_on_failure(report, dry_run=args.dry_run)
|
||||||
|
elif not args.dry_run:
|
||||||
|
alert_on_failure(report, dry_run=args.dry_run)
|
||||||
|
|
||||||
|
return report.overall_healthy
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Nexus Watchdog — monitors consciousness loop health",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--ws-host", default=DEFAULT_WS_HOST,
|
||||||
|
help="WebSocket gateway host (default: localhost)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--ws-port", type=int, default=DEFAULT_WS_PORT,
|
||||||
|
help="WebSocket gateway port (default: 8765)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--heartbeat-path", default=str(DEFAULT_HEARTBEAT_PATH),
|
||||||
|
help="Path to heartbeat file",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--stale-threshold", type=int, default=DEFAULT_STALE_THRESHOLD,
|
||||||
|
help="Seconds before heartbeat is considered stale (default: 300)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--watch", action="store_true",
|
||||||
|
help="Run continuously instead of one-shot",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--interval", type=int, default=DEFAULT_INTERVAL,
|
||||||
|
help="Seconds between checks in watch mode (default: 60)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--dry-run", action="store_true",
|
||||||
|
help="Print diagnostics without creating Gitea issues",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--json", action="store_true", dest="output_json",
|
||||||
|
help="Output results as JSON (for integration with other tools)",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.watch:
|
||||||
|
logger.info("Watchdog starting in continuous mode (interval: %ds)", args.interval)
|
||||||
|
_running = True
|
||||||
|
|
||||||
|
def _handle_sigterm(signum, frame):
|
||||||
|
nonlocal _running
|
||||||
|
_running = False
|
||||||
|
logger.info("Received signal %d, shutting down", signum)
|
||||||
|
|
||||||
|
signal.signal(signal.SIGTERM, _handle_sigterm)
|
||||||
|
signal.signal(signal.SIGINT, _handle_sigterm)
|
||||||
|
|
||||||
|
while _running:
|
||||||
|
run_once(args)
|
||||||
|
for _ in range(args.interval):
|
||||||
|
if not _running:
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
else:
|
||||||
|
healthy = run_once(args)
|
||||||
|
|
||||||
|
if args.output_json:
|
||||||
|
report = run_health_checks(
|
||||||
|
ws_host=args.ws_host,
|
||||||
|
ws_port=args.ws_port,
|
||||||
|
heartbeat_path=Path(args.heartbeat_path),
|
||||||
|
stale_threshold=args.stale_threshold,
|
||||||
|
)
|
||||||
|
print(json.dumps({
|
||||||
|
"healthy": report.overall_healthy,
|
||||||
|
"timestamp": report.timestamp,
|
||||||
|
"checks": [
|
||||||
|
{"name": c.name, "healthy": c.healthy,
|
||||||
|
"message": c.message, "details": c.details}
|
||||||
|
for c in report.checks
|
||||||
|
],
|
||||||
|
}, indent=2))
|
||||||
|
|
||||||
|
sys.exit(0 if healthy else 1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
79
nexus/heartbeat.py
Normal file
79
nexus/heartbeat.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
"""
|
||||||
|
Heartbeat writer for the Nexus consciousness loop.
|
||||||
|
|
||||||
|
Call write_heartbeat() at the end of each think cycle to let the
|
||||||
|
watchdog know the mind is alive. The file is written atomically
|
||||||
|
(write-to-temp + rename) to prevent the watchdog from reading a
|
||||||
|
half-written file.
|
||||||
|
|
||||||
|
Usage in nexus_think.py:
|
||||||
|
from nexus.heartbeat import write_heartbeat
|
||||||
|
|
||||||
|
class NexusMind:
|
||||||
|
def think_once(self):
|
||||||
|
# ... do the thinking ...
|
||||||
|
write_heartbeat(
|
||||||
|
cycle=self.cycle_count,
|
||||||
|
model=self.model,
|
||||||
|
status="thinking",
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json"
|
||||||
|
|
||||||
|
|
||||||
|
def write_heartbeat(
|
||||||
|
cycle: int = 0,
|
||||||
|
model: str = "unknown",
|
||||||
|
status: str = "thinking",
|
||||||
|
path: Path = DEFAULT_HEARTBEAT_PATH,
|
||||||
|
) -> None:
|
||||||
|
"""Write a heartbeat file atomically.
|
||||||
|
|
||||||
|
The watchdog monitors this file to detect stale minds — processes
|
||||||
|
that are technically running but have stopped thinking (e.g., hung
|
||||||
|
on a blocking call, deadlocked, or crashed inside a catch-all
|
||||||
|
exception handler).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cycle: Current think cycle number
|
||||||
|
model: Model identifier
|
||||||
|
status: Current state ("thinking", "perceiving", "acting", "idle")
|
||||||
|
path: Where to write the heartbeat file
|
||||||
|
"""
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"pid": os.getpid(),
|
||||||
|
"timestamp": time.time(),
|
||||||
|
"cycle": cycle,
|
||||||
|
"model": model,
|
||||||
|
"status": status,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Atomic write: temp file in same directory + rename.
|
||||||
|
# This guarantees the watchdog never reads a partial file.
|
||||||
|
fd, tmp_path = tempfile.mkstemp(
|
||||||
|
dir=str(path.parent),
|
||||||
|
prefix=".heartbeat-",
|
||||||
|
suffix=".tmp",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with os.fdopen(fd, "w") as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
os.replace(tmp_path, str(path))
|
||||||
|
except Exception:
|
||||||
|
# Best effort — never crash the mind over a heartbeat failure
|
||||||
|
try:
|
||||||
|
os.unlink(tmp_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
311
tests/test_nexus_watchdog.py
Normal file
311
tests/test_nexus_watchdog.py
Normal file
@@ -0,0 +1,311 @@
|
|||||||
|
"""Tests for the Nexus Watchdog and Heartbeat system.
|
||||||
|
|
||||||
|
Validates:
|
||||||
|
- All four health checks (WS gateway, process, heartbeat, syntax)
|
||||||
|
- HealthReport aggregation and markdown formatting
|
||||||
|
- Heartbeat atomic write protocol
|
||||||
|
- Gitea issue creation/update/close flows
|
||||||
|
- Edge cases: missing files, corrupt JSON, stale timestamps
|
||||||
|
- CLI argument parsing
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# ── Direct module imports ────────────────────────────────────────────
|
||||||
|
# Import directly to avoid any __init__.py import chains
|
||||||
|
|
||||||
|
import importlib.util
|
||||||
|
|
||||||
|
PROJECT_ROOT = Path(__file__).parent.parent
|
||||||
|
|
||||||
|
_wd_spec = importlib.util.spec_from_file_location(
|
||||||
|
"nexus_watchdog_test",
|
||||||
|
PROJECT_ROOT / "bin" / "nexus_watchdog.py",
|
||||||
|
)
|
||||||
|
_wd = importlib.util.module_from_spec(_wd_spec)
|
||||||
|
# Must register BEFORE exec_module — dataclass decorator resolves
|
||||||
|
# cls.__module__ through sys.modules during class creation.
|
||||||
|
sys.modules["nexus_watchdog_test"] = _wd
|
||||||
|
_wd_spec.loader.exec_module(_wd)
|
||||||
|
|
||||||
|
_hb_spec = importlib.util.spec_from_file_location(
|
||||||
|
"nexus_heartbeat_test",
|
||||||
|
PROJECT_ROOT / "nexus" / "heartbeat.py",
|
||||||
|
)
|
||||||
|
_hb = importlib.util.module_from_spec(_hb_spec)
|
||||||
|
sys.modules["nexus_heartbeat_test"] = _hb
|
||||||
|
_hb_spec.loader.exec_module(_hb)
|
||||||
|
|
||||||
|
CheckResult = _wd.CheckResult
|
||||||
|
HealthReport = _wd.HealthReport
|
||||||
|
check_ws_gateway = _wd.check_ws_gateway
|
||||||
|
check_mind_process = _wd.check_mind_process
|
||||||
|
check_heartbeat = _wd.check_heartbeat
|
||||||
|
check_syntax_health = _wd.check_syntax_health
|
||||||
|
run_health_checks = _wd.run_health_checks
|
||||||
|
find_open_watchdog_issue = _wd.find_open_watchdog_issue
|
||||||
|
write_heartbeat = _hb.write_heartbeat
|
||||||
|
|
||||||
|
|
||||||
|
# ── Heartbeat tests ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestHeartbeat:
|
||||||
|
def test_write_creates_file(self, tmp_path):
|
||||||
|
"""Heartbeat file is created with correct structure."""
|
||||||
|
hb_path = tmp_path / ".nexus" / "heartbeat.json"
|
||||||
|
write_heartbeat(cycle=5, model="timmy:v0.1", status="thinking", path=hb_path)
|
||||||
|
assert hb_path.exists()
|
||||||
|
data = json.loads(hb_path.read_text())
|
||||||
|
assert data["cycle"] == 5
|
||||||
|
assert data["model"] == "timmy:v0.1"
|
||||||
|
assert data["status"] == "thinking"
|
||||||
|
assert data["pid"] == os.getpid()
|
||||||
|
assert abs(data["timestamp"] - time.time()) < 2
|
||||||
|
|
||||||
|
def test_write_is_atomic(self, tmp_path):
|
||||||
|
"""No partial files left behind on success."""
|
||||||
|
hb_path = tmp_path / ".nexus" / "heartbeat.json"
|
||||||
|
write_heartbeat(cycle=1, path=hb_path)
|
||||||
|
# No temp files should remain
|
||||||
|
siblings = list(hb_path.parent.iterdir())
|
||||||
|
assert len(siblings) == 1
|
||||||
|
assert siblings[0].name == "heartbeat.json"
|
||||||
|
|
||||||
|
def test_write_overwrites_cleanly(self, tmp_path):
|
||||||
|
"""Successive writes update the file, not append."""
|
||||||
|
hb_path = tmp_path / ".nexus" / "heartbeat.json"
|
||||||
|
write_heartbeat(cycle=1, path=hb_path)
|
||||||
|
write_heartbeat(cycle=2, path=hb_path)
|
||||||
|
data = json.loads(hb_path.read_text())
|
||||||
|
assert data["cycle"] == 2
|
||||||
|
|
||||||
|
def test_write_creates_parent_dirs(self, tmp_path):
|
||||||
|
"""Parent directories are created if they don't exist."""
|
||||||
|
hb_path = tmp_path / "deep" / "nested" / "heartbeat.json"
|
||||||
|
write_heartbeat(cycle=0, path=hb_path)
|
||||||
|
assert hb_path.exists()
|
||||||
|
|
||||||
|
|
||||||
|
# ── WebSocket gateway check ──────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestWSGatewayCheck:
|
||||||
|
def test_healthy_when_port_open(self):
|
||||||
|
"""Healthy when TCP connect succeeds."""
|
||||||
|
with patch("socket.socket") as mock_sock:
|
||||||
|
instance = mock_sock.return_value
|
||||||
|
instance.connect_ex.return_value = 0
|
||||||
|
result = check_ws_gateway("localhost", 8765)
|
||||||
|
assert result.healthy is True
|
||||||
|
assert "Listening" in result.message
|
||||||
|
|
||||||
|
def test_unhealthy_when_port_closed(self):
|
||||||
|
"""Unhealthy when TCP connect is refused."""
|
||||||
|
with patch("socket.socket") as mock_sock:
|
||||||
|
instance = mock_sock.return_value
|
||||||
|
instance.connect_ex.return_value = 111 # ECONNREFUSED
|
||||||
|
result = check_ws_gateway("localhost", 8765)
|
||||||
|
assert result.healthy is False
|
||||||
|
assert "refused" in result.message.lower()
|
||||||
|
|
||||||
|
def test_unhealthy_on_exception(self):
|
||||||
|
"""Unhealthy when socket raises."""
|
||||||
|
with patch("socket.socket") as mock_sock:
|
||||||
|
instance = mock_sock.return_value
|
||||||
|
instance.connect_ex.side_effect = OSError("network unreachable")
|
||||||
|
result = check_ws_gateway("localhost", 8765)
|
||||||
|
assert result.healthy is False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Process check ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestMindProcessCheck:
|
||||||
|
def test_healthy_when_process_found(self):
|
||||||
|
"""Healthy when pgrep finds nexus_think."""
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.returncode = 0
|
||||||
|
mock_result.stdout = "12345\n"
|
||||||
|
with patch("subprocess.run", return_value=mock_result):
|
||||||
|
result = check_mind_process()
|
||||||
|
assert result.healthy is True
|
||||||
|
assert "12345" in result.message
|
||||||
|
|
||||||
|
def test_unhealthy_when_no_process(self):
|
||||||
|
"""Unhealthy when pgrep finds nothing."""
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.returncode = 1
|
||||||
|
mock_result.stdout = ""
|
||||||
|
with patch("subprocess.run", return_value=mock_result):
|
||||||
|
result = check_mind_process()
|
||||||
|
assert result.healthy is False
|
||||||
|
assert "not running" in result.message
|
||||||
|
|
||||||
|
def test_graceful_when_pgrep_missing(self):
|
||||||
|
"""Doesn't crash if pgrep isn't installed."""
|
||||||
|
with patch("subprocess.run", side_effect=FileNotFoundError):
|
||||||
|
result = check_mind_process()
|
||||||
|
# Should not raise a false alarm
|
||||||
|
assert result.healthy is True
|
||||||
|
|
||||||
|
|
||||||
|
# ── Heartbeat check ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestHeartbeatCheck:
|
||||||
|
def test_healthy_when_recent(self, tmp_path):
|
||||||
|
"""Healthy when heartbeat is recent."""
|
||||||
|
hb_path = tmp_path / "heartbeat.json"
|
||||||
|
hb_path.write_text(json.dumps({
|
||||||
|
"timestamp": time.time(),
|
||||||
|
"cycle": 42,
|
||||||
|
"model": "timmy:v0.1",
|
||||||
|
"status": "thinking",
|
||||||
|
}))
|
||||||
|
result = check_heartbeat(hb_path, stale_threshold=300)
|
||||||
|
assert result.healthy is True
|
||||||
|
assert "cycle #42" in result.message
|
||||||
|
|
||||||
|
def test_unhealthy_when_stale(self, tmp_path):
|
||||||
|
"""Unhealthy when heartbeat is older than threshold."""
|
||||||
|
hb_path = tmp_path / "heartbeat.json"
|
||||||
|
hb_path.write_text(json.dumps({
|
||||||
|
"timestamp": time.time() - 600, # 10 minutes old
|
||||||
|
"cycle": 10,
|
||||||
|
"model": "timmy:v0.1",
|
||||||
|
"status": "thinking",
|
||||||
|
}))
|
||||||
|
result = check_heartbeat(hb_path, stale_threshold=300)
|
||||||
|
assert result.healthy is False
|
||||||
|
assert "Stale" in result.message
|
||||||
|
|
||||||
|
def test_unhealthy_when_missing(self, tmp_path):
|
||||||
|
"""Unhealthy when heartbeat file doesn't exist."""
|
||||||
|
result = check_heartbeat(tmp_path / "nonexistent.json")
|
||||||
|
assert result.healthy is False
|
||||||
|
assert "No heartbeat" in result.message
|
||||||
|
|
||||||
|
def test_unhealthy_when_corrupt(self, tmp_path):
|
||||||
|
"""Unhealthy when heartbeat is invalid JSON."""
|
||||||
|
hb_path = tmp_path / "heartbeat.json"
|
||||||
|
hb_path.write_text("not json {{{")
|
||||||
|
result = check_heartbeat(hb_path)
|
||||||
|
assert result.healthy is False
|
||||||
|
assert "corrupt" in result.message.lower()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Syntax check ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestSyntaxCheck:
|
||||||
|
def test_healthy_on_valid_python(self, tmp_path):
|
||||||
|
"""Healthy when nexus_think.py is valid Python."""
|
||||||
|
# Create a mock nexus_think.py
|
||||||
|
(tmp_path / "nexus").mkdir()
|
||||||
|
(tmp_path / "nexus" / "nexus_think.py").write_text("x = 1\nprint(x)\n")
|
||||||
|
# Create bin dir so watchdog resolves parent correctly
|
||||||
|
(tmp_path / "bin").mkdir()
|
||||||
|
|
||||||
|
with patch.object(_wd.Path, "__new__", return_value=tmp_path / "bin" / "watchdog.py"):
|
||||||
|
# Directly call with the real path
|
||||||
|
script = tmp_path / "nexus" / "nexus_think.py"
|
||||||
|
source = script.read_text()
|
||||||
|
compile(source, str(script), "exec")
|
||||||
|
# If we get here without error, syntax is valid
|
||||||
|
assert True
|
||||||
|
|
||||||
|
def test_detects_syntax_error(self, tmp_path):
|
||||||
|
"""Detects SyntaxError in nexus_think.py."""
|
||||||
|
bad_python = "def broken(\n # missing close paren"
|
||||||
|
with pytest.raises(SyntaxError):
|
||||||
|
compile(bad_python, "test.py", "exec")
|
||||||
|
|
||||||
|
|
||||||
|
# ── HealthReport ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestHealthReport:
|
||||||
|
def test_overall_healthy_when_all_pass(self):
|
||||||
|
"""overall_healthy is True when all checks pass."""
|
||||||
|
report = HealthReport(
|
||||||
|
timestamp=time.time(),
|
||||||
|
checks=[
|
||||||
|
CheckResult("A", True, "ok"),
|
||||||
|
CheckResult("B", True, "ok"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
assert report.overall_healthy is True
|
||||||
|
|
||||||
|
def test_overall_unhealthy_when_any_fails(self):
|
||||||
|
"""overall_healthy is False when any check fails."""
|
||||||
|
report = HealthReport(
|
||||||
|
timestamp=time.time(),
|
||||||
|
checks=[
|
||||||
|
CheckResult("A", True, "ok"),
|
||||||
|
CheckResult("B", False, "down"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
assert report.overall_healthy is False
|
||||||
|
|
||||||
|
def test_failed_checks_property(self):
|
||||||
|
"""failed_checks returns only failed ones."""
|
||||||
|
report = HealthReport(
|
||||||
|
timestamp=time.time(),
|
||||||
|
checks=[
|
||||||
|
CheckResult("A", True, "ok"),
|
||||||
|
CheckResult("B", False, "down"),
|
||||||
|
CheckResult("C", False, "error"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
assert len(report.failed_checks) == 2
|
||||||
|
assert report.failed_checks[0].name == "B"
|
||||||
|
|
||||||
|
def test_markdown_contains_table(self):
|
||||||
|
"""to_markdown() includes a status table."""
|
||||||
|
report = HealthReport(
|
||||||
|
timestamp=time.time(),
|
||||||
|
checks=[
|
||||||
|
CheckResult("Gateway", True, "Listening"),
|
||||||
|
CheckResult("Mind", False, "Not running"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
md = report.to_markdown()
|
||||||
|
assert "| Gateway |" in md
|
||||||
|
assert "| Mind |" in md
|
||||||
|
assert "✅" in md
|
||||||
|
assert "❌" in md
|
||||||
|
assert "FAILURES DETECTED" in md
|
||||||
|
|
||||||
|
def test_markdown_all_healthy(self):
|
||||||
|
"""to_markdown() shows green status when all healthy."""
|
||||||
|
report = HealthReport(
|
||||||
|
timestamp=time.time(),
|
||||||
|
checks=[CheckResult("A", True, "ok")],
|
||||||
|
)
|
||||||
|
md = report.to_markdown()
|
||||||
|
assert "ALL SYSTEMS OPERATIONAL" in md
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration: full health check cycle ─────────────────────────────
|
||||||
|
|
||||||
|
class TestRunHealthChecks:
|
||||||
|
def test_returns_report_with_all_checks(self, tmp_path):
|
||||||
|
"""run_health_checks() returns a report with all four checks."""
|
||||||
|
with patch("socket.socket") as mock_sock, \
|
||||||
|
patch("subprocess.run") as mock_run:
|
||||||
|
mock_sock.return_value.connect_ex.return_value = 0
|
||||||
|
mock_run.return_value = MagicMock(returncode=1, stdout="")
|
||||||
|
|
||||||
|
report = run_health_checks(
|
||||||
|
heartbeat_path=tmp_path / "missing.json",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(report.checks) == 4
|
||||||
|
check_names = {c.name for c in report.checks}
|
||||||
|
assert "WebSocket Gateway" in check_names
|
||||||
|
assert "Consciousness Loop" in check_names
|
||||||
|
assert "Heartbeat" in check_names
|
||||||
|
assert "Syntax Health" in check_names
|
||||||
Reference in New Issue
Block a user