[watchdog] The Eye That Never Sleeps — Nexus Health Monitor (#794)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Co-authored-by: Google AI Agent <gemini@hermes.local> Co-committed-by: Google AI Agent <gemini@hermes.local>
This commit was merged in pull request #794.
This commit is contained in:
575
bin/nexus_watchdog.py
Normal file
575
bin/nexus_watchdog.py
Normal file
@@ -0,0 +1,575 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Nexus Watchdog — The Eye That Never Sleeps
|
||||
|
||||
Monitors the health of the Nexus consciousness loop and WebSocket
|
||||
gateway, raising Gitea issues when components go dark.
|
||||
|
||||
The nexus was dead for hours after a syntax error crippled
|
||||
nexus_think.py. Nobody knew. The gateway kept running, but the
|
||||
consciousness loop — the only part that matters — was silent.
|
||||
|
||||
This watchdog ensures that never happens again.
|
||||
|
||||
HOW IT WORKS
|
||||
============
|
||||
1. Probes the WebSocket gateway (ws://localhost:8765)
|
||||
→ Can Timmy hear the world?
|
||||
|
||||
2. Checks for a running nexus_think.py process
|
||||
→ Is Timmy's mind awake?
|
||||
|
||||
3. Reads the heartbeat file (~/.nexus/heartbeat.json)
|
||||
→ When did Timmy last think?
|
||||
|
||||
4. If any check fails, opens a Gitea issue (or updates an existing one)
|
||||
with the exact failure mode, timestamp, and diagnostic info.
|
||||
|
||||
5. If all checks pass after a previous failure, closes the issue
|
||||
with a recovery note.
|
||||
|
||||
USAGE
|
||||
=====
|
||||
# One-shot check (good for cron)
|
||||
python bin/nexus_watchdog.py
|
||||
|
||||
# Continuous monitoring (every 60s)
|
||||
python bin/nexus_watchdog.py --watch --interval 60
|
||||
|
||||
# Dry-run (print diagnostics, don't touch Gitea)
|
||||
python bin/nexus_watchdog.py --dry-run
|
||||
|
||||
# Crontab entry (every 5 minutes)
|
||||
*/5 * * * * cd /path/to/the-nexus && python bin/nexus_watchdog.py
|
||||
|
||||
HEARTBEAT PROTOCOL
|
||||
==================
|
||||
The consciousness loop (nexus_think.py) writes a heartbeat file
|
||||
after each think cycle:
|
||||
|
||||
~/.nexus/heartbeat.json
|
||||
{
|
||||
"pid": 12345,
|
||||
"timestamp": 1711843200.0,
|
||||
"cycle": 42,
|
||||
"model": "timmy:v0.1-q4",
|
||||
"status": "thinking"
|
||||
}
|
||||
|
||||
If the heartbeat is older than --stale-threshold seconds, the
|
||||
mind is considered dead even if the process is still running
|
||||
(e.g., hung on a blocking call).
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
=================
|
||||
Pure stdlib. No pip installs. Same machine as the nexus.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("nexus.watchdog")
|
||||
|
||||
# ── Configuration ────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_WS_HOST = "localhost"
|
||||
DEFAULT_WS_PORT = 8765
|
||||
DEFAULT_HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json"
|
||||
DEFAULT_STALE_THRESHOLD = 300 # 5 minutes without a heartbeat = dead
|
||||
DEFAULT_INTERVAL = 60 # seconds between checks in watch mode
|
||||
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "http://143.198.27.163:3000")
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_REPO = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
|
||||
WATCHDOG_LABEL = "watchdog"
|
||||
WATCHDOG_TITLE_PREFIX = "[watchdog]"
|
||||
|
||||
|
||||
# ── Health check results ─────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class CheckResult:
|
||||
"""Result of a single health check."""
|
||||
name: str
|
||||
healthy: bool
|
||||
message: str
|
||||
details: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthReport:
|
||||
"""Aggregate health report from all checks."""
|
||||
timestamp: float
|
||||
checks: List[CheckResult]
|
||||
overall_healthy: bool = True
|
||||
|
||||
def __post_init__(self):
|
||||
self.overall_healthy = all(c.healthy for c in self.checks)
|
||||
|
||||
@property
|
||||
def failed_checks(self) -> List[CheckResult]:
|
||||
return [c for c in self.checks if not c.healthy]
|
||||
|
||||
def to_markdown(self) -> str:
|
||||
"""Format as a Gitea issue body."""
|
||||
ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(self.timestamp))
|
||||
status = "🟢 ALL SYSTEMS OPERATIONAL" if self.overall_healthy else "🔴 FAILURES DETECTED"
|
||||
|
||||
lines = [
|
||||
f"## Nexus Health Report — {ts}",
|
||||
f"**Status:** {status}",
|
||||
"",
|
||||
"| Check | Status | Details |",
|
||||
"|:------|:------:|:--------|",
|
||||
]
|
||||
|
||||
for c in self.checks:
|
||||
icon = "✅" if c.healthy else "❌"
|
||||
lines.append(f"| {c.name} | {icon} | {c.message} |")
|
||||
|
||||
if self.failed_checks:
|
||||
lines.append("")
|
||||
lines.append("### Failure Diagnostics")
|
||||
for c in self.failed_checks:
|
||||
lines.append(f"\n**{c.name}:**")
|
||||
lines.append(f"```")
|
||||
lines.append(c.message)
|
||||
if c.details:
|
||||
lines.append(json.dumps(c.details, indent=2))
|
||||
lines.append(f"```")
|
||||
|
||||
lines.append("")
|
||||
lines.append(f"*Generated by `nexus_watchdog.py` at {ts}*")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── Health checks ────────────────────────────────────────────────────
|
||||
|
||||
def check_ws_gateway(host: str = DEFAULT_WS_HOST, port: int = DEFAULT_WS_PORT) -> CheckResult:
|
||||
"""Check if the WebSocket gateway is accepting connections.
|
||||
|
||||
Uses a raw TCP socket probe (not a full WebSocket handshake) to avoid
|
||||
depending on the websockets library. If TCP connects, the gateway
|
||||
process is alive and listening.
|
||||
"""
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(5)
|
||||
result = sock.connect_ex((host, port))
|
||||
sock.close()
|
||||
|
||||
if result == 0:
|
||||
return CheckResult(
|
||||
name="WebSocket Gateway",
|
||||
healthy=True,
|
||||
message=f"Listening on {host}:{port}",
|
||||
)
|
||||
else:
|
||||
return CheckResult(
|
||||
name="WebSocket Gateway",
|
||||
healthy=False,
|
||||
message=f"Connection refused on {host}:{port} (errno={result})",
|
||||
details={"host": host, "port": port, "errno": result},
|
||||
)
|
||||
except Exception as e:
|
||||
return CheckResult(
|
||||
name="WebSocket Gateway",
|
||||
healthy=False,
|
||||
message=f"Probe failed: {e}",
|
||||
details={"host": host, "port": port, "error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
def check_mind_process() -> CheckResult:
|
||||
"""Check if nexus_think.py is running as a process.
|
||||
|
||||
Uses `pgrep -f` to find processes matching the script name.
|
||||
This catches both `python nexus_think.py` and `python -m nexus.nexus_think`.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["pgrep", "-f", "nexus_think"],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
pids = [p.strip() for p in result.stdout.strip().split("\n") if p.strip()]
|
||||
# Filter out our own watchdog process
|
||||
own_pid = str(os.getpid())
|
||||
pids = [p for p in pids if p != own_pid]
|
||||
|
||||
if pids:
|
||||
return CheckResult(
|
||||
name="Consciousness Loop",
|
||||
healthy=True,
|
||||
message=f"Running (PID: {', '.join(pids)})",
|
||||
details={"pids": pids},
|
||||
)
|
||||
|
||||
return CheckResult(
|
||||
name="Consciousness Loop",
|
||||
healthy=False,
|
||||
message="nexus_think.py is not running — Timmy's mind is dark",
|
||||
details={"pgrep_returncode": result.returncode},
|
||||
)
|
||||
except FileNotFoundError:
|
||||
# pgrep not available (unlikely on Linux/macOS but handle gracefully)
|
||||
return CheckResult(
|
||||
name="Consciousness Loop",
|
||||
healthy=True, # Can't check — don't raise false alarms
|
||||
message="pgrep not available, skipping process check",
|
||||
)
|
||||
except Exception as e:
|
||||
return CheckResult(
|
||||
name="Consciousness Loop",
|
||||
healthy=False,
|
||||
message=f"Process check failed: {e}",
|
||||
details={"error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
def check_heartbeat(
|
||||
path: Path = DEFAULT_HEARTBEAT_PATH,
|
||||
stale_threshold: int = DEFAULT_STALE_THRESHOLD,
|
||||
) -> CheckResult:
|
||||
"""Check if the heartbeat file exists and is recent.
|
||||
|
||||
The consciousness loop should write this file after each think
|
||||
cycle. If it's missing or stale, the mind has stopped thinking
|
||||
even if the process is technically alive.
|
||||
"""
|
||||
if not path.exists():
|
||||
return CheckResult(
|
||||
name="Heartbeat",
|
||||
healthy=False,
|
||||
message=f"No heartbeat file at {path} — mind has never reported",
|
||||
details={"path": str(path)},
|
||||
)
|
||||
|
||||
try:
|
||||
data = json.loads(path.read_text())
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
return CheckResult(
|
||||
name="Heartbeat",
|
||||
healthy=False,
|
||||
message=f"Heartbeat file corrupt: {e}",
|
||||
details={"path": str(path), "error": str(e)},
|
||||
)
|
||||
|
||||
timestamp = data.get("timestamp", 0)
|
||||
age = time.time() - timestamp
|
||||
cycle = data.get("cycle", "?")
|
||||
model = data.get("model", "unknown")
|
||||
status = data.get("status", "unknown")
|
||||
|
||||
if age > stale_threshold:
|
||||
return CheckResult(
|
||||
name="Heartbeat",
|
||||
healthy=False,
|
||||
message=(
|
||||
f"Stale heartbeat — last pulse {int(age)}s ago "
|
||||
f"(threshold: {stale_threshold}s). "
|
||||
f"Cycle #{cycle}, model={model}, status={status}"
|
||||
),
|
||||
details=data,
|
||||
)
|
||||
|
||||
return CheckResult(
|
||||
name="Heartbeat",
|
||||
healthy=True,
|
||||
message=f"Alive — cycle #{cycle}, {int(age)}s ago, model={model}",
|
||||
details=data,
|
||||
)
|
||||
|
||||
|
||||
def check_syntax_health() -> CheckResult:
|
||||
"""Verify nexus_think.py can be parsed by Python.
|
||||
|
||||
This catches the exact failure mode that killed the nexus: a syntax
|
||||
error introduced by a bad commit. Python's compile() is a fast,
|
||||
zero-import check that catches SyntaxErrors before they hit runtime.
|
||||
"""
|
||||
script_path = Path(__file__).parent.parent / "nexus" / "nexus_think.py"
|
||||
if not script_path.exists():
|
||||
return CheckResult(
|
||||
name="Syntax Health",
|
||||
healthy=True,
|
||||
message="nexus_think.py not found at expected path, skipping",
|
||||
)
|
||||
|
||||
try:
|
||||
source = script_path.read_text()
|
||||
compile(source, str(script_path), "exec")
|
||||
return CheckResult(
|
||||
name="Syntax Health",
|
||||
healthy=True,
|
||||
message=f"nexus_think.py compiles cleanly ({len(source)} bytes)",
|
||||
)
|
||||
except SyntaxError as e:
|
||||
return CheckResult(
|
||||
name="Syntax Health",
|
||||
healthy=False,
|
||||
message=f"SyntaxError at line {e.lineno}: {e.msg}",
|
||||
details={
|
||||
"file": str(script_path),
|
||||
"line": e.lineno,
|
||||
"offset": e.offset,
|
||||
"text": (e.text or "").strip(),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# ── Gitea alerting ───────────────────────────────────────────────────
|
||||
|
||||
def _gitea_request(method: str, path: str, data: Optional[dict] = None) -> Any:
|
||||
"""Make a Gitea API request. Returns parsed JSON or empty dict."""
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
url = f"{GITEA_URL.rstrip('/')}/api/v1{path}"
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(url, data=body, method=method)
|
||||
if GITEA_TOKEN:
|
||||
req.add_header("Authorization", f"token {GITEA_TOKEN}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
req.add_header("Accept", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
raw = resp.read().decode()
|
||||
return json.loads(raw) if raw.strip() else {}
|
||||
except urllib.error.HTTPError as e:
|
||||
logger.warning("Gitea %d: %s", e.code, e.read().decode()[:200])
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning("Gitea request failed: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def find_open_watchdog_issue() -> Optional[dict]:
|
||||
"""Find an existing open watchdog issue, if any."""
|
||||
issues = _gitea_request(
|
||||
"GET",
|
||||
f"/repos/{GITEA_REPO}/issues?state=open&type=issues&limit=20",
|
||||
)
|
||||
if not issues or not isinstance(issues, list):
|
||||
return None
|
||||
|
||||
for issue in issues:
|
||||
title = issue.get("title", "")
|
||||
if title.startswith(WATCHDOG_TITLE_PREFIX):
|
||||
return issue
|
||||
return None
|
||||
|
||||
|
||||
def create_alert_issue(report: HealthReport) -> Optional[dict]:
|
||||
"""Create a Gitea issue for a health failure."""
|
||||
failed = report.failed_checks
|
||||
components = ", ".join(c.name for c in failed)
|
||||
title = f"{WATCHDOG_TITLE_PREFIX} Nexus health failure: {components}"
|
||||
|
||||
return _gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues",
|
||||
data={
|
||||
"title": title,
|
||||
"body": report.to_markdown(),
|
||||
"assignees": ["Timmy"],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def update_alert_issue(issue_number: int, report: HealthReport) -> Optional[dict]:
|
||||
"""Add a comment to an existing watchdog issue with new findings."""
|
||||
return _gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues/{issue_number}/comments",
|
||||
data={"body": report.to_markdown()},
|
||||
)
|
||||
|
||||
|
||||
def close_alert_issue(issue_number: int, report: HealthReport) -> None:
|
||||
"""Close a watchdog issue when health is restored."""
|
||||
_gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues/{issue_number}/comments",
|
||||
data={"body": (
|
||||
"## 🟢 Recovery Confirmed\n\n"
|
||||
+ report.to_markdown()
|
||||
+ "\n\n*Closing — all systems operational.*"
|
||||
)},
|
||||
)
|
||||
_gitea_request(
|
||||
"PATCH",
|
||||
f"/repos/{GITEA_REPO}/issues/{issue_number}",
|
||||
data={"state": "closed"},
|
||||
)
|
||||
|
||||
|
||||
# ── Orchestration ────────────────────────────────────────────────────
|
||||
|
||||
def run_health_checks(
|
||||
ws_host: str = DEFAULT_WS_HOST,
|
||||
ws_port: int = DEFAULT_WS_PORT,
|
||||
heartbeat_path: Path = DEFAULT_HEARTBEAT_PATH,
|
||||
stale_threshold: int = DEFAULT_STALE_THRESHOLD,
|
||||
) -> HealthReport:
|
||||
"""Run all health checks and return the aggregate report."""
|
||||
checks = [
|
||||
check_ws_gateway(ws_host, ws_port),
|
||||
check_mind_process(),
|
||||
check_heartbeat(heartbeat_path, stale_threshold),
|
||||
check_syntax_health(),
|
||||
]
|
||||
return HealthReport(timestamp=time.time(), checks=checks)
|
||||
|
||||
|
||||
def alert_on_failure(report: HealthReport, dry_run: bool = False) -> None:
|
||||
"""Create, update, or close Gitea issues based on health status."""
|
||||
if dry_run:
|
||||
logger.info("DRY RUN — would %s Gitea issue",
|
||||
"close" if report.overall_healthy else "create/update")
|
||||
return
|
||||
|
||||
if not GITEA_TOKEN:
|
||||
logger.warning("GITEA_TOKEN not set — cannot create issues")
|
||||
return
|
||||
|
||||
existing = find_open_watchdog_issue()
|
||||
|
||||
if report.overall_healthy:
|
||||
if existing:
|
||||
logger.info("Health restored — closing issue #%d", existing["number"])
|
||||
close_alert_issue(existing["number"], report)
|
||||
else:
|
||||
if existing:
|
||||
logger.info("Still unhealthy — updating issue #%d", existing["number"])
|
||||
update_alert_issue(existing["number"], report)
|
||||
else:
|
||||
result = create_alert_issue(report)
|
||||
if result and result.get("number"):
|
||||
logger.info("Created alert issue #%d", result["number"])
|
||||
|
||||
|
||||
def run_once(args: argparse.Namespace) -> bool:
|
||||
"""Run one health check cycle. Returns True if healthy."""
|
||||
report = run_health_checks(
|
||||
ws_host=args.ws_host,
|
||||
ws_port=args.ws_port,
|
||||
heartbeat_path=Path(args.heartbeat_path),
|
||||
stale_threshold=args.stale_threshold,
|
||||
)
|
||||
|
||||
# Log results
|
||||
for check in report.checks:
|
||||
level = logging.INFO if check.healthy else logging.ERROR
|
||||
icon = "✅" if check.healthy else "❌"
|
||||
logger.log(level, "%s %s: %s", icon, check.name, check.message)
|
||||
|
||||
if not report.overall_healthy:
|
||||
alert_on_failure(report, dry_run=args.dry_run)
|
||||
elif not args.dry_run:
|
||||
alert_on_failure(report, dry_run=args.dry_run)
|
||||
|
||||
return report.overall_healthy
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Nexus Watchdog — monitors consciousness loop health",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ws-host", default=DEFAULT_WS_HOST,
|
||||
help="WebSocket gateway host (default: localhost)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ws-port", type=int, default=DEFAULT_WS_PORT,
|
||||
help="WebSocket gateway port (default: 8765)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--heartbeat-path", default=str(DEFAULT_HEARTBEAT_PATH),
|
||||
help="Path to heartbeat file",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--stale-threshold", type=int, default=DEFAULT_STALE_THRESHOLD,
|
||||
help="Seconds before heartbeat is considered stale (default: 300)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--watch", action="store_true",
|
||||
help="Run continuously instead of one-shot",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--interval", type=int, default=DEFAULT_INTERVAL,
|
||||
help="Seconds between checks in watch mode (default: 60)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true",
|
||||
help="Print diagnostics without creating Gitea issues",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json", action="store_true", dest="output_json",
|
||||
help="Output results as JSON (for integration with other tools)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.watch:
|
||||
logger.info("Watchdog starting in continuous mode (interval: %ds)", args.interval)
|
||||
_running = True
|
||||
|
||||
def _handle_sigterm(signum, frame):
|
||||
nonlocal _running
|
||||
_running = False
|
||||
logger.info("Received signal %d, shutting down", signum)
|
||||
|
||||
signal.signal(signal.SIGTERM, _handle_sigterm)
|
||||
signal.signal(signal.SIGINT, _handle_sigterm)
|
||||
|
||||
while _running:
|
||||
run_once(args)
|
||||
for _ in range(args.interval):
|
||||
if not _running:
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
healthy = run_once(args)
|
||||
|
||||
if args.output_json:
|
||||
report = run_health_checks(
|
||||
ws_host=args.ws_host,
|
||||
ws_port=args.ws_port,
|
||||
heartbeat_path=Path(args.heartbeat_path),
|
||||
stale_threshold=args.stale_threshold,
|
||||
)
|
||||
print(json.dumps({
|
||||
"healthy": report.overall_healthy,
|
||||
"timestamp": report.timestamp,
|
||||
"checks": [
|
||||
{"name": c.name, "healthy": c.healthy,
|
||||
"message": c.message, "details": c.details}
|
||||
for c in report.checks
|
||||
],
|
||||
}, indent=2))
|
||||
|
||||
sys.exit(0 if healthy else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user