feat(lazarus): deploy fleet health watchdog with auto-restart and fallback promotion (#911)
This commit is contained in:
252
scripts/lazarus_watchdog.py
Normal file
252
scripts/lazarus_watchdog.py
Normal file
@@ -0,0 +1,252 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Lazarus Pit Watchdog
|
||||
====================
|
||||
Automated health monitoring, fallback promotion, and agent resurrection
|
||||
for the Timmy Foundation wizard fleet.
|
||||
|
||||
Usage:
|
||||
python lazarus_watchdog.py [--dry-run]
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
import subprocess
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
REGISTRY_PATH = Path("/root/wizards/bezalel/workspace/the-nexus/lazarus-registry.yaml")
|
||||
INCIDENT_LOG = Path("/var/log/lazarus_incidents.jsonl")
|
||||
AGENT_CONFIG_PATH = Path("/root/wizards/bezalel/home/.hermes/config.yaml")
|
||||
|
||||
|
||||
def shell(cmd: str, timeout: int = 30) -> tuple[int, str, str]:
|
||||
try:
|
||||
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
|
||||
return r.returncode, r.stdout.strip(), r.stderr.strip()
|
||||
except Exception as e:
|
||||
return -1, "", str(e)
|
||||
|
||||
|
||||
def load_registry() -> dict:
|
||||
with open(REGISTRY_PATH) as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
|
||||
def save_registry(data: dict):
|
||||
with open(REGISTRY_PATH, "w") as f:
|
||||
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
|
||||
|
||||
|
||||
def ping_http(url: str, timeout: int = 10) -> tuple[bool, int]:
|
||||
try:
|
||||
req = urllib.request.Request(url, method="HEAD")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
return True, resp.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return True, e.code
|
||||
except Exception:
|
||||
return False, 0
|
||||
|
||||
|
||||
def probe_provider(provider: str, model: str, timeout: int = 20) -> dict:
|
||||
"""
|
||||
Lightweight provider probe.
|
||||
For now we only check if the provider is in our local Hermes config
|
||||
by attempting a trivial API call. Simplified: just assume healthy
|
||||
unless we have explicit evidence of death from logs.
|
||||
"""
|
||||
# Check agent logs for recent provider failures
|
||||
log_path = Path("/var/log/syslog")
|
||||
if not log_path.exists():
|
||||
log_path = Path("/var/log/messages")
|
||||
|
||||
dead_keywords = ["access_terminated", "403", "Invalid API key"]
|
||||
degraded_keywords = ["rate limit", "429", "timeout", "Connection reset"]
|
||||
|
||||
status = "healthy"
|
||||
note = ""
|
||||
|
||||
# Parse last 100 lines of hermes log if available
|
||||
hermes_log = Path("/var/log/hermes-gateway.log")
|
||||
if hermes_log.exists():
|
||||
_, out, _ = shell(f"tail -n 100 {hermes_log}")
|
||||
lower = out.lower()
|
||||
for kw in dead_keywords:
|
||||
if kw in lower:
|
||||
status = "dead"
|
||||
note = f"Detected '{kw}' in recent gateway logs"
|
||||
break
|
||||
if status == "healthy":
|
||||
for kw in degraded_keywords:
|
||||
if kw in lower:
|
||||
status = "degraded"
|
||||
note = f"Detected '{kw}' in recent gateway logs"
|
||||
break
|
||||
|
||||
return {"status": status, "note": note, "last_checked": datetime.now(timezone.utc).isoformat()}
|
||||
|
||||
|
||||
def check_agent(name: str, spec: dict) -> dict:
|
||||
result = {"agent": name, "timestamp": datetime.now(timezone.utc).isoformat(), "actions": []}
|
||||
|
||||
# Ping gateway
|
||||
gw_url = spec.get("health_endpoints", {}).get("gateway")
|
||||
if gw_url:
|
||||
reachable, code = ping_http(gw_url)
|
||||
result["gateway_reachable"] = reachable
|
||||
result["gateway_status"] = code
|
||||
if not reachable:
|
||||
result["actions"].append("gateway_unreachable")
|
||||
else:
|
||||
result["gateway_reachable"] = False
|
||||
result["actions"].append("no_gateway_configured")
|
||||
|
||||
# Local service check (only if on this host)
|
||||
host = spec.get("host", "")
|
||||
if host in ("127.0.0.1", "localhost", "104.131.15.18") or not host:
|
||||
svc_name = f"hermes-{name}.service"
|
||||
code, out, _ = shell(f"systemctl is-active {svc_name}")
|
||||
result["service_active"] = (code == 0)
|
||||
if code != 0:
|
||||
result["actions"].append("service_inactive")
|
||||
else:
|
||||
result["service_active"] = None
|
||||
|
||||
# Probe primary provider
|
||||
primary = spec.get("primary", {})
|
||||
probe = probe_provider(primary.get("provider"), primary.get("model"))
|
||||
result["primary_provider"] = probe
|
||||
if probe["status"] in ("dead", "degraded"):
|
||||
result["actions"].append(f"primary_{probe['status']}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def rewrite_fallbacks(name: str, fallback_chain: list, dry_run: bool = False) -> bool:
|
||||
"""Rewrite Bezalel's local config.yaml fallback_providers to match registry."""
|
||||
if name != "bezalel":
|
||||
return False # Can only rewrite local config
|
||||
if not AGENT_CONFIG_PATH.exists():
|
||||
return False
|
||||
|
||||
with open(AGENT_CONFIG_PATH) as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
if "fallback_providers" not in config:
|
||||
config["fallback_providers"] = []
|
||||
|
||||
new_fallbacks = []
|
||||
for entry in fallback_chain:
|
||||
fb = {
|
||||
"provider": entry["provider"],
|
||||
"model": entry["model"],
|
||||
"timeout": entry.get("timeout", 120),
|
||||
}
|
||||
if entry.get("provider") == "openrouter":
|
||||
fb["base_url"] = "https://openrouter.ai/api/v1"
|
||||
fb["api_key_env"] = "OPENROUTER_API_KEY"
|
||||
if entry.get("provider") == "big_brain":
|
||||
fb["base_url"] = "http://yxw29g3excyddq-64411cd0-11434.tcp.runpod.net:11434/v1"
|
||||
new_fallbacks.append(fb)
|
||||
|
||||
if config["fallback_providers"] == new_fallbacks:
|
||||
return False # No change needed
|
||||
|
||||
config["fallback_providers"] = new_fallbacks
|
||||
|
||||
if not dry_run:
|
||||
with open(AGENT_CONFIG_PATH, "w") as f:
|
||||
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def resurrect_agent(name: str, dry_run: bool = False) -> bool:
|
||||
svc = f"hermes-{name}.service"
|
||||
if dry_run:
|
||||
print(f"[DRY-RUN] Would restart {svc}")
|
||||
return True
|
||||
code, _, err = shell(f"systemctl restart {svc}")
|
||||
return code == 0
|
||||
|
||||
|
||||
def log_incident(event: dict):
|
||||
INCIDENT_LOG.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(INCIDENT_LOG, "a") as f:
|
||||
f.write(json.dumps(event) + "\n")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--dry-run", action="store_true", help="Show actions without executing")
|
||||
args = parser.parse_args()
|
||||
|
||||
registry = load_registry()
|
||||
fleet = registry.get("fleet", {})
|
||||
provider_matrix = registry.get("provider_health_matrix", {})
|
||||
changed = False
|
||||
|
||||
for name, spec in fleet.items():
|
||||
result = check_agent(name, spec)
|
||||
actions = result.get("actions", [])
|
||||
|
||||
# Update provider matrix
|
||||
primary_provider = spec.get("primary", {}).get("provider")
|
||||
if primary_provider and primary_provider in provider_matrix:
|
||||
provider_matrix[primary_provider].update(result["primary_provider"])
|
||||
|
||||
# Rewrite fallback chain if needed (local only)
|
||||
if name == "bezalel":
|
||||
fb_chain = spec.get("fallback_chain", [])
|
||||
if rewrite_fallbacks(name, fb_chain, dry_run=args.dry_run):
|
||||
result["actions"].append("fallback_chain_rewritten")
|
||||
changed = True
|
||||
|
||||
# Resurrection logic — only for local agents
|
||||
agent_host = spec.get("host", "")
|
||||
is_local = agent_host in ("127.0.0.1", "localhost", "104.131.15.18") or not agent_host
|
||||
if is_local and ("gateway_unreachable" in actions or "service_inactive" in actions):
|
||||
if spec.get("auto_restart", False):
|
||||
ok = resurrect_agent(name, dry_run=args.dry_run)
|
||||
result["resurrected"] = ok
|
||||
result["actions"].append("auto_restart_executed" if ok else "auto_restart_failed")
|
||||
log_incident(result)
|
||||
changed = True
|
||||
|
||||
# Fallback promotion if primary is dead
|
||||
if "primary_dead" in actions:
|
||||
fb = spec.get("fallback_chain", [])
|
||||
if fb:
|
||||
healthy_fallback = None
|
||||
for candidate in fb:
|
||||
cand_provider = candidate["provider"]
|
||||
if provider_matrix.get(cand_provider, {}).get("status") == "healthy":
|
||||
healthy_fallback = candidate
|
||||
break
|
||||
if healthy_fallback:
|
||||
if not args.dry_run:
|
||||
spec["primary"] = healthy_fallback
|
||||
result["actions"].append(f"promoted_fallback_to_{healthy_fallback['provider']}")
|
||||
log_incident(result)
|
||||
changed = True
|
||||
|
||||
# Print summary
|
||||
status = "OK" if not actions else "ACTION"
|
||||
print(f"[{status}] {name}: {', '.join(actions) if actions else 'healthy'}")
|
||||
|
||||
if changed and not args.dry_run:
|
||||
registry["meta"]["updated_at"] = datetime.now(timezone.utc).isoformat()
|
||||
save_registry(registry)
|
||||
print("\nRegistry updated.")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user