Compare commits
1 Commits
main
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
80126f3fff |
940
bin/resurrection_pool.py
Normal file
940
bin/resurrection_pool.py
Normal file
@@ -0,0 +1,940 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Resurrection Pool — Fleet Health Polling, Dead-Agent Detection, Auto-Revive
|
||||
|
||||
Monitors all wizard gateways in the fleet, detects downed agents, and
|
||||
applies configurable revive policies per mission cell:
|
||||
|
||||
auto — restart the agent immediately on death detection
|
||||
ask — send a Telegram / Nostr approval request; revive when approved
|
||||
never — record the death, create a Gitea issue, do nothing else
|
||||
|
||||
HOW IT WORKS
|
||||
============
|
||||
1. Load a JSON config listing wizard gateways and mission cells.
|
||||
2. Poll each gateway on a configurable interval (default: 60s).
|
||||
- TCP-probe the WebSocket port (can the gateway hear the world?)
|
||||
- Check for a running agent process via pgrep
|
||||
- Read the heartbeat file for last-think timestamp
|
||||
3. If an agent fails all three checks for longer than dead_threshold:
|
||||
a. Mark it dead in the pool state.
|
||||
b. Look up the mission it belongs to and read the revive_policy.
|
||||
c. Execute the policy: auto-restart, request human approval, or no-op.
|
||||
4. On auto-revive: spawn the configured revive_cmd, wait for the heartbeat
|
||||
to return healthy, and log the recovery.
|
||||
5. On ask-revive: post an approval request to Telegram / Nostr and wait
|
||||
up to approval_timeout seconds for a "+1" or "revive" reply.
|
||||
6. Create / close Gitea issues to track fleet health events.
|
||||
|
||||
CONFIGURATION
|
||||
=============
|
||||
Config file path is resolved in this order:
|
||||
1. --config CLI flag
|
||||
2. RESURRECTION_CONFIG env var
|
||||
3. ~/.nexus/resurrection_pool.json
|
||||
4. /etc/nexus/resurrection_pool.json
|
||||
|
||||
Minimal config skeleton (all fields optional, sane defaults everywhere):
|
||||
|
||||
{
|
||||
"poll_interval": 60,
|
||||
"dead_threshold": 300,
|
||||
"approval_timeout": 300,
|
||||
"gateways": [
|
||||
{
|
||||
"name": "timmy",
|
||||
"host": "localhost",
|
||||
"port": 8765,
|
||||
"heartbeat_path": "~/.nexus/heartbeat.json",
|
||||
"process_pattern": "nexus_think",
|
||||
"revive_cmd": ["python", "/root/wizards/timmy/nexus_think.py"]
|
||||
}
|
||||
],
|
||||
"missions": {
|
||||
"some-mission-uuid": {
|
||||
"label": "Portal Sprint",
|
||||
"agents": ["timmy"],
|
||||
"revive_policy": "ask"
|
||||
}
|
||||
},
|
||||
"telegram": {
|
||||
"bot_token": "...",
|
||||
"chat_id": "-1001234567890"
|
||||
},
|
||||
"nostr": {
|
||||
"relay_url": "wss://relay.damus.io",
|
||||
"recipient_pubkey": "..."
|
||||
},
|
||||
"gitea": {
|
||||
"url": "https://forge.example.com",
|
||||
"token": "...",
|
||||
"repo": "Timmy_Foundation/the-nexus"
|
||||
}
|
||||
}
|
||||
|
||||
USAGE
|
||||
=====
|
||||
# One-shot health check (good for cron / smoke tests)
|
||||
python bin/resurrection_pool.py --once
|
||||
|
||||
# Continuous daemon (default)
|
||||
python bin/resurrection_pool.py
|
||||
|
||||
# Custom config + dry-run (no side-effects)
|
||||
python bin/resurrection_pool.py --config /etc/nexus/pool.json --dry-run
|
||||
|
||||
# Print current pool state as JSON
|
||||
python bin/resurrection_pool.py --status
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
=================
|
||||
Pure stdlib. No pip installs required on the monitoring host.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("nexus.resurrection_pool")
|
||||
|
||||
# ── Constants ────────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_POLL_INTERVAL = 60 # seconds between full fleet sweeps
|
||||
DEFAULT_DEAD_THRESHOLD = 300 # seconds with no heartbeat = dead
|
||||
DEFAULT_APPROVAL_TIMEOUT = 300 # seconds to wait for human approval
|
||||
|
||||
CONFIG_SEARCH_PATHS = [
|
||||
Path.home() / ".nexus" / "resurrection_pool.json",
|
||||
Path("/etc/nexus/resurrection_pool.json"),
|
||||
]
|
||||
|
||||
POOL_LABEL = "resurrection-pool"
|
||||
POOL_TITLE_PREFIX = "[pool]"
|
||||
|
||||
|
||||
# ── Enums & simple types ─────────────────────────────────────────────
|
||||
|
||||
class RevivePolicy(str, Enum):
|
||||
AUTO = "auto" # restart immediately, no human needed
|
||||
ASK = "ask" # send approval request, wait for +1
|
||||
NEVER = "never" # log + Gitea issue, do not restart
|
||||
|
||||
|
||||
class AgentState(str, Enum):
|
||||
HEALTHY = "healthy"
|
||||
DEGRADED = "degraded" # some checks failing but not yet dead
|
||||
DEAD = "dead"
|
||||
REVIVING = "reviving"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
# ── Configuration dataclasses ────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class GatewayConfig:
|
||||
"""One wizard gateway to monitor."""
|
||||
name: str
|
||||
host: str = "localhost"
|
||||
port: int = 8765
|
||||
heartbeat_path: str = "~/.nexus/heartbeat.json"
|
||||
process_pattern: str = "nexus_think"
|
||||
revive_cmd: List[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def resolved_heartbeat_path(self) -> Path:
|
||||
return Path(self.heartbeat_path).expanduser()
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionConfig:
|
||||
"""A mission cell with agents and a revive policy."""
|
||||
mission_id: str
|
||||
label: str = ""
|
||||
agents: List[str] = field(default_factory=list)
|
||||
revive_policy: RevivePolicy = RevivePolicy.ASK
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, mission_id: str, data: dict) -> "MissionConfig":
|
||||
policy_raw = data.get("revive_policy", RevivePolicy.ASK)
|
||||
try:
|
||||
policy = RevivePolicy(policy_raw)
|
||||
except ValueError:
|
||||
logger.warning("Unknown revive_policy %r for mission %s; defaulting to 'ask'",
|
||||
policy_raw, mission_id)
|
||||
policy = RevivePolicy.ASK
|
||||
return cls(
|
||||
mission_id=mission_id,
|
||||
label=data.get("label", ""),
|
||||
agents=data.get("agents", []),
|
||||
revive_policy=policy,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PoolConfig:
|
||||
"""Full resurrection pool configuration."""
|
||||
poll_interval: int = DEFAULT_POLL_INTERVAL
|
||||
dead_threshold: int = DEFAULT_DEAD_THRESHOLD
|
||||
approval_timeout: int = DEFAULT_APPROVAL_TIMEOUT
|
||||
gateways: List[GatewayConfig] = field(default_factory=list)
|
||||
missions: List[MissionConfig] = field(default_factory=list)
|
||||
telegram_bot_token: str = ""
|
||||
telegram_chat_id: str = ""
|
||||
nostr_relay_url: str = ""
|
||||
nostr_recipient_pubkey: str = ""
|
||||
gitea_url: str = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
gitea_token: str = os.environ.get("GITEA_TOKEN", "")
|
||||
gitea_repo: str = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "PoolConfig":
|
||||
tg = data.get("telegram", {})
|
||||
ns = data.get("nostr", {})
|
||||
git = data.get("gitea", {})
|
||||
|
||||
gateways = [
|
||||
GatewayConfig(
|
||||
name=gw["name"],
|
||||
host=gw.get("host", "localhost"),
|
||||
port=gw.get("port", 8765),
|
||||
heartbeat_path=gw.get("heartbeat_path", "~/.nexus/heartbeat.json"),
|
||||
process_pattern=gw.get("process_pattern", "nexus_think"),
|
||||
revive_cmd=gw.get("revive_cmd", []),
|
||||
)
|
||||
for gw in data.get("gateways", [])
|
||||
]
|
||||
|
||||
missions = [
|
||||
MissionConfig.from_dict(mid, mdata)
|
||||
for mid, mdata in data.get("missions", {}).items()
|
||||
]
|
||||
|
||||
return cls(
|
||||
poll_interval=data.get("poll_interval", DEFAULT_POLL_INTERVAL),
|
||||
dead_threshold=data.get("dead_threshold", DEFAULT_DEAD_THRESHOLD),
|
||||
approval_timeout=data.get("approval_timeout", DEFAULT_APPROVAL_TIMEOUT),
|
||||
gateways=gateways,
|
||||
missions=missions,
|
||||
telegram_bot_token=tg.get("bot_token", os.environ.get("TELEGRAM_BOT_TOKEN", "")),
|
||||
telegram_chat_id=tg.get("chat_id", os.environ.get("TELEGRAM_CHAT_ID", "")),
|
||||
nostr_relay_url=ns.get("relay_url", ""),
|
||||
nostr_recipient_pubkey=ns.get("recipient_pubkey", ""),
|
||||
gitea_url=git.get("url", os.environ.get("GITEA_URL",
|
||||
"https://forge.alexanderwhitestone.com")),
|
||||
gitea_token=git.get("token", os.environ.get("GITEA_TOKEN", "")),
|
||||
gitea_repo=git.get("repo", os.environ.get("NEXUS_REPO",
|
||||
"Timmy_Foundation/the-nexus")),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Optional[Path] = None) -> "PoolConfig":
|
||||
"""Load config from file, falling back through search paths."""
|
||||
candidates = [path] if path else []
|
||||
candidates += CONFIG_SEARCH_PATHS
|
||||
|
||||
env_path = os.environ.get("RESURRECTION_CONFIG")
|
||||
if env_path:
|
||||
candidates.insert(0, Path(env_path))
|
||||
|
||||
for candidate in candidates:
|
||||
if candidate and candidate.exists():
|
||||
try:
|
||||
data = json.loads(candidate.read_text())
|
||||
logger.info("Loaded config from %s", candidate)
|
||||
return cls.from_dict(data)
|
||||
except (json.JSONDecodeError, KeyError) as e:
|
||||
logger.warning("Failed to parse %s: %s", candidate, e)
|
||||
|
||||
logger.info("No config file found; using defaults (no gateways configured)")
|
||||
return cls()
|
||||
|
||||
|
||||
# ── Per-agent health state ───────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class AgentHealth:
|
||||
"""Runtime health snapshot for one agent gateway."""
|
||||
name: str
|
||||
state: AgentState = AgentState.UNKNOWN
|
||||
gateway_alive: bool = False
|
||||
process_alive: bool = False
|
||||
heartbeat_age: Optional[float] = None # seconds since last heartbeat
|
||||
last_checked: float = field(default_factory=time.time)
|
||||
first_dead_at: Optional[float] = None # when death was first detected
|
||||
revive_pending: bool = False # approval request outstanding
|
||||
revive_attempts: int = 0
|
||||
|
||||
@property
|
||||
def consecutive_dead_seconds(self) -> float:
|
||||
if self.first_dead_at is None:
|
||||
return 0.0
|
||||
return time.time() - self.first_dead_at
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"name": self.name,
|
||||
"state": self.state.value,
|
||||
"gateway_alive": self.gateway_alive,
|
||||
"process_alive": self.process_alive,
|
||||
"heartbeat_age": self.heartbeat_age,
|
||||
"last_checked": self.last_checked,
|
||||
"first_dead_at": self.first_dead_at,
|
||||
"revive_pending": self.revive_pending,
|
||||
"revive_attempts": self.revive_attempts,
|
||||
}
|
||||
|
||||
|
||||
# ── Health probes ────────────────────────────────────────────────────
|
||||
|
||||
def probe_gateway(host: str, port: int, timeout: int = 5) -> Tuple[bool, str]:
|
||||
"""TCP probe a wizard gateway. Returns (alive, message)."""
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(timeout)
|
||||
errno = sock.connect_ex((host, port))
|
||||
sock.close()
|
||||
if errno == 0:
|
||||
return True, f"TCP:{host}:{port} reachable"
|
||||
return False, f"TCP:{host}:{port} refused (errno={errno})"
|
||||
except Exception as e:
|
||||
return False, f"TCP probe failed: {e}"
|
||||
|
||||
|
||||
def probe_process(pattern: str) -> Tuple[bool, str]:
|
||||
"""pgrep-based process check. Returns (alive, message)."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["pgrep", "-f", pattern],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
pids = [p.strip() for p in result.stdout.strip().split("\n")
|
||||
if p.strip() and p.strip() != str(os.getpid())]
|
||||
if pids:
|
||||
return True, f"Running (PID: {', '.join(pids)})"
|
||||
return False, f"No process matching {pattern!r}"
|
||||
except FileNotFoundError:
|
||||
return True, "pgrep unavailable; skipping process check"
|
||||
except Exception as e:
|
||||
return False, f"Process probe failed: {e}"
|
||||
|
||||
|
||||
def probe_heartbeat(path: Path, dead_threshold: int) -> Tuple[bool, Optional[float], str]:
|
||||
"""Read a heartbeat file. Returns (alive, age_seconds, message)."""
|
||||
if not path.exists():
|
||||
return False, None, f"No heartbeat file at {path}"
|
||||
try:
|
||||
data = json.loads(path.read_text())
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
return False, None, f"Heartbeat file unreadable: {e}"
|
||||
|
||||
ts = data.get("timestamp", 0)
|
||||
age = time.time() - ts
|
||||
cycle = data.get("cycle", "?")
|
||||
model = data.get("model", "unknown")
|
||||
|
||||
if age > dead_threshold:
|
||||
return False, age, (
|
||||
f"Stale heartbeat: {int(age)}s ago (threshold {dead_threshold}s), "
|
||||
f"cycle={cycle}, model={model}"
|
||||
)
|
||||
return True, age, f"Alive: cycle={cycle}, {int(age)}s ago, model={model}"
|
||||
|
||||
|
||||
def assess_agent(gw: GatewayConfig, dead_threshold: int) -> AgentHealth:
|
||||
"""Run all probes for one gateway and return an AgentHealth snapshot."""
|
||||
health = AgentHealth(name=gw.name, last_checked=time.time())
|
||||
|
||||
gw_ok, gw_msg = probe_gateway(gw.host, gw.port)
|
||||
proc_ok, proc_msg = probe_process(gw.process_pattern)
|
||||
hb_ok, hb_age, hb_msg = probe_heartbeat(gw.resolved_heartbeat_path, dead_threshold)
|
||||
|
||||
health.gateway_alive = gw_ok
|
||||
health.process_alive = proc_ok
|
||||
health.heartbeat_age = hb_age
|
||||
|
||||
logger.debug("[%s] gateway=%s process=%s heartbeat=%s",
|
||||
gw.name, gw_ok, proc_ok, hb_ok)
|
||||
logger.debug("[%s] gw: %s | proc: %s | hb: %s",
|
||||
gw.name, gw_msg, proc_msg, hb_msg)
|
||||
|
||||
checks_ok = sum([gw_ok, proc_ok, hb_ok])
|
||||
if checks_ok == 3:
|
||||
health.state = AgentState.HEALTHY
|
||||
elif checks_ok == 0:
|
||||
health.state = AgentState.DEAD
|
||||
else:
|
||||
health.state = AgentState.DEGRADED
|
||||
|
||||
return health
|
||||
|
||||
|
||||
# ── Notification channels ────────────────────────────────────────────
|
||||
|
||||
def _http(method: str, url: str, data: Optional[dict] = None,
|
||||
headers: Optional[dict] = None) -> Optional[Any]:
|
||||
"""Minimal HTTP helper. Returns parsed JSON or None on failure."""
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(url, data=body, method=method)
|
||||
req.add_header("Content-Type", "application/json")
|
||||
req.add_header("Accept", "application/json")
|
||||
if headers:
|
||||
for k, v in headers.items():
|
||||
req.add_header(k, v)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
raw = resp.read().decode()
|
||||
return json.loads(raw) if raw.strip() else {}
|
||||
except urllib.error.HTTPError as e:
|
||||
logger.warning("HTTP %d %s: %s", e.code, url, e.read().decode()[:200])
|
||||
except Exception as e:
|
||||
logger.warning("HTTP error %s: %s", url, e)
|
||||
return None
|
||||
|
||||
|
||||
def send_telegram_approval_request(
|
||||
bot_token: str,
|
||||
chat_id: str,
|
||||
agent_name: str,
|
||||
mission_label: str,
|
||||
dead_seconds: float,
|
||||
dry_run: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Post a Telegram message asking for revive approval.
|
||||
|
||||
The operator should reply with "revive <agent_name>" or "+1" to approve.
|
||||
Returns True if the message was sent successfully.
|
||||
"""
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would send Telegram approval request for %s", agent_name)
|
||||
return True
|
||||
|
||||
if not bot_token or not chat_id:
|
||||
logger.warning("Telegram not configured; cannot send approval request for %s", agent_name)
|
||||
return False
|
||||
|
||||
text = (
|
||||
f"🔴 *Resurrection Pool Alert*\n\n"
|
||||
f"Agent *{agent_name}* has been dead for {int(dead_seconds)}s.\n"
|
||||
f"Mission: _{mission_label or 'unassigned'}_\n\n"
|
||||
f"Reply `revive {agent_name}` to authorize restart.\n"
|
||||
f"Reply `skip {agent_name}` to suppress for this cycle."
|
||||
)
|
||||
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||
result = _http("POST", url, {
|
||||
"chat_id": chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "Markdown",
|
||||
})
|
||||
if result and result.get("ok"):
|
||||
logger.info("Telegram approval request sent for %s (message_id=%s)",
|
||||
agent_name, result.get("result", {}).get("message_id"))
|
||||
return True
|
||||
logger.warning("Telegram send failed for %s: %s", agent_name, result)
|
||||
return False
|
||||
|
||||
|
||||
def poll_telegram_for_approval(
|
||||
bot_token: str,
|
||||
agent_name: str,
|
||||
timeout_seconds: int,
|
||||
dry_run: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Poll Telegram getUpdates for a revive approval message.
|
||||
|
||||
Looks for messages containing "revive <agent_name>" (case-insensitive).
|
||||
Returns True if approved within timeout, False otherwise.
|
||||
"""
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Simulating Telegram approval for %s (auto-approve)", agent_name)
|
||||
return True
|
||||
|
||||
if not bot_token:
|
||||
return False
|
||||
|
||||
deadline = time.time() + timeout_seconds
|
||||
offset = None
|
||||
|
||||
while time.time() < deadline:
|
||||
params = "?timeout=30&limit=10"
|
||||
if offset is not None:
|
||||
params += f"&offset={offset}"
|
||||
url = f"https://api.telegram.org/bot{bot_token}/getUpdates{params}"
|
||||
|
||||
result = _http("GET", url)
|
||||
if not result or not result.get("ok"):
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
for update in result.get("result", []):
|
||||
offset = update["update_id"] + 1
|
||||
msg_text = (
|
||||
update.get("message", {}).get("text", "")
|
||||
or update.get("channel_post", {}).get("text", "")
|
||||
).lower().strip()
|
||||
|
||||
if f"revive {agent_name.lower()}" in msg_text or msg_text == "+1":
|
||||
logger.info("Telegram approval received for %s", agent_name)
|
||||
return True
|
||||
if f"skip {agent_name.lower()}" in msg_text:
|
||||
logger.info("Telegram skip received for %s; aborting revive", agent_name)
|
||||
return False
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
logger.info("Approval timeout for %s after %ds", agent_name, timeout_seconds)
|
||||
return False
|
||||
|
||||
|
||||
def send_nostr_approval_request(
|
||||
relay_url: str,
|
||||
recipient_pubkey: str,
|
||||
agent_name: str,
|
||||
mission_label: str,
|
||||
dead_seconds: float,
|
||||
dry_run: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Post a Nostr direct-message (kind 4) approval request.
|
||||
|
||||
Falls back gracefully if websockets is not installed — logs a warning
|
||||
instead of crashing the pool. Returns True if sent, False otherwise.
|
||||
"""
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would send Nostr approval request for %s", agent_name)
|
||||
return True
|
||||
|
||||
if not relay_url or not recipient_pubkey:
|
||||
logger.debug("Nostr not configured; skipping DM for %s", agent_name)
|
||||
return False
|
||||
|
||||
content = (
|
||||
f"[Resurrection Pool] Agent '{agent_name}' is dead ({int(dead_seconds)}s). "
|
||||
f"Mission: {mission_label or 'unassigned'}. "
|
||||
f"Reply 'revive {agent_name}' to authorize restart."
|
||||
)
|
||||
|
||||
try:
|
||||
import asyncio
|
||||
|
||||
async def _send():
|
||||
import websockets # type: ignore
|
||||
event = {
|
||||
"kind": 4,
|
||||
"pubkey": "", # filled by relay/identity in prod
|
||||
"created_at": int(time.time()),
|
||||
"tags": [["p", recipient_pubkey]],
|
||||
"content": content,
|
||||
"id": "",
|
||||
"sig": "",
|
||||
}
|
||||
try:
|
||||
async with websockets.connect(relay_url, open_timeout=10) as ws:
|
||||
await ws.send(json.dumps(["EVENT", event]))
|
||||
logger.info("Nostr DM sent for %s via %s", agent_name, relay_url)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("Nostr send failed for %s: %s", agent_name, e)
|
||||
return False
|
||||
|
||||
return asyncio.run(_send())
|
||||
except ImportError:
|
||||
logger.warning("websockets library not installed; Nostr notifications disabled")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.warning("Nostr notification error for %s: %s", agent_name, e)
|
||||
return False
|
||||
|
||||
|
||||
# ── Revive actions ───────────────────────────────────────────────────
|
||||
|
||||
def revive_agent(gw: GatewayConfig, dry_run: bool = False) -> bool:
|
||||
"""
|
||||
Attempt to restart an agent using its configured revive_cmd.
|
||||
|
||||
Returns True if the command was launched without error.
|
||||
Does NOT wait for the agent to become healthy — the next poll cycle
|
||||
will detect recovery via the heartbeat file.
|
||||
"""
|
||||
if not gw.revive_cmd:
|
||||
logger.warning("[%s] No revive_cmd configured; cannot restart", gw.name)
|
||||
return False
|
||||
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would exec: %s", gw.revive_cmd)
|
||||
return True
|
||||
|
||||
try:
|
||||
logger.info("[%s] Launching: %s", gw.name, gw.revive_cmd)
|
||||
proc = subprocess.Popen(
|
||||
gw.revive_cmd,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
start_new_session=True, # detach from our process group
|
||||
)
|
||||
logger.info("[%s] Revive process started (PID %d)", gw.name, proc.pid)
|
||||
return True
|
||||
except FileNotFoundError as e:
|
||||
logger.error("[%s] Revive cmd not found: %s", gw.name, e)
|
||||
except PermissionError as e:
|
||||
logger.error("[%s] Revive cmd permission denied: %s", gw.name, e)
|
||||
except Exception as e:
|
||||
logger.error("[%s] Revive failed: %s", gw.name, e)
|
||||
return False
|
||||
|
||||
|
||||
# ── Gitea integration ────────────────────────────────────────────────
|
||||
|
||||
def _gitea(method: str, path: str, cfg: PoolConfig, data: Optional[dict] = None) -> Any:
|
||||
"""Gitea API helper."""
|
||||
url = f"{cfg.gitea_url.rstrip('/')}/api/v1{path}"
|
||||
headers = {}
|
||||
if cfg.gitea_token:
|
||||
headers["Authorization"] = f"token {cfg.gitea_token}"
|
||||
return _http(method, url, data=data, headers=headers)
|
||||
|
||||
|
||||
def find_open_pool_issue(cfg: PoolConfig, agent_name: str) -> Optional[dict]:
|
||||
"""Look for an existing open pool issue for this agent."""
|
||||
issues = _gitea("GET",
|
||||
f"/repos/{cfg.gitea_repo}/issues"
|
||||
f"?state=open&type=issues&limit=50",
|
||||
cfg)
|
||||
if not issues or not isinstance(issues, list):
|
||||
return None
|
||||
prefix = f"{POOL_TITLE_PREFIX} {agent_name}:"
|
||||
for issue in issues:
|
||||
if issue.get("title", "").startswith(prefix):
|
||||
return issue
|
||||
return None
|
||||
|
||||
|
||||
def open_death_issue(cfg: PoolConfig, health: AgentHealth, mission_label: str) -> None:
|
||||
"""Create a Gitea issue recording an agent death."""
|
||||
if not cfg.gitea_token:
|
||||
logger.debug("No Gitea token; skipping issue creation for %s", health.name)
|
||||
return
|
||||
title = f"{POOL_TITLE_PREFIX} {health.name}: agent dead ({int(health.consecutive_dead_seconds)}s)"
|
||||
body = (
|
||||
f"## 🔴 Agent Death Detected — {health.name}\n\n"
|
||||
f"**Mission:** {mission_label or 'unassigned'} \n"
|
||||
f"**Dead for:** {int(health.consecutive_dead_seconds)}s \n"
|
||||
f"**Gateway alive:** {health.gateway_alive} \n"
|
||||
f"**Process alive:** {health.process_alive} \n"
|
||||
f"**Heartbeat age:** "
|
||||
f"{f'{int(health.heartbeat_age)}s' if health.heartbeat_age is not None else 'N/A'} \n\n"
|
||||
f"*Logged by Resurrection Pool at "
|
||||
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"
|
||||
)
|
||||
result = _gitea("POST", f"/repos/{cfg.gitea_repo}/issues", cfg,
|
||||
data={"title": title, "body": body})
|
||||
if result and result.get("number"):
|
||||
logger.info("Opened death issue #%d for %s", result["number"], health.name)
|
||||
|
||||
|
||||
def close_death_issue(cfg: PoolConfig, agent_name: str, revived: bool) -> None:
|
||||
"""Close an existing death issue when the agent recovers."""
|
||||
issue = find_open_pool_issue(cfg, agent_name)
|
||||
if not issue:
|
||||
return
|
||||
num = issue["number"]
|
||||
verb = "revived and healthy" if revived else "recovered (external)"
|
||||
_gitea("POST", f"/repos/{cfg.gitea_repo}/issues/{num}/comments", cfg,
|
||||
data={"body": f"## 🟢 Agent {agent_name} — {verb}\n\n"
|
||||
f"*Resurrection Pool confirmed recovery at "
|
||||
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"})
|
||||
_gitea("PATCH", f"/repos/{cfg.gitea_repo}/issues/{num}", cfg,
|
||||
data={"state": "closed"})
|
||||
logger.info("Closed death issue #%d for %s", num, agent_name)
|
||||
|
||||
|
||||
# ── Policy engine ────────────────────────────────────────────────────
|
||||
|
||||
def apply_revive_policy(
|
||||
gw: GatewayConfig,
|
||||
health: AgentHealth,
|
||||
mission: Optional[MissionConfig],
|
||||
cfg: PoolConfig,
|
||||
dry_run: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Execute the revive policy for a dead agent.
|
||||
|
||||
Returns True if a revive was attempted, False otherwise.
|
||||
"""
|
||||
policy = mission.revive_policy if mission else RevivePolicy.ASK
|
||||
mission_label = mission.label if mission else ""
|
||||
|
||||
logger.info("[%s] Applying revive policy=%s (dead %ds)",
|
||||
gw.name, policy.value, int(health.consecutive_dead_seconds))
|
||||
|
||||
if policy == RevivePolicy.NEVER:
|
||||
logger.info("[%s] Policy=never — recording death, no restart", gw.name)
|
||||
open_death_issue(cfg, health, mission_label)
|
||||
return False
|
||||
|
||||
if policy == RevivePolicy.AUTO:
|
||||
logger.info("[%s] Policy=auto — restarting immediately", gw.name)
|
||||
ok = revive_agent(gw, dry_run=dry_run)
|
||||
if ok:
|
||||
logger.info("[%s] Auto-revive launched", gw.name)
|
||||
else:
|
||||
logger.warning("[%s] Auto-revive failed — opening Gitea issue", gw.name)
|
||||
open_death_issue(cfg, health, mission_label)
|
||||
return ok
|
||||
|
||||
# policy == RevivePolicy.ASK
|
||||
if health.revive_pending:
|
||||
logger.debug("[%s] Approval already pending; waiting", gw.name)
|
||||
return False
|
||||
|
||||
# Send approval requests over all configured channels
|
||||
health.revive_pending = True
|
||||
|
||||
tg_sent = send_telegram_approval_request(
|
||||
cfg.telegram_bot_token, cfg.telegram_chat_id,
|
||||
gw.name, mission_label, health.consecutive_dead_seconds,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
send_nostr_approval_request(
|
||||
cfg.nostr_relay_url, cfg.nostr_recipient_pubkey,
|
||||
gw.name, mission_label, health.consecutive_dead_seconds,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
if not tg_sent and not cfg.nostr_relay_url:
|
||||
logger.warning(
|
||||
"[%s] No approval channel available (configure telegram or nostr). "
|
||||
"Falling back to auto-revive.", gw.name
|
||||
)
|
||||
health.revive_pending = False
|
||||
return revive_agent(gw, dry_run=dry_run)
|
||||
|
||||
# Poll for approval
|
||||
approved = poll_telegram_for_approval(
|
||||
cfg.telegram_bot_token, gw.name,
|
||||
cfg.approval_timeout, dry_run=dry_run,
|
||||
)
|
||||
health.revive_pending = False
|
||||
|
||||
if approved:
|
||||
logger.info("[%s] Approval received — launching revive", gw.name)
|
||||
ok = revive_agent(gw, dry_run=dry_run)
|
||||
if not ok:
|
||||
open_death_issue(cfg, health, mission_label)
|
||||
return ok
|
||||
|
||||
logger.info("[%s] Revive not approved — recording death", gw.name)
|
||||
open_death_issue(cfg, health, mission_label)
|
||||
return False
|
||||
|
||||
|
||||
# ── Main pool loop ───────────────────────────────────────────────────
|
||||
|
||||
class ResurrectionPool:
|
||||
"""
|
||||
Fleet-wide health monitor and revive coordinator.
|
||||
|
||||
Maintains an in-memory health table keyed by agent name.
|
||||
Drives one poll-sweep per interval across all configured gateways.
|
||||
"""
|
||||
|
||||
def __init__(self, cfg: PoolConfig, dry_run: bool = False):
|
||||
self.cfg = cfg
|
||||
self.dry_run = dry_run
|
||||
self._health: Dict[str, AgentHealth] = {}
|
||||
self._running = False
|
||||
|
||||
# ── Agent-to-mission lookup ──────────────────────────────────────
|
||||
|
||||
def _mission_for(self, agent_name: str) -> Optional[MissionConfig]:
|
||||
"""Return the first mission that lists this agent."""
|
||||
for m in self.cfg.missions:
|
||||
if agent_name in m.agents:
|
||||
return m
|
||||
return None
|
||||
|
||||
# ── Gateway lookup ───────────────────────────────────────────────
|
||||
|
||||
def _gateway_for(self, agent_name: str) -> Optional[GatewayConfig]:
|
||||
for gw in self.cfg.gateways:
|
||||
if gw.name == agent_name:
|
||||
return gw
|
||||
return None
|
||||
|
||||
# ── Health state tracking ────────────────────────────────────────
|
||||
|
||||
def _update_health(self, agent_name: str, new: AgentHealth) -> AgentHealth:
|
||||
"""
|
||||
Merge a fresh health snapshot into the pool's running state.
|
||||
|
||||
- Preserves first_dead_at timestamp so dead_seconds accumulates.
|
||||
- Resets first_dead_at when agent returns healthy.
|
||||
- Tracks revive_attempts across cycles.
|
||||
"""
|
||||
prev = self._health.get(agent_name)
|
||||
|
||||
if new.state == AgentState.HEALTHY:
|
||||
new.first_dead_at = None
|
||||
new.revive_attempts = 0
|
||||
new.revive_pending = False
|
||||
if prev and prev.state == AgentState.DEAD:
|
||||
logger.info("[%s] Agent recovered!", agent_name)
|
||||
close_death_issue(self.cfg, agent_name, revived=prev.revive_attempts > 0)
|
||||
else:
|
||||
if prev:
|
||||
new.revive_pending = prev.revive_pending
|
||||
new.revive_attempts = prev.revive_attempts
|
||||
if prev.first_dead_at is not None:
|
||||
new.first_dead_at = prev.first_dead_at
|
||||
elif new.state in (AgentState.DEAD, AgentState.DEGRADED):
|
||||
new.first_dead_at = new.last_checked
|
||||
else:
|
||||
new.first_dead_at = new.last_checked
|
||||
|
||||
self._health[agent_name] = new
|
||||
return new
|
||||
|
||||
# ── Single sweep ─────────────────────────────────────────────────
|
||||
|
||||
def sweep(self) -> Dict[str, AgentHealth]:
|
||||
"""Poll all gateways once and apply revive policies where needed."""
|
||||
if not self.cfg.gateways:
|
||||
logger.debug("No gateways configured; nothing to sweep")
|
||||
return {}
|
||||
|
||||
for gw in self.cfg.gateways:
|
||||
raw = assess_agent(gw, self.cfg.dead_threshold)
|
||||
health = self._update_health(gw.name, raw)
|
||||
icon = {"healthy": "✅", "degraded": "⚠️", "dead": "🔴",
|
||||
"reviving": "🔄", "unknown": "❓"}.get(health.state.value, "?")
|
||||
logger.info("%s [%s] state=%s dead_for=%ds",
|
||||
icon, gw.name, health.state.value,
|
||||
int(health.consecutive_dead_seconds))
|
||||
|
||||
if health.state == AgentState.DEAD:
|
||||
if health.consecutive_dead_seconds >= self.cfg.dead_threshold:
|
||||
if not health.revive_pending:
|
||||
mission = self._mission_for(gw.name)
|
||||
launched = apply_revive_policy(
|
||||
gw, health, mission, self.cfg, dry_run=self.dry_run
|
||||
)
|
||||
if launched:
|
||||
health.revive_attempts += 1
|
||||
health.state = AgentState.REVIVING
|
||||
|
||||
return dict(self._health)
|
||||
|
||||
def status(self) -> List[dict]:
|
||||
"""Return current pool state as a list of dicts (for --status flag)."""
|
||||
return [h.to_dict() for h in self._health.values()]
|
||||
|
||||
# ── Run loop ─────────────────────────────────────────────────────
|
||||
|
||||
def run(self) -> None:
|
||||
"""Blocking poll loop. Handles SIGTERM/SIGINT cleanly."""
|
||||
self._running = True
|
||||
|
||||
def _stop(signum, frame):
|
||||
logger.info("Signal %d received; stopping pool", signum)
|
||||
self._running = False
|
||||
|
||||
signal.signal(signal.SIGTERM, _stop)
|
||||
signal.signal(signal.SIGINT, _stop)
|
||||
|
||||
logger.info(
|
||||
"Resurrection Pool starting — %d gateway(s), poll_interval=%ds, dead_threshold=%ds",
|
||||
len(self.cfg.gateways), self.cfg.poll_interval, self.cfg.dead_threshold,
|
||||
)
|
||||
|
||||
while self._running:
|
||||
self.sweep()
|
||||
# Sleep in 1s ticks so we respond to signals promptly
|
||||
for _ in range(self.cfg.poll_interval):
|
||||
if not self._running:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
logger.info("Resurrection Pool stopped")
|
||||
|
||||
def run_once(self) -> bool:
|
||||
"""Single sweep; returns True if all agents are healthy."""
|
||||
results = self.sweep()
|
||||
return all(h.state == AgentState.HEALTHY for h in results.values())
|
||||
|
||||
|
||||
# ── CLI ──────────────────────────────────────────────────────────────
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Resurrection Pool — fleet health polling and auto-revive",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
p.add_argument("--config", metavar="PATH",
|
||||
help="Path to resurrection_pool.json config file")
|
||||
p.add_argument("--once", action="store_true",
|
||||
help="Run one sweep and exit (for cron / smoke tests)")
|
||||
p.add_argument("--status", action="store_true",
|
||||
help="Print current pool state as JSON and exit")
|
||||
p.add_argument("--dry-run", action="store_true",
|
||||
help="Run all checks but do not restart agents or alert channels")
|
||||
p.add_argument("--poll-interval", type=int, default=None,
|
||||
help="Override poll interval (seconds)")
|
||||
p.add_argument("--dead-threshold", type=int, default=None,
|
||||
help="Override dead threshold (seconds)")
|
||||
p.add_argument("-v", "--verbose", action="store_true",
|
||||
help="Enable DEBUG logging")
|
||||
return p
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = build_parser().parse_args()
|
||||
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
cfg = PoolConfig.load(Path(args.config) if args.config else None)
|
||||
|
||||
if args.poll_interval is not None:
|
||||
cfg.poll_interval = args.poll_interval
|
||||
if args.dead_threshold is not None:
|
||||
cfg.dead_threshold = args.dead_threshold
|
||||
|
||||
pool = ResurrectionPool(cfg, dry_run=args.dry_run)
|
||||
|
||||
if args.status:
|
||||
# Print last-known state (empty if never run)
|
||||
pool.sweep()
|
||||
print(json.dumps(pool.status(), indent=2))
|
||||
return
|
||||
|
||||
if args.once:
|
||||
healthy = pool.run_once()
|
||||
sys.exit(0 if healthy else 1)
|
||||
|
||||
pool.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
659
tests/test_resurrection_pool.py
Normal file
659
tests/test_resurrection_pool.py
Normal file
@@ -0,0 +1,659 @@
|
||||
"""Tests for the Resurrection Pool — health polling, dead-agent detection, auto-revive.
|
||||
|
||||
Validates:
|
||||
- GatewayConfig / PoolConfig loading from dicts and files
|
||||
- RevivePolicy enum parsing (including unknown values)
|
||||
- probe_gateway / probe_process / probe_heartbeat functions
|
||||
- assess_agent combining all three probes into AgentHealth
|
||||
- AgentHealth state: HEALTHY / DEGRADED / DEAD
|
||||
- ResurrectionPool._update_health — dead_at tracking and recovery
|
||||
- ResurrectionPool.sweep — policy routing (auto/ask/never)
|
||||
- apply_revive_policy — auto-restart, never, ask-with-no-channel fallback
|
||||
- Telegram / Nostr dry-run paths
|
||||
- Gitea issue helpers (mocked)
|
||||
- CLI argument parsing
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Module loading (same pattern as test_nexus_watchdog.py) ──────────
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_rp_spec = importlib.util.spec_from_file_location(
|
||||
"resurrection_pool_test",
|
||||
PROJECT_ROOT / "bin" / "resurrection_pool.py",
|
||||
)
|
||||
_rp = importlib.util.module_from_spec(_rp_spec)
|
||||
sys.modules["resurrection_pool_test"] = _rp
|
||||
_rp_spec.loader.exec_module(_rp)
|
||||
|
||||
# Convenience aliases
|
||||
AgentHealth = _rp.AgentHealth
|
||||
AgentState = _rp.AgentState
|
||||
GatewayConfig = _rp.GatewayConfig
|
||||
MissionConfig = _rp.MissionConfig
|
||||
PoolConfig = _rp.PoolConfig
|
||||
ResurrectionPool = _rp.ResurrectionPool
|
||||
RevivePolicy = _rp.RevivePolicy
|
||||
apply_revive_policy = _rp.apply_revive_policy
|
||||
assess_agent = _rp.assess_agent
|
||||
build_parser = _rp.build_parser
|
||||
probe_gateway = _rp.probe_gateway
|
||||
probe_heartbeat = _rp.probe_heartbeat
|
||||
probe_process = _rp.probe_process
|
||||
revive_agent = _rp.revive_agent
|
||||
send_telegram_approval_request = _rp.send_telegram_approval_request
|
||||
send_nostr_approval_request = _rp.send_nostr_approval_request
|
||||
|
||||
|
||||
# ── Helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
def _make_gw(name="timmy", host="localhost", port=8765,
|
||||
heartbeat_path=None, process_pattern="nexus_think",
|
||||
revive_cmd=None, tmp_path=None):
|
||||
if heartbeat_path is None and tmp_path is not None:
|
||||
heartbeat_path = str(tmp_path / "heartbeat.json")
|
||||
elif heartbeat_path is None:
|
||||
heartbeat_path = "~/.nexus/heartbeat.json"
|
||||
return GatewayConfig(
|
||||
name=name,
|
||||
host=host,
|
||||
port=port,
|
||||
heartbeat_path=heartbeat_path,
|
||||
process_pattern=process_pattern,
|
||||
revive_cmd=revive_cmd or [],
|
||||
)
|
||||
|
||||
|
||||
def _write_heartbeat(path: Path, age_seconds: float = 10.0):
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
data = {
|
||||
"pid": os.getpid(),
|
||||
"timestamp": time.time() - age_seconds,
|
||||
"cycle": 7,
|
||||
"model": "timmy:v0.1",
|
||||
"status": "thinking",
|
||||
}
|
||||
path.write_text(json.dumps(data))
|
||||
|
||||
|
||||
def _make_cfg(**kwargs) -> PoolConfig:
|
||||
return PoolConfig(**kwargs)
|
||||
|
||||
|
||||
# ── RevivePolicy tests ───────────────────────────────────────────────
|
||||
|
||||
class TestRevivePolicy:
|
||||
def test_valid_values(self):
|
||||
assert RevivePolicy("auto") == RevivePolicy.AUTO
|
||||
assert RevivePolicy("ask") == RevivePolicy.ASK
|
||||
assert RevivePolicy("never") == RevivePolicy.NEVER
|
||||
|
||||
def test_mission_from_dict_unknown_policy(self):
|
||||
m = MissionConfig.from_dict("m1", {"revive_policy": "bogus", "agents": ["a"]})
|
||||
assert m.revive_policy == RevivePolicy.ASK
|
||||
|
||||
def test_mission_from_dict_valid_policy(self):
|
||||
m = MissionConfig.from_dict("m1", {"revive_policy": "auto", "agents": ["x"]})
|
||||
assert m.revive_policy == RevivePolicy.AUTO
|
||||
|
||||
def test_mission_agents_default_empty(self):
|
||||
m = MissionConfig.from_dict("m2", {})
|
||||
assert m.agents == []
|
||||
|
||||
|
||||
# ── PoolConfig loading tests ─────────────────────────────────────────
|
||||
|
||||
class TestPoolConfig:
|
||||
def test_defaults(self):
|
||||
cfg = PoolConfig()
|
||||
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
|
||||
assert cfg.dead_threshold == _rp.DEFAULT_DEAD_THRESHOLD
|
||||
assert cfg.gateways == []
|
||||
assert cfg.missions == []
|
||||
|
||||
def test_from_dict_basic(self):
|
||||
data = {
|
||||
"poll_interval": 30,
|
||||
"dead_threshold": 120,
|
||||
"gateways": [{"name": "allegro", "port": 9000}],
|
||||
"missions": {
|
||||
"m1": {"label": "Sprint", "agents": ["allegro"], "revive_policy": "auto"},
|
||||
},
|
||||
}
|
||||
cfg = PoolConfig.from_dict(data)
|
||||
assert cfg.poll_interval == 30
|
||||
assert cfg.dead_threshold == 120
|
||||
assert len(cfg.gateways) == 1
|
||||
assert cfg.gateways[0].name == "allegro"
|
||||
assert cfg.gateways[0].port == 9000
|
||||
assert len(cfg.missions) == 1
|
||||
assert cfg.missions[0].revive_policy == RevivePolicy.AUTO
|
||||
|
||||
def test_from_dict_telegram(self):
|
||||
data = {"telegram": {"bot_token": "tok123", "chat_id": "-999"}}
|
||||
cfg = PoolConfig.from_dict(data)
|
||||
assert cfg.telegram_bot_token == "tok123"
|
||||
assert cfg.telegram_chat_id == "-999"
|
||||
|
||||
def test_from_dict_nostr(self):
|
||||
data = {"nostr": {"relay_url": "wss://relay.damus.io", "recipient_pubkey": "abc"}}
|
||||
cfg = PoolConfig.from_dict(data)
|
||||
assert cfg.nostr_relay_url == "wss://relay.damus.io"
|
||||
assert cfg.nostr_recipient_pubkey == "abc"
|
||||
|
||||
def test_load_from_file(self, tmp_path):
|
||||
config_file = tmp_path / "pool.json"
|
||||
config_file.write_text(json.dumps({"poll_interval": 42}))
|
||||
cfg = PoolConfig.load(config_file)
|
||||
assert cfg.poll_interval == 42
|
||||
|
||||
def test_load_missing_file_returns_defaults(self, tmp_path):
|
||||
cfg = PoolConfig.load(tmp_path / "nonexistent.json")
|
||||
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
|
||||
|
||||
def test_load_corrupt_file_falls_through(self, tmp_path):
|
||||
bad = tmp_path / "bad.json"
|
||||
bad.write_text("not json {{")
|
||||
cfg = PoolConfig.load(bad)
|
||||
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
|
||||
|
||||
def test_load_env_var(self, tmp_path, monkeypatch):
|
||||
config_file = tmp_path / "env_pool.json"
|
||||
config_file.write_text(json.dumps({"dead_threshold": 99}))
|
||||
monkeypatch.setenv("RESURRECTION_CONFIG", str(config_file))
|
||||
cfg = PoolConfig.load()
|
||||
assert cfg.dead_threshold == 99
|
||||
|
||||
|
||||
# ── probe_gateway tests ──────────────────────────────────────────────
|
||||
|
||||
class TestProbeGateway:
|
||||
def test_refused_port(self):
|
||||
# Port 1 should always be refused on localhost
|
||||
alive, msg = probe_gateway("localhost", 1, timeout=2)
|
||||
assert not alive
|
||||
assert "refused" in msg.lower() or "failed" in msg.lower()
|
||||
|
||||
def test_open_port(self):
|
||||
# Open a real server socket to confirm TCP probe works
|
||||
import threading
|
||||
import socket as _socket
|
||||
srv = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
|
||||
srv.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
|
||||
srv.bind(("127.0.0.1", 0))
|
||||
srv.listen(1)
|
||||
port = srv.getsockname()[1]
|
||||
|
||||
def accept_one():
|
||||
try:
|
||||
conn, _ = srv.accept()
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
srv.close()
|
||||
|
||||
t = threading.Thread(target=accept_one, daemon=True)
|
||||
t.start()
|
||||
|
||||
alive, msg = probe_gateway("127.0.0.1", port, timeout=3)
|
||||
t.join(timeout=2)
|
||||
assert alive
|
||||
assert "reachable" in msg.lower()
|
||||
|
||||
def test_invalid_host(self):
|
||||
alive, msg = probe_gateway("this.host.does.not.exist.invalid", 8765, timeout=2)
|
||||
assert not alive
|
||||
|
||||
|
||||
# ── probe_process tests ──────────────────────────────────────────────
|
||||
|
||||
class TestProbeProcess:
|
||||
def test_self_is_filtered(self):
|
||||
# The test runner process matches "python" — but watchdog filters own PID.
|
||||
# We just check it returns a tuple without error.
|
||||
alive, msg = probe_process("definitely-no-process-xyz-12345")
|
||||
assert not alive
|
||||
assert isinstance(msg, str)
|
||||
|
||||
def test_running_process(self):
|
||||
# pgrep for the current python process name should always find something
|
||||
alive, msg = probe_process("pytest")
|
||||
# May or may not find it depending on env; just check it doesn't crash
|
||||
assert isinstance(alive, bool)
|
||||
assert isinstance(msg, str)
|
||||
|
||||
@patch("subprocess.run", side_effect=FileNotFoundError)
|
||||
def test_pgrep_missing(self, _mock):
|
||||
alive, msg = probe_process("anything")
|
||||
assert alive # graceful fallback: don't raise false alarms
|
||||
assert "pgrep" in msg.lower()
|
||||
|
||||
|
||||
# ── probe_heartbeat tests ────────────────────────────────────────────
|
||||
|
||||
class TestProbeHeartbeat:
|
||||
def test_missing_file(self, tmp_path):
|
||||
alive, age, msg = probe_heartbeat(tmp_path / "no.json", dead_threshold=300)
|
||||
assert not alive
|
||||
assert age is None
|
||||
assert "no heartbeat" in msg.lower()
|
||||
|
||||
def test_fresh_heartbeat(self, tmp_path):
|
||||
hb = tmp_path / "heartbeat.json"
|
||||
_write_heartbeat(hb, age_seconds=10)
|
||||
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||
assert alive
|
||||
assert age is not None and age < 20
|
||||
assert "alive" in msg.lower()
|
||||
|
||||
def test_stale_heartbeat(self, tmp_path):
|
||||
hb = tmp_path / "heartbeat.json"
|
||||
_write_heartbeat(hb, age_seconds=600)
|
||||
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||
assert not alive
|
||||
assert age > 500
|
||||
assert "stale" in msg.lower()
|
||||
|
||||
def test_corrupt_json(self, tmp_path):
|
||||
hb = tmp_path / "heartbeat.json"
|
||||
hb.write_text("{ broken json")
|
||||
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||
assert not alive
|
||||
assert "unreadable" in msg.lower()
|
||||
|
||||
def test_missing_timestamp_treated_as_stale(self, tmp_path):
|
||||
hb = tmp_path / "heartbeat.json"
|
||||
hb.write_text(json.dumps({"cycle": 1})) # no timestamp key
|
||||
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||
# timestamp defaults to 0 → very old → stale
|
||||
assert not alive
|
||||
|
||||
|
||||
# ── assess_agent tests ────────────────────────────────────────────────
|
||||
|
||||
class TestAssessAgent:
|
||||
def _gw(self, tmp_path, port=1):
|
||||
return _make_gw(tmp_path=tmp_path, port=port)
|
||||
|
||||
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
|
||||
@patch("resurrection_pool_test.probe_process", return_value=(True, "ok"))
|
||||
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 5.0, "ok"))
|
||||
def test_all_healthy(self, _hb, _proc, _gw, tmp_path):
|
||||
gw = self._gw(tmp_path)
|
||||
h = assess_agent(gw, dead_threshold=300)
|
||||
assert h.state == AgentState.HEALTHY
|
||||
|
||||
@patch("resurrection_pool_test.probe_gateway", return_value=(False, "refused"))
|
||||
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
|
||||
@patch("resurrection_pool_test.probe_heartbeat", return_value=(False, None, "missing"))
|
||||
def test_all_dead(self, _hb, _proc, _gw, tmp_path):
|
||||
gw = self._gw(tmp_path)
|
||||
h = assess_agent(gw, dead_threshold=300)
|
||||
assert h.state == AgentState.DEAD
|
||||
|
||||
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
|
||||
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
|
||||
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 10.0, "ok"))
|
||||
def test_partially_failing_is_degraded(self, _hb, _proc, _gw, tmp_path):
|
||||
gw = self._gw(tmp_path)
|
||||
h = assess_agent(gw, dead_threshold=300)
|
||||
assert h.state == AgentState.DEGRADED
|
||||
|
||||
|
||||
# ── AgentHealth state machine tests ──────────────────────────────────
|
||||
|
||||
class TestAgentHealthTracking:
|
||||
def _pool(self):
|
||||
cfg = _make_cfg()
|
||||
return ResurrectionPool(cfg, dry_run=True)
|
||||
|
||||
def test_first_dead_at_set_on_first_failure(self):
|
||||
pool = self._pool()
|
||||
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
result = pool._update_health("timmy", dead)
|
||||
assert result.first_dead_at is not None
|
||||
|
||||
def test_first_dead_at_preserved_across_cycles(self):
|
||||
pool = self._pool()
|
||||
t0 = time.time() - 100
|
||||
dead1 = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=t0, first_dead_at=t0)
|
||||
pool._update_health("timmy", dead1)
|
||||
|
||||
dead2 = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
result = pool._update_health("timmy", dead2)
|
||||
assert result.first_dead_at == t0 # preserved
|
||||
|
||||
def test_recovery_resets_first_dead_at(self):
|
||||
pool = self._pool()
|
||||
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
pool._update_health("timmy", dead)
|
||||
|
||||
healthy = AgentHealth(name="timmy", state=AgentState.HEALTHY,
|
||||
last_checked=time.time())
|
||||
result = pool._update_health("timmy", healthy)
|
||||
assert result.first_dead_at is None
|
||||
assert result.revive_attempts == 0
|
||||
|
||||
def test_revive_attempts_increments(self):
|
||||
pool = self._pool()
|
||||
h = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
pool._health["timmy"] = h
|
||||
h.revive_attempts = 2
|
||||
prev = pool._health["timmy"]
|
||||
|
||||
new = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
result = pool._update_health("timmy", new)
|
||||
assert result.revive_attempts == 2 # preserved from prev
|
||||
|
||||
|
||||
# ── mission_for / gateway_for tests ─────────────────────────────────
|
||||
|
||||
class TestPoolLookups:
|
||||
def test_mission_for_known_agent(self):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"missions": {"m1": {"agents": ["timmy"], "revive_policy": "auto"}},
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
m = pool._mission_for("timmy")
|
||||
assert m is not None
|
||||
assert m.mission_id == "m1"
|
||||
|
||||
def test_mission_for_unknown_agent(self):
|
||||
cfg = PoolConfig()
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
assert pool._mission_for("ghost") is None
|
||||
|
||||
def test_gateway_for(self):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"gateways": [{"name": "allegro", "port": 9000}],
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
gw = pool._gateway_for("allegro")
|
||||
assert gw is not None
|
||||
assert gw.port == 9000
|
||||
|
||||
def test_gateway_for_missing(self):
|
||||
pool = ResurrectionPool(PoolConfig(), dry_run=True)
|
||||
assert pool._gateway_for("nobody") is None
|
||||
|
||||
|
||||
# ── apply_revive_policy tests ────────────────────────────────────────
|
||||
|
||||
class TestApplyRevivePolicy:
|
||||
def _dead_health(self, name="timmy", dead_seconds=400):
|
||||
h = AgentHealth(name=name, state=AgentState.DEAD,
|
||||
first_dead_at=time.time() - dead_seconds,
|
||||
last_checked=time.time())
|
||||
return h
|
||||
|
||||
def test_never_policy_no_revive(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path)
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.NEVER)
|
||||
cfg = _make_cfg()
|
||||
health = self._dead_health()
|
||||
|
||||
with patch.object(_rp, "open_death_issue") as mock_issue:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert not result
|
||||
mock_issue.assert_called_once()
|
||||
|
||||
def test_auto_policy_launches_revive(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "revive"])
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
|
||||
cfg = _make_cfg()
|
||||
health = self._dead_health()
|
||||
|
||||
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert result
|
||||
mock_revive.assert_called_once_with(gw, dry_run=True)
|
||||
|
||||
def test_auto_policy_opens_issue_on_revive_failure(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path)
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
|
||||
cfg = _make_cfg()
|
||||
health = self._dead_health()
|
||||
|
||||
with patch.object(_rp, "revive_agent", return_value=False), \
|
||||
patch.object(_rp, "open_death_issue") as mock_issue:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert not result
|
||||
mock_issue.assert_called_once()
|
||||
|
||||
def test_ask_policy_dry_run_approves(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
|
||||
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
|
||||
health = self._dead_health()
|
||||
|
||||
# dry_run → send_telegram returns True, poll returns True
|
||||
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert result
|
||||
mock_revive.assert_called_once()
|
||||
|
||||
def test_ask_policy_no_channel_falls_back_to_auto(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
|
||||
cfg = _make_cfg() # no telegram, no nostr
|
||||
health = self._dead_health()
|
||||
|
||||
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert result
|
||||
mock_revive.assert_called_once()
|
||||
|
||||
def test_skip_if_revive_pending(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path)
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
|
||||
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
|
||||
health = self._dead_health()
|
||||
health.revive_pending = True # approval already outstanding
|
||||
|
||||
with patch.object(_rp, "revive_agent") as mock_revive:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=False)
|
||||
assert not result
|
||||
mock_revive.assert_not_called()
|
||||
|
||||
|
||||
# ── revive_agent tests ───────────────────────────────────────────────
|
||||
|
||||
class TestReviveAgent:
|
||||
def test_no_revive_cmd(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=[])
|
||||
assert not revive_agent(gw)
|
||||
|
||||
def test_dry_run(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
|
||||
assert revive_agent(gw, dry_run=True)
|
||||
|
||||
def test_launches_process(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["true"])
|
||||
result = revive_agent(gw, dry_run=False)
|
||||
assert result
|
||||
|
||||
def test_bad_command(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["/does/not/exist/xyz"])
|
||||
result = revive_agent(gw, dry_run=False)
|
||||
assert not result
|
||||
|
||||
|
||||
# ── Notification channel tests ────────────────────────────────────────
|
||||
|
||||
class TestNotifications:
|
||||
def test_telegram_dry_run(self):
|
||||
assert send_telegram_approval_request(
|
||||
"token", "chat", "timmy", "Sprint", 400, dry_run=True
|
||||
)
|
||||
|
||||
def test_telegram_no_token(self):
|
||||
assert not send_telegram_approval_request(
|
||||
"", "", "timmy", "Sprint", 400, dry_run=False
|
||||
)
|
||||
|
||||
def test_nostr_dry_run(self):
|
||||
result = send_nostr_approval_request(
|
||||
"wss://relay.damus.io", "pubkey123",
|
||||
"timmy", "Sprint", 400, dry_run=True
|
||||
)
|
||||
assert result
|
||||
|
||||
def test_nostr_no_config(self):
|
||||
result = send_nostr_approval_request(
|
||||
"", "",
|
||||
"timmy", "Sprint", 400, dry_run=False
|
||||
)
|
||||
assert not result
|
||||
|
||||
|
||||
# ── ResurrectionPool.sweep integration tests ─────────────────────────
|
||||
|
||||
class TestPoolSweep:
|
||||
def test_sweep_empty_gateways(self):
|
||||
pool = ResurrectionPool(PoolConfig(), dry_run=True)
|
||||
result = pool.sweep()
|
||||
assert result == {}
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_sweep_healthy_agent_no_policy_triggered(self, mock_assess):
|
||||
gw = GatewayConfig(name="timmy")
|
||||
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.HEALTHY,
|
||||
gateway_alive=True, process_alive=True, heartbeat_age=5.0,
|
||||
)
|
||||
|
||||
with patch.object(_rp, "apply_revive_policy") as mock_policy:
|
||||
pool.sweep()
|
||||
mock_policy.assert_not_called()
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_sweep_dead_agent_triggers_policy(self, mock_assess):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"gateways": [{"name": "timmy"}],
|
||||
"dead_threshold": 0, # treat any death as immediate trigger
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.DEAD,
|
||||
first_dead_at=time.time() - 999,
|
||||
last_checked=time.time(),
|
||||
)
|
||||
|
||||
with patch.object(_rp, "apply_revive_policy", return_value=False) as mock_policy:
|
||||
pool.sweep()
|
||||
mock_policy.assert_called_once()
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_sweep_degraded_does_not_trigger_policy(self, mock_assess):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"gateways": [{"name": "timmy"}],
|
||||
"dead_threshold": 10,
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.DEGRADED,
|
||||
first_dead_at=time.time() - 5, # only 5s, below threshold
|
||||
last_checked=time.time(),
|
||||
)
|
||||
|
||||
with patch.object(_rp, "apply_revive_policy") as mock_policy:
|
||||
pool.sweep()
|
||||
mock_policy.assert_not_called()
|
||||
|
||||
|
||||
# ── run_once / status tests ───────────────────────────────────────────
|
||||
|
||||
class TestRunOnce:
|
||||
def test_run_once_empty_returns_true(self):
|
||||
pool = ResurrectionPool(PoolConfig(), dry_run=True)
|
||||
assert pool.run_once()
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_run_once_dead_agent_returns_false(self, mock_assess):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"gateways": [{"name": "timmy"}],
|
||||
"dead_threshold": 0,
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.DEAD,
|
||||
first_dead_at=time.time() - 9999,
|
||||
last_checked=time.time(),
|
||||
)
|
||||
with patch.object(_rp, "apply_revive_policy", return_value=False):
|
||||
result = pool.run_once()
|
||||
assert not result
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_status_returns_list(self, mock_assess):
|
||||
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.HEALTHY,
|
||||
gateway_alive=True, process_alive=True, heartbeat_age=3.0,
|
||||
)
|
||||
pool.sweep()
|
||||
status = pool.status()
|
||||
assert isinstance(status, list)
|
||||
assert len(status) == 1
|
||||
assert status[0]["name"] == "timmy"
|
||||
assert status[0]["state"] == "healthy"
|
||||
|
||||
|
||||
# ── CLI tests ─────────────────────────────────────────────────────────
|
||||
|
||||
class TestCLI:
|
||||
def test_default_args(self):
|
||||
p = build_parser()
|
||||
args = p.parse_args([])
|
||||
assert not args.once
|
||||
assert not args.dry_run
|
||||
assert not args.status
|
||||
assert args.config is None
|
||||
|
||||
def test_once_flag(self):
|
||||
args = build_parser().parse_args(["--once"])
|
||||
assert args.once
|
||||
|
||||
def test_dry_run_flag(self):
|
||||
args = build_parser().parse_args(["--dry-run"])
|
||||
assert args.dry_run
|
||||
|
||||
def test_poll_interval_override(self):
|
||||
args = build_parser().parse_args(["--poll-interval", "15"])
|
||||
assert args.poll_interval == 15
|
||||
|
||||
def test_dead_threshold_override(self):
|
||||
args = build_parser().parse_args(["--dead-threshold", "60"])
|
||||
assert args.dead_threshold == 60
|
||||
|
||||
def test_config_path(self, tmp_path):
|
||||
cfg_file = tmp_path / "pool.json"
|
||||
cfg_file.write_text("{}")
|
||||
args = build_parser().parse_args(["--config", str(cfg_file)])
|
||||
assert args.config == str(cfg_file)
|
||||
Reference in New Issue
Block a user