Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
e530565d3c WIP: Claude Code progress on #880
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-04-06 14:16:00 -04:00
19 changed files with 1443 additions and 1599 deletions

View File

@@ -1,940 +0,0 @@
#!/usr/bin/env python3
"""
Resurrection Pool — Fleet Health Polling, Dead-Agent Detection, Auto-Revive
Monitors all wizard gateways in the fleet, detects downed agents, and
applies configurable revive policies per mission cell:
auto — restart the agent immediately on death detection
ask — send a Telegram / Nostr approval request; revive when approved
never — record the death, create a Gitea issue, do nothing else
HOW IT WORKS
============
1. Load a JSON config listing wizard gateways and mission cells.
2. Poll each gateway on a configurable interval (default: 60s).
- TCP-probe the WebSocket port (can the gateway hear the world?)
- Check for a running agent process via pgrep
- Read the heartbeat file for last-think timestamp
3. If an agent fails all three checks for longer than dead_threshold:
a. Mark it dead in the pool state.
b. Look up the mission it belongs to and read the revive_policy.
c. Execute the policy: auto-restart, request human approval, or no-op.
4. On auto-revive: spawn the configured revive_cmd, wait for the heartbeat
to return healthy, and log the recovery.
5. On ask-revive: post an approval request to Telegram / Nostr and wait
up to approval_timeout seconds for a "+1" or "revive" reply.
6. Create / close Gitea issues to track fleet health events.
CONFIGURATION
=============
Config file path is resolved in this order:
1. --config CLI flag
2. RESURRECTION_CONFIG env var
3. ~/.nexus/resurrection_pool.json
4. /etc/nexus/resurrection_pool.json
Minimal config skeleton (all fields optional, sane defaults everywhere):
{
"poll_interval": 60,
"dead_threshold": 300,
"approval_timeout": 300,
"gateways": [
{
"name": "timmy",
"host": "localhost",
"port": 8765,
"heartbeat_path": "~/.nexus/heartbeat.json",
"process_pattern": "nexus_think",
"revive_cmd": ["python", "/root/wizards/timmy/nexus_think.py"]
}
],
"missions": {
"some-mission-uuid": {
"label": "Portal Sprint",
"agents": ["timmy"],
"revive_policy": "ask"
}
},
"telegram": {
"bot_token": "...",
"chat_id": "-1001234567890"
},
"nostr": {
"relay_url": "wss://relay.damus.io",
"recipient_pubkey": "..."
},
"gitea": {
"url": "https://forge.example.com",
"token": "...",
"repo": "Timmy_Foundation/the-nexus"
}
}
USAGE
=====
# One-shot health check (good for cron / smoke tests)
python bin/resurrection_pool.py --once
# Continuous daemon (default)
python bin/resurrection_pool.py
# Custom config + dry-run (no side-effects)
python bin/resurrection_pool.py --config /etc/nexus/pool.json --dry-run
# Print current pool state as JSON
python bin/resurrection_pool.py --status
ZERO DEPENDENCIES
=================
Pure stdlib. No pip installs required on the monitoring host.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("nexus.resurrection_pool")
# ── Constants ────────────────────────────────────────────────────────
DEFAULT_POLL_INTERVAL = 60 # seconds between full fleet sweeps
DEFAULT_DEAD_THRESHOLD = 300 # seconds with no heartbeat = dead
DEFAULT_APPROVAL_TIMEOUT = 300 # seconds to wait for human approval
CONFIG_SEARCH_PATHS = [
Path.home() / ".nexus" / "resurrection_pool.json",
Path("/etc/nexus/resurrection_pool.json"),
]
POOL_LABEL = "resurrection-pool"
POOL_TITLE_PREFIX = "[pool]"
# ── Enums & simple types ─────────────────────────────────────────────
class RevivePolicy(str, Enum):
AUTO = "auto" # restart immediately, no human needed
ASK = "ask" # send approval request, wait for +1
NEVER = "never" # log + Gitea issue, do not restart
class AgentState(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded" # some checks failing but not yet dead
DEAD = "dead"
REVIVING = "reviving"
UNKNOWN = "unknown"
# ── Configuration dataclasses ────────────────────────────────────────
@dataclass
class GatewayConfig:
"""One wizard gateway to monitor."""
name: str
host: str = "localhost"
port: int = 8765
heartbeat_path: str = "~/.nexus/heartbeat.json"
process_pattern: str = "nexus_think"
revive_cmd: List[str] = field(default_factory=list)
@property
def resolved_heartbeat_path(self) -> Path:
return Path(self.heartbeat_path).expanduser()
@dataclass
class MissionConfig:
"""A mission cell with agents and a revive policy."""
mission_id: str
label: str = ""
agents: List[str] = field(default_factory=list)
revive_policy: RevivePolicy = RevivePolicy.ASK
@classmethod
def from_dict(cls, mission_id: str, data: dict) -> "MissionConfig":
policy_raw = data.get("revive_policy", RevivePolicy.ASK)
try:
policy = RevivePolicy(policy_raw)
except ValueError:
logger.warning("Unknown revive_policy %r for mission %s; defaulting to 'ask'",
policy_raw, mission_id)
policy = RevivePolicy.ASK
return cls(
mission_id=mission_id,
label=data.get("label", ""),
agents=data.get("agents", []),
revive_policy=policy,
)
@dataclass
class PoolConfig:
"""Full resurrection pool configuration."""
poll_interval: int = DEFAULT_POLL_INTERVAL
dead_threshold: int = DEFAULT_DEAD_THRESHOLD
approval_timeout: int = DEFAULT_APPROVAL_TIMEOUT
gateways: List[GatewayConfig] = field(default_factory=list)
missions: List[MissionConfig] = field(default_factory=list)
telegram_bot_token: str = ""
telegram_chat_id: str = ""
nostr_relay_url: str = ""
nostr_recipient_pubkey: str = ""
gitea_url: str = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
gitea_token: str = os.environ.get("GITEA_TOKEN", "")
gitea_repo: str = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
@classmethod
def from_dict(cls, data: dict) -> "PoolConfig":
tg = data.get("telegram", {})
ns = data.get("nostr", {})
git = data.get("gitea", {})
gateways = [
GatewayConfig(
name=gw["name"],
host=gw.get("host", "localhost"),
port=gw.get("port", 8765),
heartbeat_path=gw.get("heartbeat_path", "~/.nexus/heartbeat.json"),
process_pattern=gw.get("process_pattern", "nexus_think"),
revive_cmd=gw.get("revive_cmd", []),
)
for gw in data.get("gateways", [])
]
missions = [
MissionConfig.from_dict(mid, mdata)
for mid, mdata in data.get("missions", {}).items()
]
return cls(
poll_interval=data.get("poll_interval", DEFAULT_POLL_INTERVAL),
dead_threshold=data.get("dead_threshold", DEFAULT_DEAD_THRESHOLD),
approval_timeout=data.get("approval_timeout", DEFAULT_APPROVAL_TIMEOUT),
gateways=gateways,
missions=missions,
telegram_bot_token=tg.get("bot_token", os.environ.get("TELEGRAM_BOT_TOKEN", "")),
telegram_chat_id=tg.get("chat_id", os.environ.get("TELEGRAM_CHAT_ID", "")),
nostr_relay_url=ns.get("relay_url", ""),
nostr_recipient_pubkey=ns.get("recipient_pubkey", ""),
gitea_url=git.get("url", os.environ.get("GITEA_URL",
"https://forge.alexanderwhitestone.com")),
gitea_token=git.get("token", os.environ.get("GITEA_TOKEN", "")),
gitea_repo=git.get("repo", os.environ.get("NEXUS_REPO",
"Timmy_Foundation/the-nexus")),
)
@classmethod
def load(cls, path: Optional[Path] = None) -> "PoolConfig":
"""Load config from file, falling back through search paths."""
candidates = [path] if path else []
candidates += CONFIG_SEARCH_PATHS
env_path = os.environ.get("RESURRECTION_CONFIG")
if env_path:
candidates.insert(0, Path(env_path))
for candidate in candidates:
if candidate and candidate.exists():
try:
data = json.loads(candidate.read_text())
logger.info("Loaded config from %s", candidate)
return cls.from_dict(data)
except (json.JSONDecodeError, KeyError) as e:
logger.warning("Failed to parse %s: %s", candidate, e)
logger.info("No config file found; using defaults (no gateways configured)")
return cls()
# ── Per-agent health state ───────────────────────────────────────────
@dataclass
class AgentHealth:
"""Runtime health snapshot for one agent gateway."""
name: str
state: AgentState = AgentState.UNKNOWN
gateway_alive: bool = False
process_alive: bool = False
heartbeat_age: Optional[float] = None # seconds since last heartbeat
last_checked: float = field(default_factory=time.time)
first_dead_at: Optional[float] = None # when death was first detected
revive_pending: bool = False # approval request outstanding
revive_attempts: int = 0
@property
def consecutive_dead_seconds(self) -> float:
if self.first_dead_at is None:
return 0.0
return time.time() - self.first_dead_at
def to_dict(self) -> dict:
return {
"name": self.name,
"state": self.state.value,
"gateway_alive": self.gateway_alive,
"process_alive": self.process_alive,
"heartbeat_age": self.heartbeat_age,
"last_checked": self.last_checked,
"first_dead_at": self.first_dead_at,
"revive_pending": self.revive_pending,
"revive_attempts": self.revive_attempts,
}
# ── Health probes ────────────────────────────────────────────────────
def probe_gateway(host: str, port: int, timeout: int = 5) -> Tuple[bool, str]:
"""TCP probe a wizard gateway. Returns (alive, message)."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
errno = sock.connect_ex((host, port))
sock.close()
if errno == 0:
return True, f"TCP:{host}:{port} reachable"
return False, f"TCP:{host}:{port} refused (errno={errno})"
except Exception as e:
return False, f"TCP probe failed: {e}"
def probe_process(pattern: str) -> Tuple[bool, str]:
"""pgrep-based process check. Returns (alive, message)."""
try:
result = subprocess.run(
["pgrep", "-f", pattern],
capture_output=True, text=True, timeout=5,
)
if result.returncode == 0:
pids = [p.strip() for p in result.stdout.strip().split("\n")
if p.strip() and p.strip() != str(os.getpid())]
if pids:
return True, f"Running (PID: {', '.join(pids)})"
return False, f"No process matching {pattern!r}"
except FileNotFoundError:
return True, "pgrep unavailable; skipping process check"
except Exception as e:
return False, f"Process probe failed: {e}"
def probe_heartbeat(path: Path, dead_threshold: int) -> Tuple[bool, Optional[float], str]:
"""Read a heartbeat file. Returns (alive, age_seconds, message)."""
if not path.exists():
return False, None, f"No heartbeat file at {path}"
try:
data = json.loads(path.read_text())
except (json.JSONDecodeError, OSError) as e:
return False, None, f"Heartbeat file unreadable: {e}"
ts = data.get("timestamp", 0)
age = time.time() - ts
cycle = data.get("cycle", "?")
model = data.get("model", "unknown")
if age > dead_threshold:
return False, age, (
f"Stale heartbeat: {int(age)}s ago (threshold {dead_threshold}s), "
f"cycle={cycle}, model={model}"
)
return True, age, f"Alive: cycle={cycle}, {int(age)}s ago, model={model}"
def assess_agent(gw: GatewayConfig, dead_threshold: int) -> AgentHealth:
"""Run all probes for one gateway and return an AgentHealth snapshot."""
health = AgentHealth(name=gw.name, last_checked=time.time())
gw_ok, gw_msg = probe_gateway(gw.host, gw.port)
proc_ok, proc_msg = probe_process(gw.process_pattern)
hb_ok, hb_age, hb_msg = probe_heartbeat(gw.resolved_heartbeat_path, dead_threshold)
health.gateway_alive = gw_ok
health.process_alive = proc_ok
health.heartbeat_age = hb_age
logger.debug("[%s] gateway=%s process=%s heartbeat=%s",
gw.name, gw_ok, proc_ok, hb_ok)
logger.debug("[%s] gw: %s | proc: %s | hb: %s",
gw.name, gw_msg, proc_msg, hb_msg)
checks_ok = sum([gw_ok, proc_ok, hb_ok])
if checks_ok == 3:
health.state = AgentState.HEALTHY
elif checks_ok == 0:
health.state = AgentState.DEAD
else:
health.state = AgentState.DEGRADED
return health
# ── Notification channels ────────────────────────────────────────────
def _http(method: str, url: str, data: Optional[dict] = None,
headers: Optional[dict] = None) -> Optional[Any]:
"""Minimal HTTP helper. Returns parsed JSON or None on failure."""
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, method=method)
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
if headers:
for k, v in headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode()
return json.loads(raw) if raw.strip() else {}
except urllib.error.HTTPError as e:
logger.warning("HTTP %d %s: %s", e.code, url, e.read().decode()[:200])
except Exception as e:
logger.warning("HTTP error %s: %s", url, e)
return None
def send_telegram_approval_request(
bot_token: str,
chat_id: str,
agent_name: str,
mission_label: str,
dead_seconds: float,
dry_run: bool = False,
) -> bool:
"""
Post a Telegram message asking for revive approval.
The operator should reply with "revive <agent_name>" or "+1" to approve.
Returns True if the message was sent successfully.
"""
if dry_run:
logger.info("[DRY-RUN] Would send Telegram approval request for %s", agent_name)
return True
if not bot_token or not chat_id:
logger.warning("Telegram not configured; cannot send approval request for %s", agent_name)
return False
text = (
f"🔴 *Resurrection Pool Alert*\n\n"
f"Agent *{agent_name}* has been dead for {int(dead_seconds)}s.\n"
f"Mission: _{mission_label or 'unassigned'}_\n\n"
f"Reply `revive {agent_name}` to authorize restart.\n"
f"Reply `skip {agent_name}` to suppress for this cycle."
)
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
result = _http("POST", url, {
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown",
})
if result and result.get("ok"):
logger.info("Telegram approval request sent for %s (message_id=%s)",
agent_name, result.get("result", {}).get("message_id"))
return True
logger.warning("Telegram send failed for %s: %s", agent_name, result)
return False
def poll_telegram_for_approval(
bot_token: str,
agent_name: str,
timeout_seconds: int,
dry_run: bool = False,
) -> bool:
"""
Poll Telegram getUpdates for a revive approval message.
Looks for messages containing "revive <agent_name>" (case-insensitive).
Returns True if approved within timeout, False otherwise.
"""
if dry_run:
logger.info("[DRY-RUN] Simulating Telegram approval for %s (auto-approve)", agent_name)
return True
if not bot_token:
return False
deadline = time.time() + timeout_seconds
offset = None
while time.time() < deadline:
params = "?timeout=30&limit=10"
if offset is not None:
params += f"&offset={offset}"
url = f"https://api.telegram.org/bot{bot_token}/getUpdates{params}"
result = _http("GET", url)
if not result or not result.get("ok"):
time.sleep(5)
continue
for update in result.get("result", []):
offset = update["update_id"] + 1
msg_text = (
update.get("message", {}).get("text", "")
or update.get("channel_post", {}).get("text", "")
).lower().strip()
if f"revive {agent_name.lower()}" in msg_text or msg_text == "+1":
logger.info("Telegram approval received for %s", agent_name)
return True
if f"skip {agent_name.lower()}" in msg_text:
logger.info("Telegram skip received for %s; aborting revive", agent_name)
return False
time.sleep(5)
logger.info("Approval timeout for %s after %ds", agent_name, timeout_seconds)
return False
def send_nostr_approval_request(
relay_url: str,
recipient_pubkey: str,
agent_name: str,
mission_label: str,
dead_seconds: float,
dry_run: bool = False,
) -> bool:
"""
Post a Nostr direct-message (kind 4) approval request.
Falls back gracefully if websockets is not installed — logs a warning
instead of crashing the pool. Returns True if sent, False otherwise.
"""
if dry_run:
logger.info("[DRY-RUN] Would send Nostr approval request for %s", agent_name)
return True
if not relay_url or not recipient_pubkey:
logger.debug("Nostr not configured; skipping DM for %s", agent_name)
return False
content = (
f"[Resurrection Pool] Agent '{agent_name}' is dead ({int(dead_seconds)}s). "
f"Mission: {mission_label or 'unassigned'}. "
f"Reply 'revive {agent_name}' to authorize restart."
)
try:
import asyncio
async def _send():
import websockets # type: ignore
event = {
"kind": 4,
"pubkey": "", # filled by relay/identity in prod
"created_at": int(time.time()),
"tags": [["p", recipient_pubkey]],
"content": content,
"id": "",
"sig": "",
}
try:
async with websockets.connect(relay_url, open_timeout=10) as ws:
await ws.send(json.dumps(["EVENT", event]))
logger.info("Nostr DM sent for %s via %s", agent_name, relay_url)
return True
except Exception as e:
logger.warning("Nostr send failed for %s: %s", agent_name, e)
return False
return asyncio.run(_send())
except ImportError:
logger.warning("websockets library not installed; Nostr notifications disabled")
return False
except Exception as e:
logger.warning("Nostr notification error for %s: %s", agent_name, e)
return False
# ── Revive actions ───────────────────────────────────────────────────
def revive_agent(gw: GatewayConfig, dry_run: bool = False) -> bool:
"""
Attempt to restart an agent using its configured revive_cmd.
Returns True if the command was launched without error.
Does NOT wait for the agent to become healthy — the next poll cycle
will detect recovery via the heartbeat file.
"""
if not gw.revive_cmd:
logger.warning("[%s] No revive_cmd configured; cannot restart", gw.name)
return False
if dry_run:
logger.info("[DRY-RUN] Would exec: %s", gw.revive_cmd)
return True
try:
logger.info("[%s] Launching: %s", gw.name, gw.revive_cmd)
proc = subprocess.Popen(
gw.revive_cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True, # detach from our process group
)
logger.info("[%s] Revive process started (PID %d)", gw.name, proc.pid)
return True
except FileNotFoundError as e:
logger.error("[%s] Revive cmd not found: %s", gw.name, e)
except PermissionError as e:
logger.error("[%s] Revive cmd permission denied: %s", gw.name, e)
except Exception as e:
logger.error("[%s] Revive failed: %s", gw.name, e)
return False
# ── Gitea integration ────────────────────────────────────────────────
def _gitea(method: str, path: str, cfg: PoolConfig, data: Optional[dict] = None) -> Any:
"""Gitea API helper."""
url = f"{cfg.gitea_url.rstrip('/')}/api/v1{path}"
headers = {}
if cfg.gitea_token:
headers["Authorization"] = f"token {cfg.gitea_token}"
return _http(method, url, data=data, headers=headers)
def find_open_pool_issue(cfg: PoolConfig, agent_name: str) -> Optional[dict]:
"""Look for an existing open pool issue for this agent."""
issues = _gitea("GET",
f"/repos/{cfg.gitea_repo}/issues"
f"?state=open&type=issues&limit=50",
cfg)
if not issues or not isinstance(issues, list):
return None
prefix = f"{POOL_TITLE_PREFIX} {agent_name}:"
for issue in issues:
if issue.get("title", "").startswith(prefix):
return issue
return None
def open_death_issue(cfg: PoolConfig, health: AgentHealth, mission_label: str) -> None:
"""Create a Gitea issue recording an agent death."""
if not cfg.gitea_token:
logger.debug("No Gitea token; skipping issue creation for %s", health.name)
return
title = f"{POOL_TITLE_PREFIX} {health.name}: agent dead ({int(health.consecutive_dead_seconds)}s)"
body = (
f"## 🔴 Agent Death Detected — {health.name}\n\n"
f"**Mission:** {mission_label or 'unassigned'} \n"
f"**Dead for:** {int(health.consecutive_dead_seconds)}s \n"
f"**Gateway alive:** {health.gateway_alive} \n"
f"**Process alive:** {health.process_alive} \n"
f"**Heartbeat age:** "
f"{f'{int(health.heartbeat_age)}s' if health.heartbeat_age is not None else 'N/A'} \n\n"
f"*Logged by Resurrection Pool at "
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"
)
result = _gitea("POST", f"/repos/{cfg.gitea_repo}/issues", cfg,
data={"title": title, "body": body})
if result and result.get("number"):
logger.info("Opened death issue #%d for %s", result["number"], health.name)
def close_death_issue(cfg: PoolConfig, agent_name: str, revived: bool) -> None:
"""Close an existing death issue when the agent recovers."""
issue = find_open_pool_issue(cfg, agent_name)
if not issue:
return
num = issue["number"]
verb = "revived and healthy" if revived else "recovered (external)"
_gitea("POST", f"/repos/{cfg.gitea_repo}/issues/{num}/comments", cfg,
data={"body": f"## 🟢 Agent {agent_name}{verb}\n\n"
f"*Resurrection Pool confirmed recovery at "
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"})
_gitea("PATCH", f"/repos/{cfg.gitea_repo}/issues/{num}", cfg,
data={"state": "closed"})
logger.info("Closed death issue #%d for %s", num, agent_name)
# ── Policy engine ────────────────────────────────────────────────────
def apply_revive_policy(
gw: GatewayConfig,
health: AgentHealth,
mission: Optional[MissionConfig],
cfg: PoolConfig,
dry_run: bool = False,
) -> bool:
"""
Execute the revive policy for a dead agent.
Returns True if a revive was attempted, False otherwise.
"""
policy = mission.revive_policy if mission else RevivePolicy.ASK
mission_label = mission.label if mission else ""
logger.info("[%s] Applying revive policy=%s (dead %ds)",
gw.name, policy.value, int(health.consecutive_dead_seconds))
if policy == RevivePolicy.NEVER:
logger.info("[%s] Policy=never — recording death, no restart", gw.name)
open_death_issue(cfg, health, mission_label)
return False
if policy == RevivePolicy.AUTO:
logger.info("[%s] Policy=auto — restarting immediately", gw.name)
ok = revive_agent(gw, dry_run=dry_run)
if ok:
logger.info("[%s] Auto-revive launched", gw.name)
else:
logger.warning("[%s] Auto-revive failed — opening Gitea issue", gw.name)
open_death_issue(cfg, health, mission_label)
return ok
# policy == RevivePolicy.ASK
if health.revive_pending:
logger.debug("[%s] Approval already pending; waiting", gw.name)
return False
# Send approval requests over all configured channels
health.revive_pending = True
tg_sent = send_telegram_approval_request(
cfg.telegram_bot_token, cfg.telegram_chat_id,
gw.name, mission_label, health.consecutive_dead_seconds,
dry_run=dry_run,
)
send_nostr_approval_request(
cfg.nostr_relay_url, cfg.nostr_recipient_pubkey,
gw.name, mission_label, health.consecutive_dead_seconds,
dry_run=dry_run,
)
if not tg_sent and not cfg.nostr_relay_url:
logger.warning(
"[%s] No approval channel available (configure telegram or nostr). "
"Falling back to auto-revive.", gw.name
)
health.revive_pending = False
return revive_agent(gw, dry_run=dry_run)
# Poll for approval
approved = poll_telegram_for_approval(
cfg.telegram_bot_token, gw.name,
cfg.approval_timeout, dry_run=dry_run,
)
health.revive_pending = False
if approved:
logger.info("[%s] Approval received — launching revive", gw.name)
ok = revive_agent(gw, dry_run=dry_run)
if not ok:
open_death_issue(cfg, health, mission_label)
return ok
logger.info("[%s] Revive not approved — recording death", gw.name)
open_death_issue(cfg, health, mission_label)
return False
# ── Main pool loop ───────────────────────────────────────────────────
class ResurrectionPool:
"""
Fleet-wide health monitor and revive coordinator.
Maintains an in-memory health table keyed by agent name.
Drives one poll-sweep per interval across all configured gateways.
"""
def __init__(self, cfg: PoolConfig, dry_run: bool = False):
self.cfg = cfg
self.dry_run = dry_run
self._health: Dict[str, AgentHealth] = {}
self._running = False
# ── Agent-to-mission lookup ──────────────────────────────────────
def _mission_for(self, agent_name: str) -> Optional[MissionConfig]:
"""Return the first mission that lists this agent."""
for m in self.cfg.missions:
if agent_name in m.agents:
return m
return None
# ── Gateway lookup ───────────────────────────────────────────────
def _gateway_for(self, agent_name: str) -> Optional[GatewayConfig]:
for gw in self.cfg.gateways:
if gw.name == agent_name:
return gw
return None
# ── Health state tracking ────────────────────────────────────────
def _update_health(self, agent_name: str, new: AgentHealth) -> AgentHealth:
"""
Merge a fresh health snapshot into the pool's running state.
- Preserves first_dead_at timestamp so dead_seconds accumulates.
- Resets first_dead_at when agent returns healthy.
- Tracks revive_attempts across cycles.
"""
prev = self._health.get(agent_name)
if new.state == AgentState.HEALTHY:
new.first_dead_at = None
new.revive_attempts = 0
new.revive_pending = False
if prev and prev.state == AgentState.DEAD:
logger.info("[%s] Agent recovered!", agent_name)
close_death_issue(self.cfg, agent_name, revived=prev.revive_attempts > 0)
else:
if prev:
new.revive_pending = prev.revive_pending
new.revive_attempts = prev.revive_attempts
if prev.first_dead_at is not None:
new.first_dead_at = prev.first_dead_at
elif new.state in (AgentState.DEAD, AgentState.DEGRADED):
new.first_dead_at = new.last_checked
else:
new.first_dead_at = new.last_checked
self._health[agent_name] = new
return new
# ── Single sweep ─────────────────────────────────────────────────
def sweep(self) -> Dict[str, AgentHealth]:
"""Poll all gateways once and apply revive policies where needed."""
if not self.cfg.gateways:
logger.debug("No gateways configured; nothing to sweep")
return {}
for gw in self.cfg.gateways:
raw = assess_agent(gw, self.cfg.dead_threshold)
health = self._update_health(gw.name, raw)
icon = {"healthy": "", "degraded": "⚠️", "dead": "🔴",
"reviving": "🔄", "unknown": ""}.get(health.state.value, "?")
logger.info("%s [%s] state=%s dead_for=%ds",
icon, gw.name, health.state.value,
int(health.consecutive_dead_seconds))
if health.state == AgentState.DEAD:
if health.consecutive_dead_seconds >= self.cfg.dead_threshold:
if not health.revive_pending:
mission = self._mission_for(gw.name)
launched = apply_revive_policy(
gw, health, mission, self.cfg, dry_run=self.dry_run
)
if launched:
health.revive_attempts += 1
health.state = AgentState.REVIVING
return dict(self._health)
def status(self) -> List[dict]:
"""Return current pool state as a list of dicts (for --status flag)."""
return [h.to_dict() for h in self._health.values()]
# ── Run loop ─────────────────────────────────────────────────────
def run(self) -> None:
"""Blocking poll loop. Handles SIGTERM/SIGINT cleanly."""
self._running = True
def _stop(signum, frame):
logger.info("Signal %d received; stopping pool", signum)
self._running = False
signal.signal(signal.SIGTERM, _stop)
signal.signal(signal.SIGINT, _stop)
logger.info(
"Resurrection Pool starting — %d gateway(s), poll_interval=%ds, dead_threshold=%ds",
len(self.cfg.gateways), self.cfg.poll_interval, self.cfg.dead_threshold,
)
while self._running:
self.sweep()
# Sleep in 1s ticks so we respond to signals promptly
for _ in range(self.cfg.poll_interval):
if not self._running:
break
time.sleep(1)
logger.info("Resurrection Pool stopped")
def run_once(self) -> bool:
"""Single sweep; returns True if all agents are healthy."""
results = self.sweep()
return all(h.state == AgentState.HEALTHY for h in results.values())
# ── CLI ──────────────────────────────────────────────────────────────
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Resurrection Pool — fleet health polling and auto-revive",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument("--config", metavar="PATH",
help="Path to resurrection_pool.json config file")
p.add_argument("--once", action="store_true",
help="Run one sweep and exit (for cron / smoke tests)")
p.add_argument("--status", action="store_true",
help="Print current pool state as JSON and exit")
p.add_argument("--dry-run", action="store_true",
help="Run all checks but do not restart agents or alert channels")
p.add_argument("--poll-interval", type=int, default=None,
help="Override poll interval (seconds)")
p.add_argument("--dead-threshold", type=int, default=None,
help="Override dead threshold (seconds)")
p.add_argument("-v", "--verbose", action="store_true",
help="Enable DEBUG logging")
return p
def main() -> None:
args = build_parser().parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
cfg = PoolConfig.load(Path(args.config) if args.config else None)
if args.poll_interval is not None:
cfg.poll_interval = args.poll_interval
if args.dead_threshold is not None:
cfg.dead_threshold = args.dead_threshold
pool = ResurrectionPool(cfg, dry_run=args.dry_run)
if args.status:
# Print last-known state (empty if never run)
pool.sweep()
print(json.dumps(pool.status(), indent=2))
return
if args.once:
healthy = pool.run_once()
sys.exit(0 if healthy else 1)
pool.run()
if __name__ == "__main__":
main()

1
lazarus/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Lazarus Pit — Agent resurrection pool and mission cell isolation."""

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

145
lazarus/cell.py Normal file
View File

@@ -0,0 +1,145 @@
"""Mission cell lifecycle — create, load, save, destroy."""
from __future__ import annotations
import json
import logging
import os
import tempfile
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
log = logging.getLogger(__name__)
CellState = Literal["pending", "active", "archived", "destroyed"]
@dataclass
class MissionCell:
"""Represents a mission cell — an ephemeral project workspace."""
id: str
name: str
repo_url: str
created_at: str
state: CellState
agents: list[str]
cell_path: str
home_path: str
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"repo_url": self.repo_url,
"created_at": self.created_at,
"state": self.state,
"agents": list(self.agents),
"cell_path": self.cell_path,
"home_path": self.home_path,
}
@classmethod
def from_dict(cls, data: dict) -> MissionCell:
return cls(
id=data["id"],
name=data["name"],
repo_url=data["repo_url"],
created_at=data["created_at"],
state=data["state"],
agents=list(data.get("agents", [])),
cell_path=data["cell_path"],
home_path=data.get("home_path", ""),
)
def create_cell(
mission_id: str,
name: str,
repo_url: str,
base_path: Path,
home_path: str = "",
) -> MissionCell:
"""Create a new mission cell directory structure and return a MissionCell."""
cell_path = base_path / mission_id
cell_path.mkdir(parents=True, exist_ok=True)
# Create required subdirectories
(cell_path / "workspace").mkdir(exist_ok=True)
(cell_path / "logs").mkdir(exist_ok=True)
(cell_path / "snapshots").mkdir(exist_ok=True)
created_at = datetime.now(timezone.utc).isoformat()
cell = MissionCell(
id=mission_id,
name=name,
repo_url=repo_url,
created_at=created_at,
state="pending",
agents=[],
cell_path=str(cell_path),
home_path=home_path,
)
# Write roster.json placeholder
roster_path = cell_path / "roster.json"
_atomic_write(roster_path, {"agents": []})
# Write cell.json
save_cell(cell)
log.info("Created mission cell %s at %s", mission_id, cell_path)
return cell
def destroy_cell(cell: MissionCell, cleanup_dirs: bool = False) -> None:
"""Mark a cell as destroyed and optionally remove its directories."""
cell.state = "destroyed"
save_cell(cell)
if cleanup_dirs:
import shutil
cell_dir = Path(cell.cell_path)
if cell_dir.exists():
shutil.rmtree(cell_dir)
log.info("Removed cell directory %s", cell_dir)
log.info("Destroyed mission cell %s", cell.id)
def load_cell(cell_path: Path) -> MissionCell:
"""Load a MissionCell from a cell.json file at cell_path."""
cell_json = Path(cell_path) / "cell.json"
with open(cell_json, "r", encoding="utf-8") as fh:
data = json.load(fh)
return MissionCell.from_dict(data)
def save_cell(cell: MissionCell) -> None:
"""Atomically save a MissionCell to cell.json in its cell_path."""
cell_path = Path(cell.cell_path)
cell_path.mkdir(parents=True, exist_ok=True)
target = cell_path / "cell.json"
_atomic_write(target, cell.to_dict())
log.debug("Saved cell %s to %s", cell.id, target)
# ─── helpers ──────────────────────────────────────────────────────────────────
def _atomic_write(path: Path, data: dict) -> None:
"""Write JSON atomically via a temp file + rename."""
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2)
os.replace(tmp, path)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise

162
lazarus/cli.py Normal file
View File

@@ -0,0 +1,162 @@
"""Lazarus CLI — invite agents, manage mission cells."""
from __future__ import annotations
import argparse
import json
import platform
import sys
import uuid
from pathlib import Path
from lazarus.cell import MissionCell, create_cell, load_cell
from lazarus.isolation import setup_level1_isolation
from lazarus.roster import DEFAULT_REGISTRY_PATH, MissionRoster
def _default_base_path() -> Path:
if platform.system() == "Linux":
return Path("/var/missions")
return Path.home() / ".nexus" / "missions"
# ─── command handlers ─────────────────────────────────────────────────────────
def cmd_invite(args: argparse.Namespace) -> int:
"""Create or load a mission cell and invite an agent."""
base_path = Path(args.base_path) if args.base_path else _default_base_path()
mission_id: str = args.mission or str(uuid.uuid4())
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
# Create or load cell
cell_dir = base_path / mission_id
if (cell_dir / "cell.json").exists():
cell = load_cell(cell_dir)
else:
cell = create_cell(
mission_id=mission_id,
name=args.name or mission_id,
repo_url=args.repo,
base_path=base_path,
)
# Level 1 isolation
setup_level1_isolation(cell)
# Register mission and invite agent
roster.add_mission(cell)
entry = roster.invite_agent(mission_id, args.agent, role=args.role)
roster.save()
summary = {
"mission_id": mission_id,
"agent": args.agent,
"role": entry.role,
"status": entry.status,
"cell_path": cell.cell_path,
"repo_url": cell.repo_url,
"invited_at": entry.invited_at,
}
print(json.dumps(summary, indent=2))
return 0
def cmd_list(args: argparse.Namespace) -> int:
"""List missions, optionally filtered by state."""
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
missions = roster.list_missions(state=args.state)
print(json.dumps(missions, indent=2))
return 0
def cmd_status(args: argparse.Namespace) -> int:
"""Show details of a specific mission."""
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
mission = roster.get_mission(args.mission_id)
if mission is None:
print(f"Mission {args.mission_id!r} not found.", file=sys.stderr)
return 1
agents = roster.list_agents(args.mission_id)
output = dict(mission)
output["agent_count"] = len(agents)
output["agents"] = [a.to_dict() for a in agents]
print(json.dumps(output, indent=2))
return 0
def cmd_depart(args: argparse.Namespace) -> int:
"""Mark an agent as departed from a mission."""
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
try:
entry = roster.depart_agent(args.mission, args.agent)
except KeyError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
roster.save()
print(json.dumps(entry.to_dict(), indent=2))
return 0
# ─── argument parsing ─────────────────────────────────────────────────────────
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="lazarus",
description="Lazarus Pit — agent invitation and mission cell management",
)
parser.add_argument(
"--registry",
default=None,
help="Path to mission roster JSON (default: ~/.nexus/mission-roster.json)",
)
sub = parser.add_subparsers(dest="command", required=True)
# invite
p_invite = sub.add_parser("invite", help="Invite an agent to a mission")
p_invite.add_argument("agent", help="Agent name to invite")
p_invite.add_argument("--mission", default=None, help="Mission ID (UUID generated if omitted)")
p_invite.add_argument("--repo", required=True, help="Repository URL for the mission")
p_invite.add_argument("--base-path", default=None, help="Base directory for mission cells")
p_invite.add_argument("--role", default="contributor", help="Agent role (default: contributor)")
p_invite.add_argument("--name", default=None, help="Human-readable mission name")
# list
p_list = sub.add_parser("list", help="List missions")
p_list.add_argument("--state", default=None, help="Filter by state (pending/active/archived/destroyed)")
# status
p_status = sub.add_parser("status", help="Show mission status")
p_status.add_argument("mission_id", help="Mission ID")
# depart
p_depart = sub.add_parser("depart", help="Mark agent as departed")
p_depart.add_argument("agent", help="Agent name")
p_depart.add_argument("--mission", required=True, help="Mission ID")
return parser
def main(argv: list[str] | None = None) -> int:
parser = _build_parser()
args = parser.parse_args(argv)
dispatch = {
"invite": cmd_invite,
"list": cmd_list,
"status": cmd_status,
"depart": cmd_depart,
}
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
return 1
return handler(args)
if __name__ == "__main__":
sys.exit(main())

165
lazarus/harness.py Normal file
View File

@@ -0,0 +1,165 @@
"""Hermes harness mission-cell mode — wraps agent lifecycle in a mission cell."""
from __future__ import annotations
import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Awaitable
try:
import websockets # type: ignore[import]
except ImportError: # pragma: no cover
websockets = None # type: ignore[assignment]
from lazarus.cell import MissionCell, load_cell
from lazarus.isolation import setup_level1_isolation
log = logging.getLogger(__name__)
DEFAULT_HERMES_WS_URL = "ws://localhost:8000/ws"
class MissionCellHarness:
"""
Wraps agent execution in a mission cell with Hermes WS telemetry.
Usage::
harness = MissionCellHarness("myagent", "/var/missions/<uuid>")
await harness.run_with_harness(my_coroutine())
"""
def __init__(
self,
agent_name: str,
cell_path: str | Path,
hermes_ws_url: str = DEFAULT_HERMES_WS_URL,
home_path: str | None = None,
) -> None:
self.agent_name = agent_name
self.cell_path = Path(cell_path)
self.hermes_ws_url = hermes_ws_url
self.home_path = home_path
self._cell: MissionCell | None = None
self._ws: Any = None # websockets connection
self._ws_connected = False
# ─── public API ───────────────────────────────────────────────────────────
async def start(self) -> None:
"""Load cell, set up isolation, emit agent_joined telemetry."""
self._cell = load_cell(self.cell_path)
setup_level1_isolation(self._cell)
await self._connect_hermes()
await self._emit_mission_event("agent_joined")
log.info(
"MissionCellHarness started for agent=%s mission=%s",
self.agent_name,
self._cell.id,
)
async def stop(self) -> None:
"""Emit agent_departed telemetry and save state."""
await self._emit_mission_event("agent_departed")
if self._ws is not None:
try:
await self._ws.close()
except Exception as exc:
log.debug("WS close error (ignored): %s", exc)
self._ws_connected = False
log.info(
"MissionCellHarness stopped for agent=%s", self.agent_name
)
def get_env(self) -> dict[str, str]:
"""Return environment variables for the mission cell context."""
cell = self._cell
env: dict[str, str] = {}
if cell:
env["MISSION_CELL_PATH"] = str(self.cell_path)
env["MISSION_ID"] = cell.id
else:
env["MISSION_CELL_PATH"] = str(self.cell_path)
env["MISSION_ID"] = ""
env["AGENT_NAME"] = self.agent_name
if self.home_path:
env["HOME"] = self.home_path
else:
env["HOME"] = os.environ.get("HOME", str(Path.home()))
return env
def write_mission_log(self, message: str) -> None:
"""Write a timestamped log entry to the cell's logs directory."""
if self._cell is None:
log.warning("Cannot write mission log: cell not loaded")
return
logs_dir = Path(self._cell.cell_path) / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
log_file = logs_dir / "mission.log"
timestamp = datetime.now(timezone.utc).isoformat()
line = f"[{timestamp}] [{self.agent_name}] {message}\n"
with open(log_file, "a", encoding="utf-8") as fh:
fh.write(line)
async def run_with_harness(self, coro: Awaitable[Any]) -> Any:
"""Start harness, run coro, stop harness. Returns coro result."""
await self.start()
try:
return await coro
finally:
await self.stop()
# ─── internals ────────────────────────────────────────────────────────────
async def _connect_hermes(self) -> None:
"""Attempt WebSocket connection to Hermes; fail gracefully."""
try:
import websockets # type: ignore[import]
self._ws = await websockets.connect(self.hermes_ws_url)
self._ws_connected = True
log.info("Connected to Hermes at %s", self.hermes_ws_url)
except Exception as exc:
log.warning("Could not connect to Hermes WS: %s", exc)
self._ws_connected = False
async def _emit_mission_event(self, event: str) -> None:
"""Send a mission_event message to Hermes."""
cell = self._cell
mission_id = cell.id if cell else ""
payload = {
"type": "mission_event",
"event": event,
"mission_id": mission_id,
"agent_name": self.agent_name,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if self._ws_connected and self._ws is not None:
try:
await self._ws.send(json.dumps(payload))
log.debug("Emitted %s for agent=%s", event, self.agent_name)
except Exception as exc:
log.warning("Failed to emit %s telemetry: %s", event, exc)
self._ws_connected = False
else:
log.debug(
"Hermes not connected; %s event not sent (agent=%s)",
event,
self.agent_name,
)

170
lazarus/isolation.py Normal file
View File

@@ -0,0 +1,170 @@
"""Level 1 isolation — directory setup and AppArmor profile generation."""
from __future__ import annotations
import logging
import platform
import subprocess
from dataclasses import dataclass
from pathlib import Path
from lazarus.cell import MissionCell
log = logging.getLogger(__name__)
_APPARMOR_TEMPLATE = """\
#include <tunables/global>
profile nexus-mission-{cell_id} flags=(attach_disconnected,mediate_deleted) {{
#include <abstractions/base>
# Allow read access to agent home
"{home_path}/" r,
"{home_path}/**" r,
# Allow read/write access to cell workspace
"{workspace_path}/" rw,
"{workspace_path}/**" rwkl,
# Allow read/write access to cell logs
"{logs_path}/" rw,
"{logs_path}/**" rwkl,
# Deny writes to home path
deny "{home_path}/**" w,
# Deny access to other agent home dirs
deny "/root/wizards/**" rw,
# Deny network access by default
deny network,
# Basic system access
/bin/** ix,
/usr/bin/** ix,
/lib/** r,
/usr/lib/** r,
/proc/self/** r,
}}
"""
@dataclass
class IsolationManifest:
"""Result of setting up Level 1 isolation for a mission cell."""
cell_id: str
workspace_path: Path
logs_path: Path
apparmor_profile_path: Path | None
platform: str
def setup_level1_isolation(cell: MissionCell) -> IsolationManifest:
"""
Set up Level 1 isolation for a mission cell.
Creates the proper directory structure with permissions and writes
an AppArmor profile (Linux) or just a profile file (other platforms).
Returns an IsolationManifest describing what was set up.
"""
cell_path = Path(cell.cell_path)
workspace_path = cell_path / "workspace"
logs_path = cell_path / "logs"
workspace_path.mkdir(parents=True, exist_ok=True)
logs_path.mkdir(parents=True, exist_ok=True)
# Set restrictive permissions on the workspace (owner-only)
try:
workspace_path.chmod(0o700)
logs_path.chmod(0o700)
except OSError as exc:
log.warning("Could not set directory permissions: %s", exc)
apparmor_profile_path: Path | None = None
sys_platform = platform.system()
if cell.home_path:
profile_path = cell_path / f"nexus-mission-{cell.id}.apparmor"
apparmor_profile_path = write_apparmor_profile(cell, profile_path)
log.info(
"Level 1 isolation set up for cell %s (platform=%s)", cell.id, sys_platform
)
return IsolationManifest(
cell_id=cell.id,
workspace_path=workspace_path,
logs_path=logs_path,
apparmor_profile_path=apparmor_profile_path,
platform=sys_platform,
)
def write_apparmor_profile(cell: MissionCell, output_path: Path) -> Path:
"""
Write an AppArmor profile for the mission cell to output_path.
On non-Linux platforms this just writes the file without attempting to load it.
Returns the path to the written profile.
"""
cell_path = Path(cell.cell_path)
workspace_path = cell_path / "workspace"
logs_path = cell_path / "logs"
home_path = cell.home_path or str(Path.home())
profile_content = _APPARMOR_TEMPLATE.format(
cell_id=cell.id,
home_path=home_path,
workspace_path=str(workspace_path),
logs_path=str(logs_path),
)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(profile_content, encoding="utf-8")
log.info("Wrote AppArmor profile to %s", output_path)
return output_path
def load_apparmor_profile(profile_path: Path) -> bool:
"""
Attempt to load an AppArmor profile via apparmor_parser.
On non-Linux platforms or when apparmor_parser is absent, this is a no-op
and returns False gracefully.
Returns True if the profile was loaded successfully.
"""
if platform.system() != "Linux":
log.debug(
"AppArmor load skipped: not on Linux (platform=%s)", platform.system()
)
return False
try:
result = subprocess.run(
["apparmor_parser", "-r", str(profile_path)],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
log.info("Loaded AppArmor profile from %s", profile_path)
return True
else:
log.warning(
"apparmor_parser failed (rc=%d): %s",
result.returncode,
result.stderr.strip(),
)
return False
except FileNotFoundError:
log.debug("apparmor_parser not found; skipping profile load")
return False
except subprocess.TimeoutExpired:
log.warning("apparmor_parser timed out loading %s", profile_path)
return False
except Exception as exc:
log.warning("Unexpected error loading AppArmor profile: %s", exc)
return False

184
lazarus/roster.py Normal file
View File

@@ -0,0 +1,184 @@
"""Mission roster registry — tracks all missions and agents."""
from __future__ import annotations
import json
import logging
import os
import tempfile
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
from lazarus.cell import MissionCell
log = logging.getLogger(__name__)
DEFAULT_REGISTRY_PATH = Path.home() / ".nexus" / "mission-roster.json"
EntryStatus = Literal["invited", "active", "departed"]
@dataclass
class RosterEntry:
"""An agent's participation record in a mission."""
mission_id: str
agent_name: str
invited_at: str
accepted_at: str | None
role: str
status: EntryStatus
def to_dict(self) -> dict:
return {
"mission_id": self.mission_id,
"agent_name": self.agent_name,
"invited_at": self.invited_at,
"accepted_at": self.accepted_at,
"role": self.role,
"status": self.status,
}
@classmethod
def from_dict(cls, data: dict) -> RosterEntry:
return cls(
mission_id=data["mission_id"],
agent_name=data["agent_name"],
invited_at=data["invited_at"],
accepted_at=data.get("accepted_at"),
role=data.get("role", "contributor"),
status=data["status"],
)
class MissionRoster:
"""Registry of all missions and their agent rosters."""
def __init__(self, registry_path: Path = DEFAULT_REGISTRY_PATH) -> None:
self.registry_path = Path(registry_path)
self._data: dict = {}
self._load()
# ─── internal I/O ─────────────────────────────────────────────────────────
def _load(self) -> None:
if self.registry_path.exists():
with open(self.registry_path, "r", encoding="utf-8") as fh:
self._data = json.load(fh)
else:
self._data = {"schema_version": "1", "missions": {}}
def save(self) -> None:
"""Atomically persist the registry to disk."""
self.registry_path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=self.registry_path.parent, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(self._data, fh, indent=2)
os.replace(tmp, self.registry_path)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise
log.debug("Saved mission roster to %s", self.registry_path)
# ─── missions ─────────────────────────────────────────────────────────────
def add_mission(self, cell: MissionCell) -> None:
"""Register a new mission from a MissionCell."""
missions: dict = self._data.setdefault("missions", {})
if cell.id not in missions:
missions[cell.id] = {
"id": cell.id,
"name": cell.name,
"repo_url": cell.repo_url,
"created_at": cell.created_at,
"state": cell.state,
"cell_path": cell.cell_path,
"agents": [],
}
log.info("Registered mission %s in roster", cell.id)
else:
log.debug("Mission %s already in roster, skipping add", cell.id)
def get_mission(self, mission_id: str) -> dict | None:
return self._data.get("missions", {}).get(mission_id)
def list_missions(self, state: str | None = None) -> list[dict]:
missions = list(self._data.get("missions", {}).values())
if state is not None:
missions = [m for m in missions if m.get("state") == state]
return missions
# ─── agents ───────────────────────────────────────────────────────────────
def invite_agent(
self,
mission_id: str,
agent_name: str,
role: str = "contributor",
) -> RosterEntry:
"""Invite an agent to a mission; returns the new RosterEntry."""
mission = self._require_mission(mission_id)
agents: list[dict] = mission.setdefault("agents", [])
# Check for existing entry
for entry_dict in agents:
if entry_dict["agent_name"] == agent_name:
log.warning(
"Agent %s already in mission %s roster", agent_name, mission_id
)
return RosterEntry.from_dict(entry_dict)
entry = RosterEntry(
mission_id=mission_id,
agent_name=agent_name,
invited_at=datetime.now(timezone.utc).isoformat(),
accepted_at=None,
role=role,
status="invited",
)
agents.append(entry.to_dict())
log.info("Invited %s to mission %s as %s", agent_name, mission_id, role)
return entry
def accept_invitation(self, mission_id: str, agent_name: str) -> RosterEntry:
"""Mark an invitation as accepted; status becomes 'active'."""
entry_dict = self._require_agent_entry(mission_id, agent_name)
entry_dict["status"] = "active"
entry_dict["accepted_at"] = datetime.now(timezone.utc).isoformat()
log.info("Agent %s accepted invitation to mission %s", agent_name, mission_id)
return RosterEntry.from_dict(entry_dict)
def depart_agent(self, mission_id: str, agent_name: str) -> RosterEntry:
"""Mark an agent as departed from a mission."""
entry_dict = self._require_agent_entry(mission_id, agent_name)
entry_dict["status"] = "departed"
log.info("Agent %s departed mission %s", agent_name, mission_id)
return RosterEntry.from_dict(entry_dict)
def list_agents(self, mission_id: str) -> list[RosterEntry]:
"""Return all RosterEntry objects for a mission."""
mission = self._require_mission(mission_id)
return [RosterEntry.from_dict(e) for e in mission.get("agents", [])]
# ─── helpers ──────────────────────────────────────────────────────────────
def _require_mission(self, mission_id: str) -> dict:
missions = self._data.get("missions", {})
if mission_id not in missions:
raise KeyError(f"Mission {mission_id!r} not found in roster")
return missions[mission_id]
def _require_agent_entry(self, mission_id: str, agent_name: str) -> dict:
mission = self._require_mission(mission_id)
for entry_dict in mission.get("agents", []):
if entry_dict["agent_name"] == agent_name:
return entry_dict
raise KeyError(
f"Agent {agent_name!r} not found in mission {mission_id!r} roster"
)

110
tests/test_lazarus_cell.py Normal file
View File

@@ -0,0 +1,110 @@
"""Tests for lazarus.cell — MissionCell lifecycle."""
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cell import MissionCell, create_cell, destroy_cell, load_cell, save_cell
def test_create_cell_creates_directories(tmp_path: Path) -> None:
"""workspace/, logs/, and snapshots/ subdirs are created under the cell dir."""
cell = create_cell(
mission_id="test-mission-1",
name="Test Mission",
repo_url="https://github.com/test/repo",
base_path=tmp_path,
)
cell_dir = tmp_path / "test-mission-1"
assert cell_dir.is_dir()
assert (cell_dir / "workspace").is_dir()
assert (cell_dir / "logs").is_dir()
assert (cell_dir / "snapshots").is_dir()
def test_create_cell_writes_cell_json(tmp_path: Path) -> None:
"""cell.json is created and contains valid mission metadata."""
cell = create_cell(
mission_id="test-mission-2",
name="Another Mission",
repo_url="https://github.com/test/another",
base_path=tmp_path,
)
cell_json_path = tmp_path / "test-mission-2" / "cell.json"
assert cell_json_path.exists()
with open(cell_json_path) as fh:
data = json.load(fh)
assert data["id"] == "test-mission-2"
assert data["name"] == "Another Mission"
assert data["repo_url"] == "https://github.com/test/another"
assert data["state"] == "pending"
assert isinstance(data["agents"], list)
def test_save_and_load_cell(tmp_path: Path) -> None:
"""Round-trip: save a cell, load it back, fields match."""
cell = create_cell(
mission_id="roundtrip-mission",
name="Roundtrip",
repo_url="https://github.com/test/roundtrip",
base_path=tmp_path,
)
# Mutate and save
cell.state = "active"
cell.agents.append("agentA")
save_cell(cell)
loaded = load_cell(tmp_path / "roundtrip-mission")
assert loaded.id == "roundtrip-mission"
assert loaded.state == "active"
assert "agentA" in loaded.agents
assert loaded.repo_url == "https://github.com/test/roundtrip"
def test_cell_state_transitions(tmp_path: Path) -> None:
"""State changes are persisted correctly through save/load."""
cell = create_cell(
mission_id="state-mission",
name="State Test",
repo_url="https://github.com/test/state",
base_path=tmp_path,
)
assert cell.state == "pending"
cell.state = "active"
save_cell(cell)
reloaded = load_cell(tmp_path / "state-mission")
assert reloaded.state == "active"
reloaded.state = "archived"
save_cell(reloaded)
final = load_cell(tmp_path / "state-mission")
assert final.state == "archived"
def test_destroy_cell_marks_destroyed(tmp_path: Path) -> None:
"""destroy_cell sets state to 'destroyed' and persists it."""
cell = create_cell(
mission_id="destroy-mission",
name="Destroy Test",
repo_url="https://github.com/test/destroy",
base_path=tmp_path,
)
destroy_cell(cell, cleanup_dirs=False)
assert cell.state == "destroyed"
reloaded = load_cell(tmp_path / "destroy-mission")
assert reloaded.state == "destroyed"

136
tests/test_lazarus_cli.py Normal file
View File

@@ -0,0 +1,136 @@
"""Tests for lazarus.cli — CLI commands."""
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cli import main
from lazarus.roster import MissionRoster
def _run(argv: list[str], tmp_path: Path, capsys: pytest.CaptureFixture) -> tuple:
"""Helper: run CLI with a tmp registry and capture stdout as JSON.
Prepends --registry before the subcommand so argparse sees it as a global arg.
"""
registry = str(tmp_path / "roster.json")
rc = main(["--registry", registry, *argv])
out = capsys.readouterr().out
return json.loads(out), rc
# Patch setup_level1_isolation globally for all CLI tests to avoid permission issues
@pytest.fixture(autouse=True)
def no_isolation():
with patch("lazarus.cli.setup_level1_isolation"):
yield
def test_invite_creates_mission_cell(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""invite creates a mission cell directory with cell.json."""
base = str(tmp_path / "missions")
result, rc = _run(
[
"invite", "agentAlpha",
"--repo", "https://github.com/test/repo",
"--base-path", base,
],
tmp_path,
capsys,
)
assert rc == 0
mission_id = result["mission_id"]
cell_json = tmp_path / "missions" / mission_id / "cell.json"
assert cell_json.exists()
def test_invite_adds_agent_to_roster(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""After invite, the agent appears in the roster with status 'invited'."""
base = str(tmp_path / "missions")
registry = str(tmp_path / "roster.json")
rc = main([
"--registry", registry,
"invite", "agentBeta",
"--repo", "https://github.com/test/repo",
"--base-path", base,
])
out = capsys.readouterr().out
result = json.loads(out)
assert rc == 0
roster = MissionRoster(Path(registry))
agents = roster.list_agents(result["mission_id"])
agent_names = [a.agent_name for a in agents]
assert "agentBeta" in agent_names
def test_list_shows_missions(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""list command shows missions that were previously created via invite."""
base = str(tmp_path / "missions")
registry = str(tmp_path / "roster.json")
# Create a mission first
main(["--registry", registry, "invite", "agentGamma",
"--repo", "https://github.com/test/repo", "--base-path", base])
capsys.readouterr() # discard invite output
rc = main(["--registry", registry, "list"])
out = capsys.readouterr().out
missions = json.loads(out)
assert rc == 0
assert isinstance(missions, list)
assert len(missions) >= 1
def test_status_shows_agents(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""status command shows agent count for a mission."""
base = str(tmp_path / "missions")
registry = str(tmp_path / "roster.json")
main(["--registry", registry, "invite", "agentDelta",
"--repo", "https://github.com/test/repo", "--base-path", base])
invite_out = json.loads(capsys.readouterr().out)
mission_id = invite_out["mission_id"]
rc = main(["--registry", registry, "status", mission_id])
out = capsys.readouterr().out
status = json.loads(out)
assert rc == 0
assert status["agent_count"] >= 1
agent_names = [a["agent_name"] for a in status["agents"]]
assert "agentDelta" in agent_names
def test_depart_updates_agent_status(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""depart command marks an agent's status as 'departed'."""
base = str(tmp_path / "missions")
registry = str(tmp_path / "roster.json")
main(["--registry", registry, "invite", "agentEpsilon",
"--repo", "https://github.com/test/repo", "--base-path", base])
invite_out = json.loads(capsys.readouterr().out)
mission_id = invite_out["mission_id"]
rc = main(["--registry", registry, "depart", "agentEpsilon",
"--mission", mission_id])
out = capsys.readouterr().out
result = json.loads(out)
assert rc == 0
assert result["status"] == "departed"
# Verify persisted
roster = MissionRoster(Path(registry))
agents = roster.list_agents(mission_id)
epsilon = next(a for a in agents if a.agent_name == "agentEpsilon")
assert epsilon.status == "departed"

View File

@@ -0,0 +1,141 @@
"""Tests for lazarus.harness — MissionCellHarness."""
from __future__ import annotations
import asyncio
import json
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cell import create_cell, save_cell
from lazarus.harness import MissionCellHarness
pytestmark = pytest.mark.asyncio
def _make_cell_and_harness(
tmp_path: Path,
agent_name: str = "testAgent",
) -> tuple[object, MissionCellHarness]:
cell = create_cell(
mission_id="harness-test-001",
name="Harness Test",
repo_url="https://github.com/test/repo",
base_path=tmp_path / "cells",
)
harness = MissionCellHarness(
agent_name=agent_name,
cell_path=Path(cell.cell_path),
hermes_ws_url="ws://localhost:8000/ws",
)
return cell, harness
def test_harness_get_env_includes_cell_path(tmp_path: Path) -> None:
"""get_env() returns a dict that includes MISSION_CELL_PATH and AGENT_NAME."""
cell, harness = _make_cell_and_harness(tmp_path)
# Load cell manually so get_env works without start()
from lazarus.cell import load_cell
harness._cell = load_cell(Path(cell.cell_path))
env = harness.get_env()
assert "MISSION_CELL_PATH" in env
assert env["MISSION_CELL_PATH"] == str(Path(cell.cell_path))
assert env["AGENT_NAME"] == "testAgent"
assert "MISSION_ID" in env
assert "HOME" in env
def test_harness_get_env_uses_custom_home(tmp_path: Path) -> None:
"""When home_path is provided, HOME in env equals that path."""
cell, _ = _make_cell_and_harness(tmp_path)
custom_home = str(tmp_path / "wizard_home")
harness = MissionCellHarness(
agent_name="agentX",
cell_path=Path(cell.cell_path),
home_path=custom_home,
)
from lazarus.cell import load_cell
harness._cell = load_cell(Path(cell.cell_path))
env = harness.get_env()
assert env["HOME"] == custom_home
def test_harness_write_mission_log(tmp_path: Path) -> None:
"""write_mission_log creates a log file in the cell's logs/ directory."""
cell, harness = _make_cell_and_harness(tmp_path)
from lazarus.cell import load_cell
harness._cell = load_cell(Path(cell.cell_path))
harness.write_mission_log("Hello from test")
log_file = Path(cell.cell_path) / "logs" / "mission.log"
assert log_file.exists()
content = log_file.read_text()
assert "Hello from test" in content
assert "testAgent" in content
async def test_harness_start_emits_telemetry(tmp_path: Path) -> None:
"""start() emits an agent_joined mission_event via WebSocket."""
cell, harness = _make_cell_and_harness(tmp_path)
mock_ws = AsyncMock()
mock_ws.send = AsyncMock()
mock_ws.close = AsyncMock()
sent_messages = []
async def capture_send(payload: str) -> None:
sent_messages.append(json.loads(payload))
mock_ws.send.side_effect = capture_send
with patch("lazarus.isolation.setup_level1_isolation"):
with patch("lazarus.harness.setup_level1_isolation"):
with patch("lazarus.harness.websockets") as mock_websockets:
mock_websockets.connect = AsyncMock(return_value=mock_ws)
harness._ws_connected = False
await harness.start()
assert any(
m.get("event") == "agent_joined" and m.get("type") == "mission_event"
for m in sent_messages
), f"Expected agent_joined event; got: {sent_messages}"
async def test_harness_stop_emits_departure_telemetry(tmp_path: Path) -> None:
"""stop() emits an agent_departed mission_event via WebSocket."""
cell, harness = _make_cell_and_harness(tmp_path)
from lazarus.cell import load_cell
harness._cell = load_cell(Path(cell.cell_path))
mock_ws = AsyncMock()
mock_ws.close = AsyncMock()
sent_messages = []
async def capture_send(payload: str) -> None:
sent_messages.append(json.loads(payload))
mock_ws.send = AsyncMock(side_effect=capture_send)
harness._ws = mock_ws
harness._ws_connected = True
await harness.stop()
assert any(
m.get("event") == "agent_departed"
for m in sent_messages
), f"Expected agent_departed event; got: {sent_messages}"

View File

@@ -0,0 +1,107 @@
"""Tests for lazarus.isolation — Level 1 isolation setup."""
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cell import create_cell
from lazarus.isolation import (
IsolationManifest,
load_apparmor_profile,
setup_level1_isolation,
write_apparmor_profile,
)
def _make_cell(tmp_path: Path, home_path: str = "") -> object:
return create_cell(
mission_id="iso-test-001",
name="Isolation Test",
repo_url="https://github.com/test/repo",
base_path=tmp_path / "cells",
home_path=home_path,
)
def test_setup_isolation_creates_workspace(tmp_path: Path) -> None:
"""setup_level1_isolation creates workspace and logs dirs."""
cell = _make_cell(tmp_path)
manifest = setup_level1_isolation(cell)
assert manifest.workspace_path.is_dir()
assert manifest.logs_path.is_dir()
assert isinstance(manifest, IsolationManifest)
assert manifest.cell_id == cell.id
def test_write_apparmor_profile_creates_file(tmp_path: Path) -> None:
"""write_apparmor_profile writes a file at the given output path."""
cell = _make_cell(tmp_path, home_path=str(tmp_path / "home" / "agentX"))
output_path = tmp_path / "test.apparmor"
result_path = write_apparmor_profile(cell, output_path)
assert result_path == output_path
assert output_path.exists()
assert output_path.stat().st_size > 0
def test_apparmor_profile_content(tmp_path: Path) -> None:
"""Profile file contains correct cell ID, workspace, logs, and home paths."""
home = str(tmp_path / "home" / "agentX")
cell = _make_cell(tmp_path, home_path=home)
output_path = tmp_path / "test.apparmor"
write_apparmor_profile(cell, output_path)
content = output_path.read_text()
assert cell.id in content
assert "nexus-mission-" in content
assert "workspace" in content
assert "logs" in content
assert home in content
# Deny write to home
assert "deny" in content
def test_load_apparmor_profile_noop_on_no_apparmor(tmp_path: Path) -> None:
"""load_apparmor_profile returns False gracefully when apparmor_parser is absent."""
profile_path = tmp_path / "dummy.apparmor"
profile_path.write_text("# fake profile\n")
# Simulate apparmor_parser not found (FileNotFoundError from subprocess.run)
with patch("lazarus.isolation.platform.system", return_value="Linux"):
with patch(
"lazarus.isolation.subprocess.run",
side_effect=FileNotFoundError("apparmor_parser not found"),
):
result = load_apparmor_profile(profile_path)
assert result is False
def test_load_apparmor_profile_noop_on_non_linux(tmp_path: Path) -> None:
"""load_apparmor_profile returns False on non-Linux platforms."""
profile_path = tmp_path / "dummy.apparmor"
profile_path.write_text("# fake profile\n")
with patch("lazarus.isolation.platform.system", return_value="Darwin"):
result = load_apparmor_profile(profile_path)
assert result is False
def test_setup_isolation_writes_profile_when_home_path_set(tmp_path: Path) -> None:
"""When home_path is set, setup_level1_isolation writes an AppArmor profile."""
home = str(tmp_path / "home" / "agentZ")
cell = _make_cell(tmp_path, home_path=home)
manifest = setup_level1_isolation(cell)
assert manifest.apparmor_profile_path is not None
assert manifest.apparmor_profile_path.exists()

View File

@@ -0,0 +1,122 @@
"""Tests for lazarus.roster — MissionRoster registry."""
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cell import create_cell
from lazarus.roster import MissionRoster
def _make_roster(tmp_path: Path) -> MissionRoster:
return MissionRoster(registry_path=tmp_path / "roster.json")
def _make_cell(tmp_path: Path, mission_id: str = "m-001") -> object:
return create_cell(
mission_id=mission_id,
name="Test Mission",
repo_url="https://github.com/test/repo",
base_path=tmp_path / "cells",
)
def test_add_mission_and_invite(tmp_path: Path) -> None:
"""Can add a mission and invite an agent to it."""
roster = _make_roster(tmp_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
entry = roster.invite_agent(cell.id, "agentA", role="contributor")
assert entry.agent_name == "agentA"
assert entry.status == "invited"
assert entry.role == "contributor"
assert entry.accepted_at is None
def test_accept_invitation(tmp_path: Path) -> None:
"""Accepting an invitation changes status from 'invited' to 'active'."""
roster = _make_roster(tmp_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
roster.invite_agent(cell.id, "agentB")
entry = roster.accept_invitation(cell.id, "agentB")
assert entry.status == "active"
assert entry.accepted_at is not None
def test_depart_agent(tmp_path: Path) -> None:
"""Departing changes status to 'departed'."""
roster = _make_roster(tmp_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
roster.invite_agent(cell.id, "agentC")
roster.accept_invitation(cell.id, "agentC")
entry = roster.depart_agent(cell.id, "agentC")
assert entry.status == "departed"
def test_list_missions_filtered_by_state(tmp_path: Path) -> None:
"""list_missions(state=...) filters correctly."""
roster = _make_roster(tmp_path)
cell1 = _make_cell(tmp_path, "m-001")
cell2 = _make_cell(tmp_path, "m-002")
roster.add_mission(cell1)
roster.add_mission(cell2)
# Both are 'pending' by default
pending = roster.list_missions(state="pending")
assert len(pending) == 2
# No active missions yet
active = roster.list_missions(state="active")
assert len(active) == 0
def test_roster_persistence(tmp_path: Path) -> None:
"""Save and reload the roster file; data survives the round-trip."""
registry_path = tmp_path / "roster.json"
roster = MissionRoster(registry_path=registry_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
roster.invite_agent(cell.id, "agentD", role="lead")
roster.save()
# Load fresh from disk
roster2 = MissionRoster(registry_path=registry_path)
mission = roster2.get_mission(cell.id)
assert mission is not None
assert mission["id"] == cell.id
agents = roster2.list_agents(cell.id)
assert len(agents) == 1
assert agents[0].agent_name == "agentD"
assert agents[0].role == "lead"
assert agents[0].status == "invited"
def test_list_agents(tmp_path: Path) -> None:
"""list_agents returns all agents for a given mission."""
roster = _make_roster(tmp_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
roster.invite_agent(cell.id, "alice")
roster.invite_agent(cell.id, "bob")
agents = roster.list_agents(cell.id)
names = {a.agent_name for a in agents}
assert names == {"alice", "bob"}

View File

@@ -1,659 +0,0 @@
"""Tests for the Resurrection Pool — health polling, dead-agent detection, auto-revive.
Validates:
- GatewayConfig / PoolConfig loading from dicts and files
- RevivePolicy enum parsing (including unknown values)
- probe_gateway / probe_process / probe_heartbeat functions
- assess_agent combining all three probes into AgentHealth
- AgentHealth state: HEALTHY / DEGRADED / DEAD
- ResurrectionPool._update_health — dead_at tracking and recovery
- ResurrectionPool.sweep — policy routing (auto/ask/never)
- apply_revive_policy — auto-restart, never, ask-with-no-channel fallback
- Telegram / Nostr dry-run paths
- Gitea issue helpers (mocked)
- CLI argument parsing
"""
from __future__ import annotations
import importlib.util
import json
import os
import sys
import tempfile
import time
from pathlib import Path
from unittest.mock import MagicMock, patch, call
import pytest
# ── Module loading (same pattern as test_nexus_watchdog.py) ──────────
PROJECT_ROOT = Path(__file__).parent.parent
_rp_spec = importlib.util.spec_from_file_location(
"resurrection_pool_test",
PROJECT_ROOT / "bin" / "resurrection_pool.py",
)
_rp = importlib.util.module_from_spec(_rp_spec)
sys.modules["resurrection_pool_test"] = _rp
_rp_spec.loader.exec_module(_rp)
# Convenience aliases
AgentHealth = _rp.AgentHealth
AgentState = _rp.AgentState
GatewayConfig = _rp.GatewayConfig
MissionConfig = _rp.MissionConfig
PoolConfig = _rp.PoolConfig
ResurrectionPool = _rp.ResurrectionPool
RevivePolicy = _rp.RevivePolicy
apply_revive_policy = _rp.apply_revive_policy
assess_agent = _rp.assess_agent
build_parser = _rp.build_parser
probe_gateway = _rp.probe_gateway
probe_heartbeat = _rp.probe_heartbeat
probe_process = _rp.probe_process
revive_agent = _rp.revive_agent
send_telegram_approval_request = _rp.send_telegram_approval_request
send_nostr_approval_request = _rp.send_nostr_approval_request
# ── Helpers ──────────────────────────────────────────────────────────
def _make_gw(name="timmy", host="localhost", port=8765,
heartbeat_path=None, process_pattern="nexus_think",
revive_cmd=None, tmp_path=None):
if heartbeat_path is None and tmp_path is not None:
heartbeat_path = str(tmp_path / "heartbeat.json")
elif heartbeat_path is None:
heartbeat_path = "~/.nexus/heartbeat.json"
return GatewayConfig(
name=name,
host=host,
port=port,
heartbeat_path=heartbeat_path,
process_pattern=process_pattern,
revive_cmd=revive_cmd or [],
)
def _write_heartbeat(path: Path, age_seconds: float = 10.0):
path.parent.mkdir(parents=True, exist_ok=True)
data = {
"pid": os.getpid(),
"timestamp": time.time() - age_seconds,
"cycle": 7,
"model": "timmy:v0.1",
"status": "thinking",
}
path.write_text(json.dumps(data))
def _make_cfg(**kwargs) -> PoolConfig:
return PoolConfig(**kwargs)
# ── RevivePolicy tests ───────────────────────────────────────────────
class TestRevivePolicy:
def test_valid_values(self):
assert RevivePolicy("auto") == RevivePolicy.AUTO
assert RevivePolicy("ask") == RevivePolicy.ASK
assert RevivePolicy("never") == RevivePolicy.NEVER
def test_mission_from_dict_unknown_policy(self):
m = MissionConfig.from_dict("m1", {"revive_policy": "bogus", "agents": ["a"]})
assert m.revive_policy == RevivePolicy.ASK
def test_mission_from_dict_valid_policy(self):
m = MissionConfig.from_dict("m1", {"revive_policy": "auto", "agents": ["x"]})
assert m.revive_policy == RevivePolicy.AUTO
def test_mission_agents_default_empty(self):
m = MissionConfig.from_dict("m2", {})
assert m.agents == []
# ── PoolConfig loading tests ─────────────────────────────────────────
class TestPoolConfig:
def test_defaults(self):
cfg = PoolConfig()
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
assert cfg.dead_threshold == _rp.DEFAULT_DEAD_THRESHOLD
assert cfg.gateways == []
assert cfg.missions == []
def test_from_dict_basic(self):
data = {
"poll_interval": 30,
"dead_threshold": 120,
"gateways": [{"name": "allegro", "port": 9000}],
"missions": {
"m1": {"label": "Sprint", "agents": ["allegro"], "revive_policy": "auto"},
},
}
cfg = PoolConfig.from_dict(data)
assert cfg.poll_interval == 30
assert cfg.dead_threshold == 120
assert len(cfg.gateways) == 1
assert cfg.gateways[0].name == "allegro"
assert cfg.gateways[0].port == 9000
assert len(cfg.missions) == 1
assert cfg.missions[0].revive_policy == RevivePolicy.AUTO
def test_from_dict_telegram(self):
data = {"telegram": {"bot_token": "tok123", "chat_id": "-999"}}
cfg = PoolConfig.from_dict(data)
assert cfg.telegram_bot_token == "tok123"
assert cfg.telegram_chat_id == "-999"
def test_from_dict_nostr(self):
data = {"nostr": {"relay_url": "wss://relay.damus.io", "recipient_pubkey": "abc"}}
cfg = PoolConfig.from_dict(data)
assert cfg.nostr_relay_url == "wss://relay.damus.io"
assert cfg.nostr_recipient_pubkey == "abc"
def test_load_from_file(self, tmp_path):
config_file = tmp_path / "pool.json"
config_file.write_text(json.dumps({"poll_interval": 42}))
cfg = PoolConfig.load(config_file)
assert cfg.poll_interval == 42
def test_load_missing_file_returns_defaults(self, tmp_path):
cfg = PoolConfig.load(tmp_path / "nonexistent.json")
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
def test_load_corrupt_file_falls_through(self, tmp_path):
bad = tmp_path / "bad.json"
bad.write_text("not json {{")
cfg = PoolConfig.load(bad)
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
def test_load_env_var(self, tmp_path, monkeypatch):
config_file = tmp_path / "env_pool.json"
config_file.write_text(json.dumps({"dead_threshold": 99}))
monkeypatch.setenv("RESURRECTION_CONFIG", str(config_file))
cfg = PoolConfig.load()
assert cfg.dead_threshold == 99
# ── probe_gateway tests ──────────────────────────────────────────────
class TestProbeGateway:
def test_refused_port(self):
# Port 1 should always be refused on localhost
alive, msg = probe_gateway("localhost", 1, timeout=2)
assert not alive
assert "refused" in msg.lower() or "failed" in msg.lower()
def test_open_port(self):
# Open a real server socket to confirm TCP probe works
import threading
import socket as _socket
srv = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
srv.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
srv.bind(("127.0.0.1", 0))
srv.listen(1)
port = srv.getsockname()[1]
def accept_one():
try:
conn, _ = srv.accept()
conn.close()
except Exception:
pass
finally:
srv.close()
t = threading.Thread(target=accept_one, daemon=True)
t.start()
alive, msg = probe_gateway("127.0.0.1", port, timeout=3)
t.join(timeout=2)
assert alive
assert "reachable" in msg.lower()
def test_invalid_host(self):
alive, msg = probe_gateway("this.host.does.not.exist.invalid", 8765, timeout=2)
assert not alive
# ── probe_process tests ──────────────────────────────────────────────
class TestProbeProcess:
def test_self_is_filtered(self):
# The test runner process matches "python" — but watchdog filters own PID.
# We just check it returns a tuple without error.
alive, msg = probe_process("definitely-no-process-xyz-12345")
assert not alive
assert isinstance(msg, str)
def test_running_process(self):
# pgrep for the current python process name should always find something
alive, msg = probe_process("pytest")
# May or may not find it depending on env; just check it doesn't crash
assert isinstance(alive, bool)
assert isinstance(msg, str)
@patch("subprocess.run", side_effect=FileNotFoundError)
def test_pgrep_missing(self, _mock):
alive, msg = probe_process("anything")
assert alive # graceful fallback: don't raise false alarms
assert "pgrep" in msg.lower()
# ── probe_heartbeat tests ────────────────────────────────────────────
class TestProbeHeartbeat:
def test_missing_file(self, tmp_path):
alive, age, msg = probe_heartbeat(tmp_path / "no.json", dead_threshold=300)
assert not alive
assert age is None
assert "no heartbeat" in msg.lower()
def test_fresh_heartbeat(self, tmp_path):
hb = tmp_path / "heartbeat.json"
_write_heartbeat(hb, age_seconds=10)
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
assert alive
assert age is not None and age < 20
assert "alive" in msg.lower()
def test_stale_heartbeat(self, tmp_path):
hb = tmp_path / "heartbeat.json"
_write_heartbeat(hb, age_seconds=600)
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
assert not alive
assert age > 500
assert "stale" in msg.lower()
def test_corrupt_json(self, tmp_path):
hb = tmp_path / "heartbeat.json"
hb.write_text("{ broken json")
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
assert not alive
assert "unreadable" in msg.lower()
def test_missing_timestamp_treated_as_stale(self, tmp_path):
hb = tmp_path / "heartbeat.json"
hb.write_text(json.dumps({"cycle": 1})) # no timestamp key
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
# timestamp defaults to 0 → very old → stale
assert not alive
# ── assess_agent tests ────────────────────────────────────────────────
class TestAssessAgent:
def _gw(self, tmp_path, port=1):
return _make_gw(tmp_path=tmp_path, port=port)
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
@patch("resurrection_pool_test.probe_process", return_value=(True, "ok"))
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 5.0, "ok"))
def test_all_healthy(self, _hb, _proc, _gw, tmp_path):
gw = self._gw(tmp_path)
h = assess_agent(gw, dead_threshold=300)
assert h.state == AgentState.HEALTHY
@patch("resurrection_pool_test.probe_gateway", return_value=(False, "refused"))
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
@patch("resurrection_pool_test.probe_heartbeat", return_value=(False, None, "missing"))
def test_all_dead(self, _hb, _proc, _gw, tmp_path):
gw = self._gw(tmp_path)
h = assess_agent(gw, dead_threshold=300)
assert h.state == AgentState.DEAD
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 10.0, "ok"))
def test_partially_failing_is_degraded(self, _hb, _proc, _gw, tmp_path):
gw = self._gw(tmp_path)
h = assess_agent(gw, dead_threshold=300)
assert h.state == AgentState.DEGRADED
# ── AgentHealth state machine tests ──────────────────────────────────
class TestAgentHealthTracking:
def _pool(self):
cfg = _make_cfg()
return ResurrectionPool(cfg, dry_run=True)
def test_first_dead_at_set_on_first_failure(self):
pool = self._pool()
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
result = pool._update_health("timmy", dead)
assert result.first_dead_at is not None
def test_first_dead_at_preserved_across_cycles(self):
pool = self._pool()
t0 = time.time() - 100
dead1 = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=t0, first_dead_at=t0)
pool._update_health("timmy", dead1)
dead2 = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
result = pool._update_health("timmy", dead2)
assert result.first_dead_at == t0 # preserved
def test_recovery_resets_first_dead_at(self):
pool = self._pool()
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
pool._update_health("timmy", dead)
healthy = AgentHealth(name="timmy", state=AgentState.HEALTHY,
last_checked=time.time())
result = pool._update_health("timmy", healthy)
assert result.first_dead_at is None
assert result.revive_attempts == 0
def test_revive_attempts_increments(self):
pool = self._pool()
h = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
pool._health["timmy"] = h
h.revive_attempts = 2
prev = pool._health["timmy"]
new = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
result = pool._update_health("timmy", new)
assert result.revive_attempts == 2 # preserved from prev
# ── mission_for / gateway_for tests ─────────────────────────────────
class TestPoolLookups:
def test_mission_for_known_agent(self):
cfg = PoolConfig.from_dict({
"missions": {"m1": {"agents": ["timmy"], "revive_policy": "auto"}},
})
pool = ResurrectionPool(cfg, dry_run=True)
m = pool._mission_for("timmy")
assert m is not None
assert m.mission_id == "m1"
def test_mission_for_unknown_agent(self):
cfg = PoolConfig()
pool = ResurrectionPool(cfg, dry_run=True)
assert pool._mission_for("ghost") is None
def test_gateway_for(self):
cfg = PoolConfig.from_dict({
"gateways": [{"name": "allegro", "port": 9000}],
})
pool = ResurrectionPool(cfg, dry_run=True)
gw = pool._gateway_for("allegro")
assert gw is not None
assert gw.port == 9000
def test_gateway_for_missing(self):
pool = ResurrectionPool(PoolConfig(), dry_run=True)
assert pool._gateway_for("nobody") is None
# ── apply_revive_policy tests ────────────────────────────────────────
class TestApplyRevivePolicy:
def _dead_health(self, name="timmy", dead_seconds=400):
h = AgentHealth(name=name, state=AgentState.DEAD,
first_dead_at=time.time() - dead_seconds,
last_checked=time.time())
return h
def test_never_policy_no_revive(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path)
mission = MissionConfig("m1", revive_policy=RevivePolicy.NEVER)
cfg = _make_cfg()
health = self._dead_health()
with patch.object(_rp, "open_death_issue") as mock_issue:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert not result
mock_issue.assert_called_once()
def test_auto_policy_launches_revive(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "revive"])
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
cfg = _make_cfg()
health = self._dead_health()
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert result
mock_revive.assert_called_once_with(gw, dry_run=True)
def test_auto_policy_opens_issue_on_revive_failure(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path)
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
cfg = _make_cfg()
health = self._dead_health()
with patch.object(_rp, "revive_agent", return_value=False), \
patch.object(_rp, "open_death_issue") as mock_issue:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert not result
mock_issue.assert_called_once()
def test_ask_policy_dry_run_approves(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
health = self._dead_health()
# dry_run → send_telegram returns True, poll returns True
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert result
mock_revive.assert_called_once()
def test_ask_policy_no_channel_falls_back_to_auto(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
cfg = _make_cfg() # no telegram, no nostr
health = self._dead_health()
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert result
mock_revive.assert_called_once()
def test_skip_if_revive_pending(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path)
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
health = self._dead_health()
health.revive_pending = True # approval already outstanding
with patch.object(_rp, "revive_agent") as mock_revive:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=False)
assert not result
mock_revive.assert_not_called()
# ── revive_agent tests ───────────────────────────────────────────────
class TestReviveAgent:
def test_no_revive_cmd(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=[])
assert not revive_agent(gw)
def test_dry_run(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
assert revive_agent(gw, dry_run=True)
def test_launches_process(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["true"])
result = revive_agent(gw, dry_run=False)
assert result
def test_bad_command(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["/does/not/exist/xyz"])
result = revive_agent(gw, dry_run=False)
assert not result
# ── Notification channel tests ────────────────────────────────────────
class TestNotifications:
def test_telegram_dry_run(self):
assert send_telegram_approval_request(
"token", "chat", "timmy", "Sprint", 400, dry_run=True
)
def test_telegram_no_token(self):
assert not send_telegram_approval_request(
"", "", "timmy", "Sprint", 400, dry_run=False
)
def test_nostr_dry_run(self):
result = send_nostr_approval_request(
"wss://relay.damus.io", "pubkey123",
"timmy", "Sprint", 400, dry_run=True
)
assert result
def test_nostr_no_config(self):
result = send_nostr_approval_request(
"", "",
"timmy", "Sprint", 400, dry_run=False
)
assert not result
# ── ResurrectionPool.sweep integration tests ─────────────────────────
class TestPoolSweep:
def test_sweep_empty_gateways(self):
pool = ResurrectionPool(PoolConfig(), dry_run=True)
result = pool.sweep()
assert result == {}
@patch("resurrection_pool_test.assess_agent")
def test_sweep_healthy_agent_no_policy_triggered(self, mock_assess):
gw = GatewayConfig(name="timmy")
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.HEALTHY,
gateway_alive=True, process_alive=True, heartbeat_age=5.0,
)
with patch.object(_rp, "apply_revive_policy") as mock_policy:
pool.sweep()
mock_policy.assert_not_called()
@patch("resurrection_pool_test.assess_agent")
def test_sweep_dead_agent_triggers_policy(self, mock_assess):
cfg = PoolConfig.from_dict({
"gateways": [{"name": "timmy"}],
"dead_threshold": 0, # treat any death as immediate trigger
})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.DEAD,
first_dead_at=time.time() - 999,
last_checked=time.time(),
)
with patch.object(_rp, "apply_revive_policy", return_value=False) as mock_policy:
pool.sweep()
mock_policy.assert_called_once()
@patch("resurrection_pool_test.assess_agent")
def test_sweep_degraded_does_not_trigger_policy(self, mock_assess):
cfg = PoolConfig.from_dict({
"gateways": [{"name": "timmy"}],
"dead_threshold": 10,
})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.DEGRADED,
first_dead_at=time.time() - 5, # only 5s, below threshold
last_checked=time.time(),
)
with patch.object(_rp, "apply_revive_policy") as mock_policy:
pool.sweep()
mock_policy.assert_not_called()
# ── run_once / status tests ───────────────────────────────────────────
class TestRunOnce:
def test_run_once_empty_returns_true(self):
pool = ResurrectionPool(PoolConfig(), dry_run=True)
assert pool.run_once()
@patch("resurrection_pool_test.assess_agent")
def test_run_once_dead_agent_returns_false(self, mock_assess):
cfg = PoolConfig.from_dict({
"gateways": [{"name": "timmy"}],
"dead_threshold": 0,
})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.DEAD,
first_dead_at=time.time() - 9999,
last_checked=time.time(),
)
with patch.object(_rp, "apply_revive_policy", return_value=False):
result = pool.run_once()
assert not result
@patch("resurrection_pool_test.assess_agent")
def test_status_returns_list(self, mock_assess):
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.HEALTHY,
gateway_alive=True, process_alive=True, heartbeat_age=3.0,
)
pool.sweep()
status = pool.status()
assert isinstance(status, list)
assert len(status) == 1
assert status[0]["name"] == "timmy"
assert status[0]["state"] == "healthy"
# ── CLI tests ─────────────────────────────────────────────────────────
class TestCLI:
def test_default_args(self):
p = build_parser()
args = p.parse_args([])
assert not args.once
assert not args.dry_run
assert not args.status
assert args.config is None
def test_once_flag(self):
args = build_parser().parse_args(["--once"])
assert args.once
def test_dry_run_flag(self):
args = build_parser().parse_args(["--dry-run"])
assert args.dry_run
def test_poll_interval_override(self):
args = build_parser().parse_args(["--poll-interval", "15"])
assert args.poll_interval == 15
def test_dead_threshold_override(self):
args = build_parser().parse_args(["--dead-threshold", "60"])
assert args.dead_threshold == 60
def test_config_path(self, tmp_path):
cfg_file = tmp_path / "pool.json"
cfg_file.write_text("{}")
args = build_parser().parse_args(["--config", str(cfg_file)])
assert args.config == str(cfg_file)