Compare commits
1 Commits
mimo/code/
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
80126f3fff |
940
bin/resurrection_pool.py
Normal file
940
bin/resurrection_pool.py
Normal file
@@ -0,0 +1,940 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Resurrection Pool — Fleet Health Polling, Dead-Agent Detection, Auto-Revive
|
||||||
|
|
||||||
|
Monitors all wizard gateways in the fleet, detects downed agents, and
|
||||||
|
applies configurable revive policies per mission cell:
|
||||||
|
|
||||||
|
auto — restart the agent immediately on death detection
|
||||||
|
ask — send a Telegram / Nostr approval request; revive when approved
|
||||||
|
never — record the death, create a Gitea issue, do nothing else
|
||||||
|
|
||||||
|
HOW IT WORKS
|
||||||
|
============
|
||||||
|
1. Load a JSON config listing wizard gateways and mission cells.
|
||||||
|
2. Poll each gateway on a configurable interval (default: 60s).
|
||||||
|
- TCP-probe the WebSocket port (can the gateway hear the world?)
|
||||||
|
- Check for a running agent process via pgrep
|
||||||
|
- Read the heartbeat file for last-think timestamp
|
||||||
|
3. If an agent fails all three checks for longer than dead_threshold:
|
||||||
|
a. Mark it dead in the pool state.
|
||||||
|
b. Look up the mission it belongs to and read the revive_policy.
|
||||||
|
c. Execute the policy: auto-restart, request human approval, or no-op.
|
||||||
|
4. On auto-revive: spawn the configured revive_cmd, wait for the heartbeat
|
||||||
|
to return healthy, and log the recovery.
|
||||||
|
5. On ask-revive: post an approval request to Telegram / Nostr and wait
|
||||||
|
up to approval_timeout seconds for a "+1" or "revive" reply.
|
||||||
|
6. Create / close Gitea issues to track fleet health events.
|
||||||
|
|
||||||
|
CONFIGURATION
|
||||||
|
=============
|
||||||
|
Config file path is resolved in this order:
|
||||||
|
1. --config CLI flag
|
||||||
|
2. RESURRECTION_CONFIG env var
|
||||||
|
3. ~/.nexus/resurrection_pool.json
|
||||||
|
4. /etc/nexus/resurrection_pool.json
|
||||||
|
|
||||||
|
Minimal config skeleton (all fields optional, sane defaults everywhere):
|
||||||
|
|
||||||
|
{
|
||||||
|
"poll_interval": 60,
|
||||||
|
"dead_threshold": 300,
|
||||||
|
"approval_timeout": 300,
|
||||||
|
"gateways": [
|
||||||
|
{
|
||||||
|
"name": "timmy",
|
||||||
|
"host": "localhost",
|
||||||
|
"port": 8765,
|
||||||
|
"heartbeat_path": "~/.nexus/heartbeat.json",
|
||||||
|
"process_pattern": "nexus_think",
|
||||||
|
"revive_cmd": ["python", "/root/wizards/timmy/nexus_think.py"]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"missions": {
|
||||||
|
"some-mission-uuid": {
|
||||||
|
"label": "Portal Sprint",
|
||||||
|
"agents": ["timmy"],
|
||||||
|
"revive_policy": "ask"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"telegram": {
|
||||||
|
"bot_token": "...",
|
||||||
|
"chat_id": "-1001234567890"
|
||||||
|
},
|
||||||
|
"nostr": {
|
||||||
|
"relay_url": "wss://relay.damus.io",
|
||||||
|
"recipient_pubkey": "..."
|
||||||
|
},
|
||||||
|
"gitea": {
|
||||||
|
"url": "https://forge.example.com",
|
||||||
|
"token": "...",
|
||||||
|
"repo": "Timmy_Foundation/the-nexus"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
USAGE
|
||||||
|
=====
|
||||||
|
# One-shot health check (good for cron / smoke tests)
|
||||||
|
python bin/resurrection_pool.py --once
|
||||||
|
|
||||||
|
# Continuous daemon (default)
|
||||||
|
python bin/resurrection_pool.py
|
||||||
|
|
||||||
|
# Custom config + dry-run (no side-effects)
|
||||||
|
python bin/resurrection_pool.py --config /etc/nexus/pool.json --dry-run
|
||||||
|
|
||||||
|
# Print current pool state as JSON
|
||||||
|
python bin/resurrection_pool.py --status
|
||||||
|
|
||||||
|
ZERO DEPENDENCIES
|
||||||
|
=================
|
||||||
|
Pure stdlib. No pip installs required on the monitoring host.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import socket
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import Enum
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
||||||
|
datefmt="%Y-%m-%d %H:%M:%S",
|
||||||
|
)
|
||||||
|
logger = logging.getLogger("nexus.resurrection_pool")
|
||||||
|
|
||||||
|
# ── Constants ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
DEFAULT_POLL_INTERVAL = 60 # seconds between full fleet sweeps
|
||||||
|
DEFAULT_DEAD_THRESHOLD = 300 # seconds with no heartbeat = dead
|
||||||
|
DEFAULT_APPROVAL_TIMEOUT = 300 # seconds to wait for human approval
|
||||||
|
|
||||||
|
CONFIG_SEARCH_PATHS = [
|
||||||
|
Path.home() / ".nexus" / "resurrection_pool.json",
|
||||||
|
Path("/etc/nexus/resurrection_pool.json"),
|
||||||
|
]
|
||||||
|
|
||||||
|
POOL_LABEL = "resurrection-pool"
|
||||||
|
POOL_TITLE_PREFIX = "[pool]"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Enums & simple types ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
class RevivePolicy(str, Enum):
|
||||||
|
AUTO = "auto" # restart immediately, no human needed
|
||||||
|
ASK = "ask" # send approval request, wait for +1
|
||||||
|
NEVER = "never" # log + Gitea issue, do not restart
|
||||||
|
|
||||||
|
|
||||||
|
class AgentState(str, Enum):
|
||||||
|
HEALTHY = "healthy"
|
||||||
|
DEGRADED = "degraded" # some checks failing but not yet dead
|
||||||
|
DEAD = "dead"
|
||||||
|
REVIVING = "reviving"
|
||||||
|
UNKNOWN = "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Configuration dataclasses ────────────────────────────────────────
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class GatewayConfig:
|
||||||
|
"""One wizard gateway to monitor."""
|
||||||
|
name: str
|
||||||
|
host: str = "localhost"
|
||||||
|
port: int = 8765
|
||||||
|
heartbeat_path: str = "~/.nexus/heartbeat.json"
|
||||||
|
process_pattern: str = "nexus_think"
|
||||||
|
revive_cmd: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def resolved_heartbeat_path(self) -> Path:
|
||||||
|
return Path(self.heartbeat_path).expanduser()
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MissionConfig:
|
||||||
|
"""A mission cell with agents and a revive policy."""
|
||||||
|
mission_id: str
|
||||||
|
label: str = ""
|
||||||
|
agents: List[str] = field(default_factory=list)
|
||||||
|
revive_policy: RevivePolicy = RevivePolicy.ASK
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, mission_id: str, data: dict) -> "MissionConfig":
|
||||||
|
policy_raw = data.get("revive_policy", RevivePolicy.ASK)
|
||||||
|
try:
|
||||||
|
policy = RevivePolicy(policy_raw)
|
||||||
|
except ValueError:
|
||||||
|
logger.warning("Unknown revive_policy %r for mission %s; defaulting to 'ask'",
|
||||||
|
policy_raw, mission_id)
|
||||||
|
policy = RevivePolicy.ASK
|
||||||
|
return cls(
|
||||||
|
mission_id=mission_id,
|
||||||
|
label=data.get("label", ""),
|
||||||
|
agents=data.get("agents", []),
|
||||||
|
revive_policy=policy,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PoolConfig:
|
||||||
|
"""Full resurrection pool configuration."""
|
||||||
|
poll_interval: int = DEFAULT_POLL_INTERVAL
|
||||||
|
dead_threshold: int = DEFAULT_DEAD_THRESHOLD
|
||||||
|
approval_timeout: int = DEFAULT_APPROVAL_TIMEOUT
|
||||||
|
gateways: List[GatewayConfig] = field(default_factory=list)
|
||||||
|
missions: List[MissionConfig] = field(default_factory=list)
|
||||||
|
telegram_bot_token: str = ""
|
||||||
|
telegram_chat_id: str = ""
|
||||||
|
nostr_relay_url: str = ""
|
||||||
|
nostr_recipient_pubkey: str = ""
|
||||||
|
gitea_url: str = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||||
|
gitea_token: str = os.environ.get("GITEA_TOKEN", "")
|
||||||
|
gitea_repo: str = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, data: dict) -> "PoolConfig":
|
||||||
|
tg = data.get("telegram", {})
|
||||||
|
ns = data.get("nostr", {})
|
||||||
|
git = data.get("gitea", {})
|
||||||
|
|
||||||
|
gateways = [
|
||||||
|
GatewayConfig(
|
||||||
|
name=gw["name"],
|
||||||
|
host=gw.get("host", "localhost"),
|
||||||
|
port=gw.get("port", 8765),
|
||||||
|
heartbeat_path=gw.get("heartbeat_path", "~/.nexus/heartbeat.json"),
|
||||||
|
process_pattern=gw.get("process_pattern", "nexus_think"),
|
||||||
|
revive_cmd=gw.get("revive_cmd", []),
|
||||||
|
)
|
||||||
|
for gw in data.get("gateways", [])
|
||||||
|
]
|
||||||
|
|
||||||
|
missions = [
|
||||||
|
MissionConfig.from_dict(mid, mdata)
|
||||||
|
for mid, mdata in data.get("missions", {}).items()
|
||||||
|
]
|
||||||
|
|
||||||
|
return cls(
|
||||||
|
poll_interval=data.get("poll_interval", DEFAULT_POLL_INTERVAL),
|
||||||
|
dead_threshold=data.get("dead_threshold", DEFAULT_DEAD_THRESHOLD),
|
||||||
|
approval_timeout=data.get("approval_timeout", DEFAULT_APPROVAL_TIMEOUT),
|
||||||
|
gateways=gateways,
|
||||||
|
missions=missions,
|
||||||
|
telegram_bot_token=tg.get("bot_token", os.environ.get("TELEGRAM_BOT_TOKEN", "")),
|
||||||
|
telegram_chat_id=tg.get("chat_id", os.environ.get("TELEGRAM_CHAT_ID", "")),
|
||||||
|
nostr_relay_url=ns.get("relay_url", ""),
|
||||||
|
nostr_recipient_pubkey=ns.get("recipient_pubkey", ""),
|
||||||
|
gitea_url=git.get("url", os.environ.get("GITEA_URL",
|
||||||
|
"https://forge.alexanderwhitestone.com")),
|
||||||
|
gitea_token=git.get("token", os.environ.get("GITEA_TOKEN", "")),
|
||||||
|
gitea_repo=git.get("repo", os.environ.get("NEXUS_REPO",
|
||||||
|
"Timmy_Foundation/the-nexus")),
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load(cls, path: Optional[Path] = None) -> "PoolConfig":
|
||||||
|
"""Load config from file, falling back through search paths."""
|
||||||
|
candidates = [path] if path else []
|
||||||
|
candidates += CONFIG_SEARCH_PATHS
|
||||||
|
|
||||||
|
env_path = os.environ.get("RESURRECTION_CONFIG")
|
||||||
|
if env_path:
|
||||||
|
candidates.insert(0, Path(env_path))
|
||||||
|
|
||||||
|
for candidate in candidates:
|
||||||
|
if candidate and candidate.exists():
|
||||||
|
try:
|
||||||
|
data = json.loads(candidate.read_text())
|
||||||
|
logger.info("Loaded config from %s", candidate)
|
||||||
|
return cls.from_dict(data)
|
||||||
|
except (json.JSONDecodeError, KeyError) as e:
|
||||||
|
logger.warning("Failed to parse %s: %s", candidate, e)
|
||||||
|
|
||||||
|
logger.info("No config file found; using defaults (no gateways configured)")
|
||||||
|
return cls()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Per-agent health state ───────────────────────────────────────────
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AgentHealth:
|
||||||
|
"""Runtime health snapshot for one agent gateway."""
|
||||||
|
name: str
|
||||||
|
state: AgentState = AgentState.UNKNOWN
|
||||||
|
gateway_alive: bool = False
|
||||||
|
process_alive: bool = False
|
||||||
|
heartbeat_age: Optional[float] = None # seconds since last heartbeat
|
||||||
|
last_checked: float = field(default_factory=time.time)
|
||||||
|
first_dead_at: Optional[float] = None # when death was first detected
|
||||||
|
revive_pending: bool = False # approval request outstanding
|
||||||
|
revive_attempts: int = 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def consecutive_dead_seconds(self) -> float:
|
||||||
|
if self.first_dead_at is None:
|
||||||
|
return 0.0
|
||||||
|
return time.time() - self.first_dead_at
|
||||||
|
|
||||||
|
def to_dict(self) -> dict:
|
||||||
|
return {
|
||||||
|
"name": self.name,
|
||||||
|
"state": self.state.value,
|
||||||
|
"gateway_alive": self.gateway_alive,
|
||||||
|
"process_alive": self.process_alive,
|
||||||
|
"heartbeat_age": self.heartbeat_age,
|
||||||
|
"last_checked": self.last_checked,
|
||||||
|
"first_dead_at": self.first_dead_at,
|
||||||
|
"revive_pending": self.revive_pending,
|
||||||
|
"revive_attempts": self.revive_attempts,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Health probes ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def probe_gateway(host: str, port: int, timeout: int = 5) -> Tuple[bool, str]:
|
||||||
|
"""TCP probe a wizard gateway. Returns (alive, message)."""
|
||||||
|
try:
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.settimeout(timeout)
|
||||||
|
errno = sock.connect_ex((host, port))
|
||||||
|
sock.close()
|
||||||
|
if errno == 0:
|
||||||
|
return True, f"TCP:{host}:{port} reachable"
|
||||||
|
return False, f"TCP:{host}:{port} refused (errno={errno})"
|
||||||
|
except Exception as e:
|
||||||
|
return False, f"TCP probe failed: {e}"
|
||||||
|
|
||||||
|
|
||||||
|
def probe_process(pattern: str) -> Tuple[bool, str]:
|
||||||
|
"""pgrep-based process check. Returns (alive, message)."""
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["pgrep", "-f", pattern],
|
||||||
|
capture_output=True, text=True, timeout=5,
|
||||||
|
)
|
||||||
|
if result.returncode == 0:
|
||||||
|
pids = [p.strip() for p in result.stdout.strip().split("\n")
|
||||||
|
if p.strip() and p.strip() != str(os.getpid())]
|
||||||
|
if pids:
|
||||||
|
return True, f"Running (PID: {', '.join(pids)})"
|
||||||
|
return False, f"No process matching {pattern!r}"
|
||||||
|
except FileNotFoundError:
|
||||||
|
return True, "pgrep unavailable; skipping process check"
|
||||||
|
except Exception as e:
|
||||||
|
return False, f"Process probe failed: {e}"
|
||||||
|
|
||||||
|
|
||||||
|
def probe_heartbeat(path: Path, dead_threshold: int) -> Tuple[bool, Optional[float], str]:
|
||||||
|
"""Read a heartbeat file. Returns (alive, age_seconds, message)."""
|
||||||
|
if not path.exists():
|
||||||
|
return False, None, f"No heartbeat file at {path}"
|
||||||
|
try:
|
||||||
|
data = json.loads(path.read_text())
|
||||||
|
except (json.JSONDecodeError, OSError) as e:
|
||||||
|
return False, None, f"Heartbeat file unreadable: {e}"
|
||||||
|
|
||||||
|
ts = data.get("timestamp", 0)
|
||||||
|
age = time.time() - ts
|
||||||
|
cycle = data.get("cycle", "?")
|
||||||
|
model = data.get("model", "unknown")
|
||||||
|
|
||||||
|
if age > dead_threshold:
|
||||||
|
return False, age, (
|
||||||
|
f"Stale heartbeat: {int(age)}s ago (threshold {dead_threshold}s), "
|
||||||
|
f"cycle={cycle}, model={model}"
|
||||||
|
)
|
||||||
|
return True, age, f"Alive: cycle={cycle}, {int(age)}s ago, model={model}"
|
||||||
|
|
||||||
|
|
||||||
|
def assess_agent(gw: GatewayConfig, dead_threshold: int) -> AgentHealth:
|
||||||
|
"""Run all probes for one gateway and return an AgentHealth snapshot."""
|
||||||
|
health = AgentHealth(name=gw.name, last_checked=time.time())
|
||||||
|
|
||||||
|
gw_ok, gw_msg = probe_gateway(gw.host, gw.port)
|
||||||
|
proc_ok, proc_msg = probe_process(gw.process_pattern)
|
||||||
|
hb_ok, hb_age, hb_msg = probe_heartbeat(gw.resolved_heartbeat_path, dead_threshold)
|
||||||
|
|
||||||
|
health.gateway_alive = gw_ok
|
||||||
|
health.process_alive = proc_ok
|
||||||
|
health.heartbeat_age = hb_age
|
||||||
|
|
||||||
|
logger.debug("[%s] gateway=%s process=%s heartbeat=%s",
|
||||||
|
gw.name, gw_ok, proc_ok, hb_ok)
|
||||||
|
logger.debug("[%s] gw: %s | proc: %s | hb: %s",
|
||||||
|
gw.name, gw_msg, proc_msg, hb_msg)
|
||||||
|
|
||||||
|
checks_ok = sum([gw_ok, proc_ok, hb_ok])
|
||||||
|
if checks_ok == 3:
|
||||||
|
health.state = AgentState.HEALTHY
|
||||||
|
elif checks_ok == 0:
|
||||||
|
health.state = AgentState.DEAD
|
||||||
|
else:
|
||||||
|
health.state = AgentState.DEGRADED
|
||||||
|
|
||||||
|
return health
|
||||||
|
|
||||||
|
|
||||||
|
# ── Notification channels ────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _http(method: str, url: str, data: Optional[dict] = None,
|
||||||
|
headers: Optional[dict] = None) -> Optional[Any]:
|
||||||
|
"""Minimal HTTP helper. Returns parsed JSON or None on failure."""
|
||||||
|
body = json.dumps(data).encode() if data else None
|
||||||
|
req = urllib.request.Request(url, data=body, method=method)
|
||||||
|
req.add_header("Content-Type", "application/json")
|
||||||
|
req.add_header("Accept", "application/json")
|
||||||
|
if headers:
|
||||||
|
for k, v in headers.items():
|
||||||
|
req.add_header(k, v)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||||
|
raw = resp.read().decode()
|
||||||
|
return json.loads(raw) if raw.strip() else {}
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
logger.warning("HTTP %d %s: %s", e.code, url, e.read().decode()[:200])
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("HTTP error %s: %s", url, e)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def send_telegram_approval_request(
|
||||||
|
bot_token: str,
|
||||||
|
chat_id: str,
|
||||||
|
agent_name: str,
|
||||||
|
mission_label: str,
|
||||||
|
dead_seconds: float,
|
||||||
|
dry_run: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Post a Telegram message asking for revive approval.
|
||||||
|
|
||||||
|
The operator should reply with "revive <agent_name>" or "+1" to approve.
|
||||||
|
Returns True if the message was sent successfully.
|
||||||
|
"""
|
||||||
|
if dry_run:
|
||||||
|
logger.info("[DRY-RUN] Would send Telegram approval request for %s", agent_name)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if not bot_token or not chat_id:
|
||||||
|
logger.warning("Telegram not configured; cannot send approval request for %s", agent_name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
text = (
|
||||||
|
f"🔴 *Resurrection Pool Alert*\n\n"
|
||||||
|
f"Agent *{agent_name}* has been dead for {int(dead_seconds)}s.\n"
|
||||||
|
f"Mission: _{mission_label or 'unassigned'}_\n\n"
|
||||||
|
f"Reply `revive {agent_name}` to authorize restart.\n"
|
||||||
|
f"Reply `skip {agent_name}` to suppress for this cycle."
|
||||||
|
)
|
||||||
|
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||||
|
result = _http("POST", url, {
|
||||||
|
"chat_id": chat_id,
|
||||||
|
"text": text,
|
||||||
|
"parse_mode": "Markdown",
|
||||||
|
})
|
||||||
|
if result and result.get("ok"):
|
||||||
|
logger.info("Telegram approval request sent for %s (message_id=%s)",
|
||||||
|
agent_name, result.get("result", {}).get("message_id"))
|
||||||
|
return True
|
||||||
|
logger.warning("Telegram send failed for %s: %s", agent_name, result)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def poll_telegram_for_approval(
|
||||||
|
bot_token: str,
|
||||||
|
agent_name: str,
|
||||||
|
timeout_seconds: int,
|
||||||
|
dry_run: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Poll Telegram getUpdates for a revive approval message.
|
||||||
|
|
||||||
|
Looks for messages containing "revive <agent_name>" (case-insensitive).
|
||||||
|
Returns True if approved within timeout, False otherwise.
|
||||||
|
"""
|
||||||
|
if dry_run:
|
||||||
|
logger.info("[DRY-RUN] Simulating Telegram approval for %s (auto-approve)", agent_name)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if not bot_token:
|
||||||
|
return False
|
||||||
|
|
||||||
|
deadline = time.time() + timeout_seconds
|
||||||
|
offset = None
|
||||||
|
|
||||||
|
while time.time() < deadline:
|
||||||
|
params = "?timeout=30&limit=10"
|
||||||
|
if offset is not None:
|
||||||
|
params += f"&offset={offset}"
|
||||||
|
url = f"https://api.telegram.org/bot{bot_token}/getUpdates{params}"
|
||||||
|
|
||||||
|
result = _http("GET", url)
|
||||||
|
if not result or not result.get("ok"):
|
||||||
|
time.sleep(5)
|
||||||
|
continue
|
||||||
|
|
||||||
|
for update in result.get("result", []):
|
||||||
|
offset = update["update_id"] + 1
|
||||||
|
msg_text = (
|
||||||
|
update.get("message", {}).get("text", "")
|
||||||
|
or update.get("channel_post", {}).get("text", "")
|
||||||
|
).lower().strip()
|
||||||
|
|
||||||
|
if f"revive {agent_name.lower()}" in msg_text or msg_text == "+1":
|
||||||
|
logger.info("Telegram approval received for %s", agent_name)
|
||||||
|
return True
|
||||||
|
if f"skip {agent_name.lower()}" in msg_text:
|
||||||
|
logger.info("Telegram skip received for %s; aborting revive", agent_name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
logger.info("Approval timeout for %s after %ds", agent_name, timeout_seconds)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def send_nostr_approval_request(
|
||||||
|
relay_url: str,
|
||||||
|
recipient_pubkey: str,
|
||||||
|
agent_name: str,
|
||||||
|
mission_label: str,
|
||||||
|
dead_seconds: float,
|
||||||
|
dry_run: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Post a Nostr direct-message (kind 4) approval request.
|
||||||
|
|
||||||
|
Falls back gracefully if websockets is not installed — logs a warning
|
||||||
|
instead of crashing the pool. Returns True if sent, False otherwise.
|
||||||
|
"""
|
||||||
|
if dry_run:
|
||||||
|
logger.info("[DRY-RUN] Would send Nostr approval request for %s", agent_name)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if not relay_url or not recipient_pubkey:
|
||||||
|
logger.debug("Nostr not configured; skipping DM for %s", agent_name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
content = (
|
||||||
|
f"[Resurrection Pool] Agent '{agent_name}' is dead ({int(dead_seconds)}s). "
|
||||||
|
f"Mission: {mission_label or 'unassigned'}. "
|
||||||
|
f"Reply 'revive {agent_name}' to authorize restart."
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
async def _send():
|
||||||
|
import websockets # type: ignore
|
||||||
|
event = {
|
||||||
|
"kind": 4,
|
||||||
|
"pubkey": "", # filled by relay/identity in prod
|
||||||
|
"created_at": int(time.time()),
|
||||||
|
"tags": [["p", recipient_pubkey]],
|
||||||
|
"content": content,
|
||||||
|
"id": "",
|
||||||
|
"sig": "",
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
async with websockets.connect(relay_url, open_timeout=10) as ws:
|
||||||
|
await ws.send(json.dumps(["EVENT", event]))
|
||||||
|
logger.info("Nostr DM sent for %s via %s", agent_name, relay_url)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Nostr send failed for %s: %s", agent_name, e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
return asyncio.run(_send())
|
||||||
|
except ImportError:
|
||||||
|
logger.warning("websockets library not installed; Nostr notifications disabled")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Nostr notification error for %s: %s", agent_name, e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Revive actions ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def revive_agent(gw: GatewayConfig, dry_run: bool = False) -> bool:
|
||||||
|
"""
|
||||||
|
Attempt to restart an agent using its configured revive_cmd.
|
||||||
|
|
||||||
|
Returns True if the command was launched without error.
|
||||||
|
Does NOT wait for the agent to become healthy — the next poll cycle
|
||||||
|
will detect recovery via the heartbeat file.
|
||||||
|
"""
|
||||||
|
if not gw.revive_cmd:
|
||||||
|
logger.warning("[%s] No revive_cmd configured; cannot restart", gw.name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
logger.info("[DRY-RUN] Would exec: %s", gw.revive_cmd)
|
||||||
|
return True
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("[%s] Launching: %s", gw.name, gw.revive_cmd)
|
||||||
|
proc = subprocess.Popen(
|
||||||
|
gw.revive_cmd,
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
start_new_session=True, # detach from our process group
|
||||||
|
)
|
||||||
|
logger.info("[%s] Revive process started (PID %d)", gw.name, proc.pid)
|
||||||
|
return True
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
logger.error("[%s] Revive cmd not found: %s", gw.name, e)
|
||||||
|
except PermissionError as e:
|
||||||
|
logger.error("[%s] Revive cmd permission denied: %s", gw.name, e)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("[%s] Revive failed: %s", gw.name, e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Gitea integration ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _gitea(method: str, path: str, cfg: PoolConfig, data: Optional[dict] = None) -> Any:
|
||||||
|
"""Gitea API helper."""
|
||||||
|
url = f"{cfg.gitea_url.rstrip('/')}/api/v1{path}"
|
||||||
|
headers = {}
|
||||||
|
if cfg.gitea_token:
|
||||||
|
headers["Authorization"] = f"token {cfg.gitea_token}"
|
||||||
|
return _http(method, url, data=data, headers=headers)
|
||||||
|
|
||||||
|
|
||||||
|
def find_open_pool_issue(cfg: PoolConfig, agent_name: str) -> Optional[dict]:
|
||||||
|
"""Look for an existing open pool issue for this agent."""
|
||||||
|
issues = _gitea("GET",
|
||||||
|
f"/repos/{cfg.gitea_repo}/issues"
|
||||||
|
f"?state=open&type=issues&limit=50",
|
||||||
|
cfg)
|
||||||
|
if not issues or not isinstance(issues, list):
|
||||||
|
return None
|
||||||
|
prefix = f"{POOL_TITLE_PREFIX} {agent_name}:"
|
||||||
|
for issue in issues:
|
||||||
|
if issue.get("title", "").startswith(prefix):
|
||||||
|
return issue
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def open_death_issue(cfg: PoolConfig, health: AgentHealth, mission_label: str) -> None:
|
||||||
|
"""Create a Gitea issue recording an agent death."""
|
||||||
|
if not cfg.gitea_token:
|
||||||
|
logger.debug("No Gitea token; skipping issue creation for %s", health.name)
|
||||||
|
return
|
||||||
|
title = f"{POOL_TITLE_PREFIX} {health.name}: agent dead ({int(health.consecutive_dead_seconds)}s)"
|
||||||
|
body = (
|
||||||
|
f"## 🔴 Agent Death Detected — {health.name}\n\n"
|
||||||
|
f"**Mission:** {mission_label or 'unassigned'} \n"
|
||||||
|
f"**Dead for:** {int(health.consecutive_dead_seconds)}s \n"
|
||||||
|
f"**Gateway alive:** {health.gateway_alive} \n"
|
||||||
|
f"**Process alive:** {health.process_alive} \n"
|
||||||
|
f"**Heartbeat age:** "
|
||||||
|
f"{f'{int(health.heartbeat_age)}s' if health.heartbeat_age is not None else 'N/A'} \n\n"
|
||||||
|
f"*Logged by Resurrection Pool at "
|
||||||
|
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"
|
||||||
|
)
|
||||||
|
result = _gitea("POST", f"/repos/{cfg.gitea_repo}/issues", cfg,
|
||||||
|
data={"title": title, "body": body})
|
||||||
|
if result and result.get("number"):
|
||||||
|
logger.info("Opened death issue #%d for %s", result["number"], health.name)
|
||||||
|
|
||||||
|
|
||||||
|
def close_death_issue(cfg: PoolConfig, agent_name: str, revived: bool) -> None:
|
||||||
|
"""Close an existing death issue when the agent recovers."""
|
||||||
|
issue = find_open_pool_issue(cfg, agent_name)
|
||||||
|
if not issue:
|
||||||
|
return
|
||||||
|
num = issue["number"]
|
||||||
|
verb = "revived and healthy" if revived else "recovered (external)"
|
||||||
|
_gitea("POST", f"/repos/{cfg.gitea_repo}/issues/{num}/comments", cfg,
|
||||||
|
data={"body": f"## 🟢 Agent {agent_name} — {verb}\n\n"
|
||||||
|
f"*Resurrection Pool confirmed recovery at "
|
||||||
|
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"})
|
||||||
|
_gitea("PATCH", f"/repos/{cfg.gitea_repo}/issues/{num}", cfg,
|
||||||
|
data={"state": "closed"})
|
||||||
|
logger.info("Closed death issue #%d for %s", num, agent_name)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Policy engine ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def apply_revive_policy(
|
||||||
|
gw: GatewayConfig,
|
||||||
|
health: AgentHealth,
|
||||||
|
mission: Optional[MissionConfig],
|
||||||
|
cfg: PoolConfig,
|
||||||
|
dry_run: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Execute the revive policy for a dead agent.
|
||||||
|
|
||||||
|
Returns True if a revive was attempted, False otherwise.
|
||||||
|
"""
|
||||||
|
policy = mission.revive_policy if mission else RevivePolicy.ASK
|
||||||
|
mission_label = mission.label if mission else ""
|
||||||
|
|
||||||
|
logger.info("[%s] Applying revive policy=%s (dead %ds)",
|
||||||
|
gw.name, policy.value, int(health.consecutive_dead_seconds))
|
||||||
|
|
||||||
|
if policy == RevivePolicy.NEVER:
|
||||||
|
logger.info("[%s] Policy=never — recording death, no restart", gw.name)
|
||||||
|
open_death_issue(cfg, health, mission_label)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if policy == RevivePolicy.AUTO:
|
||||||
|
logger.info("[%s] Policy=auto — restarting immediately", gw.name)
|
||||||
|
ok = revive_agent(gw, dry_run=dry_run)
|
||||||
|
if ok:
|
||||||
|
logger.info("[%s] Auto-revive launched", gw.name)
|
||||||
|
else:
|
||||||
|
logger.warning("[%s] Auto-revive failed — opening Gitea issue", gw.name)
|
||||||
|
open_death_issue(cfg, health, mission_label)
|
||||||
|
return ok
|
||||||
|
|
||||||
|
# policy == RevivePolicy.ASK
|
||||||
|
if health.revive_pending:
|
||||||
|
logger.debug("[%s] Approval already pending; waiting", gw.name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Send approval requests over all configured channels
|
||||||
|
health.revive_pending = True
|
||||||
|
|
||||||
|
tg_sent = send_telegram_approval_request(
|
||||||
|
cfg.telegram_bot_token, cfg.telegram_chat_id,
|
||||||
|
gw.name, mission_label, health.consecutive_dead_seconds,
|
||||||
|
dry_run=dry_run,
|
||||||
|
)
|
||||||
|
send_nostr_approval_request(
|
||||||
|
cfg.nostr_relay_url, cfg.nostr_recipient_pubkey,
|
||||||
|
gw.name, mission_label, health.consecutive_dead_seconds,
|
||||||
|
dry_run=dry_run,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not tg_sent and not cfg.nostr_relay_url:
|
||||||
|
logger.warning(
|
||||||
|
"[%s] No approval channel available (configure telegram or nostr). "
|
||||||
|
"Falling back to auto-revive.", gw.name
|
||||||
|
)
|
||||||
|
health.revive_pending = False
|
||||||
|
return revive_agent(gw, dry_run=dry_run)
|
||||||
|
|
||||||
|
# Poll for approval
|
||||||
|
approved = poll_telegram_for_approval(
|
||||||
|
cfg.telegram_bot_token, gw.name,
|
||||||
|
cfg.approval_timeout, dry_run=dry_run,
|
||||||
|
)
|
||||||
|
health.revive_pending = False
|
||||||
|
|
||||||
|
if approved:
|
||||||
|
logger.info("[%s] Approval received — launching revive", gw.name)
|
||||||
|
ok = revive_agent(gw, dry_run=dry_run)
|
||||||
|
if not ok:
|
||||||
|
open_death_issue(cfg, health, mission_label)
|
||||||
|
return ok
|
||||||
|
|
||||||
|
logger.info("[%s] Revive not approved — recording death", gw.name)
|
||||||
|
open_death_issue(cfg, health, mission_label)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main pool loop ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class ResurrectionPool:
|
||||||
|
"""
|
||||||
|
Fleet-wide health monitor and revive coordinator.
|
||||||
|
|
||||||
|
Maintains an in-memory health table keyed by agent name.
|
||||||
|
Drives one poll-sweep per interval across all configured gateways.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, cfg: PoolConfig, dry_run: bool = False):
|
||||||
|
self.cfg = cfg
|
||||||
|
self.dry_run = dry_run
|
||||||
|
self._health: Dict[str, AgentHealth] = {}
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
# ── Agent-to-mission lookup ──────────────────────────────────────
|
||||||
|
|
||||||
|
def _mission_for(self, agent_name: str) -> Optional[MissionConfig]:
|
||||||
|
"""Return the first mission that lists this agent."""
|
||||||
|
for m in self.cfg.missions:
|
||||||
|
if agent_name in m.agents:
|
||||||
|
return m
|
||||||
|
return None
|
||||||
|
|
||||||
|
# ── Gateway lookup ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _gateway_for(self, agent_name: str) -> Optional[GatewayConfig]:
|
||||||
|
for gw in self.cfg.gateways:
|
||||||
|
if gw.name == agent_name:
|
||||||
|
return gw
|
||||||
|
return None
|
||||||
|
|
||||||
|
# ── Health state tracking ────────────────────────────────────────
|
||||||
|
|
||||||
|
def _update_health(self, agent_name: str, new: AgentHealth) -> AgentHealth:
|
||||||
|
"""
|
||||||
|
Merge a fresh health snapshot into the pool's running state.
|
||||||
|
|
||||||
|
- Preserves first_dead_at timestamp so dead_seconds accumulates.
|
||||||
|
- Resets first_dead_at when agent returns healthy.
|
||||||
|
- Tracks revive_attempts across cycles.
|
||||||
|
"""
|
||||||
|
prev = self._health.get(agent_name)
|
||||||
|
|
||||||
|
if new.state == AgentState.HEALTHY:
|
||||||
|
new.first_dead_at = None
|
||||||
|
new.revive_attempts = 0
|
||||||
|
new.revive_pending = False
|
||||||
|
if prev and prev.state == AgentState.DEAD:
|
||||||
|
logger.info("[%s] Agent recovered!", agent_name)
|
||||||
|
close_death_issue(self.cfg, agent_name, revived=prev.revive_attempts > 0)
|
||||||
|
else:
|
||||||
|
if prev:
|
||||||
|
new.revive_pending = prev.revive_pending
|
||||||
|
new.revive_attempts = prev.revive_attempts
|
||||||
|
if prev.first_dead_at is not None:
|
||||||
|
new.first_dead_at = prev.first_dead_at
|
||||||
|
elif new.state in (AgentState.DEAD, AgentState.DEGRADED):
|
||||||
|
new.first_dead_at = new.last_checked
|
||||||
|
else:
|
||||||
|
new.first_dead_at = new.last_checked
|
||||||
|
|
||||||
|
self._health[agent_name] = new
|
||||||
|
return new
|
||||||
|
|
||||||
|
# ── Single sweep ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def sweep(self) -> Dict[str, AgentHealth]:
|
||||||
|
"""Poll all gateways once and apply revive policies where needed."""
|
||||||
|
if not self.cfg.gateways:
|
||||||
|
logger.debug("No gateways configured; nothing to sweep")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
for gw in self.cfg.gateways:
|
||||||
|
raw = assess_agent(gw, self.cfg.dead_threshold)
|
||||||
|
health = self._update_health(gw.name, raw)
|
||||||
|
icon = {"healthy": "✅", "degraded": "⚠️", "dead": "🔴",
|
||||||
|
"reviving": "🔄", "unknown": "❓"}.get(health.state.value, "?")
|
||||||
|
logger.info("%s [%s] state=%s dead_for=%ds",
|
||||||
|
icon, gw.name, health.state.value,
|
||||||
|
int(health.consecutive_dead_seconds))
|
||||||
|
|
||||||
|
if health.state == AgentState.DEAD:
|
||||||
|
if health.consecutive_dead_seconds >= self.cfg.dead_threshold:
|
||||||
|
if not health.revive_pending:
|
||||||
|
mission = self._mission_for(gw.name)
|
||||||
|
launched = apply_revive_policy(
|
||||||
|
gw, health, mission, self.cfg, dry_run=self.dry_run
|
||||||
|
)
|
||||||
|
if launched:
|
||||||
|
health.revive_attempts += 1
|
||||||
|
health.state = AgentState.REVIVING
|
||||||
|
|
||||||
|
return dict(self._health)
|
||||||
|
|
||||||
|
def status(self) -> List[dict]:
|
||||||
|
"""Return current pool state as a list of dicts (for --status flag)."""
|
||||||
|
return [h.to_dict() for h in self._health.values()]
|
||||||
|
|
||||||
|
# ── Run loop ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def run(self) -> None:
|
||||||
|
"""Blocking poll loop. Handles SIGTERM/SIGINT cleanly."""
|
||||||
|
self._running = True
|
||||||
|
|
||||||
|
def _stop(signum, frame):
|
||||||
|
logger.info("Signal %d received; stopping pool", signum)
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
signal.signal(signal.SIGTERM, _stop)
|
||||||
|
signal.signal(signal.SIGINT, _stop)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Resurrection Pool starting — %d gateway(s), poll_interval=%ds, dead_threshold=%ds",
|
||||||
|
len(self.cfg.gateways), self.cfg.poll_interval, self.cfg.dead_threshold,
|
||||||
|
)
|
||||||
|
|
||||||
|
while self._running:
|
||||||
|
self.sweep()
|
||||||
|
# Sleep in 1s ticks so we respond to signals promptly
|
||||||
|
for _ in range(self.cfg.poll_interval):
|
||||||
|
if not self._running:
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
logger.info("Resurrection Pool stopped")
|
||||||
|
|
||||||
|
def run_once(self) -> bool:
|
||||||
|
"""Single sweep; returns True if all agents are healthy."""
|
||||||
|
results = self.sweep()
|
||||||
|
return all(h.state == AgentState.HEALTHY for h in results.values())
|
||||||
|
|
||||||
|
|
||||||
|
# ── CLI ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def build_parser() -> argparse.ArgumentParser:
|
||||||
|
p = argparse.ArgumentParser(
|
||||||
|
description="Resurrection Pool — fleet health polling and auto-revive",
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
)
|
||||||
|
p.add_argument("--config", metavar="PATH",
|
||||||
|
help="Path to resurrection_pool.json config file")
|
||||||
|
p.add_argument("--once", action="store_true",
|
||||||
|
help="Run one sweep and exit (for cron / smoke tests)")
|
||||||
|
p.add_argument("--status", action="store_true",
|
||||||
|
help="Print current pool state as JSON and exit")
|
||||||
|
p.add_argument("--dry-run", action="store_true",
|
||||||
|
help="Run all checks but do not restart agents or alert channels")
|
||||||
|
p.add_argument("--poll-interval", type=int, default=None,
|
||||||
|
help="Override poll interval (seconds)")
|
||||||
|
p.add_argument("--dead-threshold", type=int, default=None,
|
||||||
|
help="Override dead threshold (seconds)")
|
||||||
|
p.add_argument("-v", "--verbose", action="store_true",
|
||||||
|
help="Enable DEBUG logging")
|
||||||
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
args = build_parser().parse_args()
|
||||||
|
|
||||||
|
if args.verbose:
|
||||||
|
logging.getLogger().setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
cfg = PoolConfig.load(Path(args.config) if args.config else None)
|
||||||
|
|
||||||
|
if args.poll_interval is not None:
|
||||||
|
cfg.poll_interval = args.poll_interval
|
||||||
|
if args.dead_threshold is not None:
|
||||||
|
cfg.dead_threshold = args.dead_threshold
|
||||||
|
|
||||||
|
pool = ResurrectionPool(cfg, dry_run=args.dry_run)
|
||||||
|
|
||||||
|
if args.status:
|
||||||
|
# Print last-known state (empty if never run)
|
||||||
|
pool.sweep()
|
||||||
|
print(json.dumps(pool.status(), indent=2))
|
||||||
|
return
|
||||||
|
|
||||||
|
if args.once:
|
||||||
|
healthy = pool.run_once()
|
||||||
|
sys.exit(0 if healthy else 1)
|
||||||
|
|
||||||
|
pool.run()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
659
tests/test_resurrection_pool.py
Normal file
659
tests/test_resurrection_pool.py
Normal file
@@ -0,0 +1,659 @@
|
|||||||
|
"""Tests for the Resurrection Pool — health polling, dead-agent detection, auto-revive.
|
||||||
|
|
||||||
|
Validates:
|
||||||
|
- GatewayConfig / PoolConfig loading from dicts and files
|
||||||
|
- RevivePolicy enum parsing (including unknown values)
|
||||||
|
- probe_gateway / probe_process / probe_heartbeat functions
|
||||||
|
- assess_agent combining all three probes into AgentHealth
|
||||||
|
- AgentHealth state: HEALTHY / DEGRADED / DEAD
|
||||||
|
- ResurrectionPool._update_health — dead_at tracking and recovery
|
||||||
|
- ResurrectionPool.sweep — policy routing (auto/ask/never)
|
||||||
|
- apply_revive_policy — auto-restart, never, ask-with-no-channel fallback
|
||||||
|
- Telegram / Nostr dry-run paths
|
||||||
|
- Gitea issue helpers (mocked)
|
||||||
|
- CLI argument parsing
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import importlib.util
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch, call
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# ── Module loading (same pattern as test_nexus_watchdog.py) ──────────
|
||||||
|
|
||||||
|
PROJECT_ROOT = Path(__file__).parent.parent
|
||||||
|
|
||||||
|
_rp_spec = importlib.util.spec_from_file_location(
|
||||||
|
"resurrection_pool_test",
|
||||||
|
PROJECT_ROOT / "bin" / "resurrection_pool.py",
|
||||||
|
)
|
||||||
|
_rp = importlib.util.module_from_spec(_rp_spec)
|
||||||
|
sys.modules["resurrection_pool_test"] = _rp
|
||||||
|
_rp_spec.loader.exec_module(_rp)
|
||||||
|
|
||||||
|
# Convenience aliases
|
||||||
|
AgentHealth = _rp.AgentHealth
|
||||||
|
AgentState = _rp.AgentState
|
||||||
|
GatewayConfig = _rp.GatewayConfig
|
||||||
|
MissionConfig = _rp.MissionConfig
|
||||||
|
PoolConfig = _rp.PoolConfig
|
||||||
|
ResurrectionPool = _rp.ResurrectionPool
|
||||||
|
RevivePolicy = _rp.RevivePolicy
|
||||||
|
apply_revive_policy = _rp.apply_revive_policy
|
||||||
|
assess_agent = _rp.assess_agent
|
||||||
|
build_parser = _rp.build_parser
|
||||||
|
probe_gateway = _rp.probe_gateway
|
||||||
|
probe_heartbeat = _rp.probe_heartbeat
|
||||||
|
probe_process = _rp.probe_process
|
||||||
|
revive_agent = _rp.revive_agent
|
||||||
|
send_telegram_approval_request = _rp.send_telegram_approval_request
|
||||||
|
send_nostr_approval_request = _rp.send_nostr_approval_request
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_gw(name="timmy", host="localhost", port=8765,
|
||||||
|
heartbeat_path=None, process_pattern="nexus_think",
|
||||||
|
revive_cmd=None, tmp_path=None):
|
||||||
|
if heartbeat_path is None and tmp_path is not None:
|
||||||
|
heartbeat_path = str(tmp_path / "heartbeat.json")
|
||||||
|
elif heartbeat_path is None:
|
||||||
|
heartbeat_path = "~/.nexus/heartbeat.json"
|
||||||
|
return GatewayConfig(
|
||||||
|
name=name,
|
||||||
|
host=host,
|
||||||
|
port=port,
|
||||||
|
heartbeat_path=heartbeat_path,
|
||||||
|
process_pattern=process_pattern,
|
||||||
|
revive_cmd=revive_cmd or [],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_heartbeat(path: Path, age_seconds: float = 10.0):
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
data = {
|
||||||
|
"pid": os.getpid(),
|
||||||
|
"timestamp": time.time() - age_seconds,
|
||||||
|
"cycle": 7,
|
||||||
|
"model": "timmy:v0.1",
|
||||||
|
"status": "thinking",
|
||||||
|
}
|
||||||
|
path.write_text(json.dumps(data))
|
||||||
|
|
||||||
|
|
||||||
|
def _make_cfg(**kwargs) -> PoolConfig:
|
||||||
|
return PoolConfig(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# ── RevivePolicy tests ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestRevivePolicy:
|
||||||
|
def test_valid_values(self):
|
||||||
|
assert RevivePolicy("auto") == RevivePolicy.AUTO
|
||||||
|
assert RevivePolicy("ask") == RevivePolicy.ASK
|
||||||
|
assert RevivePolicy("never") == RevivePolicy.NEVER
|
||||||
|
|
||||||
|
def test_mission_from_dict_unknown_policy(self):
|
||||||
|
m = MissionConfig.from_dict("m1", {"revive_policy": "bogus", "agents": ["a"]})
|
||||||
|
assert m.revive_policy == RevivePolicy.ASK
|
||||||
|
|
||||||
|
def test_mission_from_dict_valid_policy(self):
|
||||||
|
m = MissionConfig.from_dict("m1", {"revive_policy": "auto", "agents": ["x"]})
|
||||||
|
assert m.revive_policy == RevivePolicy.AUTO
|
||||||
|
|
||||||
|
def test_mission_agents_default_empty(self):
|
||||||
|
m = MissionConfig.from_dict("m2", {})
|
||||||
|
assert m.agents == []
|
||||||
|
|
||||||
|
|
||||||
|
# ── PoolConfig loading tests ─────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestPoolConfig:
|
||||||
|
def test_defaults(self):
|
||||||
|
cfg = PoolConfig()
|
||||||
|
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
|
||||||
|
assert cfg.dead_threshold == _rp.DEFAULT_DEAD_THRESHOLD
|
||||||
|
assert cfg.gateways == []
|
||||||
|
assert cfg.missions == []
|
||||||
|
|
||||||
|
def test_from_dict_basic(self):
|
||||||
|
data = {
|
||||||
|
"poll_interval": 30,
|
||||||
|
"dead_threshold": 120,
|
||||||
|
"gateways": [{"name": "allegro", "port": 9000}],
|
||||||
|
"missions": {
|
||||||
|
"m1": {"label": "Sprint", "agents": ["allegro"], "revive_policy": "auto"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cfg = PoolConfig.from_dict(data)
|
||||||
|
assert cfg.poll_interval == 30
|
||||||
|
assert cfg.dead_threshold == 120
|
||||||
|
assert len(cfg.gateways) == 1
|
||||||
|
assert cfg.gateways[0].name == "allegro"
|
||||||
|
assert cfg.gateways[0].port == 9000
|
||||||
|
assert len(cfg.missions) == 1
|
||||||
|
assert cfg.missions[0].revive_policy == RevivePolicy.AUTO
|
||||||
|
|
||||||
|
def test_from_dict_telegram(self):
|
||||||
|
data = {"telegram": {"bot_token": "tok123", "chat_id": "-999"}}
|
||||||
|
cfg = PoolConfig.from_dict(data)
|
||||||
|
assert cfg.telegram_bot_token == "tok123"
|
||||||
|
assert cfg.telegram_chat_id == "-999"
|
||||||
|
|
||||||
|
def test_from_dict_nostr(self):
|
||||||
|
data = {"nostr": {"relay_url": "wss://relay.damus.io", "recipient_pubkey": "abc"}}
|
||||||
|
cfg = PoolConfig.from_dict(data)
|
||||||
|
assert cfg.nostr_relay_url == "wss://relay.damus.io"
|
||||||
|
assert cfg.nostr_recipient_pubkey == "abc"
|
||||||
|
|
||||||
|
def test_load_from_file(self, tmp_path):
|
||||||
|
config_file = tmp_path / "pool.json"
|
||||||
|
config_file.write_text(json.dumps({"poll_interval": 42}))
|
||||||
|
cfg = PoolConfig.load(config_file)
|
||||||
|
assert cfg.poll_interval == 42
|
||||||
|
|
||||||
|
def test_load_missing_file_returns_defaults(self, tmp_path):
|
||||||
|
cfg = PoolConfig.load(tmp_path / "nonexistent.json")
|
||||||
|
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
|
||||||
|
|
||||||
|
def test_load_corrupt_file_falls_through(self, tmp_path):
|
||||||
|
bad = tmp_path / "bad.json"
|
||||||
|
bad.write_text("not json {{")
|
||||||
|
cfg = PoolConfig.load(bad)
|
||||||
|
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
|
||||||
|
|
||||||
|
def test_load_env_var(self, tmp_path, monkeypatch):
|
||||||
|
config_file = tmp_path / "env_pool.json"
|
||||||
|
config_file.write_text(json.dumps({"dead_threshold": 99}))
|
||||||
|
monkeypatch.setenv("RESURRECTION_CONFIG", str(config_file))
|
||||||
|
cfg = PoolConfig.load()
|
||||||
|
assert cfg.dead_threshold == 99
|
||||||
|
|
||||||
|
|
||||||
|
# ── probe_gateway tests ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestProbeGateway:
|
||||||
|
def test_refused_port(self):
|
||||||
|
# Port 1 should always be refused on localhost
|
||||||
|
alive, msg = probe_gateway("localhost", 1, timeout=2)
|
||||||
|
assert not alive
|
||||||
|
assert "refused" in msg.lower() or "failed" in msg.lower()
|
||||||
|
|
||||||
|
def test_open_port(self):
|
||||||
|
# Open a real server socket to confirm TCP probe works
|
||||||
|
import threading
|
||||||
|
import socket as _socket
|
||||||
|
srv = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
|
||||||
|
srv.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
|
||||||
|
srv.bind(("127.0.0.1", 0))
|
||||||
|
srv.listen(1)
|
||||||
|
port = srv.getsockname()[1]
|
||||||
|
|
||||||
|
def accept_one():
|
||||||
|
try:
|
||||||
|
conn, _ = srv.accept()
|
||||||
|
conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
srv.close()
|
||||||
|
|
||||||
|
t = threading.Thread(target=accept_one, daemon=True)
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
alive, msg = probe_gateway("127.0.0.1", port, timeout=3)
|
||||||
|
t.join(timeout=2)
|
||||||
|
assert alive
|
||||||
|
assert "reachable" in msg.lower()
|
||||||
|
|
||||||
|
def test_invalid_host(self):
|
||||||
|
alive, msg = probe_gateway("this.host.does.not.exist.invalid", 8765, timeout=2)
|
||||||
|
assert not alive
|
||||||
|
|
||||||
|
|
||||||
|
# ── probe_process tests ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestProbeProcess:
|
||||||
|
def test_self_is_filtered(self):
|
||||||
|
# The test runner process matches "python" — but watchdog filters own PID.
|
||||||
|
# We just check it returns a tuple without error.
|
||||||
|
alive, msg = probe_process("definitely-no-process-xyz-12345")
|
||||||
|
assert not alive
|
||||||
|
assert isinstance(msg, str)
|
||||||
|
|
||||||
|
def test_running_process(self):
|
||||||
|
# pgrep for the current python process name should always find something
|
||||||
|
alive, msg = probe_process("pytest")
|
||||||
|
# May or may not find it depending on env; just check it doesn't crash
|
||||||
|
assert isinstance(alive, bool)
|
||||||
|
assert isinstance(msg, str)
|
||||||
|
|
||||||
|
@patch("subprocess.run", side_effect=FileNotFoundError)
|
||||||
|
def test_pgrep_missing(self, _mock):
|
||||||
|
alive, msg = probe_process("anything")
|
||||||
|
assert alive # graceful fallback: don't raise false alarms
|
||||||
|
assert "pgrep" in msg.lower()
|
||||||
|
|
||||||
|
|
||||||
|
# ── probe_heartbeat tests ────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestProbeHeartbeat:
|
||||||
|
def test_missing_file(self, tmp_path):
|
||||||
|
alive, age, msg = probe_heartbeat(tmp_path / "no.json", dead_threshold=300)
|
||||||
|
assert not alive
|
||||||
|
assert age is None
|
||||||
|
assert "no heartbeat" in msg.lower()
|
||||||
|
|
||||||
|
def test_fresh_heartbeat(self, tmp_path):
|
||||||
|
hb = tmp_path / "heartbeat.json"
|
||||||
|
_write_heartbeat(hb, age_seconds=10)
|
||||||
|
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||||
|
assert alive
|
||||||
|
assert age is not None and age < 20
|
||||||
|
assert "alive" in msg.lower()
|
||||||
|
|
||||||
|
def test_stale_heartbeat(self, tmp_path):
|
||||||
|
hb = tmp_path / "heartbeat.json"
|
||||||
|
_write_heartbeat(hb, age_seconds=600)
|
||||||
|
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||||
|
assert not alive
|
||||||
|
assert age > 500
|
||||||
|
assert "stale" in msg.lower()
|
||||||
|
|
||||||
|
def test_corrupt_json(self, tmp_path):
|
||||||
|
hb = tmp_path / "heartbeat.json"
|
||||||
|
hb.write_text("{ broken json")
|
||||||
|
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||||
|
assert not alive
|
||||||
|
assert "unreadable" in msg.lower()
|
||||||
|
|
||||||
|
def test_missing_timestamp_treated_as_stale(self, tmp_path):
|
||||||
|
hb = tmp_path / "heartbeat.json"
|
||||||
|
hb.write_text(json.dumps({"cycle": 1})) # no timestamp key
|
||||||
|
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||||
|
# timestamp defaults to 0 → very old → stale
|
||||||
|
assert not alive
|
||||||
|
|
||||||
|
|
||||||
|
# ── assess_agent tests ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestAssessAgent:
|
||||||
|
def _gw(self, tmp_path, port=1):
|
||||||
|
return _make_gw(tmp_path=tmp_path, port=port)
|
||||||
|
|
||||||
|
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
|
||||||
|
@patch("resurrection_pool_test.probe_process", return_value=(True, "ok"))
|
||||||
|
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 5.0, "ok"))
|
||||||
|
def test_all_healthy(self, _hb, _proc, _gw, tmp_path):
|
||||||
|
gw = self._gw(tmp_path)
|
||||||
|
h = assess_agent(gw, dead_threshold=300)
|
||||||
|
assert h.state == AgentState.HEALTHY
|
||||||
|
|
||||||
|
@patch("resurrection_pool_test.probe_gateway", return_value=(False, "refused"))
|
||||||
|
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
|
||||||
|
@patch("resurrection_pool_test.probe_heartbeat", return_value=(False, None, "missing"))
|
||||||
|
def test_all_dead(self, _hb, _proc, _gw, tmp_path):
|
||||||
|
gw = self._gw(tmp_path)
|
||||||
|
h = assess_agent(gw, dead_threshold=300)
|
||||||
|
assert h.state == AgentState.DEAD
|
||||||
|
|
||||||
|
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
|
||||||
|
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
|
||||||
|
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 10.0, "ok"))
|
||||||
|
def test_partially_failing_is_degraded(self, _hb, _proc, _gw, tmp_path):
|
||||||
|
gw = self._gw(tmp_path)
|
||||||
|
h = assess_agent(gw, dead_threshold=300)
|
||||||
|
assert h.state == AgentState.DEGRADED
|
||||||
|
|
||||||
|
|
||||||
|
# ── AgentHealth state machine tests ──────────────────────────────────
|
||||||
|
|
||||||
|
class TestAgentHealthTracking:
|
||||||
|
def _pool(self):
|
||||||
|
cfg = _make_cfg()
|
||||||
|
return ResurrectionPool(cfg, dry_run=True)
|
||||||
|
|
||||||
|
def test_first_dead_at_set_on_first_failure(self):
|
||||||
|
pool = self._pool()
|
||||||
|
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||||
|
last_checked=time.time())
|
||||||
|
result = pool._update_health("timmy", dead)
|
||||||
|
assert result.first_dead_at is not None
|
||||||
|
|
||||||
|
def test_first_dead_at_preserved_across_cycles(self):
|
||||||
|
pool = self._pool()
|
||||||
|
t0 = time.time() - 100
|
||||||
|
dead1 = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||||
|
last_checked=t0, first_dead_at=t0)
|
||||||
|
pool._update_health("timmy", dead1)
|
||||||
|
|
||||||
|
dead2 = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||||
|
last_checked=time.time())
|
||||||
|
result = pool._update_health("timmy", dead2)
|
||||||
|
assert result.first_dead_at == t0 # preserved
|
||||||
|
|
||||||
|
def test_recovery_resets_first_dead_at(self):
|
||||||
|
pool = self._pool()
|
||||||
|
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||||
|
last_checked=time.time())
|
||||||
|
pool._update_health("timmy", dead)
|
||||||
|
|
||||||
|
healthy = AgentHealth(name="timmy", state=AgentState.HEALTHY,
|
||||||
|
last_checked=time.time())
|
||||||
|
result = pool._update_health("timmy", healthy)
|
||||||
|
assert result.first_dead_at is None
|
||||||
|
assert result.revive_attempts == 0
|
||||||
|
|
||||||
|
def test_revive_attempts_increments(self):
|
||||||
|
pool = self._pool()
|
||||||
|
h = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||||
|
last_checked=time.time())
|
||||||
|
pool._health["timmy"] = h
|
||||||
|
h.revive_attempts = 2
|
||||||
|
prev = pool._health["timmy"]
|
||||||
|
|
||||||
|
new = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||||
|
last_checked=time.time())
|
||||||
|
result = pool._update_health("timmy", new)
|
||||||
|
assert result.revive_attempts == 2 # preserved from prev
|
||||||
|
|
||||||
|
|
||||||
|
# ── mission_for / gateway_for tests ─────────────────────────────────
|
||||||
|
|
||||||
|
class TestPoolLookups:
|
||||||
|
def test_mission_for_known_agent(self):
|
||||||
|
cfg = PoolConfig.from_dict({
|
||||||
|
"missions": {"m1": {"agents": ["timmy"], "revive_policy": "auto"}},
|
||||||
|
})
|
||||||
|
pool = ResurrectionPool(cfg, dry_run=True)
|
||||||
|
m = pool._mission_for("timmy")
|
||||||
|
assert m is not None
|
||||||
|
assert m.mission_id == "m1"
|
||||||
|
|
||||||
|
def test_mission_for_unknown_agent(self):
|
||||||
|
cfg = PoolConfig()
|
||||||
|
pool = ResurrectionPool(cfg, dry_run=True)
|
||||||
|
assert pool._mission_for("ghost") is None
|
||||||
|
|
||||||
|
def test_gateway_for(self):
|
||||||
|
cfg = PoolConfig.from_dict({
|
||||||
|
"gateways": [{"name": "allegro", "port": 9000}],
|
||||||
|
})
|
||||||
|
pool = ResurrectionPool(cfg, dry_run=True)
|
||||||
|
gw = pool._gateway_for("allegro")
|
||||||
|
assert gw is not None
|
||||||
|
assert gw.port == 9000
|
||||||
|
|
||||||
|
def test_gateway_for_missing(self):
|
||||||
|
pool = ResurrectionPool(PoolConfig(), dry_run=True)
|
||||||
|
assert pool._gateway_for("nobody") is None
|
||||||
|
|
||||||
|
|
||||||
|
# ── apply_revive_policy tests ────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestApplyRevivePolicy:
|
||||||
|
def _dead_health(self, name="timmy", dead_seconds=400):
|
||||||
|
h = AgentHealth(name=name, state=AgentState.DEAD,
|
||||||
|
first_dead_at=time.time() - dead_seconds,
|
||||||
|
last_checked=time.time())
|
||||||
|
return h
|
||||||
|
|
||||||
|
def test_never_policy_no_revive(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path)
|
||||||
|
mission = MissionConfig("m1", revive_policy=RevivePolicy.NEVER)
|
||||||
|
cfg = _make_cfg()
|
||||||
|
health = self._dead_health()
|
||||||
|
|
||||||
|
with patch.object(_rp, "open_death_issue") as mock_issue:
|
||||||
|
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||||
|
assert not result
|
||||||
|
mock_issue.assert_called_once()
|
||||||
|
|
||||||
|
def test_auto_policy_launches_revive(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "revive"])
|
||||||
|
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
|
||||||
|
cfg = _make_cfg()
|
||||||
|
health = self._dead_health()
|
||||||
|
|
||||||
|
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
|
||||||
|
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||||
|
assert result
|
||||||
|
mock_revive.assert_called_once_with(gw, dry_run=True)
|
||||||
|
|
||||||
|
def test_auto_policy_opens_issue_on_revive_failure(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path)
|
||||||
|
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
|
||||||
|
cfg = _make_cfg()
|
||||||
|
health = self._dead_health()
|
||||||
|
|
||||||
|
with patch.object(_rp, "revive_agent", return_value=False), \
|
||||||
|
patch.object(_rp, "open_death_issue") as mock_issue:
|
||||||
|
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||||
|
assert not result
|
||||||
|
mock_issue.assert_called_once()
|
||||||
|
|
||||||
|
def test_ask_policy_dry_run_approves(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
|
||||||
|
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
|
||||||
|
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
|
||||||
|
health = self._dead_health()
|
||||||
|
|
||||||
|
# dry_run → send_telegram returns True, poll returns True
|
||||||
|
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
|
||||||
|
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||||
|
assert result
|
||||||
|
mock_revive.assert_called_once()
|
||||||
|
|
||||||
|
def test_ask_policy_no_channel_falls_back_to_auto(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
|
||||||
|
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
|
||||||
|
cfg = _make_cfg() # no telegram, no nostr
|
||||||
|
health = self._dead_health()
|
||||||
|
|
||||||
|
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
|
||||||
|
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||||
|
assert result
|
||||||
|
mock_revive.assert_called_once()
|
||||||
|
|
||||||
|
def test_skip_if_revive_pending(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path)
|
||||||
|
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
|
||||||
|
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
|
||||||
|
health = self._dead_health()
|
||||||
|
health.revive_pending = True # approval already outstanding
|
||||||
|
|
||||||
|
with patch.object(_rp, "revive_agent") as mock_revive:
|
||||||
|
result = apply_revive_policy(gw, health, mission, cfg, dry_run=False)
|
||||||
|
assert not result
|
||||||
|
mock_revive.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
# ── revive_agent tests ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestReviveAgent:
|
||||||
|
def test_no_revive_cmd(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path, revive_cmd=[])
|
||||||
|
assert not revive_agent(gw)
|
||||||
|
|
||||||
|
def test_dry_run(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
|
||||||
|
assert revive_agent(gw, dry_run=True)
|
||||||
|
|
||||||
|
def test_launches_process(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["true"])
|
||||||
|
result = revive_agent(gw, dry_run=False)
|
||||||
|
assert result
|
||||||
|
|
||||||
|
def test_bad_command(self, tmp_path):
|
||||||
|
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["/does/not/exist/xyz"])
|
||||||
|
result = revive_agent(gw, dry_run=False)
|
||||||
|
assert not result
|
||||||
|
|
||||||
|
|
||||||
|
# ── Notification channel tests ────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNotifications:
|
||||||
|
def test_telegram_dry_run(self):
|
||||||
|
assert send_telegram_approval_request(
|
||||||
|
"token", "chat", "timmy", "Sprint", 400, dry_run=True
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_telegram_no_token(self):
|
||||||
|
assert not send_telegram_approval_request(
|
||||||
|
"", "", "timmy", "Sprint", 400, dry_run=False
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_nostr_dry_run(self):
|
||||||
|
result = send_nostr_approval_request(
|
||||||
|
"wss://relay.damus.io", "pubkey123",
|
||||||
|
"timmy", "Sprint", 400, dry_run=True
|
||||||
|
)
|
||||||
|
assert result
|
||||||
|
|
||||||
|
def test_nostr_no_config(self):
|
||||||
|
result = send_nostr_approval_request(
|
||||||
|
"", "",
|
||||||
|
"timmy", "Sprint", 400, dry_run=False
|
||||||
|
)
|
||||||
|
assert not result
|
||||||
|
|
||||||
|
|
||||||
|
# ── ResurrectionPool.sweep integration tests ─────────────────────────
|
||||||
|
|
||||||
|
class TestPoolSweep:
|
||||||
|
def test_sweep_empty_gateways(self):
|
||||||
|
pool = ResurrectionPool(PoolConfig(), dry_run=True)
|
||||||
|
result = pool.sweep()
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
@patch("resurrection_pool_test.assess_agent")
|
||||||
|
def test_sweep_healthy_agent_no_policy_triggered(self, mock_assess):
|
||||||
|
gw = GatewayConfig(name="timmy")
|
||||||
|
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
|
||||||
|
pool = ResurrectionPool(cfg, dry_run=True)
|
||||||
|
|
||||||
|
mock_assess.return_value = AgentHealth(
|
||||||
|
name="timmy", state=AgentState.HEALTHY,
|
||||||
|
gateway_alive=True, process_alive=True, heartbeat_age=5.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(_rp, "apply_revive_policy") as mock_policy:
|
||||||
|
pool.sweep()
|
||||||
|
mock_policy.assert_not_called()
|
||||||
|
|
||||||
|
@patch("resurrection_pool_test.assess_agent")
|
||||||
|
def test_sweep_dead_agent_triggers_policy(self, mock_assess):
|
||||||
|
cfg = PoolConfig.from_dict({
|
||||||
|
"gateways": [{"name": "timmy"}],
|
||||||
|
"dead_threshold": 0, # treat any death as immediate trigger
|
||||||
|
})
|
||||||
|
pool = ResurrectionPool(cfg, dry_run=True)
|
||||||
|
|
||||||
|
mock_assess.return_value = AgentHealth(
|
||||||
|
name="timmy", state=AgentState.DEAD,
|
||||||
|
first_dead_at=time.time() - 999,
|
||||||
|
last_checked=time.time(),
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(_rp, "apply_revive_policy", return_value=False) as mock_policy:
|
||||||
|
pool.sweep()
|
||||||
|
mock_policy.assert_called_once()
|
||||||
|
|
||||||
|
@patch("resurrection_pool_test.assess_agent")
|
||||||
|
def test_sweep_degraded_does_not_trigger_policy(self, mock_assess):
|
||||||
|
cfg = PoolConfig.from_dict({
|
||||||
|
"gateways": [{"name": "timmy"}],
|
||||||
|
"dead_threshold": 10,
|
||||||
|
})
|
||||||
|
pool = ResurrectionPool(cfg, dry_run=True)
|
||||||
|
|
||||||
|
mock_assess.return_value = AgentHealth(
|
||||||
|
name="timmy", state=AgentState.DEGRADED,
|
||||||
|
first_dead_at=time.time() - 5, # only 5s, below threshold
|
||||||
|
last_checked=time.time(),
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(_rp, "apply_revive_policy") as mock_policy:
|
||||||
|
pool.sweep()
|
||||||
|
mock_policy.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
# ── run_once / status tests ───────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestRunOnce:
|
||||||
|
def test_run_once_empty_returns_true(self):
|
||||||
|
pool = ResurrectionPool(PoolConfig(), dry_run=True)
|
||||||
|
assert pool.run_once()
|
||||||
|
|
||||||
|
@patch("resurrection_pool_test.assess_agent")
|
||||||
|
def test_run_once_dead_agent_returns_false(self, mock_assess):
|
||||||
|
cfg = PoolConfig.from_dict({
|
||||||
|
"gateways": [{"name": "timmy"}],
|
||||||
|
"dead_threshold": 0,
|
||||||
|
})
|
||||||
|
pool = ResurrectionPool(cfg, dry_run=True)
|
||||||
|
mock_assess.return_value = AgentHealth(
|
||||||
|
name="timmy", state=AgentState.DEAD,
|
||||||
|
first_dead_at=time.time() - 9999,
|
||||||
|
last_checked=time.time(),
|
||||||
|
)
|
||||||
|
with patch.object(_rp, "apply_revive_policy", return_value=False):
|
||||||
|
result = pool.run_once()
|
||||||
|
assert not result
|
||||||
|
|
||||||
|
@patch("resurrection_pool_test.assess_agent")
|
||||||
|
def test_status_returns_list(self, mock_assess):
|
||||||
|
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
|
||||||
|
pool = ResurrectionPool(cfg, dry_run=True)
|
||||||
|
mock_assess.return_value = AgentHealth(
|
||||||
|
name="timmy", state=AgentState.HEALTHY,
|
||||||
|
gateway_alive=True, process_alive=True, heartbeat_age=3.0,
|
||||||
|
)
|
||||||
|
pool.sweep()
|
||||||
|
status = pool.status()
|
||||||
|
assert isinstance(status, list)
|
||||||
|
assert len(status) == 1
|
||||||
|
assert status[0]["name"] == "timmy"
|
||||||
|
assert status[0]["state"] == "healthy"
|
||||||
|
|
||||||
|
|
||||||
|
# ── CLI tests ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestCLI:
|
||||||
|
def test_default_args(self):
|
||||||
|
p = build_parser()
|
||||||
|
args = p.parse_args([])
|
||||||
|
assert not args.once
|
||||||
|
assert not args.dry_run
|
||||||
|
assert not args.status
|
||||||
|
assert args.config is None
|
||||||
|
|
||||||
|
def test_once_flag(self):
|
||||||
|
args = build_parser().parse_args(["--once"])
|
||||||
|
assert args.once
|
||||||
|
|
||||||
|
def test_dry_run_flag(self):
|
||||||
|
args = build_parser().parse_args(["--dry-run"])
|
||||||
|
assert args.dry_run
|
||||||
|
|
||||||
|
def test_poll_interval_override(self):
|
||||||
|
args = build_parser().parse_args(["--poll-interval", "15"])
|
||||||
|
assert args.poll_interval == 15
|
||||||
|
|
||||||
|
def test_dead_threshold_override(self):
|
||||||
|
args = build_parser().parse_args(["--dead-threshold", "60"])
|
||||||
|
assert args.dead_threshold == 60
|
||||||
|
|
||||||
|
def test_config_path(self, tmp_path):
|
||||||
|
cfg_file = tmp_path / "pool.json"
|
||||||
|
cfg_file.write_text("{}")
|
||||||
|
args = build_parser().parse_args(["--config", str(cfg_file)])
|
||||||
|
assert args.config == str(cfg_file)
|
||||||
Reference in New Issue
Block a user