Compare commits
1 Commits
claude/iss
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e530565d3c |
@@ -1,940 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Resurrection Pool — Fleet Health Polling, Dead-Agent Detection, Auto-Revive
|
||||
|
||||
Monitors all wizard gateways in the fleet, detects downed agents, and
|
||||
applies configurable revive policies per mission cell:
|
||||
|
||||
auto — restart the agent immediately on death detection
|
||||
ask — send a Telegram / Nostr approval request; revive when approved
|
||||
never — record the death, create a Gitea issue, do nothing else
|
||||
|
||||
HOW IT WORKS
|
||||
============
|
||||
1. Load a JSON config listing wizard gateways and mission cells.
|
||||
2. Poll each gateway on a configurable interval (default: 60s).
|
||||
- TCP-probe the WebSocket port (can the gateway hear the world?)
|
||||
- Check for a running agent process via pgrep
|
||||
- Read the heartbeat file for last-think timestamp
|
||||
3. If an agent fails all three checks for longer than dead_threshold:
|
||||
a. Mark it dead in the pool state.
|
||||
b. Look up the mission it belongs to and read the revive_policy.
|
||||
c. Execute the policy: auto-restart, request human approval, or no-op.
|
||||
4. On auto-revive: spawn the configured revive_cmd, wait for the heartbeat
|
||||
to return healthy, and log the recovery.
|
||||
5. On ask-revive: post an approval request to Telegram / Nostr and wait
|
||||
up to approval_timeout seconds for a "+1" or "revive" reply.
|
||||
6. Create / close Gitea issues to track fleet health events.
|
||||
|
||||
CONFIGURATION
|
||||
=============
|
||||
Config file path is resolved in this order:
|
||||
1. --config CLI flag
|
||||
2. RESURRECTION_CONFIG env var
|
||||
3. ~/.nexus/resurrection_pool.json
|
||||
4. /etc/nexus/resurrection_pool.json
|
||||
|
||||
Minimal config skeleton (all fields optional, sane defaults everywhere):
|
||||
|
||||
{
|
||||
"poll_interval": 60,
|
||||
"dead_threshold": 300,
|
||||
"approval_timeout": 300,
|
||||
"gateways": [
|
||||
{
|
||||
"name": "timmy",
|
||||
"host": "localhost",
|
||||
"port": 8765,
|
||||
"heartbeat_path": "~/.nexus/heartbeat.json",
|
||||
"process_pattern": "nexus_think",
|
||||
"revive_cmd": ["python", "/root/wizards/timmy/nexus_think.py"]
|
||||
}
|
||||
],
|
||||
"missions": {
|
||||
"some-mission-uuid": {
|
||||
"label": "Portal Sprint",
|
||||
"agents": ["timmy"],
|
||||
"revive_policy": "ask"
|
||||
}
|
||||
},
|
||||
"telegram": {
|
||||
"bot_token": "...",
|
||||
"chat_id": "-1001234567890"
|
||||
},
|
||||
"nostr": {
|
||||
"relay_url": "wss://relay.damus.io",
|
||||
"recipient_pubkey": "..."
|
||||
},
|
||||
"gitea": {
|
||||
"url": "https://forge.example.com",
|
||||
"token": "...",
|
||||
"repo": "Timmy_Foundation/the-nexus"
|
||||
}
|
||||
}
|
||||
|
||||
USAGE
|
||||
=====
|
||||
# One-shot health check (good for cron / smoke tests)
|
||||
python bin/resurrection_pool.py --once
|
||||
|
||||
# Continuous daemon (default)
|
||||
python bin/resurrection_pool.py
|
||||
|
||||
# Custom config + dry-run (no side-effects)
|
||||
python bin/resurrection_pool.py --config /etc/nexus/pool.json --dry-run
|
||||
|
||||
# Print current pool state as JSON
|
||||
python bin/resurrection_pool.py --status
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
=================
|
||||
Pure stdlib. No pip installs required on the monitoring host.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("nexus.resurrection_pool")
|
||||
|
||||
# ── Constants ────────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_POLL_INTERVAL = 60 # seconds between full fleet sweeps
|
||||
DEFAULT_DEAD_THRESHOLD = 300 # seconds with no heartbeat = dead
|
||||
DEFAULT_APPROVAL_TIMEOUT = 300 # seconds to wait for human approval
|
||||
|
||||
CONFIG_SEARCH_PATHS = [
|
||||
Path.home() / ".nexus" / "resurrection_pool.json",
|
||||
Path("/etc/nexus/resurrection_pool.json"),
|
||||
]
|
||||
|
||||
POOL_LABEL = "resurrection-pool"
|
||||
POOL_TITLE_PREFIX = "[pool]"
|
||||
|
||||
|
||||
# ── Enums & simple types ─────────────────────────────────────────────
|
||||
|
||||
class RevivePolicy(str, Enum):
|
||||
AUTO = "auto" # restart immediately, no human needed
|
||||
ASK = "ask" # send approval request, wait for +1
|
||||
NEVER = "never" # log + Gitea issue, do not restart
|
||||
|
||||
|
||||
class AgentState(str, Enum):
|
||||
HEALTHY = "healthy"
|
||||
DEGRADED = "degraded" # some checks failing but not yet dead
|
||||
DEAD = "dead"
|
||||
REVIVING = "reviving"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
# ── Configuration dataclasses ────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class GatewayConfig:
|
||||
"""One wizard gateway to monitor."""
|
||||
name: str
|
||||
host: str = "localhost"
|
||||
port: int = 8765
|
||||
heartbeat_path: str = "~/.nexus/heartbeat.json"
|
||||
process_pattern: str = "nexus_think"
|
||||
revive_cmd: List[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def resolved_heartbeat_path(self) -> Path:
|
||||
return Path(self.heartbeat_path).expanduser()
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionConfig:
|
||||
"""A mission cell with agents and a revive policy."""
|
||||
mission_id: str
|
||||
label: str = ""
|
||||
agents: List[str] = field(default_factory=list)
|
||||
revive_policy: RevivePolicy = RevivePolicy.ASK
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, mission_id: str, data: dict) -> "MissionConfig":
|
||||
policy_raw = data.get("revive_policy", RevivePolicy.ASK)
|
||||
try:
|
||||
policy = RevivePolicy(policy_raw)
|
||||
except ValueError:
|
||||
logger.warning("Unknown revive_policy %r for mission %s; defaulting to 'ask'",
|
||||
policy_raw, mission_id)
|
||||
policy = RevivePolicy.ASK
|
||||
return cls(
|
||||
mission_id=mission_id,
|
||||
label=data.get("label", ""),
|
||||
agents=data.get("agents", []),
|
||||
revive_policy=policy,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PoolConfig:
|
||||
"""Full resurrection pool configuration."""
|
||||
poll_interval: int = DEFAULT_POLL_INTERVAL
|
||||
dead_threshold: int = DEFAULT_DEAD_THRESHOLD
|
||||
approval_timeout: int = DEFAULT_APPROVAL_TIMEOUT
|
||||
gateways: List[GatewayConfig] = field(default_factory=list)
|
||||
missions: List[MissionConfig] = field(default_factory=list)
|
||||
telegram_bot_token: str = ""
|
||||
telegram_chat_id: str = ""
|
||||
nostr_relay_url: str = ""
|
||||
nostr_recipient_pubkey: str = ""
|
||||
gitea_url: str = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
gitea_token: str = os.environ.get("GITEA_TOKEN", "")
|
||||
gitea_repo: str = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "PoolConfig":
|
||||
tg = data.get("telegram", {})
|
||||
ns = data.get("nostr", {})
|
||||
git = data.get("gitea", {})
|
||||
|
||||
gateways = [
|
||||
GatewayConfig(
|
||||
name=gw["name"],
|
||||
host=gw.get("host", "localhost"),
|
||||
port=gw.get("port", 8765),
|
||||
heartbeat_path=gw.get("heartbeat_path", "~/.nexus/heartbeat.json"),
|
||||
process_pattern=gw.get("process_pattern", "nexus_think"),
|
||||
revive_cmd=gw.get("revive_cmd", []),
|
||||
)
|
||||
for gw in data.get("gateways", [])
|
||||
]
|
||||
|
||||
missions = [
|
||||
MissionConfig.from_dict(mid, mdata)
|
||||
for mid, mdata in data.get("missions", {}).items()
|
||||
]
|
||||
|
||||
return cls(
|
||||
poll_interval=data.get("poll_interval", DEFAULT_POLL_INTERVAL),
|
||||
dead_threshold=data.get("dead_threshold", DEFAULT_DEAD_THRESHOLD),
|
||||
approval_timeout=data.get("approval_timeout", DEFAULT_APPROVAL_TIMEOUT),
|
||||
gateways=gateways,
|
||||
missions=missions,
|
||||
telegram_bot_token=tg.get("bot_token", os.environ.get("TELEGRAM_BOT_TOKEN", "")),
|
||||
telegram_chat_id=tg.get("chat_id", os.environ.get("TELEGRAM_CHAT_ID", "")),
|
||||
nostr_relay_url=ns.get("relay_url", ""),
|
||||
nostr_recipient_pubkey=ns.get("recipient_pubkey", ""),
|
||||
gitea_url=git.get("url", os.environ.get("GITEA_URL",
|
||||
"https://forge.alexanderwhitestone.com")),
|
||||
gitea_token=git.get("token", os.environ.get("GITEA_TOKEN", "")),
|
||||
gitea_repo=git.get("repo", os.environ.get("NEXUS_REPO",
|
||||
"Timmy_Foundation/the-nexus")),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Optional[Path] = None) -> "PoolConfig":
|
||||
"""Load config from file, falling back through search paths."""
|
||||
candidates = [path] if path else []
|
||||
candidates += CONFIG_SEARCH_PATHS
|
||||
|
||||
env_path = os.environ.get("RESURRECTION_CONFIG")
|
||||
if env_path:
|
||||
candidates.insert(0, Path(env_path))
|
||||
|
||||
for candidate in candidates:
|
||||
if candidate and candidate.exists():
|
||||
try:
|
||||
data = json.loads(candidate.read_text())
|
||||
logger.info("Loaded config from %s", candidate)
|
||||
return cls.from_dict(data)
|
||||
except (json.JSONDecodeError, KeyError) as e:
|
||||
logger.warning("Failed to parse %s: %s", candidate, e)
|
||||
|
||||
logger.info("No config file found; using defaults (no gateways configured)")
|
||||
return cls()
|
||||
|
||||
|
||||
# ── Per-agent health state ───────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class AgentHealth:
|
||||
"""Runtime health snapshot for one agent gateway."""
|
||||
name: str
|
||||
state: AgentState = AgentState.UNKNOWN
|
||||
gateway_alive: bool = False
|
||||
process_alive: bool = False
|
||||
heartbeat_age: Optional[float] = None # seconds since last heartbeat
|
||||
last_checked: float = field(default_factory=time.time)
|
||||
first_dead_at: Optional[float] = None # when death was first detected
|
||||
revive_pending: bool = False # approval request outstanding
|
||||
revive_attempts: int = 0
|
||||
|
||||
@property
|
||||
def consecutive_dead_seconds(self) -> float:
|
||||
if self.first_dead_at is None:
|
||||
return 0.0
|
||||
return time.time() - self.first_dead_at
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"name": self.name,
|
||||
"state": self.state.value,
|
||||
"gateway_alive": self.gateway_alive,
|
||||
"process_alive": self.process_alive,
|
||||
"heartbeat_age": self.heartbeat_age,
|
||||
"last_checked": self.last_checked,
|
||||
"first_dead_at": self.first_dead_at,
|
||||
"revive_pending": self.revive_pending,
|
||||
"revive_attempts": self.revive_attempts,
|
||||
}
|
||||
|
||||
|
||||
# ── Health probes ────────────────────────────────────────────────────
|
||||
|
||||
def probe_gateway(host: str, port: int, timeout: int = 5) -> Tuple[bool, str]:
|
||||
"""TCP probe a wizard gateway. Returns (alive, message)."""
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(timeout)
|
||||
errno = sock.connect_ex((host, port))
|
||||
sock.close()
|
||||
if errno == 0:
|
||||
return True, f"TCP:{host}:{port} reachable"
|
||||
return False, f"TCP:{host}:{port} refused (errno={errno})"
|
||||
except Exception as e:
|
||||
return False, f"TCP probe failed: {e}"
|
||||
|
||||
|
||||
def probe_process(pattern: str) -> Tuple[bool, str]:
|
||||
"""pgrep-based process check. Returns (alive, message)."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["pgrep", "-f", pattern],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
pids = [p.strip() for p in result.stdout.strip().split("\n")
|
||||
if p.strip() and p.strip() != str(os.getpid())]
|
||||
if pids:
|
||||
return True, f"Running (PID: {', '.join(pids)})"
|
||||
return False, f"No process matching {pattern!r}"
|
||||
except FileNotFoundError:
|
||||
return True, "pgrep unavailable; skipping process check"
|
||||
except Exception as e:
|
||||
return False, f"Process probe failed: {e}"
|
||||
|
||||
|
||||
def probe_heartbeat(path: Path, dead_threshold: int) -> Tuple[bool, Optional[float], str]:
|
||||
"""Read a heartbeat file. Returns (alive, age_seconds, message)."""
|
||||
if not path.exists():
|
||||
return False, None, f"No heartbeat file at {path}"
|
||||
try:
|
||||
data = json.loads(path.read_text())
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
return False, None, f"Heartbeat file unreadable: {e}"
|
||||
|
||||
ts = data.get("timestamp", 0)
|
||||
age = time.time() - ts
|
||||
cycle = data.get("cycle", "?")
|
||||
model = data.get("model", "unknown")
|
||||
|
||||
if age > dead_threshold:
|
||||
return False, age, (
|
||||
f"Stale heartbeat: {int(age)}s ago (threshold {dead_threshold}s), "
|
||||
f"cycle={cycle}, model={model}"
|
||||
)
|
||||
return True, age, f"Alive: cycle={cycle}, {int(age)}s ago, model={model}"
|
||||
|
||||
|
||||
def assess_agent(gw: GatewayConfig, dead_threshold: int) -> AgentHealth:
|
||||
"""Run all probes for one gateway and return an AgentHealth snapshot."""
|
||||
health = AgentHealth(name=gw.name, last_checked=time.time())
|
||||
|
||||
gw_ok, gw_msg = probe_gateway(gw.host, gw.port)
|
||||
proc_ok, proc_msg = probe_process(gw.process_pattern)
|
||||
hb_ok, hb_age, hb_msg = probe_heartbeat(gw.resolved_heartbeat_path, dead_threshold)
|
||||
|
||||
health.gateway_alive = gw_ok
|
||||
health.process_alive = proc_ok
|
||||
health.heartbeat_age = hb_age
|
||||
|
||||
logger.debug("[%s] gateway=%s process=%s heartbeat=%s",
|
||||
gw.name, gw_ok, proc_ok, hb_ok)
|
||||
logger.debug("[%s] gw: %s | proc: %s | hb: %s",
|
||||
gw.name, gw_msg, proc_msg, hb_msg)
|
||||
|
||||
checks_ok = sum([gw_ok, proc_ok, hb_ok])
|
||||
if checks_ok == 3:
|
||||
health.state = AgentState.HEALTHY
|
||||
elif checks_ok == 0:
|
||||
health.state = AgentState.DEAD
|
||||
else:
|
||||
health.state = AgentState.DEGRADED
|
||||
|
||||
return health
|
||||
|
||||
|
||||
# ── Notification channels ────────────────────────────────────────────
|
||||
|
||||
def _http(method: str, url: str, data: Optional[dict] = None,
|
||||
headers: Optional[dict] = None) -> Optional[Any]:
|
||||
"""Minimal HTTP helper. Returns parsed JSON or None on failure."""
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(url, data=body, method=method)
|
||||
req.add_header("Content-Type", "application/json")
|
||||
req.add_header("Accept", "application/json")
|
||||
if headers:
|
||||
for k, v in headers.items():
|
||||
req.add_header(k, v)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
raw = resp.read().decode()
|
||||
return json.loads(raw) if raw.strip() else {}
|
||||
except urllib.error.HTTPError as e:
|
||||
logger.warning("HTTP %d %s: %s", e.code, url, e.read().decode()[:200])
|
||||
except Exception as e:
|
||||
logger.warning("HTTP error %s: %s", url, e)
|
||||
return None
|
||||
|
||||
|
||||
def send_telegram_approval_request(
|
||||
bot_token: str,
|
||||
chat_id: str,
|
||||
agent_name: str,
|
||||
mission_label: str,
|
||||
dead_seconds: float,
|
||||
dry_run: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Post a Telegram message asking for revive approval.
|
||||
|
||||
The operator should reply with "revive <agent_name>" or "+1" to approve.
|
||||
Returns True if the message was sent successfully.
|
||||
"""
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would send Telegram approval request for %s", agent_name)
|
||||
return True
|
||||
|
||||
if not bot_token or not chat_id:
|
||||
logger.warning("Telegram not configured; cannot send approval request for %s", agent_name)
|
||||
return False
|
||||
|
||||
text = (
|
||||
f"🔴 *Resurrection Pool Alert*\n\n"
|
||||
f"Agent *{agent_name}* has been dead for {int(dead_seconds)}s.\n"
|
||||
f"Mission: _{mission_label or 'unassigned'}_\n\n"
|
||||
f"Reply `revive {agent_name}` to authorize restart.\n"
|
||||
f"Reply `skip {agent_name}` to suppress for this cycle."
|
||||
)
|
||||
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||
result = _http("POST", url, {
|
||||
"chat_id": chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "Markdown",
|
||||
})
|
||||
if result and result.get("ok"):
|
||||
logger.info("Telegram approval request sent for %s (message_id=%s)",
|
||||
agent_name, result.get("result", {}).get("message_id"))
|
||||
return True
|
||||
logger.warning("Telegram send failed for %s: %s", agent_name, result)
|
||||
return False
|
||||
|
||||
|
||||
def poll_telegram_for_approval(
|
||||
bot_token: str,
|
||||
agent_name: str,
|
||||
timeout_seconds: int,
|
||||
dry_run: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Poll Telegram getUpdates for a revive approval message.
|
||||
|
||||
Looks for messages containing "revive <agent_name>" (case-insensitive).
|
||||
Returns True if approved within timeout, False otherwise.
|
||||
"""
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Simulating Telegram approval for %s (auto-approve)", agent_name)
|
||||
return True
|
||||
|
||||
if not bot_token:
|
||||
return False
|
||||
|
||||
deadline = time.time() + timeout_seconds
|
||||
offset = None
|
||||
|
||||
while time.time() < deadline:
|
||||
params = "?timeout=30&limit=10"
|
||||
if offset is not None:
|
||||
params += f"&offset={offset}"
|
||||
url = f"https://api.telegram.org/bot{bot_token}/getUpdates{params}"
|
||||
|
||||
result = _http("GET", url)
|
||||
if not result or not result.get("ok"):
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
for update in result.get("result", []):
|
||||
offset = update["update_id"] + 1
|
||||
msg_text = (
|
||||
update.get("message", {}).get("text", "")
|
||||
or update.get("channel_post", {}).get("text", "")
|
||||
).lower().strip()
|
||||
|
||||
if f"revive {agent_name.lower()}" in msg_text or msg_text == "+1":
|
||||
logger.info("Telegram approval received for %s", agent_name)
|
||||
return True
|
||||
if f"skip {agent_name.lower()}" in msg_text:
|
||||
logger.info("Telegram skip received for %s; aborting revive", agent_name)
|
||||
return False
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
logger.info("Approval timeout for %s after %ds", agent_name, timeout_seconds)
|
||||
return False
|
||||
|
||||
|
||||
def send_nostr_approval_request(
|
||||
relay_url: str,
|
||||
recipient_pubkey: str,
|
||||
agent_name: str,
|
||||
mission_label: str,
|
||||
dead_seconds: float,
|
||||
dry_run: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Post a Nostr direct-message (kind 4) approval request.
|
||||
|
||||
Falls back gracefully if websockets is not installed — logs a warning
|
||||
instead of crashing the pool. Returns True if sent, False otherwise.
|
||||
"""
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would send Nostr approval request for %s", agent_name)
|
||||
return True
|
||||
|
||||
if not relay_url or not recipient_pubkey:
|
||||
logger.debug("Nostr not configured; skipping DM for %s", agent_name)
|
||||
return False
|
||||
|
||||
content = (
|
||||
f"[Resurrection Pool] Agent '{agent_name}' is dead ({int(dead_seconds)}s). "
|
||||
f"Mission: {mission_label or 'unassigned'}. "
|
||||
f"Reply 'revive {agent_name}' to authorize restart."
|
||||
)
|
||||
|
||||
try:
|
||||
import asyncio
|
||||
|
||||
async def _send():
|
||||
import websockets # type: ignore
|
||||
event = {
|
||||
"kind": 4,
|
||||
"pubkey": "", # filled by relay/identity in prod
|
||||
"created_at": int(time.time()),
|
||||
"tags": [["p", recipient_pubkey]],
|
||||
"content": content,
|
||||
"id": "",
|
||||
"sig": "",
|
||||
}
|
||||
try:
|
||||
async with websockets.connect(relay_url, open_timeout=10) as ws:
|
||||
await ws.send(json.dumps(["EVENT", event]))
|
||||
logger.info("Nostr DM sent for %s via %s", agent_name, relay_url)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("Nostr send failed for %s: %s", agent_name, e)
|
||||
return False
|
||||
|
||||
return asyncio.run(_send())
|
||||
except ImportError:
|
||||
logger.warning("websockets library not installed; Nostr notifications disabled")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.warning("Nostr notification error for %s: %s", agent_name, e)
|
||||
return False
|
||||
|
||||
|
||||
# ── Revive actions ───────────────────────────────────────────────────
|
||||
|
||||
def revive_agent(gw: GatewayConfig, dry_run: bool = False) -> bool:
|
||||
"""
|
||||
Attempt to restart an agent using its configured revive_cmd.
|
||||
|
||||
Returns True if the command was launched without error.
|
||||
Does NOT wait for the agent to become healthy — the next poll cycle
|
||||
will detect recovery via the heartbeat file.
|
||||
"""
|
||||
if not gw.revive_cmd:
|
||||
logger.warning("[%s] No revive_cmd configured; cannot restart", gw.name)
|
||||
return False
|
||||
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would exec: %s", gw.revive_cmd)
|
||||
return True
|
||||
|
||||
try:
|
||||
logger.info("[%s] Launching: %s", gw.name, gw.revive_cmd)
|
||||
proc = subprocess.Popen(
|
||||
gw.revive_cmd,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
start_new_session=True, # detach from our process group
|
||||
)
|
||||
logger.info("[%s] Revive process started (PID %d)", gw.name, proc.pid)
|
||||
return True
|
||||
except FileNotFoundError as e:
|
||||
logger.error("[%s] Revive cmd not found: %s", gw.name, e)
|
||||
except PermissionError as e:
|
||||
logger.error("[%s] Revive cmd permission denied: %s", gw.name, e)
|
||||
except Exception as e:
|
||||
logger.error("[%s] Revive failed: %s", gw.name, e)
|
||||
return False
|
||||
|
||||
|
||||
# ── Gitea integration ────────────────────────────────────────────────
|
||||
|
||||
def _gitea(method: str, path: str, cfg: PoolConfig, data: Optional[dict] = None) -> Any:
|
||||
"""Gitea API helper."""
|
||||
url = f"{cfg.gitea_url.rstrip('/')}/api/v1{path}"
|
||||
headers = {}
|
||||
if cfg.gitea_token:
|
||||
headers["Authorization"] = f"token {cfg.gitea_token}"
|
||||
return _http(method, url, data=data, headers=headers)
|
||||
|
||||
|
||||
def find_open_pool_issue(cfg: PoolConfig, agent_name: str) -> Optional[dict]:
|
||||
"""Look for an existing open pool issue for this agent."""
|
||||
issues = _gitea("GET",
|
||||
f"/repos/{cfg.gitea_repo}/issues"
|
||||
f"?state=open&type=issues&limit=50",
|
||||
cfg)
|
||||
if not issues or not isinstance(issues, list):
|
||||
return None
|
||||
prefix = f"{POOL_TITLE_PREFIX} {agent_name}:"
|
||||
for issue in issues:
|
||||
if issue.get("title", "").startswith(prefix):
|
||||
return issue
|
||||
return None
|
||||
|
||||
|
||||
def open_death_issue(cfg: PoolConfig, health: AgentHealth, mission_label: str) -> None:
|
||||
"""Create a Gitea issue recording an agent death."""
|
||||
if not cfg.gitea_token:
|
||||
logger.debug("No Gitea token; skipping issue creation for %s", health.name)
|
||||
return
|
||||
title = f"{POOL_TITLE_PREFIX} {health.name}: agent dead ({int(health.consecutive_dead_seconds)}s)"
|
||||
body = (
|
||||
f"## 🔴 Agent Death Detected — {health.name}\n\n"
|
||||
f"**Mission:** {mission_label or 'unassigned'} \n"
|
||||
f"**Dead for:** {int(health.consecutive_dead_seconds)}s \n"
|
||||
f"**Gateway alive:** {health.gateway_alive} \n"
|
||||
f"**Process alive:** {health.process_alive} \n"
|
||||
f"**Heartbeat age:** "
|
||||
f"{f'{int(health.heartbeat_age)}s' if health.heartbeat_age is not None else 'N/A'} \n\n"
|
||||
f"*Logged by Resurrection Pool at "
|
||||
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"
|
||||
)
|
||||
result = _gitea("POST", f"/repos/{cfg.gitea_repo}/issues", cfg,
|
||||
data={"title": title, "body": body})
|
||||
if result and result.get("number"):
|
||||
logger.info("Opened death issue #%d for %s", result["number"], health.name)
|
||||
|
||||
|
||||
def close_death_issue(cfg: PoolConfig, agent_name: str, revived: bool) -> None:
|
||||
"""Close an existing death issue when the agent recovers."""
|
||||
issue = find_open_pool_issue(cfg, agent_name)
|
||||
if not issue:
|
||||
return
|
||||
num = issue["number"]
|
||||
verb = "revived and healthy" if revived else "recovered (external)"
|
||||
_gitea("POST", f"/repos/{cfg.gitea_repo}/issues/{num}/comments", cfg,
|
||||
data={"body": f"## 🟢 Agent {agent_name} — {verb}\n\n"
|
||||
f"*Resurrection Pool confirmed recovery at "
|
||||
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"})
|
||||
_gitea("PATCH", f"/repos/{cfg.gitea_repo}/issues/{num}", cfg,
|
||||
data={"state": "closed"})
|
||||
logger.info("Closed death issue #%d for %s", num, agent_name)
|
||||
|
||||
|
||||
# ── Policy engine ────────────────────────────────────────────────────
|
||||
|
||||
def apply_revive_policy(
|
||||
gw: GatewayConfig,
|
||||
health: AgentHealth,
|
||||
mission: Optional[MissionConfig],
|
||||
cfg: PoolConfig,
|
||||
dry_run: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Execute the revive policy for a dead agent.
|
||||
|
||||
Returns True if a revive was attempted, False otherwise.
|
||||
"""
|
||||
policy = mission.revive_policy if mission else RevivePolicy.ASK
|
||||
mission_label = mission.label if mission else ""
|
||||
|
||||
logger.info("[%s] Applying revive policy=%s (dead %ds)",
|
||||
gw.name, policy.value, int(health.consecutive_dead_seconds))
|
||||
|
||||
if policy == RevivePolicy.NEVER:
|
||||
logger.info("[%s] Policy=never — recording death, no restart", gw.name)
|
||||
open_death_issue(cfg, health, mission_label)
|
||||
return False
|
||||
|
||||
if policy == RevivePolicy.AUTO:
|
||||
logger.info("[%s] Policy=auto — restarting immediately", gw.name)
|
||||
ok = revive_agent(gw, dry_run=dry_run)
|
||||
if ok:
|
||||
logger.info("[%s] Auto-revive launched", gw.name)
|
||||
else:
|
||||
logger.warning("[%s] Auto-revive failed — opening Gitea issue", gw.name)
|
||||
open_death_issue(cfg, health, mission_label)
|
||||
return ok
|
||||
|
||||
# policy == RevivePolicy.ASK
|
||||
if health.revive_pending:
|
||||
logger.debug("[%s] Approval already pending; waiting", gw.name)
|
||||
return False
|
||||
|
||||
# Send approval requests over all configured channels
|
||||
health.revive_pending = True
|
||||
|
||||
tg_sent = send_telegram_approval_request(
|
||||
cfg.telegram_bot_token, cfg.telegram_chat_id,
|
||||
gw.name, mission_label, health.consecutive_dead_seconds,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
send_nostr_approval_request(
|
||||
cfg.nostr_relay_url, cfg.nostr_recipient_pubkey,
|
||||
gw.name, mission_label, health.consecutive_dead_seconds,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
if not tg_sent and not cfg.nostr_relay_url:
|
||||
logger.warning(
|
||||
"[%s] No approval channel available (configure telegram or nostr). "
|
||||
"Falling back to auto-revive.", gw.name
|
||||
)
|
||||
health.revive_pending = False
|
||||
return revive_agent(gw, dry_run=dry_run)
|
||||
|
||||
# Poll for approval
|
||||
approved = poll_telegram_for_approval(
|
||||
cfg.telegram_bot_token, gw.name,
|
||||
cfg.approval_timeout, dry_run=dry_run,
|
||||
)
|
||||
health.revive_pending = False
|
||||
|
||||
if approved:
|
||||
logger.info("[%s] Approval received — launching revive", gw.name)
|
||||
ok = revive_agent(gw, dry_run=dry_run)
|
||||
if not ok:
|
||||
open_death_issue(cfg, health, mission_label)
|
||||
return ok
|
||||
|
||||
logger.info("[%s] Revive not approved — recording death", gw.name)
|
||||
open_death_issue(cfg, health, mission_label)
|
||||
return False
|
||||
|
||||
|
||||
# ── Main pool loop ───────────────────────────────────────────────────
|
||||
|
||||
class ResurrectionPool:
|
||||
"""
|
||||
Fleet-wide health monitor and revive coordinator.
|
||||
|
||||
Maintains an in-memory health table keyed by agent name.
|
||||
Drives one poll-sweep per interval across all configured gateways.
|
||||
"""
|
||||
|
||||
def __init__(self, cfg: PoolConfig, dry_run: bool = False):
|
||||
self.cfg = cfg
|
||||
self.dry_run = dry_run
|
||||
self._health: Dict[str, AgentHealth] = {}
|
||||
self._running = False
|
||||
|
||||
# ── Agent-to-mission lookup ──────────────────────────────────────
|
||||
|
||||
def _mission_for(self, agent_name: str) -> Optional[MissionConfig]:
|
||||
"""Return the first mission that lists this agent."""
|
||||
for m in self.cfg.missions:
|
||||
if agent_name in m.agents:
|
||||
return m
|
||||
return None
|
||||
|
||||
# ── Gateway lookup ───────────────────────────────────────────────
|
||||
|
||||
def _gateway_for(self, agent_name: str) -> Optional[GatewayConfig]:
|
||||
for gw in self.cfg.gateways:
|
||||
if gw.name == agent_name:
|
||||
return gw
|
||||
return None
|
||||
|
||||
# ── Health state tracking ────────────────────────────────────────
|
||||
|
||||
def _update_health(self, agent_name: str, new: AgentHealth) -> AgentHealth:
|
||||
"""
|
||||
Merge a fresh health snapshot into the pool's running state.
|
||||
|
||||
- Preserves first_dead_at timestamp so dead_seconds accumulates.
|
||||
- Resets first_dead_at when agent returns healthy.
|
||||
- Tracks revive_attempts across cycles.
|
||||
"""
|
||||
prev = self._health.get(agent_name)
|
||||
|
||||
if new.state == AgentState.HEALTHY:
|
||||
new.first_dead_at = None
|
||||
new.revive_attempts = 0
|
||||
new.revive_pending = False
|
||||
if prev and prev.state == AgentState.DEAD:
|
||||
logger.info("[%s] Agent recovered!", agent_name)
|
||||
close_death_issue(self.cfg, agent_name, revived=prev.revive_attempts > 0)
|
||||
else:
|
||||
if prev:
|
||||
new.revive_pending = prev.revive_pending
|
||||
new.revive_attempts = prev.revive_attempts
|
||||
if prev.first_dead_at is not None:
|
||||
new.first_dead_at = prev.first_dead_at
|
||||
elif new.state in (AgentState.DEAD, AgentState.DEGRADED):
|
||||
new.first_dead_at = new.last_checked
|
||||
else:
|
||||
new.first_dead_at = new.last_checked
|
||||
|
||||
self._health[agent_name] = new
|
||||
return new
|
||||
|
||||
# ── Single sweep ─────────────────────────────────────────────────
|
||||
|
||||
def sweep(self) -> Dict[str, AgentHealth]:
|
||||
"""Poll all gateways once and apply revive policies where needed."""
|
||||
if not self.cfg.gateways:
|
||||
logger.debug("No gateways configured; nothing to sweep")
|
||||
return {}
|
||||
|
||||
for gw in self.cfg.gateways:
|
||||
raw = assess_agent(gw, self.cfg.dead_threshold)
|
||||
health = self._update_health(gw.name, raw)
|
||||
icon = {"healthy": "✅", "degraded": "⚠️", "dead": "🔴",
|
||||
"reviving": "🔄", "unknown": "❓"}.get(health.state.value, "?")
|
||||
logger.info("%s [%s] state=%s dead_for=%ds",
|
||||
icon, gw.name, health.state.value,
|
||||
int(health.consecutive_dead_seconds))
|
||||
|
||||
if health.state == AgentState.DEAD:
|
||||
if health.consecutive_dead_seconds >= self.cfg.dead_threshold:
|
||||
if not health.revive_pending:
|
||||
mission = self._mission_for(gw.name)
|
||||
launched = apply_revive_policy(
|
||||
gw, health, mission, self.cfg, dry_run=self.dry_run
|
||||
)
|
||||
if launched:
|
||||
health.revive_attempts += 1
|
||||
health.state = AgentState.REVIVING
|
||||
|
||||
return dict(self._health)
|
||||
|
||||
def status(self) -> List[dict]:
|
||||
"""Return current pool state as a list of dicts (for --status flag)."""
|
||||
return [h.to_dict() for h in self._health.values()]
|
||||
|
||||
# ── Run loop ─────────────────────────────────────────────────────
|
||||
|
||||
def run(self) -> None:
|
||||
"""Blocking poll loop. Handles SIGTERM/SIGINT cleanly."""
|
||||
self._running = True
|
||||
|
||||
def _stop(signum, frame):
|
||||
logger.info("Signal %d received; stopping pool", signum)
|
||||
self._running = False
|
||||
|
||||
signal.signal(signal.SIGTERM, _stop)
|
||||
signal.signal(signal.SIGINT, _stop)
|
||||
|
||||
logger.info(
|
||||
"Resurrection Pool starting — %d gateway(s), poll_interval=%ds, dead_threshold=%ds",
|
||||
len(self.cfg.gateways), self.cfg.poll_interval, self.cfg.dead_threshold,
|
||||
)
|
||||
|
||||
while self._running:
|
||||
self.sweep()
|
||||
# Sleep in 1s ticks so we respond to signals promptly
|
||||
for _ in range(self.cfg.poll_interval):
|
||||
if not self._running:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
logger.info("Resurrection Pool stopped")
|
||||
|
||||
def run_once(self) -> bool:
|
||||
"""Single sweep; returns True if all agents are healthy."""
|
||||
results = self.sweep()
|
||||
return all(h.state == AgentState.HEALTHY for h in results.values())
|
||||
|
||||
|
||||
# ── CLI ──────────────────────────────────────────────────────────────
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Resurrection Pool — fleet health polling and auto-revive",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
p.add_argument("--config", metavar="PATH",
|
||||
help="Path to resurrection_pool.json config file")
|
||||
p.add_argument("--once", action="store_true",
|
||||
help="Run one sweep and exit (for cron / smoke tests)")
|
||||
p.add_argument("--status", action="store_true",
|
||||
help="Print current pool state as JSON and exit")
|
||||
p.add_argument("--dry-run", action="store_true",
|
||||
help="Run all checks but do not restart agents or alert channels")
|
||||
p.add_argument("--poll-interval", type=int, default=None,
|
||||
help="Override poll interval (seconds)")
|
||||
p.add_argument("--dead-threshold", type=int, default=None,
|
||||
help="Override dead threshold (seconds)")
|
||||
p.add_argument("-v", "--verbose", action="store_true",
|
||||
help="Enable DEBUG logging")
|
||||
return p
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = build_parser().parse_args()
|
||||
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
cfg = PoolConfig.load(Path(args.config) if args.config else None)
|
||||
|
||||
if args.poll_interval is not None:
|
||||
cfg.poll_interval = args.poll_interval
|
||||
if args.dead_threshold is not None:
|
||||
cfg.dead_threshold = args.dead_threshold
|
||||
|
||||
pool = ResurrectionPool(cfg, dry_run=args.dry_run)
|
||||
|
||||
if args.status:
|
||||
# Print last-known state (empty if never run)
|
||||
pool.sweep()
|
||||
print(json.dumps(pool.status(), indent=2))
|
||||
return
|
||||
|
||||
if args.once:
|
||||
healthy = pool.run_once()
|
||||
sys.exit(0 if healthy else 1)
|
||||
|
||||
pool.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
1
lazarus/__init__.py
Normal file
1
lazarus/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Lazarus Pit — Agent resurrection pool and mission cell isolation."""
|
||||
BIN
lazarus/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
lazarus/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
lazarus/__pycache__/cell.cpython-312.pyc
Normal file
BIN
lazarus/__pycache__/cell.cpython-312.pyc
Normal file
Binary file not shown.
BIN
lazarus/__pycache__/cli.cpython-312.pyc
Normal file
BIN
lazarus/__pycache__/cli.cpython-312.pyc
Normal file
Binary file not shown.
BIN
lazarus/__pycache__/harness.cpython-312.pyc
Normal file
BIN
lazarus/__pycache__/harness.cpython-312.pyc
Normal file
Binary file not shown.
BIN
lazarus/__pycache__/isolation.cpython-312.pyc
Normal file
BIN
lazarus/__pycache__/isolation.cpython-312.pyc
Normal file
Binary file not shown.
BIN
lazarus/__pycache__/roster.cpython-312.pyc
Normal file
BIN
lazarus/__pycache__/roster.cpython-312.pyc
Normal file
Binary file not shown.
145
lazarus/cell.py
Normal file
145
lazarus/cell.py
Normal file
@@ -0,0 +1,145 @@
|
||||
"""Mission cell lifecycle — create, load, save, destroy."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
CellState = Literal["pending", "active", "archived", "destroyed"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionCell:
|
||||
"""Represents a mission cell — an ephemeral project workspace."""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
repo_url: str
|
||||
created_at: str
|
||||
state: CellState
|
||||
agents: list[str]
|
||||
cell_path: str
|
||||
home_path: str
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"id": self.id,
|
||||
"name": self.name,
|
||||
"repo_url": self.repo_url,
|
||||
"created_at": self.created_at,
|
||||
"state": self.state,
|
||||
"agents": list(self.agents),
|
||||
"cell_path": self.cell_path,
|
||||
"home_path": self.home_path,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> MissionCell:
|
||||
return cls(
|
||||
id=data["id"],
|
||||
name=data["name"],
|
||||
repo_url=data["repo_url"],
|
||||
created_at=data["created_at"],
|
||||
state=data["state"],
|
||||
agents=list(data.get("agents", [])),
|
||||
cell_path=data["cell_path"],
|
||||
home_path=data.get("home_path", ""),
|
||||
)
|
||||
|
||||
|
||||
def create_cell(
|
||||
mission_id: str,
|
||||
name: str,
|
||||
repo_url: str,
|
||||
base_path: Path,
|
||||
home_path: str = "",
|
||||
) -> MissionCell:
|
||||
"""Create a new mission cell directory structure and return a MissionCell."""
|
||||
cell_path = base_path / mission_id
|
||||
cell_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create required subdirectories
|
||||
(cell_path / "workspace").mkdir(exist_ok=True)
|
||||
(cell_path / "logs").mkdir(exist_ok=True)
|
||||
(cell_path / "snapshots").mkdir(exist_ok=True)
|
||||
|
||||
created_at = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
cell = MissionCell(
|
||||
id=mission_id,
|
||||
name=name,
|
||||
repo_url=repo_url,
|
||||
created_at=created_at,
|
||||
state="pending",
|
||||
agents=[],
|
||||
cell_path=str(cell_path),
|
||||
home_path=home_path,
|
||||
)
|
||||
|
||||
# Write roster.json placeholder
|
||||
roster_path = cell_path / "roster.json"
|
||||
_atomic_write(roster_path, {"agents": []})
|
||||
|
||||
# Write cell.json
|
||||
save_cell(cell)
|
||||
|
||||
log.info("Created mission cell %s at %s", mission_id, cell_path)
|
||||
return cell
|
||||
|
||||
|
||||
def destroy_cell(cell: MissionCell, cleanup_dirs: bool = False) -> None:
|
||||
"""Mark a cell as destroyed and optionally remove its directories."""
|
||||
cell.state = "destroyed"
|
||||
save_cell(cell)
|
||||
|
||||
if cleanup_dirs:
|
||||
import shutil
|
||||
cell_dir = Path(cell.cell_path)
|
||||
if cell_dir.exists():
|
||||
shutil.rmtree(cell_dir)
|
||||
log.info("Removed cell directory %s", cell_dir)
|
||||
|
||||
log.info("Destroyed mission cell %s", cell.id)
|
||||
|
||||
|
||||
def load_cell(cell_path: Path) -> MissionCell:
|
||||
"""Load a MissionCell from a cell.json file at cell_path."""
|
||||
cell_json = Path(cell_path) / "cell.json"
|
||||
with open(cell_json, "r", encoding="utf-8") as fh:
|
||||
data = json.load(fh)
|
||||
return MissionCell.from_dict(data)
|
||||
|
||||
|
||||
def save_cell(cell: MissionCell) -> None:
|
||||
"""Atomically save a MissionCell to cell.json in its cell_path."""
|
||||
cell_path = Path(cell.cell_path)
|
||||
cell_path.mkdir(parents=True, exist_ok=True)
|
||||
target = cell_path / "cell.json"
|
||||
_atomic_write(target, cell.to_dict())
|
||||
log.debug("Saved cell %s to %s", cell.id, target)
|
||||
|
||||
|
||||
# ─── helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def _atomic_write(path: Path, data: dict) -> None:
|
||||
"""Write JSON atomically via a temp file + rename."""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
||||
json.dump(data, fh, indent=2)
|
||||
os.replace(tmp, path)
|
||||
except Exception:
|
||||
try:
|
||||
os.unlink(tmp)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
162
lazarus/cli.py
Normal file
162
lazarus/cli.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""Lazarus CLI — invite agents, manage mission cells."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import platform
|
||||
import sys
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
from lazarus.cell import MissionCell, create_cell, load_cell
|
||||
from lazarus.isolation import setup_level1_isolation
|
||||
from lazarus.roster import DEFAULT_REGISTRY_PATH, MissionRoster
|
||||
|
||||
|
||||
def _default_base_path() -> Path:
|
||||
if platform.system() == "Linux":
|
||||
return Path("/var/missions")
|
||||
return Path.home() / ".nexus" / "missions"
|
||||
|
||||
|
||||
# ─── command handlers ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def cmd_invite(args: argparse.Namespace) -> int:
|
||||
"""Create or load a mission cell and invite an agent."""
|
||||
base_path = Path(args.base_path) if args.base_path else _default_base_path()
|
||||
mission_id: str = args.mission or str(uuid.uuid4())
|
||||
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
|
||||
|
||||
# Create or load cell
|
||||
cell_dir = base_path / mission_id
|
||||
if (cell_dir / "cell.json").exists():
|
||||
cell = load_cell(cell_dir)
|
||||
else:
|
||||
cell = create_cell(
|
||||
mission_id=mission_id,
|
||||
name=args.name or mission_id,
|
||||
repo_url=args.repo,
|
||||
base_path=base_path,
|
||||
)
|
||||
|
||||
# Level 1 isolation
|
||||
setup_level1_isolation(cell)
|
||||
|
||||
# Register mission and invite agent
|
||||
roster.add_mission(cell)
|
||||
entry = roster.invite_agent(mission_id, args.agent, role=args.role)
|
||||
roster.save()
|
||||
|
||||
summary = {
|
||||
"mission_id": mission_id,
|
||||
"agent": args.agent,
|
||||
"role": entry.role,
|
||||
"status": entry.status,
|
||||
"cell_path": cell.cell_path,
|
||||
"repo_url": cell.repo_url,
|
||||
"invited_at": entry.invited_at,
|
||||
}
|
||||
print(json.dumps(summary, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_list(args: argparse.Namespace) -> int:
|
||||
"""List missions, optionally filtered by state."""
|
||||
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
|
||||
missions = roster.list_missions(state=args.state)
|
||||
print(json.dumps(missions, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_status(args: argparse.Namespace) -> int:
|
||||
"""Show details of a specific mission."""
|
||||
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
|
||||
mission = roster.get_mission(args.mission_id)
|
||||
if mission is None:
|
||||
print(f"Mission {args.mission_id!r} not found.", file=sys.stderr)
|
||||
return 1
|
||||
agents = roster.list_agents(args.mission_id)
|
||||
output = dict(mission)
|
||||
output["agent_count"] = len(agents)
|
||||
output["agents"] = [a.to_dict() for a in agents]
|
||||
print(json.dumps(output, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_depart(args: argparse.Namespace) -> int:
|
||||
"""Mark an agent as departed from a mission."""
|
||||
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
|
||||
try:
|
||||
entry = roster.depart_agent(args.mission, args.agent)
|
||||
except KeyError as exc:
|
||||
print(f"Error: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
roster.save()
|
||||
print(json.dumps(entry.to_dict(), indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
# ─── argument parsing ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="lazarus",
|
||||
description="Lazarus Pit — agent invitation and mission cell management",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--registry",
|
||||
default=None,
|
||||
help="Path to mission roster JSON (default: ~/.nexus/mission-roster.json)",
|
||||
)
|
||||
|
||||
sub = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
# invite
|
||||
p_invite = sub.add_parser("invite", help="Invite an agent to a mission")
|
||||
p_invite.add_argument("agent", help="Agent name to invite")
|
||||
p_invite.add_argument("--mission", default=None, help="Mission ID (UUID generated if omitted)")
|
||||
p_invite.add_argument("--repo", required=True, help="Repository URL for the mission")
|
||||
p_invite.add_argument("--base-path", default=None, help="Base directory for mission cells")
|
||||
p_invite.add_argument("--role", default="contributor", help="Agent role (default: contributor)")
|
||||
p_invite.add_argument("--name", default=None, help="Human-readable mission name")
|
||||
|
||||
# list
|
||||
p_list = sub.add_parser("list", help="List missions")
|
||||
p_list.add_argument("--state", default=None, help="Filter by state (pending/active/archived/destroyed)")
|
||||
|
||||
# status
|
||||
p_status = sub.add_parser("status", help="Show mission status")
|
||||
p_status.add_argument("mission_id", help="Mission ID")
|
||||
|
||||
# depart
|
||||
p_depart = sub.add_parser("depart", help="Mark agent as departed")
|
||||
p_depart.add_argument("agent", help="Agent name")
|
||||
p_depart.add_argument("--mission", required=True, help="Mission ID")
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = _build_parser()
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
dispatch = {
|
||||
"invite": cmd_invite,
|
||||
"list": cmd_list,
|
||||
"status": cmd_status,
|
||||
"depart": cmd_depart,
|
||||
}
|
||||
|
||||
handler = dispatch.get(args.command)
|
||||
if handler is None:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
return handler(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
165
lazarus/harness.py
Normal file
165
lazarus/harness.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Hermes harness mission-cell mode — wraps agent lifecycle in a mission cell."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Awaitable
|
||||
|
||||
try:
|
||||
import websockets # type: ignore[import]
|
||||
except ImportError: # pragma: no cover
|
||||
websockets = None # type: ignore[assignment]
|
||||
|
||||
from lazarus.cell import MissionCell, load_cell
|
||||
from lazarus.isolation import setup_level1_isolation
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_HERMES_WS_URL = "ws://localhost:8000/ws"
|
||||
|
||||
|
||||
class MissionCellHarness:
|
||||
"""
|
||||
Wraps agent execution in a mission cell with Hermes WS telemetry.
|
||||
|
||||
Usage::
|
||||
|
||||
harness = MissionCellHarness("myagent", "/var/missions/<uuid>")
|
||||
await harness.run_with_harness(my_coroutine())
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_name: str,
|
||||
cell_path: str | Path,
|
||||
hermes_ws_url: str = DEFAULT_HERMES_WS_URL,
|
||||
home_path: str | None = None,
|
||||
) -> None:
|
||||
self.agent_name = agent_name
|
||||
self.cell_path = Path(cell_path)
|
||||
self.hermes_ws_url = hermes_ws_url
|
||||
self.home_path = home_path
|
||||
|
||||
self._cell: MissionCell | None = None
|
||||
self._ws: Any = None # websockets connection
|
||||
self._ws_connected = False
|
||||
|
||||
# ─── public API ───────────────────────────────────────────────────────────
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Load cell, set up isolation, emit agent_joined telemetry."""
|
||||
self._cell = load_cell(self.cell_path)
|
||||
setup_level1_isolation(self._cell)
|
||||
|
||||
await self._connect_hermes()
|
||||
await self._emit_mission_event("agent_joined")
|
||||
log.info(
|
||||
"MissionCellHarness started for agent=%s mission=%s",
|
||||
self.agent_name,
|
||||
self._cell.id,
|
||||
)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Emit agent_departed telemetry and save state."""
|
||||
await self._emit_mission_event("agent_departed")
|
||||
|
||||
if self._ws is not None:
|
||||
try:
|
||||
await self._ws.close()
|
||||
except Exception as exc:
|
||||
log.debug("WS close error (ignored): %s", exc)
|
||||
self._ws_connected = False
|
||||
|
||||
log.info(
|
||||
"MissionCellHarness stopped for agent=%s", self.agent_name
|
||||
)
|
||||
|
||||
def get_env(self) -> dict[str, str]:
|
||||
"""Return environment variables for the mission cell context."""
|
||||
cell = self._cell
|
||||
env: dict[str, str] = {}
|
||||
|
||||
if cell:
|
||||
env["MISSION_CELL_PATH"] = str(self.cell_path)
|
||||
env["MISSION_ID"] = cell.id
|
||||
else:
|
||||
env["MISSION_CELL_PATH"] = str(self.cell_path)
|
||||
env["MISSION_ID"] = ""
|
||||
|
||||
env["AGENT_NAME"] = self.agent_name
|
||||
|
||||
if self.home_path:
|
||||
env["HOME"] = self.home_path
|
||||
else:
|
||||
env["HOME"] = os.environ.get("HOME", str(Path.home()))
|
||||
|
||||
return env
|
||||
|
||||
def write_mission_log(self, message: str) -> None:
|
||||
"""Write a timestamped log entry to the cell's logs directory."""
|
||||
if self._cell is None:
|
||||
log.warning("Cannot write mission log: cell not loaded")
|
||||
return
|
||||
|
||||
logs_dir = Path(self._cell.cell_path) / "logs"
|
||||
logs_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
log_file = logs_dir / "mission.log"
|
||||
timestamp = datetime.now(timezone.utc).isoformat()
|
||||
line = f"[{timestamp}] [{self.agent_name}] {message}\n"
|
||||
|
||||
with open(log_file, "a", encoding="utf-8") as fh:
|
||||
fh.write(line)
|
||||
|
||||
async def run_with_harness(self, coro: Awaitable[Any]) -> Any:
|
||||
"""Start harness, run coro, stop harness. Returns coro result."""
|
||||
await self.start()
|
||||
try:
|
||||
return await coro
|
||||
finally:
|
||||
await self.stop()
|
||||
|
||||
# ─── internals ────────────────────────────────────────────────────────────
|
||||
|
||||
async def _connect_hermes(self) -> None:
|
||||
"""Attempt WebSocket connection to Hermes; fail gracefully."""
|
||||
try:
|
||||
import websockets # type: ignore[import]
|
||||
|
||||
self._ws = await websockets.connect(self.hermes_ws_url)
|
||||
self._ws_connected = True
|
||||
log.info("Connected to Hermes at %s", self.hermes_ws_url)
|
||||
except Exception as exc:
|
||||
log.warning("Could not connect to Hermes WS: %s", exc)
|
||||
self._ws_connected = False
|
||||
|
||||
async def _emit_mission_event(self, event: str) -> None:
|
||||
"""Send a mission_event message to Hermes."""
|
||||
cell = self._cell
|
||||
mission_id = cell.id if cell else ""
|
||||
payload = {
|
||||
"type": "mission_event",
|
||||
"event": event,
|
||||
"mission_id": mission_id,
|
||||
"agent_name": self.agent_name,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
if self._ws_connected and self._ws is not None:
|
||||
try:
|
||||
await self._ws.send(json.dumps(payload))
|
||||
log.debug("Emitted %s for agent=%s", event, self.agent_name)
|
||||
except Exception as exc:
|
||||
log.warning("Failed to emit %s telemetry: %s", event, exc)
|
||||
self._ws_connected = False
|
||||
else:
|
||||
log.debug(
|
||||
"Hermes not connected; %s event not sent (agent=%s)",
|
||||
event,
|
||||
self.agent_name,
|
||||
)
|
||||
170
lazarus/isolation.py
Normal file
170
lazarus/isolation.py
Normal file
@@ -0,0 +1,170 @@
|
||||
"""Level 1 isolation — directory setup and AppArmor profile generation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import platform
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from lazarus.cell import MissionCell
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_APPARMOR_TEMPLATE = """\
|
||||
#include <tunables/global>
|
||||
|
||||
profile nexus-mission-{cell_id} flags=(attach_disconnected,mediate_deleted) {{
|
||||
#include <abstractions/base>
|
||||
|
||||
# Allow read access to agent home
|
||||
"{home_path}/" r,
|
||||
"{home_path}/**" r,
|
||||
|
||||
# Allow read/write access to cell workspace
|
||||
"{workspace_path}/" rw,
|
||||
"{workspace_path}/**" rwkl,
|
||||
|
||||
# Allow read/write access to cell logs
|
||||
"{logs_path}/" rw,
|
||||
"{logs_path}/**" rwkl,
|
||||
|
||||
# Deny writes to home path
|
||||
deny "{home_path}/**" w,
|
||||
|
||||
# Deny access to other agent home dirs
|
||||
deny "/root/wizards/**" rw,
|
||||
|
||||
# Deny network access by default
|
||||
deny network,
|
||||
|
||||
# Basic system access
|
||||
/bin/** ix,
|
||||
/usr/bin/** ix,
|
||||
/lib/** r,
|
||||
/usr/lib/** r,
|
||||
/proc/self/** r,
|
||||
}}
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class IsolationManifest:
|
||||
"""Result of setting up Level 1 isolation for a mission cell."""
|
||||
|
||||
cell_id: str
|
||||
workspace_path: Path
|
||||
logs_path: Path
|
||||
apparmor_profile_path: Path | None
|
||||
platform: str
|
||||
|
||||
|
||||
def setup_level1_isolation(cell: MissionCell) -> IsolationManifest:
|
||||
"""
|
||||
Set up Level 1 isolation for a mission cell.
|
||||
|
||||
Creates the proper directory structure with permissions and writes
|
||||
an AppArmor profile (Linux) or just a profile file (other platforms).
|
||||
|
||||
Returns an IsolationManifest describing what was set up.
|
||||
"""
|
||||
cell_path = Path(cell.cell_path)
|
||||
workspace_path = cell_path / "workspace"
|
||||
logs_path = cell_path / "logs"
|
||||
|
||||
workspace_path.mkdir(parents=True, exist_ok=True)
|
||||
logs_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Set restrictive permissions on the workspace (owner-only)
|
||||
try:
|
||||
workspace_path.chmod(0o700)
|
||||
logs_path.chmod(0o700)
|
||||
except OSError as exc:
|
||||
log.warning("Could not set directory permissions: %s", exc)
|
||||
|
||||
apparmor_profile_path: Path | None = None
|
||||
sys_platform = platform.system()
|
||||
|
||||
if cell.home_path:
|
||||
profile_path = cell_path / f"nexus-mission-{cell.id}.apparmor"
|
||||
apparmor_profile_path = write_apparmor_profile(cell, profile_path)
|
||||
|
||||
log.info(
|
||||
"Level 1 isolation set up for cell %s (platform=%s)", cell.id, sys_platform
|
||||
)
|
||||
return IsolationManifest(
|
||||
cell_id=cell.id,
|
||||
workspace_path=workspace_path,
|
||||
logs_path=logs_path,
|
||||
apparmor_profile_path=apparmor_profile_path,
|
||||
platform=sys_platform,
|
||||
)
|
||||
|
||||
|
||||
def write_apparmor_profile(cell: MissionCell, output_path: Path) -> Path:
|
||||
"""
|
||||
Write an AppArmor profile for the mission cell to output_path.
|
||||
|
||||
On non-Linux platforms this just writes the file without attempting to load it.
|
||||
Returns the path to the written profile.
|
||||
"""
|
||||
cell_path = Path(cell.cell_path)
|
||||
workspace_path = cell_path / "workspace"
|
||||
logs_path = cell_path / "logs"
|
||||
home_path = cell.home_path or str(Path.home())
|
||||
|
||||
profile_content = _APPARMOR_TEMPLATE.format(
|
||||
cell_id=cell.id,
|
||||
home_path=home_path,
|
||||
workspace_path=str(workspace_path),
|
||||
logs_path=str(logs_path),
|
||||
)
|
||||
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(profile_content, encoding="utf-8")
|
||||
log.info("Wrote AppArmor profile to %s", output_path)
|
||||
return output_path
|
||||
|
||||
|
||||
def load_apparmor_profile(profile_path: Path) -> bool:
|
||||
"""
|
||||
Attempt to load an AppArmor profile via apparmor_parser.
|
||||
|
||||
On non-Linux platforms or when apparmor_parser is absent, this is a no-op
|
||||
and returns False gracefully.
|
||||
|
||||
Returns True if the profile was loaded successfully.
|
||||
"""
|
||||
if platform.system() != "Linux":
|
||||
log.debug(
|
||||
"AppArmor load skipped: not on Linux (platform=%s)", platform.system()
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["apparmor_parser", "-r", str(profile_path)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
log.info("Loaded AppArmor profile from %s", profile_path)
|
||||
return True
|
||||
else:
|
||||
log.warning(
|
||||
"apparmor_parser failed (rc=%d): %s",
|
||||
result.returncode,
|
||||
result.stderr.strip(),
|
||||
)
|
||||
return False
|
||||
except FileNotFoundError:
|
||||
log.debug("apparmor_parser not found; skipping profile load")
|
||||
return False
|
||||
except subprocess.TimeoutExpired:
|
||||
log.warning("apparmor_parser timed out loading %s", profile_path)
|
||||
return False
|
||||
except Exception as exc:
|
||||
log.warning("Unexpected error loading AppArmor profile: %s", exc)
|
||||
return False
|
||||
184
lazarus/roster.py
Normal file
184
lazarus/roster.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Mission roster registry — tracks all missions and agents."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
from lazarus.cell import MissionCell
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_REGISTRY_PATH = Path.home() / ".nexus" / "mission-roster.json"
|
||||
|
||||
EntryStatus = Literal["invited", "active", "departed"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class RosterEntry:
|
||||
"""An agent's participation record in a mission."""
|
||||
|
||||
mission_id: str
|
||||
agent_name: str
|
||||
invited_at: str
|
||||
accepted_at: str | None
|
||||
role: str
|
||||
status: EntryStatus
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"mission_id": self.mission_id,
|
||||
"agent_name": self.agent_name,
|
||||
"invited_at": self.invited_at,
|
||||
"accepted_at": self.accepted_at,
|
||||
"role": self.role,
|
||||
"status": self.status,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> RosterEntry:
|
||||
return cls(
|
||||
mission_id=data["mission_id"],
|
||||
agent_name=data["agent_name"],
|
||||
invited_at=data["invited_at"],
|
||||
accepted_at=data.get("accepted_at"),
|
||||
role=data.get("role", "contributor"),
|
||||
status=data["status"],
|
||||
)
|
||||
|
||||
|
||||
class MissionRoster:
|
||||
"""Registry of all missions and their agent rosters."""
|
||||
|
||||
def __init__(self, registry_path: Path = DEFAULT_REGISTRY_PATH) -> None:
|
||||
self.registry_path = Path(registry_path)
|
||||
self._data: dict = {}
|
||||
self._load()
|
||||
|
||||
# ─── internal I/O ─────────────────────────────────────────────────────────
|
||||
|
||||
def _load(self) -> None:
|
||||
if self.registry_path.exists():
|
||||
with open(self.registry_path, "r", encoding="utf-8") as fh:
|
||||
self._data = json.load(fh)
|
||||
else:
|
||||
self._data = {"schema_version": "1", "missions": {}}
|
||||
|
||||
def save(self) -> None:
|
||||
"""Atomically persist the registry to disk."""
|
||||
self.registry_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
fd, tmp = tempfile.mkstemp(dir=self.registry_path.parent, suffix=".tmp")
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
||||
json.dump(self._data, fh, indent=2)
|
||||
os.replace(tmp, self.registry_path)
|
||||
except Exception:
|
||||
try:
|
||||
os.unlink(tmp)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
log.debug("Saved mission roster to %s", self.registry_path)
|
||||
|
||||
# ─── missions ─────────────────────────────────────────────────────────────
|
||||
|
||||
def add_mission(self, cell: MissionCell) -> None:
|
||||
"""Register a new mission from a MissionCell."""
|
||||
missions: dict = self._data.setdefault("missions", {})
|
||||
if cell.id not in missions:
|
||||
missions[cell.id] = {
|
||||
"id": cell.id,
|
||||
"name": cell.name,
|
||||
"repo_url": cell.repo_url,
|
||||
"created_at": cell.created_at,
|
||||
"state": cell.state,
|
||||
"cell_path": cell.cell_path,
|
||||
"agents": [],
|
||||
}
|
||||
log.info("Registered mission %s in roster", cell.id)
|
||||
else:
|
||||
log.debug("Mission %s already in roster, skipping add", cell.id)
|
||||
|
||||
def get_mission(self, mission_id: str) -> dict | None:
|
||||
return self._data.get("missions", {}).get(mission_id)
|
||||
|
||||
def list_missions(self, state: str | None = None) -> list[dict]:
|
||||
missions = list(self._data.get("missions", {}).values())
|
||||
if state is not None:
|
||||
missions = [m for m in missions if m.get("state") == state]
|
||||
return missions
|
||||
|
||||
# ─── agents ───────────────────────────────────────────────────────────────
|
||||
|
||||
def invite_agent(
|
||||
self,
|
||||
mission_id: str,
|
||||
agent_name: str,
|
||||
role: str = "contributor",
|
||||
) -> RosterEntry:
|
||||
"""Invite an agent to a mission; returns the new RosterEntry."""
|
||||
mission = self._require_mission(mission_id)
|
||||
agents: list[dict] = mission.setdefault("agents", [])
|
||||
|
||||
# Check for existing entry
|
||||
for entry_dict in agents:
|
||||
if entry_dict["agent_name"] == agent_name:
|
||||
log.warning(
|
||||
"Agent %s already in mission %s roster", agent_name, mission_id
|
||||
)
|
||||
return RosterEntry.from_dict(entry_dict)
|
||||
|
||||
entry = RosterEntry(
|
||||
mission_id=mission_id,
|
||||
agent_name=agent_name,
|
||||
invited_at=datetime.now(timezone.utc).isoformat(),
|
||||
accepted_at=None,
|
||||
role=role,
|
||||
status="invited",
|
||||
)
|
||||
agents.append(entry.to_dict())
|
||||
log.info("Invited %s to mission %s as %s", agent_name, mission_id, role)
|
||||
return entry
|
||||
|
||||
def accept_invitation(self, mission_id: str, agent_name: str) -> RosterEntry:
|
||||
"""Mark an invitation as accepted; status becomes 'active'."""
|
||||
entry_dict = self._require_agent_entry(mission_id, agent_name)
|
||||
entry_dict["status"] = "active"
|
||||
entry_dict["accepted_at"] = datetime.now(timezone.utc).isoformat()
|
||||
log.info("Agent %s accepted invitation to mission %s", agent_name, mission_id)
|
||||
return RosterEntry.from_dict(entry_dict)
|
||||
|
||||
def depart_agent(self, mission_id: str, agent_name: str) -> RosterEntry:
|
||||
"""Mark an agent as departed from a mission."""
|
||||
entry_dict = self._require_agent_entry(mission_id, agent_name)
|
||||
entry_dict["status"] = "departed"
|
||||
log.info("Agent %s departed mission %s", agent_name, mission_id)
|
||||
return RosterEntry.from_dict(entry_dict)
|
||||
|
||||
def list_agents(self, mission_id: str) -> list[RosterEntry]:
|
||||
"""Return all RosterEntry objects for a mission."""
|
||||
mission = self._require_mission(mission_id)
|
||||
return [RosterEntry.from_dict(e) for e in mission.get("agents", [])]
|
||||
|
||||
# ─── helpers ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _require_mission(self, mission_id: str) -> dict:
|
||||
missions = self._data.get("missions", {})
|
||||
if mission_id not in missions:
|
||||
raise KeyError(f"Mission {mission_id!r} not found in roster")
|
||||
return missions[mission_id]
|
||||
|
||||
def _require_agent_entry(self, mission_id: str, agent_name: str) -> dict:
|
||||
mission = self._require_mission(mission_id)
|
||||
for entry_dict in mission.get("agents", []):
|
||||
if entry_dict["agent_name"] == agent_name:
|
||||
return entry_dict
|
||||
raise KeyError(
|
||||
f"Agent {agent_name!r} not found in mission {mission_id!r} roster"
|
||||
)
|
||||
110
tests/test_lazarus_cell.py
Normal file
110
tests/test_lazarus_cell.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""Tests for lazarus.cell — MissionCell lifecycle."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from lazarus.cell import MissionCell, create_cell, destroy_cell, load_cell, save_cell
|
||||
|
||||
|
||||
def test_create_cell_creates_directories(tmp_path: Path) -> None:
|
||||
"""workspace/, logs/, and snapshots/ subdirs are created under the cell dir."""
|
||||
cell = create_cell(
|
||||
mission_id="test-mission-1",
|
||||
name="Test Mission",
|
||||
repo_url="https://github.com/test/repo",
|
||||
base_path=tmp_path,
|
||||
)
|
||||
|
||||
cell_dir = tmp_path / "test-mission-1"
|
||||
assert cell_dir.is_dir()
|
||||
assert (cell_dir / "workspace").is_dir()
|
||||
assert (cell_dir / "logs").is_dir()
|
||||
assert (cell_dir / "snapshots").is_dir()
|
||||
|
||||
|
||||
def test_create_cell_writes_cell_json(tmp_path: Path) -> None:
|
||||
"""cell.json is created and contains valid mission metadata."""
|
||||
cell = create_cell(
|
||||
mission_id="test-mission-2",
|
||||
name="Another Mission",
|
||||
repo_url="https://github.com/test/another",
|
||||
base_path=tmp_path,
|
||||
)
|
||||
|
||||
cell_json_path = tmp_path / "test-mission-2" / "cell.json"
|
||||
assert cell_json_path.exists()
|
||||
|
||||
with open(cell_json_path) as fh:
|
||||
data = json.load(fh)
|
||||
|
||||
assert data["id"] == "test-mission-2"
|
||||
assert data["name"] == "Another Mission"
|
||||
assert data["repo_url"] == "https://github.com/test/another"
|
||||
assert data["state"] == "pending"
|
||||
assert isinstance(data["agents"], list)
|
||||
|
||||
|
||||
def test_save_and_load_cell(tmp_path: Path) -> None:
|
||||
"""Round-trip: save a cell, load it back, fields match."""
|
||||
cell = create_cell(
|
||||
mission_id="roundtrip-mission",
|
||||
name="Roundtrip",
|
||||
repo_url="https://github.com/test/roundtrip",
|
||||
base_path=tmp_path,
|
||||
)
|
||||
# Mutate and save
|
||||
cell.state = "active"
|
||||
cell.agents.append("agentA")
|
||||
save_cell(cell)
|
||||
|
||||
loaded = load_cell(tmp_path / "roundtrip-mission")
|
||||
assert loaded.id == "roundtrip-mission"
|
||||
assert loaded.state == "active"
|
||||
assert "agentA" in loaded.agents
|
||||
assert loaded.repo_url == "https://github.com/test/roundtrip"
|
||||
|
||||
|
||||
def test_cell_state_transitions(tmp_path: Path) -> None:
|
||||
"""State changes are persisted correctly through save/load."""
|
||||
cell = create_cell(
|
||||
mission_id="state-mission",
|
||||
name="State Test",
|
||||
repo_url="https://github.com/test/state",
|
||||
base_path=tmp_path,
|
||||
)
|
||||
assert cell.state == "pending"
|
||||
|
||||
cell.state = "active"
|
||||
save_cell(cell)
|
||||
|
||||
reloaded = load_cell(tmp_path / "state-mission")
|
||||
assert reloaded.state == "active"
|
||||
|
||||
reloaded.state = "archived"
|
||||
save_cell(reloaded)
|
||||
|
||||
final = load_cell(tmp_path / "state-mission")
|
||||
assert final.state == "archived"
|
||||
|
||||
|
||||
def test_destroy_cell_marks_destroyed(tmp_path: Path) -> None:
|
||||
"""destroy_cell sets state to 'destroyed' and persists it."""
|
||||
cell = create_cell(
|
||||
mission_id="destroy-mission",
|
||||
name="Destroy Test",
|
||||
repo_url="https://github.com/test/destroy",
|
||||
base_path=tmp_path,
|
||||
)
|
||||
|
||||
destroy_cell(cell, cleanup_dirs=False)
|
||||
assert cell.state == "destroyed"
|
||||
|
||||
reloaded = load_cell(tmp_path / "destroy-mission")
|
||||
assert reloaded.state == "destroyed"
|
||||
136
tests/test_lazarus_cli.py
Normal file
136
tests/test_lazarus_cli.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Tests for lazarus.cli — CLI commands."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from lazarus.cli import main
|
||||
from lazarus.roster import MissionRoster
|
||||
|
||||
|
||||
def _run(argv: list[str], tmp_path: Path, capsys: pytest.CaptureFixture) -> tuple:
|
||||
"""Helper: run CLI with a tmp registry and capture stdout as JSON.
|
||||
|
||||
Prepends --registry before the subcommand so argparse sees it as a global arg.
|
||||
"""
|
||||
registry = str(tmp_path / "roster.json")
|
||||
rc = main(["--registry", registry, *argv])
|
||||
out = capsys.readouterr().out
|
||||
return json.loads(out), rc
|
||||
|
||||
|
||||
# Patch setup_level1_isolation globally for all CLI tests to avoid permission issues
|
||||
@pytest.fixture(autouse=True)
|
||||
def no_isolation():
|
||||
with patch("lazarus.cli.setup_level1_isolation"):
|
||||
yield
|
||||
|
||||
|
||||
def test_invite_creates_mission_cell(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
|
||||
"""invite creates a mission cell directory with cell.json."""
|
||||
base = str(tmp_path / "missions")
|
||||
result, rc = _run(
|
||||
[
|
||||
"invite", "agentAlpha",
|
||||
"--repo", "https://github.com/test/repo",
|
||||
"--base-path", base,
|
||||
],
|
||||
tmp_path,
|
||||
capsys,
|
||||
)
|
||||
assert rc == 0
|
||||
mission_id = result["mission_id"]
|
||||
cell_json = tmp_path / "missions" / mission_id / "cell.json"
|
||||
assert cell_json.exists()
|
||||
|
||||
|
||||
def test_invite_adds_agent_to_roster(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
|
||||
"""After invite, the agent appears in the roster with status 'invited'."""
|
||||
base = str(tmp_path / "missions")
|
||||
registry = str(tmp_path / "roster.json")
|
||||
|
||||
rc = main([
|
||||
"--registry", registry,
|
||||
"invite", "agentBeta",
|
||||
"--repo", "https://github.com/test/repo",
|
||||
"--base-path", base,
|
||||
])
|
||||
out = capsys.readouterr().out
|
||||
result = json.loads(out)
|
||||
assert rc == 0
|
||||
|
||||
roster = MissionRoster(Path(registry))
|
||||
agents = roster.list_agents(result["mission_id"])
|
||||
agent_names = [a.agent_name for a in agents]
|
||||
assert "agentBeta" in agent_names
|
||||
|
||||
|
||||
def test_list_shows_missions(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
|
||||
"""list command shows missions that were previously created via invite."""
|
||||
base = str(tmp_path / "missions")
|
||||
registry = str(tmp_path / "roster.json")
|
||||
|
||||
# Create a mission first
|
||||
main(["--registry", registry, "invite", "agentGamma",
|
||||
"--repo", "https://github.com/test/repo", "--base-path", base])
|
||||
capsys.readouterr() # discard invite output
|
||||
|
||||
rc = main(["--registry", registry, "list"])
|
||||
out = capsys.readouterr().out
|
||||
missions = json.loads(out)
|
||||
|
||||
assert rc == 0
|
||||
assert isinstance(missions, list)
|
||||
assert len(missions) >= 1
|
||||
|
||||
|
||||
def test_status_shows_agents(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
|
||||
"""status command shows agent count for a mission."""
|
||||
base = str(tmp_path / "missions")
|
||||
registry = str(tmp_path / "roster.json")
|
||||
|
||||
main(["--registry", registry, "invite", "agentDelta",
|
||||
"--repo", "https://github.com/test/repo", "--base-path", base])
|
||||
invite_out = json.loads(capsys.readouterr().out)
|
||||
mission_id = invite_out["mission_id"]
|
||||
|
||||
rc = main(["--registry", registry, "status", mission_id])
|
||||
out = capsys.readouterr().out
|
||||
status = json.loads(out)
|
||||
|
||||
assert rc == 0
|
||||
assert status["agent_count"] >= 1
|
||||
agent_names = [a["agent_name"] for a in status["agents"]]
|
||||
assert "agentDelta" in agent_names
|
||||
|
||||
|
||||
def test_depart_updates_agent_status(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
|
||||
"""depart command marks an agent's status as 'departed'."""
|
||||
base = str(tmp_path / "missions")
|
||||
registry = str(tmp_path / "roster.json")
|
||||
|
||||
main(["--registry", registry, "invite", "agentEpsilon",
|
||||
"--repo", "https://github.com/test/repo", "--base-path", base])
|
||||
invite_out = json.loads(capsys.readouterr().out)
|
||||
mission_id = invite_out["mission_id"]
|
||||
|
||||
rc = main(["--registry", registry, "depart", "agentEpsilon",
|
||||
"--mission", mission_id])
|
||||
out = capsys.readouterr().out
|
||||
result = json.loads(out)
|
||||
|
||||
assert rc == 0
|
||||
assert result["status"] == "departed"
|
||||
|
||||
# Verify persisted
|
||||
roster = MissionRoster(Path(registry))
|
||||
agents = roster.list_agents(mission_id)
|
||||
epsilon = next(a for a in agents if a.agent_name == "agentEpsilon")
|
||||
assert epsilon.status == "departed"
|
||||
141
tests/test_lazarus_harness.py
Normal file
141
tests/test_lazarus_harness.py
Normal file
@@ -0,0 +1,141 @@
|
||||
"""Tests for lazarus.harness — MissionCellHarness."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from lazarus.cell import create_cell, save_cell
|
||||
from lazarus.harness import MissionCellHarness
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
def _make_cell_and_harness(
|
||||
tmp_path: Path,
|
||||
agent_name: str = "testAgent",
|
||||
) -> tuple[object, MissionCellHarness]:
|
||||
cell = create_cell(
|
||||
mission_id="harness-test-001",
|
||||
name="Harness Test",
|
||||
repo_url="https://github.com/test/repo",
|
||||
base_path=tmp_path / "cells",
|
||||
)
|
||||
harness = MissionCellHarness(
|
||||
agent_name=agent_name,
|
||||
cell_path=Path(cell.cell_path),
|
||||
hermes_ws_url="ws://localhost:8000/ws",
|
||||
)
|
||||
return cell, harness
|
||||
|
||||
|
||||
def test_harness_get_env_includes_cell_path(tmp_path: Path) -> None:
|
||||
"""get_env() returns a dict that includes MISSION_CELL_PATH and AGENT_NAME."""
|
||||
cell, harness = _make_cell_and_harness(tmp_path)
|
||||
|
||||
# Load cell manually so get_env works without start()
|
||||
from lazarus.cell import load_cell
|
||||
harness._cell = load_cell(Path(cell.cell_path))
|
||||
|
||||
env = harness.get_env()
|
||||
assert "MISSION_CELL_PATH" in env
|
||||
assert env["MISSION_CELL_PATH"] == str(Path(cell.cell_path))
|
||||
assert env["AGENT_NAME"] == "testAgent"
|
||||
assert "MISSION_ID" in env
|
||||
assert "HOME" in env
|
||||
|
||||
|
||||
def test_harness_get_env_uses_custom_home(tmp_path: Path) -> None:
|
||||
"""When home_path is provided, HOME in env equals that path."""
|
||||
cell, _ = _make_cell_and_harness(tmp_path)
|
||||
custom_home = str(tmp_path / "wizard_home")
|
||||
harness = MissionCellHarness(
|
||||
agent_name="agentX",
|
||||
cell_path=Path(cell.cell_path),
|
||||
home_path=custom_home,
|
||||
)
|
||||
|
||||
from lazarus.cell import load_cell
|
||||
harness._cell = load_cell(Path(cell.cell_path))
|
||||
|
||||
env = harness.get_env()
|
||||
assert env["HOME"] == custom_home
|
||||
|
||||
|
||||
def test_harness_write_mission_log(tmp_path: Path) -> None:
|
||||
"""write_mission_log creates a log file in the cell's logs/ directory."""
|
||||
cell, harness = _make_cell_and_harness(tmp_path)
|
||||
|
||||
from lazarus.cell import load_cell
|
||||
harness._cell = load_cell(Path(cell.cell_path))
|
||||
|
||||
harness.write_mission_log("Hello from test")
|
||||
|
||||
log_file = Path(cell.cell_path) / "logs" / "mission.log"
|
||||
assert log_file.exists()
|
||||
content = log_file.read_text()
|
||||
assert "Hello from test" in content
|
||||
assert "testAgent" in content
|
||||
|
||||
|
||||
async def test_harness_start_emits_telemetry(tmp_path: Path) -> None:
|
||||
"""start() emits an agent_joined mission_event via WebSocket."""
|
||||
cell, harness = _make_cell_and_harness(tmp_path)
|
||||
|
||||
mock_ws = AsyncMock()
|
||||
mock_ws.send = AsyncMock()
|
||||
mock_ws.close = AsyncMock()
|
||||
|
||||
sent_messages = []
|
||||
|
||||
async def capture_send(payload: str) -> None:
|
||||
sent_messages.append(json.loads(payload))
|
||||
|
||||
mock_ws.send.side_effect = capture_send
|
||||
|
||||
with patch("lazarus.isolation.setup_level1_isolation"):
|
||||
with patch("lazarus.harness.setup_level1_isolation"):
|
||||
with patch("lazarus.harness.websockets") as mock_websockets:
|
||||
mock_websockets.connect = AsyncMock(return_value=mock_ws)
|
||||
harness._ws_connected = False
|
||||
|
||||
await harness.start()
|
||||
|
||||
assert any(
|
||||
m.get("event") == "agent_joined" and m.get("type") == "mission_event"
|
||||
for m in sent_messages
|
||||
), f"Expected agent_joined event; got: {sent_messages}"
|
||||
|
||||
|
||||
async def test_harness_stop_emits_departure_telemetry(tmp_path: Path) -> None:
|
||||
"""stop() emits an agent_departed mission_event via WebSocket."""
|
||||
cell, harness = _make_cell_and_harness(tmp_path)
|
||||
|
||||
from lazarus.cell import load_cell
|
||||
harness._cell = load_cell(Path(cell.cell_path))
|
||||
|
||||
mock_ws = AsyncMock()
|
||||
mock_ws.close = AsyncMock()
|
||||
|
||||
sent_messages = []
|
||||
|
||||
async def capture_send(payload: str) -> None:
|
||||
sent_messages.append(json.loads(payload))
|
||||
|
||||
mock_ws.send = AsyncMock(side_effect=capture_send)
|
||||
harness._ws = mock_ws
|
||||
harness._ws_connected = True
|
||||
|
||||
await harness.stop()
|
||||
|
||||
assert any(
|
||||
m.get("event") == "agent_departed"
|
||||
for m in sent_messages
|
||||
), f"Expected agent_departed event; got: {sent_messages}"
|
||||
107
tests/test_lazarus_isolation.py
Normal file
107
tests/test_lazarus_isolation.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Tests for lazarus.isolation — Level 1 isolation setup."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from lazarus.cell import create_cell
|
||||
from lazarus.isolation import (
|
||||
IsolationManifest,
|
||||
load_apparmor_profile,
|
||||
setup_level1_isolation,
|
||||
write_apparmor_profile,
|
||||
)
|
||||
|
||||
|
||||
def _make_cell(tmp_path: Path, home_path: str = "") -> object:
|
||||
return create_cell(
|
||||
mission_id="iso-test-001",
|
||||
name="Isolation Test",
|
||||
repo_url="https://github.com/test/repo",
|
||||
base_path=tmp_path / "cells",
|
||||
home_path=home_path,
|
||||
)
|
||||
|
||||
|
||||
def test_setup_isolation_creates_workspace(tmp_path: Path) -> None:
|
||||
"""setup_level1_isolation creates workspace and logs dirs."""
|
||||
cell = _make_cell(tmp_path)
|
||||
manifest = setup_level1_isolation(cell)
|
||||
|
||||
assert manifest.workspace_path.is_dir()
|
||||
assert manifest.logs_path.is_dir()
|
||||
assert isinstance(manifest, IsolationManifest)
|
||||
assert manifest.cell_id == cell.id
|
||||
|
||||
|
||||
def test_write_apparmor_profile_creates_file(tmp_path: Path) -> None:
|
||||
"""write_apparmor_profile writes a file at the given output path."""
|
||||
cell = _make_cell(tmp_path, home_path=str(tmp_path / "home" / "agentX"))
|
||||
output_path = tmp_path / "test.apparmor"
|
||||
|
||||
result_path = write_apparmor_profile(cell, output_path)
|
||||
assert result_path == output_path
|
||||
assert output_path.exists()
|
||||
assert output_path.stat().st_size > 0
|
||||
|
||||
|
||||
def test_apparmor_profile_content(tmp_path: Path) -> None:
|
||||
"""Profile file contains correct cell ID, workspace, logs, and home paths."""
|
||||
home = str(tmp_path / "home" / "agentX")
|
||||
cell = _make_cell(tmp_path, home_path=home)
|
||||
output_path = tmp_path / "test.apparmor"
|
||||
|
||||
write_apparmor_profile(cell, output_path)
|
||||
content = output_path.read_text()
|
||||
|
||||
assert cell.id in content
|
||||
assert "nexus-mission-" in content
|
||||
assert "workspace" in content
|
||||
assert "logs" in content
|
||||
assert home in content
|
||||
# Deny write to home
|
||||
assert "deny" in content
|
||||
|
||||
|
||||
def test_load_apparmor_profile_noop_on_no_apparmor(tmp_path: Path) -> None:
|
||||
"""load_apparmor_profile returns False gracefully when apparmor_parser is absent."""
|
||||
profile_path = tmp_path / "dummy.apparmor"
|
||||
profile_path.write_text("# fake profile\n")
|
||||
|
||||
# Simulate apparmor_parser not found (FileNotFoundError from subprocess.run)
|
||||
with patch("lazarus.isolation.platform.system", return_value="Linux"):
|
||||
with patch(
|
||||
"lazarus.isolation.subprocess.run",
|
||||
side_effect=FileNotFoundError("apparmor_parser not found"),
|
||||
):
|
||||
result = load_apparmor_profile(profile_path)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_load_apparmor_profile_noop_on_non_linux(tmp_path: Path) -> None:
|
||||
"""load_apparmor_profile returns False on non-Linux platforms."""
|
||||
profile_path = tmp_path / "dummy.apparmor"
|
||||
profile_path.write_text("# fake profile\n")
|
||||
|
||||
with patch("lazarus.isolation.platform.system", return_value="Darwin"):
|
||||
result = load_apparmor_profile(profile_path)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_setup_isolation_writes_profile_when_home_path_set(tmp_path: Path) -> None:
|
||||
"""When home_path is set, setup_level1_isolation writes an AppArmor profile."""
|
||||
home = str(tmp_path / "home" / "agentZ")
|
||||
cell = _make_cell(tmp_path, home_path=home)
|
||||
|
||||
manifest = setup_level1_isolation(cell)
|
||||
|
||||
assert manifest.apparmor_profile_path is not None
|
||||
assert manifest.apparmor_profile_path.exists()
|
||||
122
tests/test_lazarus_roster.py
Normal file
122
tests/test_lazarus_roster.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Tests for lazarus.roster — MissionRoster registry."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from lazarus.cell import create_cell
|
||||
from lazarus.roster import MissionRoster
|
||||
|
||||
|
||||
def _make_roster(tmp_path: Path) -> MissionRoster:
|
||||
return MissionRoster(registry_path=tmp_path / "roster.json")
|
||||
|
||||
|
||||
def _make_cell(tmp_path: Path, mission_id: str = "m-001") -> object:
|
||||
return create_cell(
|
||||
mission_id=mission_id,
|
||||
name="Test Mission",
|
||||
repo_url="https://github.com/test/repo",
|
||||
base_path=tmp_path / "cells",
|
||||
)
|
||||
|
||||
|
||||
def test_add_mission_and_invite(tmp_path: Path) -> None:
|
||||
"""Can add a mission and invite an agent to it."""
|
||||
roster = _make_roster(tmp_path)
|
||||
cell = _make_cell(tmp_path)
|
||||
|
||||
roster.add_mission(cell)
|
||||
entry = roster.invite_agent(cell.id, "agentA", role="contributor")
|
||||
|
||||
assert entry.agent_name == "agentA"
|
||||
assert entry.status == "invited"
|
||||
assert entry.role == "contributor"
|
||||
assert entry.accepted_at is None
|
||||
|
||||
|
||||
def test_accept_invitation(tmp_path: Path) -> None:
|
||||
"""Accepting an invitation changes status from 'invited' to 'active'."""
|
||||
roster = _make_roster(tmp_path)
|
||||
cell = _make_cell(tmp_path)
|
||||
|
||||
roster.add_mission(cell)
|
||||
roster.invite_agent(cell.id, "agentB")
|
||||
entry = roster.accept_invitation(cell.id, "agentB")
|
||||
|
||||
assert entry.status == "active"
|
||||
assert entry.accepted_at is not None
|
||||
|
||||
|
||||
def test_depart_agent(tmp_path: Path) -> None:
|
||||
"""Departing changes status to 'departed'."""
|
||||
roster = _make_roster(tmp_path)
|
||||
cell = _make_cell(tmp_path)
|
||||
|
||||
roster.add_mission(cell)
|
||||
roster.invite_agent(cell.id, "agentC")
|
||||
roster.accept_invitation(cell.id, "agentC")
|
||||
entry = roster.depart_agent(cell.id, "agentC")
|
||||
|
||||
assert entry.status == "departed"
|
||||
|
||||
|
||||
def test_list_missions_filtered_by_state(tmp_path: Path) -> None:
|
||||
"""list_missions(state=...) filters correctly."""
|
||||
roster = _make_roster(tmp_path)
|
||||
cell1 = _make_cell(tmp_path, "m-001")
|
||||
cell2 = _make_cell(tmp_path, "m-002")
|
||||
|
||||
roster.add_mission(cell1)
|
||||
roster.add_mission(cell2)
|
||||
|
||||
# Both are 'pending' by default
|
||||
pending = roster.list_missions(state="pending")
|
||||
assert len(pending) == 2
|
||||
|
||||
# No active missions yet
|
||||
active = roster.list_missions(state="active")
|
||||
assert len(active) == 0
|
||||
|
||||
|
||||
def test_roster_persistence(tmp_path: Path) -> None:
|
||||
"""Save and reload the roster file; data survives the round-trip."""
|
||||
registry_path = tmp_path / "roster.json"
|
||||
roster = MissionRoster(registry_path=registry_path)
|
||||
cell = _make_cell(tmp_path)
|
||||
|
||||
roster.add_mission(cell)
|
||||
roster.invite_agent(cell.id, "agentD", role="lead")
|
||||
roster.save()
|
||||
|
||||
# Load fresh from disk
|
||||
roster2 = MissionRoster(registry_path=registry_path)
|
||||
mission = roster2.get_mission(cell.id)
|
||||
assert mission is not None
|
||||
assert mission["id"] == cell.id
|
||||
|
||||
agents = roster2.list_agents(cell.id)
|
||||
assert len(agents) == 1
|
||||
assert agents[0].agent_name == "agentD"
|
||||
assert agents[0].role == "lead"
|
||||
assert agents[0].status == "invited"
|
||||
|
||||
|
||||
def test_list_agents(tmp_path: Path) -> None:
|
||||
"""list_agents returns all agents for a given mission."""
|
||||
roster = _make_roster(tmp_path)
|
||||
cell = _make_cell(tmp_path)
|
||||
|
||||
roster.add_mission(cell)
|
||||
roster.invite_agent(cell.id, "alice")
|
||||
roster.invite_agent(cell.id, "bob")
|
||||
|
||||
agents = roster.list_agents(cell.id)
|
||||
names = {a.agent_name for a in agents}
|
||||
assert names == {"alice", "bob"}
|
||||
@@ -1,659 +0,0 @@
|
||||
"""Tests for the Resurrection Pool — health polling, dead-agent detection, auto-revive.
|
||||
|
||||
Validates:
|
||||
- GatewayConfig / PoolConfig loading from dicts and files
|
||||
- RevivePolicy enum parsing (including unknown values)
|
||||
- probe_gateway / probe_process / probe_heartbeat functions
|
||||
- assess_agent combining all three probes into AgentHealth
|
||||
- AgentHealth state: HEALTHY / DEGRADED / DEAD
|
||||
- ResurrectionPool._update_health — dead_at tracking and recovery
|
||||
- ResurrectionPool.sweep — policy routing (auto/ask/never)
|
||||
- apply_revive_policy — auto-restart, never, ask-with-no-channel fallback
|
||||
- Telegram / Nostr dry-run paths
|
||||
- Gitea issue helpers (mocked)
|
||||
- CLI argument parsing
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Module loading (same pattern as test_nexus_watchdog.py) ──────────
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_rp_spec = importlib.util.spec_from_file_location(
|
||||
"resurrection_pool_test",
|
||||
PROJECT_ROOT / "bin" / "resurrection_pool.py",
|
||||
)
|
||||
_rp = importlib.util.module_from_spec(_rp_spec)
|
||||
sys.modules["resurrection_pool_test"] = _rp
|
||||
_rp_spec.loader.exec_module(_rp)
|
||||
|
||||
# Convenience aliases
|
||||
AgentHealth = _rp.AgentHealth
|
||||
AgentState = _rp.AgentState
|
||||
GatewayConfig = _rp.GatewayConfig
|
||||
MissionConfig = _rp.MissionConfig
|
||||
PoolConfig = _rp.PoolConfig
|
||||
ResurrectionPool = _rp.ResurrectionPool
|
||||
RevivePolicy = _rp.RevivePolicy
|
||||
apply_revive_policy = _rp.apply_revive_policy
|
||||
assess_agent = _rp.assess_agent
|
||||
build_parser = _rp.build_parser
|
||||
probe_gateway = _rp.probe_gateway
|
||||
probe_heartbeat = _rp.probe_heartbeat
|
||||
probe_process = _rp.probe_process
|
||||
revive_agent = _rp.revive_agent
|
||||
send_telegram_approval_request = _rp.send_telegram_approval_request
|
||||
send_nostr_approval_request = _rp.send_nostr_approval_request
|
||||
|
||||
|
||||
# ── Helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
def _make_gw(name="timmy", host="localhost", port=8765,
|
||||
heartbeat_path=None, process_pattern="nexus_think",
|
||||
revive_cmd=None, tmp_path=None):
|
||||
if heartbeat_path is None and tmp_path is not None:
|
||||
heartbeat_path = str(tmp_path / "heartbeat.json")
|
||||
elif heartbeat_path is None:
|
||||
heartbeat_path = "~/.nexus/heartbeat.json"
|
||||
return GatewayConfig(
|
||||
name=name,
|
||||
host=host,
|
||||
port=port,
|
||||
heartbeat_path=heartbeat_path,
|
||||
process_pattern=process_pattern,
|
||||
revive_cmd=revive_cmd or [],
|
||||
)
|
||||
|
||||
|
||||
def _write_heartbeat(path: Path, age_seconds: float = 10.0):
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
data = {
|
||||
"pid": os.getpid(),
|
||||
"timestamp": time.time() - age_seconds,
|
||||
"cycle": 7,
|
||||
"model": "timmy:v0.1",
|
||||
"status": "thinking",
|
||||
}
|
||||
path.write_text(json.dumps(data))
|
||||
|
||||
|
||||
def _make_cfg(**kwargs) -> PoolConfig:
|
||||
return PoolConfig(**kwargs)
|
||||
|
||||
|
||||
# ── RevivePolicy tests ───────────────────────────────────────────────
|
||||
|
||||
class TestRevivePolicy:
|
||||
def test_valid_values(self):
|
||||
assert RevivePolicy("auto") == RevivePolicy.AUTO
|
||||
assert RevivePolicy("ask") == RevivePolicy.ASK
|
||||
assert RevivePolicy("never") == RevivePolicy.NEVER
|
||||
|
||||
def test_mission_from_dict_unknown_policy(self):
|
||||
m = MissionConfig.from_dict("m1", {"revive_policy": "bogus", "agents": ["a"]})
|
||||
assert m.revive_policy == RevivePolicy.ASK
|
||||
|
||||
def test_mission_from_dict_valid_policy(self):
|
||||
m = MissionConfig.from_dict("m1", {"revive_policy": "auto", "agents": ["x"]})
|
||||
assert m.revive_policy == RevivePolicy.AUTO
|
||||
|
||||
def test_mission_agents_default_empty(self):
|
||||
m = MissionConfig.from_dict("m2", {})
|
||||
assert m.agents == []
|
||||
|
||||
|
||||
# ── PoolConfig loading tests ─────────────────────────────────────────
|
||||
|
||||
class TestPoolConfig:
|
||||
def test_defaults(self):
|
||||
cfg = PoolConfig()
|
||||
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
|
||||
assert cfg.dead_threshold == _rp.DEFAULT_DEAD_THRESHOLD
|
||||
assert cfg.gateways == []
|
||||
assert cfg.missions == []
|
||||
|
||||
def test_from_dict_basic(self):
|
||||
data = {
|
||||
"poll_interval": 30,
|
||||
"dead_threshold": 120,
|
||||
"gateways": [{"name": "allegro", "port": 9000}],
|
||||
"missions": {
|
||||
"m1": {"label": "Sprint", "agents": ["allegro"], "revive_policy": "auto"},
|
||||
},
|
||||
}
|
||||
cfg = PoolConfig.from_dict(data)
|
||||
assert cfg.poll_interval == 30
|
||||
assert cfg.dead_threshold == 120
|
||||
assert len(cfg.gateways) == 1
|
||||
assert cfg.gateways[0].name == "allegro"
|
||||
assert cfg.gateways[0].port == 9000
|
||||
assert len(cfg.missions) == 1
|
||||
assert cfg.missions[0].revive_policy == RevivePolicy.AUTO
|
||||
|
||||
def test_from_dict_telegram(self):
|
||||
data = {"telegram": {"bot_token": "tok123", "chat_id": "-999"}}
|
||||
cfg = PoolConfig.from_dict(data)
|
||||
assert cfg.telegram_bot_token == "tok123"
|
||||
assert cfg.telegram_chat_id == "-999"
|
||||
|
||||
def test_from_dict_nostr(self):
|
||||
data = {"nostr": {"relay_url": "wss://relay.damus.io", "recipient_pubkey": "abc"}}
|
||||
cfg = PoolConfig.from_dict(data)
|
||||
assert cfg.nostr_relay_url == "wss://relay.damus.io"
|
||||
assert cfg.nostr_recipient_pubkey == "abc"
|
||||
|
||||
def test_load_from_file(self, tmp_path):
|
||||
config_file = tmp_path / "pool.json"
|
||||
config_file.write_text(json.dumps({"poll_interval": 42}))
|
||||
cfg = PoolConfig.load(config_file)
|
||||
assert cfg.poll_interval == 42
|
||||
|
||||
def test_load_missing_file_returns_defaults(self, tmp_path):
|
||||
cfg = PoolConfig.load(tmp_path / "nonexistent.json")
|
||||
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
|
||||
|
||||
def test_load_corrupt_file_falls_through(self, tmp_path):
|
||||
bad = tmp_path / "bad.json"
|
||||
bad.write_text("not json {{")
|
||||
cfg = PoolConfig.load(bad)
|
||||
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
|
||||
|
||||
def test_load_env_var(self, tmp_path, monkeypatch):
|
||||
config_file = tmp_path / "env_pool.json"
|
||||
config_file.write_text(json.dumps({"dead_threshold": 99}))
|
||||
monkeypatch.setenv("RESURRECTION_CONFIG", str(config_file))
|
||||
cfg = PoolConfig.load()
|
||||
assert cfg.dead_threshold == 99
|
||||
|
||||
|
||||
# ── probe_gateway tests ──────────────────────────────────────────────
|
||||
|
||||
class TestProbeGateway:
|
||||
def test_refused_port(self):
|
||||
# Port 1 should always be refused on localhost
|
||||
alive, msg = probe_gateway("localhost", 1, timeout=2)
|
||||
assert not alive
|
||||
assert "refused" in msg.lower() or "failed" in msg.lower()
|
||||
|
||||
def test_open_port(self):
|
||||
# Open a real server socket to confirm TCP probe works
|
||||
import threading
|
||||
import socket as _socket
|
||||
srv = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
|
||||
srv.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
|
||||
srv.bind(("127.0.0.1", 0))
|
||||
srv.listen(1)
|
||||
port = srv.getsockname()[1]
|
||||
|
||||
def accept_one():
|
||||
try:
|
||||
conn, _ = srv.accept()
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
srv.close()
|
||||
|
||||
t = threading.Thread(target=accept_one, daemon=True)
|
||||
t.start()
|
||||
|
||||
alive, msg = probe_gateway("127.0.0.1", port, timeout=3)
|
||||
t.join(timeout=2)
|
||||
assert alive
|
||||
assert "reachable" in msg.lower()
|
||||
|
||||
def test_invalid_host(self):
|
||||
alive, msg = probe_gateway("this.host.does.not.exist.invalid", 8765, timeout=2)
|
||||
assert not alive
|
||||
|
||||
|
||||
# ── probe_process tests ──────────────────────────────────────────────
|
||||
|
||||
class TestProbeProcess:
|
||||
def test_self_is_filtered(self):
|
||||
# The test runner process matches "python" — but watchdog filters own PID.
|
||||
# We just check it returns a tuple without error.
|
||||
alive, msg = probe_process("definitely-no-process-xyz-12345")
|
||||
assert not alive
|
||||
assert isinstance(msg, str)
|
||||
|
||||
def test_running_process(self):
|
||||
# pgrep for the current python process name should always find something
|
||||
alive, msg = probe_process("pytest")
|
||||
# May or may not find it depending on env; just check it doesn't crash
|
||||
assert isinstance(alive, bool)
|
||||
assert isinstance(msg, str)
|
||||
|
||||
@patch("subprocess.run", side_effect=FileNotFoundError)
|
||||
def test_pgrep_missing(self, _mock):
|
||||
alive, msg = probe_process("anything")
|
||||
assert alive # graceful fallback: don't raise false alarms
|
||||
assert "pgrep" in msg.lower()
|
||||
|
||||
|
||||
# ── probe_heartbeat tests ────────────────────────────────────────────
|
||||
|
||||
class TestProbeHeartbeat:
|
||||
def test_missing_file(self, tmp_path):
|
||||
alive, age, msg = probe_heartbeat(tmp_path / "no.json", dead_threshold=300)
|
||||
assert not alive
|
||||
assert age is None
|
||||
assert "no heartbeat" in msg.lower()
|
||||
|
||||
def test_fresh_heartbeat(self, tmp_path):
|
||||
hb = tmp_path / "heartbeat.json"
|
||||
_write_heartbeat(hb, age_seconds=10)
|
||||
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||
assert alive
|
||||
assert age is not None and age < 20
|
||||
assert "alive" in msg.lower()
|
||||
|
||||
def test_stale_heartbeat(self, tmp_path):
|
||||
hb = tmp_path / "heartbeat.json"
|
||||
_write_heartbeat(hb, age_seconds=600)
|
||||
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||
assert not alive
|
||||
assert age > 500
|
||||
assert "stale" in msg.lower()
|
||||
|
||||
def test_corrupt_json(self, tmp_path):
|
||||
hb = tmp_path / "heartbeat.json"
|
||||
hb.write_text("{ broken json")
|
||||
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||
assert not alive
|
||||
assert "unreadable" in msg.lower()
|
||||
|
||||
def test_missing_timestamp_treated_as_stale(self, tmp_path):
|
||||
hb = tmp_path / "heartbeat.json"
|
||||
hb.write_text(json.dumps({"cycle": 1})) # no timestamp key
|
||||
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
|
||||
# timestamp defaults to 0 → very old → stale
|
||||
assert not alive
|
||||
|
||||
|
||||
# ── assess_agent tests ────────────────────────────────────────────────
|
||||
|
||||
class TestAssessAgent:
|
||||
def _gw(self, tmp_path, port=1):
|
||||
return _make_gw(tmp_path=tmp_path, port=port)
|
||||
|
||||
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
|
||||
@patch("resurrection_pool_test.probe_process", return_value=(True, "ok"))
|
||||
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 5.0, "ok"))
|
||||
def test_all_healthy(self, _hb, _proc, _gw, tmp_path):
|
||||
gw = self._gw(tmp_path)
|
||||
h = assess_agent(gw, dead_threshold=300)
|
||||
assert h.state == AgentState.HEALTHY
|
||||
|
||||
@patch("resurrection_pool_test.probe_gateway", return_value=(False, "refused"))
|
||||
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
|
||||
@patch("resurrection_pool_test.probe_heartbeat", return_value=(False, None, "missing"))
|
||||
def test_all_dead(self, _hb, _proc, _gw, tmp_path):
|
||||
gw = self._gw(tmp_path)
|
||||
h = assess_agent(gw, dead_threshold=300)
|
||||
assert h.state == AgentState.DEAD
|
||||
|
||||
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
|
||||
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
|
||||
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 10.0, "ok"))
|
||||
def test_partially_failing_is_degraded(self, _hb, _proc, _gw, tmp_path):
|
||||
gw = self._gw(tmp_path)
|
||||
h = assess_agent(gw, dead_threshold=300)
|
||||
assert h.state == AgentState.DEGRADED
|
||||
|
||||
|
||||
# ── AgentHealth state machine tests ──────────────────────────────────
|
||||
|
||||
class TestAgentHealthTracking:
|
||||
def _pool(self):
|
||||
cfg = _make_cfg()
|
||||
return ResurrectionPool(cfg, dry_run=True)
|
||||
|
||||
def test_first_dead_at_set_on_first_failure(self):
|
||||
pool = self._pool()
|
||||
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
result = pool._update_health("timmy", dead)
|
||||
assert result.first_dead_at is not None
|
||||
|
||||
def test_first_dead_at_preserved_across_cycles(self):
|
||||
pool = self._pool()
|
||||
t0 = time.time() - 100
|
||||
dead1 = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=t0, first_dead_at=t0)
|
||||
pool._update_health("timmy", dead1)
|
||||
|
||||
dead2 = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
result = pool._update_health("timmy", dead2)
|
||||
assert result.first_dead_at == t0 # preserved
|
||||
|
||||
def test_recovery_resets_first_dead_at(self):
|
||||
pool = self._pool()
|
||||
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
pool._update_health("timmy", dead)
|
||||
|
||||
healthy = AgentHealth(name="timmy", state=AgentState.HEALTHY,
|
||||
last_checked=time.time())
|
||||
result = pool._update_health("timmy", healthy)
|
||||
assert result.first_dead_at is None
|
||||
assert result.revive_attempts == 0
|
||||
|
||||
def test_revive_attempts_increments(self):
|
||||
pool = self._pool()
|
||||
h = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
pool._health["timmy"] = h
|
||||
h.revive_attempts = 2
|
||||
prev = pool._health["timmy"]
|
||||
|
||||
new = AgentHealth(name="timmy", state=AgentState.DEAD,
|
||||
last_checked=time.time())
|
||||
result = pool._update_health("timmy", new)
|
||||
assert result.revive_attempts == 2 # preserved from prev
|
||||
|
||||
|
||||
# ── mission_for / gateway_for tests ─────────────────────────────────
|
||||
|
||||
class TestPoolLookups:
|
||||
def test_mission_for_known_agent(self):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"missions": {"m1": {"agents": ["timmy"], "revive_policy": "auto"}},
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
m = pool._mission_for("timmy")
|
||||
assert m is not None
|
||||
assert m.mission_id == "m1"
|
||||
|
||||
def test_mission_for_unknown_agent(self):
|
||||
cfg = PoolConfig()
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
assert pool._mission_for("ghost") is None
|
||||
|
||||
def test_gateway_for(self):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"gateways": [{"name": "allegro", "port": 9000}],
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
gw = pool._gateway_for("allegro")
|
||||
assert gw is not None
|
||||
assert gw.port == 9000
|
||||
|
||||
def test_gateway_for_missing(self):
|
||||
pool = ResurrectionPool(PoolConfig(), dry_run=True)
|
||||
assert pool._gateway_for("nobody") is None
|
||||
|
||||
|
||||
# ── apply_revive_policy tests ────────────────────────────────────────
|
||||
|
||||
class TestApplyRevivePolicy:
|
||||
def _dead_health(self, name="timmy", dead_seconds=400):
|
||||
h = AgentHealth(name=name, state=AgentState.DEAD,
|
||||
first_dead_at=time.time() - dead_seconds,
|
||||
last_checked=time.time())
|
||||
return h
|
||||
|
||||
def test_never_policy_no_revive(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path)
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.NEVER)
|
||||
cfg = _make_cfg()
|
||||
health = self._dead_health()
|
||||
|
||||
with patch.object(_rp, "open_death_issue") as mock_issue:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert not result
|
||||
mock_issue.assert_called_once()
|
||||
|
||||
def test_auto_policy_launches_revive(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "revive"])
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
|
||||
cfg = _make_cfg()
|
||||
health = self._dead_health()
|
||||
|
||||
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert result
|
||||
mock_revive.assert_called_once_with(gw, dry_run=True)
|
||||
|
||||
def test_auto_policy_opens_issue_on_revive_failure(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path)
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
|
||||
cfg = _make_cfg()
|
||||
health = self._dead_health()
|
||||
|
||||
with patch.object(_rp, "revive_agent", return_value=False), \
|
||||
patch.object(_rp, "open_death_issue") as mock_issue:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert not result
|
||||
mock_issue.assert_called_once()
|
||||
|
||||
def test_ask_policy_dry_run_approves(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
|
||||
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
|
||||
health = self._dead_health()
|
||||
|
||||
# dry_run → send_telegram returns True, poll returns True
|
||||
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert result
|
||||
mock_revive.assert_called_once()
|
||||
|
||||
def test_ask_policy_no_channel_falls_back_to_auto(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
|
||||
cfg = _make_cfg() # no telegram, no nostr
|
||||
health = self._dead_health()
|
||||
|
||||
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
|
||||
assert result
|
||||
mock_revive.assert_called_once()
|
||||
|
||||
def test_skip_if_revive_pending(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path)
|
||||
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
|
||||
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
|
||||
health = self._dead_health()
|
||||
health.revive_pending = True # approval already outstanding
|
||||
|
||||
with patch.object(_rp, "revive_agent") as mock_revive:
|
||||
result = apply_revive_policy(gw, health, mission, cfg, dry_run=False)
|
||||
assert not result
|
||||
mock_revive.assert_not_called()
|
||||
|
||||
|
||||
# ── revive_agent tests ───────────────────────────────────────────────
|
||||
|
||||
class TestReviveAgent:
|
||||
def test_no_revive_cmd(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=[])
|
||||
assert not revive_agent(gw)
|
||||
|
||||
def test_dry_run(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
|
||||
assert revive_agent(gw, dry_run=True)
|
||||
|
||||
def test_launches_process(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["true"])
|
||||
result = revive_agent(gw, dry_run=False)
|
||||
assert result
|
||||
|
||||
def test_bad_command(self, tmp_path):
|
||||
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["/does/not/exist/xyz"])
|
||||
result = revive_agent(gw, dry_run=False)
|
||||
assert not result
|
||||
|
||||
|
||||
# ── Notification channel tests ────────────────────────────────────────
|
||||
|
||||
class TestNotifications:
|
||||
def test_telegram_dry_run(self):
|
||||
assert send_telegram_approval_request(
|
||||
"token", "chat", "timmy", "Sprint", 400, dry_run=True
|
||||
)
|
||||
|
||||
def test_telegram_no_token(self):
|
||||
assert not send_telegram_approval_request(
|
||||
"", "", "timmy", "Sprint", 400, dry_run=False
|
||||
)
|
||||
|
||||
def test_nostr_dry_run(self):
|
||||
result = send_nostr_approval_request(
|
||||
"wss://relay.damus.io", "pubkey123",
|
||||
"timmy", "Sprint", 400, dry_run=True
|
||||
)
|
||||
assert result
|
||||
|
||||
def test_nostr_no_config(self):
|
||||
result = send_nostr_approval_request(
|
||||
"", "",
|
||||
"timmy", "Sprint", 400, dry_run=False
|
||||
)
|
||||
assert not result
|
||||
|
||||
|
||||
# ── ResurrectionPool.sweep integration tests ─────────────────────────
|
||||
|
||||
class TestPoolSweep:
|
||||
def test_sweep_empty_gateways(self):
|
||||
pool = ResurrectionPool(PoolConfig(), dry_run=True)
|
||||
result = pool.sweep()
|
||||
assert result == {}
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_sweep_healthy_agent_no_policy_triggered(self, mock_assess):
|
||||
gw = GatewayConfig(name="timmy")
|
||||
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.HEALTHY,
|
||||
gateway_alive=True, process_alive=True, heartbeat_age=5.0,
|
||||
)
|
||||
|
||||
with patch.object(_rp, "apply_revive_policy") as mock_policy:
|
||||
pool.sweep()
|
||||
mock_policy.assert_not_called()
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_sweep_dead_agent_triggers_policy(self, mock_assess):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"gateways": [{"name": "timmy"}],
|
||||
"dead_threshold": 0, # treat any death as immediate trigger
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.DEAD,
|
||||
first_dead_at=time.time() - 999,
|
||||
last_checked=time.time(),
|
||||
)
|
||||
|
||||
with patch.object(_rp, "apply_revive_policy", return_value=False) as mock_policy:
|
||||
pool.sweep()
|
||||
mock_policy.assert_called_once()
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_sweep_degraded_does_not_trigger_policy(self, mock_assess):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"gateways": [{"name": "timmy"}],
|
||||
"dead_threshold": 10,
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.DEGRADED,
|
||||
first_dead_at=time.time() - 5, # only 5s, below threshold
|
||||
last_checked=time.time(),
|
||||
)
|
||||
|
||||
with patch.object(_rp, "apply_revive_policy") as mock_policy:
|
||||
pool.sweep()
|
||||
mock_policy.assert_not_called()
|
||||
|
||||
|
||||
# ── run_once / status tests ───────────────────────────────────────────
|
||||
|
||||
class TestRunOnce:
|
||||
def test_run_once_empty_returns_true(self):
|
||||
pool = ResurrectionPool(PoolConfig(), dry_run=True)
|
||||
assert pool.run_once()
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_run_once_dead_agent_returns_false(self, mock_assess):
|
||||
cfg = PoolConfig.from_dict({
|
||||
"gateways": [{"name": "timmy"}],
|
||||
"dead_threshold": 0,
|
||||
})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.DEAD,
|
||||
first_dead_at=time.time() - 9999,
|
||||
last_checked=time.time(),
|
||||
)
|
||||
with patch.object(_rp, "apply_revive_policy", return_value=False):
|
||||
result = pool.run_once()
|
||||
assert not result
|
||||
|
||||
@patch("resurrection_pool_test.assess_agent")
|
||||
def test_status_returns_list(self, mock_assess):
|
||||
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
|
||||
pool = ResurrectionPool(cfg, dry_run=True)
|
||||
mock_assess.return_value = AgentHealth(
|
||||
name="timmy", state=AgentState.HEALTHY,
|
||||
gateway_alive=True, process_alive=True, heartbeat_age=3.0,
|
||||
)
|
||||
pool.sweep()
|
||||
status = pool.status()
|
||||
assert isinstance(status, list)
|
||||
assert len(status) == 1
|
||||
assert status[0]["name"] == "timmy"
|
||||
assert status[0]["state"] == "healthy"
|
||||
|
||||
|
||||
# ── CLI tests ─────────────────────────────────────────────────────────
|
||||
|
||||
class TestCLI:
|
||||
def test_default_args(self):
|
||||
p = build_parser()
|
||||
args = p.parse_args([])
|
||||
assert not args.once
|
||||
assert not args.dry_run
|
||||
assert not args.status
|
||||
assert args.config is None
|
||||
|
||||
def test_once_flag(self):
|
||||
args = build_parser().parse_args(["--once"])
|
||||
assert args.once
|
||||
|
||||
def test_dry_run_flag(self):
|
||||
args = build_parser().parse_args(["--dry-run"])
|
||||
assert args.dry_run
|
||||
|
||||
def test_poll_interval_override(self):
|
||||
args = build_parser().parse_args(["--poll-interval", "15"])
|
||||
assert args.poll_interval == 15
|
||||
|
||||
def test_dead_threshold_override(self):
|
||||
args = build_parser().parse_args(["--dead-threshold", "60"])
|
||||
assert args.dead_threshold == 60
|
||||
|
||||
def test_config_path(self, tmp_path):
|
||||
cfg_file = tmp_path / "pool.json"
|
||||
cfg_file.write_text("{}")
|
||||
args = build_parser().parse_args(["--config", str(cfg_file)])
|
||||
assert args.config == str(cfg_file)
|
||||
Reference in New Issue
Block a user