Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
f83e103d27 feat: M6-P0 Foundation — cell spec, lazarus-pit daemon, health endpoint, issue template
Implements all P0 deliverables for the Lazarus Pit epic.

- docs/lazarus-pit/mission-cell-spec.md: Canonical `/var/missions/<uuid>/`
  directory layout, file schemas (cell.json, events.jsonl, heartbeat,
  daemon state), lifecycle state machine, and isolation guarantees.

- bin/lazarus_pit.py: Daemon skeleton with config loading, cell discovery,
  per-agent heartbeat polling, stale detection, resurrection stub (P3 placeholder),
  graceful signal handling, PID file management, and CLI subcommands
  (--status, --list-cells).

- config/lazarus-pit.toml: Documented config file with all tunable knobs:
  cells root, heartbeat thresholds, revive policy, gateway coordinates,
  Gitea notification settings.

- server.py: Added HTTP health heartbeat endpoint on NEXUS_HEALTH_PORT (default
  8766). GET /health returns JSON with status, uptime, connected_clients, ws_port,
  and timestamp. Consumed by lazarus-pit to verify gateway liveness.
  Runs in a background daemon thread — zero impact on existing WS logic.

- .gitea/ISSUE_TEMPLATE/mission-proposal.md: Gitea issue template for
  proposing new mission cells (agents, scope, isolation requirements,
  cell config, success criteria).

- tests/test_lazarus_pit.py: 25 unit tests covering config, cell discovery,
  heartbeat reading, health polling, resurrection stub, and CLI.

- tests/test_gateway_health.py: 6 unit tests covering health endpoint
  schema, 404 for unknown paths, uptime, and client count reflection.

All 31 new tests pass. Pre-existing test failures unchanged.

Fixes #879
Refs #878

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 14:16:45 -04:00
9 changed files with 1144 additions and 1599 deletions

View File

@@ -0,0 +1,62 @@
---
name: Mission Proposal
about: Propose a new Mission Cell — a temporary, isolated collaboration space for agents and humans
title: "[MISSION] "
labels: mission-proposal
---
## Mission Summary
<!-- One-sentence description of the mission objective -->
## Objective
<!-- What is the outcome we are driving toward? What does "done" look like? -->
## Agents Invited
<!-- List agents (or humans) that should be invited into this cell.
Format: @agent-name — role (developer / reviewer / observer / coordinator) -->
- @ —
## Scope & Deliverables
<!-- What artifacts will this mission produce? PRs, docs, deployed services, etc. -->
- [ ]
- [ ]
## Isolation Requirements
- [ ] No wizard home-directory access needed
- [ ] Read-only access to the following homes: (list or "none")
- [ ] External network access required: yes / no
## Cell Configuration
<!-- Leave blank to use defaults from config/lazarus-pit.toml -->
| Setting | Value |
|--------------------------|-------|
| Max duration | |
| Checkpoint interval | |
| Auto-archive on close | yes / no |
| Max revive attempts | |
## Related Issues / Context
- Epic: #
- Depends on: #
- Blocked by: #
## Success Criteria
<!-- How will we know this mission succeeded? Be specific. -->
1.
2.
## Notes
<!-- Anything else the lazarus-pit daemon or cell participants should know -->

419
bin/lazarus_pit.py Normal file
View File

@@ -0,0 +1,419 @@
#!/usr/bin/env python3
"""
lazarus-pit — Agent resurrection pool daemon.
Monitors active mission cells, heartbeats agents, detects downed agents,
and revives them back into their mission cells.
Usage:
python bin/lazarus_pit.py [--config path/to/lazarus-pit.toml]
python bin/lazarus_pit.py --status
python bin/lazarus_pit.py --list-cells
Config: config/lazarus-pit.toml
Architecture: docs/lazarus-pit/mission-cell-spec.md
Epic: #878 P0 Foundation: #879
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Optional TOML support (stdlib tomllib in 3.11+, tomli on older)
# ---------------------------------------------------------------------------
try:
import tomllib # Python 3.11+
except ImportError:
try:
import tomli as tomllib # type: ignore[no-reuse-def]
except ImportError:
tomllib = None # Config loading will fall back to defaults
PROJECT_ROOT = Path(__file__).parent.parent
DEFAULT_CONFIG_PATH = PROJECT_ROOT / "config" / "lazarus-pit.toml"
DEFAULT_CELLS_ROOT = Path("/var/missions")
DEFAULT_HEARTBEAT_STALE = 60 # seconds
DEFAULT_RESURRECT_AFTER = 120 # seconds
DEFAULT_MAX_REVIVES = 3
DEFAULT_POLL_INTERVAL = 15 # seconds
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class AgentHealth:
name: str
healthy: bool = True
last_seen: float = 0.0
revive_count: int = 0
last_status: str = "unknown"
@dataclass
class CellState:
uuid: str
path: Path
agents: dict[str, AgentHealth] = field(default_factory=dict)
status: str = "active"
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
@dataclass
class DaemonConfig:
cells_root: Path = DEFAULT_CELLS_ROOT
heartbeat_stale_threshold: int = DEFAULT_HEARTBEAT_STALE
resurrect_after: int = DEFAULT_RESURRECT_AFTER
max_revive_attempts: int = DEFAULT_MAX_REVIVES
poll_interval: int = DEFAULT_POLL_INTERVAL
log_level: str = "INFO"
log_file: str = "-"
pid_file: str = "/var/run/lazarus-pit.pid"
gitea_url: str = "https://forge.alexanderwhitestone.com"
gitea_repo: str = "Timmy_Foundation/the-nexus"
gitea_token: str = ""
open_issue_on_death: bool = True
close_issue_on_revive: bool = True
def load_config(config_path: Path) -> DaemonConfig:
"""Load configuration from TOML file, falling back to defaults."""
cfg = DaemonConfig()
if not config_path.exists():
return cfg
if tomllib is None:
logging.warning(
"TOML parser not available (install tomli for Python < 3.11). "
"Using defaults."
)
return cfg
with open(config_path, "rb") as f:
raw = tomllib.load(f)
cells = raw.get("cells", {})
daemon = raw.get("daemon", {})
gitea = raw.get("gitea", {})
notifications = raw.get("notifications", {})
cfg.cells_root = Path(cells.get("root", str(cfg.cells_root)))
cfg.heartbeat_stale_threshold = cells.get(
"heartbeat_stale_threshold", cfg.heartbeat_stale_threshold
)
cfg.resurrect_after = cells.get("resurrect_after", cfg.resurrect_after)
cfg.max_revive_attempts = cells.get("max_revive_attempts", cfg.max_revive_attempts)
cfg.poll_interval = cells.get("poll_interval", cfg.poll_interval)
cfg.log_level = daemon.get("log_level", cfg.log_level)
cfg.log_file = daemon.get("log_file", cfg.log_file)
cfg.pid_file = daemon.get("pid_file", cfg.pid_file)
cfg.gitea_url = gitea.get("url", cfg.gitea_url)
cfg.gitea_repo = gitea.get("repo", cfg.gitea_repo)
cfg.gitea_token = os.environ.get("GITEA_TOKEN", gitea.get("token", ""))
cfg.open_issue_on_death = notifications.get(
"open_issue_on_death", cfg.open_issue_on_death
)
cfg.close_issue_on_revive = notifications.get(
"close_issue_on_revive", cfg.close_issue_on_revive
)
return cfg
# ---------------------------------------------------------------------------
# Heartbeat reader
# ---------------------------------------------------------------------------
def read_agent_heartbeat(cell_path: Path, agent_name: str) -> Optional[dict]:
"""Read the heartbeat file for an agent in a cell. Returns None on failure."""
hb_path = cell_path / ".lazarus" / "heartbeats" / f"{agent_name}.json"
if not hb_path.exists():
return None
try:
return json.loads(hb_path.read_text())
except (json.JSONDecodeError, OSError):
return None
# ---------------------------------------------------------------------------
# Cell discovery
# ---------------------------------------------------------------------------
def discover_cells(cells_root: Path) -> list[CellState]:
"""Walk cells_root and return all active mission cells."""
cells: list[CellState] = []
if not cells_root.exists():
return cells
for entry in cells_root.iterdir():
if not entry.is_dir():
continue
manifest_path = entry / "cell.json"
if not manifest_path.exists():
continue
try:
manifest = json.loads(manifest_path.read_text())
except (json.JSONDecodeError, OSError):
continue
if manifest.get("status") not in ("active", "frozen"):
continue
cell = CellState(
uuid=manifest.get("uuid", entry.name),
path=entry,
status=manifest.get("status", "active"),
)
for agent_info in manifest.get("agents", []):
name = agent_info.get("name", "unknown")
cell.agents[name] = AgentHealth(name=name)
cells.append(cell)
return cells
# ---------------------------------------------------------------------------
# Health poll
# ---------------------------------------------------------------------------
def poll_cell(cell: CellState, cfg: DaemonConfig) -> list[str]:
"""
Poll a cell's agents. Returns a list of warning messages for any
agents whose heartbeat is stale.
"""
now = time.time()
warnings: list[str] = []
for agent_name, health in cell.agents.items():
hb = read_agent_heartbeat(cell.path, agent_name)
if hb is None:
health.healthy = False
warnings.append(
f"[{cell.uuid}] {agent_name}: no heartbeat file found"
)
continue
age = now - hb.get("timestamp", 0)
health.last_seen = hb.get("timestamp", 0)
health.last_status = hb.get("status", "unknown")
if age > cfg.heartbeat_stale_threshold:
health.healthy = False
warnings.append(
f"[{cell.uuid}] {agent_name}: heartbeat stale ({age:.0f}s old, "
f"threshold {cfg.heartbeat_stale_threshold}s)"
)
else:
health.healthy = True
return warnings
# ---------------------------------------------------------------------------
# Resurrection (stub — P3 will implement fully)
# ---------------------------------------------------------------------------
def attempt_revive(cell: CellState, agent_name: str, cfg: DaemonConfig) -> bool:
"""
Attempt to revive a downed agent into its mission cell.
This is a stub. Full implementation lands in P3 (#882).
Currently logs the intent and increments the revive counter.
"""
health = cell.agents.get(agent_name)
if health is None:
return False
if health.revive_count >= cfg.max_revive_attempts:
logging.error(
"[%s] %s: max revive attempts (%d) reached — escalating to human",
cell.uuid, agent_name, cfg.max_revive_attempts,
)
return False
health.revive_count += 1
logging.warning(
"[%s] %s: initiating resurrection attempt %d/%d (P3 stub)",
cell.uuid, agent_name, health.revive_count, cfg.max_revive_attempts,
)
# TODO (P3 #882): exec the agent's harness entrypoint inside the cell
return False
# ---------------------------------------------------------------------------
# Main daemon loop
# ---------------------------------------------------------------------------
class LazarusPit:
def __init__(self, cfg: DaemonConfig):
self.cfg = cfg
self._running = False
def start(self) -> None:
self._running = True
self._write_pid()
self._setup_signals()
logging.info("Lazarus Pit daemon started (poll interval: %ds)", self.cfg.poll_interval)
while self._running:
self._tick()
time.sleep(self.cfg.poll_interval)
logging.info("Lazarus Pit daemon stopped.")
self._remove_pid()
def _tick(self) -> None:
cells = discover_cells(self.cfg.cells_root)
if not cells:
logging.debug("No active mission cells found in %s", self.cfg.cells_root)
return
for cell in cells:
warnings = poll_cell(cell, self.cfg)
for msg in warnings:
logging.warning(msg)
now = time.time()
for agent_name, health in cell.agents.items():
if not health.healthy:
age = now - health.last_seen if health.last_seen else float("inf")
if age > self.cfg.resurrect_after:
attempt_revive(cell, agent_name, self.cfg)
def _write_pid(self) -> None:
pid_path = Path(self.cfg.pid_file)
try:
pid_path.parent.mkdir(parents=True, exist_ok=True)
pid_path.write_text(str(os.getpid()))
except OSError as e:
logging.warning("Could not write PID file: %s", e)
def _remove_pid(self) -> None:
try:
Path(self.cfg.pid_file).unlink(missing_ok=True)
except OSError:
pass
def _setup_signals(self) -> None:
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)
def _handle_shutdown(self, signum, frame) -> None:
logging.info("Signal %d received — shutting down.", signum)
self._running = False
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def cmd_status(cfg: DaemonConfig) -> None:
"""Print status of all active mission cells."""
cells = discover_cells(cfg.cells_root)
if not cells:
print(f"No active cells in {cfg.cells_root}")
return
for cell in cells:
print(f"\nCell {cell.uuid} [{cell.status}]")
for agent_name, health in cell.agents.items():
hb = read_agent_heartbeat(cell.path, agent_name)
if hb:
age = time.time() - hb.get("timestamp", 0)
print(f" {agent_name}: {hb.get('status', '?')} (heartbeat {age:.0f}s ago)")
else:
print(f" {agent_name}: no heartbeat")
def cmd_list_cells(cfg: DaemonConfig) -> None:
"""List all mission cells (active and otherwise)."""
root = cfg.cells_root
if not root.exists():
print(f"{root} does not exist")
return
found = False
for entry in sorted(root.iterdir()):
manifest = entry / "cell.json"
if manifest.exists():
try:
data = json.loads(manifest.read_text())
print(f"{data.get('uuid', entry.name)}: {data.get('status', '?')}{data.get('mission', '')}")
found = True
except (json.JSONDecodeError, OSError):
print(f"{entry.name}: [unreadable manifest]")
found = True
if not found:
print(f"No mission cells found in {root}")
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Lazarus Pit — agent resurrection pool daemon",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--config",
default=str(DEFAULT_CONFIG_PATH),
help="Path to lazarus-pit.toml (default: config/lazarus-pit.toml)",
)
parser.add_argument(
"--status",
action="store_true",
help="Print current cell/agent health and exit",
)
parser.add_argument(
"--list-cells",
action="store_true",
help="List all mission cells and exit",
)
return parser
def main() -> None:
args = build_arg_parser().parse_args()
cfg = load_config(Path(args.config))
# Configure logging
log_format = "%(asctime)s [%(levelname)s] %(message)s"
log_level = getattr(logging, cfg.log_level.upper(), logging.INFO)
if cfg.log_file == "-":
logging.basicConfig(level=log_level, format=log_format)
else:
logging.basicConfig(level=log_level, format=log_format, filename=cfg.log_file)
if args.status:
cmd_status(cfg)
return
if args.list_cells:
cmd_list_cells(cfg)
return
pit = LazarusPit(cfg)
pit.start()
if __name__ == "__main__":
main()

View File

@@ -1,940 +0,0 @@
#!/usr/bin/env python3
"""
Resurrection Pool — Fleet Health Polling, Dead-Agent Detection, Auto-Revive
Monitors all wizard gateways in the fleet, detects downed agents, and
applies configurable revive policies per mission cell:
auto — restart the agent immediately on death detection
ask — send a Telegram / Nostr approval request; revive when approved
never — record the death, create a Gitea issue, do nothing else
HOW IT WORKS
============
1. Load a JSON config listing wizard gateways and mission cells.
2. Poll each gateway on a configurable interval (default: 60s).
- TCP-probe the WebSocket port (can the gateway hear the world?)
- Check for a running agent process via pgrep
- Read the heartbeat file for last-think timestamp
3. If an agent fails all three checks for longer than dead_threshold:
a. Mark it dead in the pool state.
b. Look up the mission it belongs to and read the revive_policy.
c. Execute the policy: auto-restart, request human approval, or no-op.
4. On auto-revive: spawn the configured revive_cmd, wait for the heartbeat
to return healthy, and log the recovery.
5. On ask-revive: post an approval request to Telegram / Nostr and wait
up to approval_timeout seconds for a "+1" or "revive" reply.
6. Create / close Gitea issues to track fleet health events.
CONFIGURATION
=============
Config file path is resolved in this order:
1. --config CLI flag
2. RESURRECTION_CONFIG env var
3. ~/.nexus/resurrection_pool.json
4. /etc/nexus/resurrection_pool.json
Minimal config skeleton (all fields optional, sane defaults everywhere):
{
"poll_interval": 60,
"dead_threshold": 300,
"approval_timeout": 300,
"gateways": [
{
"name": "timmy",
"host": "localhost",
"port": 8765,
"heartbeat_path": "~/.nexus/heartbeat.json",
"process_pattern": "nexus_think",
"revive_cmd": ["python", "/root/wizards/timmy/nexus_think.py"]
}
],
"missions": {
"some-mission-uuid": {
"label": "Portal Sprint",
"agents": ["timmy"],
"revive_policy": "ask"
}
},
"telegram": {
"bot_token": "...",
"chat_id": "-1001234567890"
},
"nostr": {
"relay_url": "wss://relay.damus.io",
"recipient_pubkey": "..."
},
"gitea": {
"url": "https://forge.example.com",
"token": "...",
"repo": "Timmy_Foundation/the-nexus"
}
}
USAGE
=====
# One-shot health check (good for cron / smoke tests)
python bin/resurrection_pool.py --once
# Continuous daemon (default)
python bin/resurrection_pool.py
# Custom config + dry-run (no side-effects)
python bin/resurrection_pool.py --config /etc/nexus/pool.json --dry-run
# Print current pool state as JSON
python bin/resurrection_pool.py --status
ZERO DEPENDENCIES
=================
Pure stdlib. No pip installs required on the monitoring host.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("nexus.resurrection_pool")
# ── Constants ────────────────────────────────────────────────────────
DEFAULT_POLL_INTERVAL = 60 # seconds between full fleet sweeps
DEFAULT_DEAD_THRESHOLD = 300 # seconds with no heartbeat = dead
DEFAULT_APPROVAL_TIMEOUT = 300 # seconds to wait for human approval
CONFIG_SEARCH_PATHS = [
Path.home() / ".nexus" / "resurrection_pool.json",
Path("/etc/nexus/resurrection_pool.json"),
]
POOL_LABEL = "resurrection-pool"
POOL_TITLE_PREFIX = "[pool]"
# ── Enums & simple types ─────────────────────────────────────────────
class RevivePolicy(str, Enum):
AUTO = "auto" # restart immediately, no human needed
ASK = "ask" # send approval request, wait for +1
NEVER = "never" # log + Gitea issue, do not restart
class AgentState(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded" # some checks failing but not yet dead
DEAD = "dead"
REVIVING = "reviving"
UNKNOWN = "unknown"
# ── Configuration dataclasses ────────────────────────────────────────
@dataclass
class GatewayConfig:
"""One wizard gateway to monitor."""
name: str
host: str = "localhost"
port: int = 8765
heartbeat_path: str = "~/.nexus/heartbeat.json"
process_pattern: str = "nexus_think"
revive_cmd: List[str] = field(default_factory=list)
@property
def resolved_heartbeat_path(self) -> Path:
return Path(self.heartbeat_path).expanduser()
@dataclass
class MissionConfig:
"""A mission cell with agents and a revive policy."""
mission_id: str
label: str = ""
agents: List[str] = field(default_factory=list)
revive_policy: RevivePolicy = RevivePolicy.ASK
@classmethod
def from_dict(cls, mission_id: str, data: dict) -> "MissionConfig":
policy_raw = data.get("revive_policy", RevivePolicy.ASK)
try:
policy = RevivePolicy(policy_raw)
except ValueError:
logger.warning("Unknown revive_policy %r for mission %s; defaulting to 'ask'",
policy_raw, mission_id)
policy = RevivePolicy.ASK
return cls(
mission_id=mission_id,
label=data.get("label", ""),
agents=data.get("agents", []),
revive_policy=policy,
)
@dataclass
class PoolConfig:
"""Full resurrection pool configuration."""
poll_interval: int = DEFAULT_POLL_INTERVAL
dead_threshold: int = DEFAULT_DEAD_THRESHOLD
approval_timeout: int = DEFAULT_APPROVAL_TIMEOUT
gateways: List[GatewayConfig] = field(default_factory=list)
missions: List[MissionConfig] = field(default_factory=list)
telegram_bot_token: str = ""
telegram_chat_id: str = ""
nostr_relay_url: str = ""
nostr_recipient_pubkey: str = ""
gitea_url: str = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
gitea_token: str = os.environ.get("GITEA_TOKEN", "")
gitea_repo: str = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
@classmethod
def from_dict(cls, data: dict) -> "PoolConfig":
tg = data.get("telegram", {})
ns = data.get("nostr", {})
git = data.get("gitea", {})
gateways = [
GatewayConfig(
name=gw["name"],
host=gw.get("host", "localhost"),
port=gw.get("port", 8765),
heartbeat_path=gw.get("heartbeat_path", "~/.nexus/heartbeat.json"),
process_pattern=gw.get("process_pattern", "nexus_think"),
revive_cmd=gw.get("revive_cmd", []),
)
for gw in data.get("gateways", [])
]
missions = [
MissionConfig.from_dict(mid, mdata)
for mid, mdata in data.get("missions", {}).items()
]
return cls(
poll_interval=data.get("poll_interval", DEFAULT_POLL_INTERVAL),
dead_threshold=data.get("dead_threshold", DEFAULT_DEAD_THRESHOLD),
approval_timeout=data.get("approval_timeout", DEFAULT_APPROVAL_TIMEOUT),
gateways=gateways,
missions=missions,
telegram_bot_token=tg.get("bot_token", os.environ.get("TELEGRAM_BOT_TOKEN", "")),
telegram_chat_id=tg.get("chat_id", os.environ.get("TELEGRAM_CHAT_ID", "")),
nostr_relay_url=ns.get("relay_url", ""),
nostr_recipient_pubkey=ns.get("recipient_pubkey", ""),
gitea_url=git.get("url", os.environ.get("GITEA_URL",
"https://forge.alexanderwhitestone.com")),
gitea_token=git.get("token", os.environ.get("GITEA_TOKEN", "")),
gitea_repo=git.get("repo", os.environ.get("NEXUS_REPO",
"Timmy_Foundation/the-nexus")),
)
@classmethod
def load(cls, path: Optional[Path] = None) -> "PoolConfig":
"""Load config from file, falling back through search paths."""
candidates = [path] if path else []
candidates += CONFIG_SEARCH_PATHS
env_path = os.environ.get("RESURRECTION_CONFIG")
if env_path:
candidates.insert(0, Path(env_path))
for candidate in candidates:
if candidate and candidate.exists():
try:
data = json.loads(candidate.read_text())
logger.info("Loaded config from %s", candidate)
return cls.from_dict(data)
except (json.JSONDecodeError, KeyError) as e:
logger.warning("Failed to parse %s: %s", candidate, e)
logger.info("No config file found; using defaults (no gateways configured)")
return cls()
# ── Per-agent health state ───────────────────────────────────────────
@dataclass
class AgentHealth:
"""Runtime health snapshot for one agent gateway."""
name: str
state: AgentState = AgentState.UNKNOWN
gateway_alive: bool = False
process_alive: bool = False
heartbeat_age: Optional[float] = None # seconds since last heartbeat
last_checked: float = field(default_factory=time.time)
first_dead_at: Optional[float] = None # when death was first detected
revive_pending: bool = False # approval request outstanding
revive_attempts: int = 0
@property
def consecutive_dead_seconds(self) -> float:
if self.first_dead_at is None:
return 0.0
return time.time() - self.first_dead_at
def to_dict(self) -> dict:
return {
"name": self.name,
"state": self.state.value,
"gateway_alive": self.gateway_alive,
"process_alive": self.process_alive,
"heartbeat_age": self.heartbeat_age,
"last_checked": self.last_checked,
"first_dead_at": self.first_dead_at,
"revive_pending": self.revive_pending,
"revive_attempts": self.revive_attempts,
}
# ── Health probes ────────────────────────────────────────────────────
def probe_gateway(host: str, port: int, timeout: int = 5) -> Tuple[bool, str]:
"""TCP probe a wizard gateway. Returns (alive, message)."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
errno = sock.connect_ex((host, port))
sock.close()
if errno == 0:
return True, f"TCP:{host}:{port} reachable"
return False, f"TCP:{host}:{port} refused (errno={errno})"
except Exception as e:
return False, f"TCP probe failed: {e}"
def probe_process(pattern: str) -> Tuple[bool, str]:
"""pgrep-based process check. Returns (alive, message)."""
try:
result = subprocess.run(
["pgrep", "-f", pattern],
capture_output=True, text=True, timeout=5,
)
if result.returncode == 0:
pids = [p.strip() for p in result.stdout.strip().split("\n")
if p.strip() and p.strip() != str(os.getpid())]
if pids:
return True, f"Running (PID: {', '.join(pids)})"
return False, f"No process matching {pattern!r}"
except FileNotFoundError:
return True, "pgrep unavailable; skipping process check"
except Exception as e:
return False, f"Process probe failed: {e}"
def probe_heartbeat(path: Path, dead_threshold: int) -> Tuple[bool, Optional[float], str]:
"""Read a heartbeat file. Returns (alive, age_seconds, message)."""
if not path.exists():
return False, None, f"No heartbeat file at {path}"
try:
data = json.loads(path.read_text())
except (json.JSONDecodeError, OSError) as e:
return False, None, f"Heartbeat file unreadable: {e}"
ts = data.get("timestamp", 0)
age = time.time() - ts
cycle = data.get("cycle", "?")
model = data.get("model", "unknown")
if age > dead_threshold:
return False, age, (
f"Stale heartbeat: {int(age)}s ago (threshold {dead_threshold}s), "
f"cycle={cycle}, model={model}"
)
return True, age, f"Alive: cycle={cycle}, {int(age)}s ago, model={model}"
def assess_agent(gw: GatewayConfig, dead_threshold: int) -> AgentHealth:
"""Run all probes for one gateway and return an AgentHealth snapshot."""
health = AgentHealth(name=gw.name, last_checked=time.time())
gw_ok, gw_msg = probe_gateway(gw.host, gw.port)
proc_ok, proc_msg = probe_process(gw.process_pattern)
hb_ok, hb_age, hb_msg = probe_heartbeat(gw.resolved_heartbeat_path, dead_threshold)
health.gateway_alive = gw_ok
health.process_alive = proc_ok
health.heartbeat_age = hb_age
logger.debug("[%s] gateway=%s process=%s heartbeat=%s",
gw.name, gw_ok, proc_ok, hb_ok)
logger.debug("[%s] gw: %s | proc: %s | hb: %s",
gw.name, gw_msg, proc_msg, hb_msg)
checks_ok = sum([gw_ok, proc_ok, hb_ok])
if checks_ok == 3:
health.state = AgentState.HEALTHY
elif checks_ok == 0:
health.state = AgentState.DEAD
else:
health.state = AgentState.DEGRADED
return health
# ── Notification channels ────────────────────────────────────────────
def _http(method: str, url: str, data: Optional[dict] = None,
headers: Optional[dict] = None) -> Optional[Any]:
"""Minimal HTTP helper. Returns parsed JSON or None on failure."""
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, method=method)
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
if headers:
for k, v in headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode()
return json.loads(raw) if raw.strip() else {}
except urllib.error.HTTPError as e:
logger.warning("HTTP %d %s: %s", e.code, url, e.read().decode()[:200])
except Exception as e:
logger.warning("HTTP error %s: %s", url, e)
return None
def send_telegram_approval_request(
bot_token: str,
chat_id: str,
agent_name: str,
mission_label: str,
dead_seconds: float,
dry_run: bool = False,
) -> bool:
"""
Post a Telegram message asking for revive approval.
The operator should reply with "revive <agent_name>" or "+1" to approve.
Returns True if the message was sent successfully.
"""
if dry_run:
logger.info("[DRY-RUN] Would send Telegram approval request for %s", agent_name)
return True
if not bot_token or not chat_id:
logger.warning("Telegram not configured; cannot send approval request for %s", agent_name)
return False
text = (
f"🔴 *Resurrection Pool Alert*\n\n"
f"Agent *{agent_name}* has been dead for {int(dead_seconds)}s.\n"
f"Mission: _{mission_label or 'unassigned'}_\n\n"
f"Reply `revive {agent_name}` to authorize restart.\n"
f"Reply `skip {agent_name}` to suppress for this cycle."
)
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
result = _http("POST", url, {
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown",
})
if result and result.get("ok"):
logger.info("Telegram approval request sent for %s (message_id=%s)",
agent_name, result.get("result", {}).get("message_id"))
return True
logger.warning("Telegram send failed for %s: %s", agent_name, result)
return False
def poll_telegram_for_approval(
bot_token: str,
agent_name: str,
timeout_seconds: int,
dry_run: bool = False,
) -> bool:
"""
Poll Telegram getUpdates for a revive approval message.
Looks for messages containing "revive <agent_name>" (case-insensitive).
Returns True if approved within timeout, False otherwise.
"""
if dry_run:
logger.info("[DRY-RUN] Simulating Telegram approval for %s (auto-approve)", agent_name)
return True
if not bot_token:
return False
deadline = time.time() + timeout_seconds
offset = None
while time.time() < deadline:
params = "?timeout=30&limit=10"
if offset is not None:
params += f"&offset={offset}"
url = f"https://api.telegram.org/bot{bot_token}/getUpdates{params}"
result = _http("GET", url)
if not result or not result.get("ok"):
time.sleep(5)
continue
for update in result.get("result", []):
offset = update["update_id"] + 1
msg_text = (
update.get("message", {}).get("text", "")
or update.get("channel_post", {}).get("text", "")
).lower().strip()
if f"revive {agent_name.lower()}" in msg_text or msg_text == "+1":
logger.info("Telegram approval received for %s", agent_name)
return True
if f"skip {agent_name.lower()}" in msg_text:
logger.info("Telegram skip received for %s; aborting revive", agent_name)
return False
time.sleep(5)
logger.info("Approval timeout for %s after %ds", agent_name, timeout_seconds)
return False
def send_nostr_approval_request(
relay_url: str,
recipient_pubkey: str,
agent_name: str,
mission_label: str,
dead_seconds: float,
dry_run: bool = False,
) -> bool:
"""
Post a Nostr direct-message (kind 4) approval request.
Falls back gracefully if websockets is not installed — logs a warning
instead of crashing the pool. Returns True if sent, False otherwise.
"""
if dry_run:
logger.info("[DRY-RUN] Would send Nostr approval request for %s", agent_name)
return True
if not relay_url or not recipient_pubkey:
logger.debug("Nostr not configured; skipping DM for %s", agent_name)
return False
content = (
f"[Resurrection Pool] Agent '{agent_name}' is dead ({int(dead_seconds)}s). "
f"Mission: {mission_label or 'unassigned'}. "
f"Reply 'revive {agent_name}' to authorize restart."
)
try:
import asyncio
async def _send():
import websockets # type: ignore
event = {
"kind": 4,
"pubkey": "", # filled by relay/identity in prod
"created_at": int(time.time()),
"tags": [["p", recipient_pubkey]],
"content": content,
"id": "",
"sig": "",
}
try:
async with websockets.connect(relay_url, open_timeout=10) as ws:
await ws.send(json.dumps(["EVENT", event]))
logger.info("Nostr DM sent for %s via %s", agent_name, relay_url)
return True
except Exception as e:
logger.warning("Nostr send failed for %s: %s", agent_name, e)
return False
return asyncio.run(_send())
except ImportError:
logger.warning("websockets library not installed; Nostr notifications disabled")
return False
except Exception as e:
logger.warning("Nostr notification error for %s: %s", agent_name, e)
return False
# ── Revive actions ───────────────────────────────────────────────────
def revive_agent(gw: GatewayConfig, dry_run: bool = False) -> bool:
"""
Attempt to restart an agent using its configured revive_cmd.
Returns True if the command was launched without error.
Does NOT wait for the agent to become healthy — the next poll cycle
will detect recovery via the heartbeat file.
"""
if not gw.revive_cmd:
logger.warning("[%s] No revive_cmd configured; cannot restart", gw.name)
return False
if dry_run:
logger.info("[DRY-RUN] Would exec: %s", gw.revive_cmd)
return True
try:
logger.info("[%s] Launching: %s", gw.name, gw.revive_cmd)
proc = subprocess.Popen(
gw.revive_cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True, # detach from our process group
)
logger.info("[%s] Revive process started (PID %d)", gw.name, proc.pid)
return True
except FileNotFoundError as e:
logger.error("[%s] Revive cmd not found: %s", gw.name, e)
except PermissionError as e:
logger.error("[%s] Revive cmd permission denied: %s", gw.name, e)
except Exception as e:
logger.error("[%s] Revive failed: %s", gw.name, e)
return False
# ── Gitea integration ────────────────────────────────────────────────
def _gitea(method: str, path: str, cfg: PoolConfig, data: Optional[dict] = None) -> Any:
"""Gitea API helper."""
url = f"{cfg.gitea_url.rstrip('/')}/api/v1{path}"
headers = {}
if cfg.gitea_token:
headers["Authorization"] = f"token {cfg.gitea_token}"
return _http(method, url, data=data, headers=headers)
def find_open_pool_issue(cfg: PoolConfig, agent_name: str) -> Optional[dict]:
"""Look for an existing open pool issue for this agent."""
issues = _gitea("GET",
f"/repos/{cfg.gitea_repo}/issues"
f"?state=open&type=issues&limit=50",
cfg)
if not issues or not isinstance(issues, list):
return None
prefix = f"{POOL_TITLE_PREFIX} {agent_name}:"
for issue in issues:
if issue.get("title", "").startswith(prefix):
return issue
return None
def open_death_issue(cfg: PoolConfig, health: AgentHealth, mission_label: str) -> None:
"""Create a Gitea issue recording an agent death."""
if not cfg.gitea_token:
logger.debug("No Gitea token; skipping issue creation for %s", health.name)
return
title = f"{POOL_TITLE_PREFIX} {health.name}: agent dead ({int(health.consecutive_dead_seconds)}s)"
body = (
f"## 🔴 Agent Death Detected — {health.name}\n\n"
f"**Mission:** {mission_label or 'unassigned'} \n"
f"**Dead for:** {int(health.consecutive_dead_seconds)}s \n"
f"**Gateway alive:** {health.gateway_alive} \n"
f"**Process alive:** {health.process_alive} \n"
f"**Heartbeat age:** "
f"{f'{int(health.heartbeat_age)}s' if health.heartbeat_age is not None else 'N/A'} \n\n"
f"*Logged by Resurrection Pool at "
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"
)
result = _gitea("POST", f"/repos/{cfg.gitea_repo}/issues", cfg,
data={"title": title, "body": body})
if result and result.get("number"):
logger.info("Opened death issue #%d for %s", result["number"], health.name)
def close_death_issue(cfg: PoolConfig, agent_name: str, revived: bool) -> None:
"""Close an existing death issue when the agent recovers."""
issue = find_open_pool_issue(cfg, agent_name)
if not issue:
return
num = issue["number"]
verb = "revived and healthy" if revived else "recovered (external)"
_gitea("POST", f"/repos/{cfg.gitea_repo}/issues/{num}/comments", cfg,
data={"body": f"## 🟢 Agent {agent_name}{verb}\n\n"
f"*Resurrection Pool confirmed recovery at "
f"{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}*"})
_gitea("PATCH", f"/repos/{cfg.gitea_repo}/issues/{num}", cfg,
data={"state": "closed"})
logger.info("Closed death issue #%d for %s", num, agent_name)
# ── Policy engine ────────────────────────────────────────────────────
def apply_revive_policy(
gw: GatewayConfig,
health: AgentHealth,
mission: Optional[MissionConfig],
cfg: PoolConfig,
dry_run: bool = False,
) -> bool:
"""
Execute the revive policy for a dead agent.
Returns True if a revive was attempted, False otherwise.
"""
policy = mission.revive_policy if mission else RevivePolicy.ASK
mission_label = mission.label if mission else ""
logger.info("[%s] Applying revive policy=%s (dead %ds)",
gw.name, policy.value, int(health.consecutive_dead_seconds))
if policy == RevivePolicy.NEVER:
logger.info("[%s] Policy=never — recording death, no restart", gw.name)
open_death_issue(cfg, health, mission_label)
return False
if policy == RevivePolicy.AUTO:
logger.info("[%s] Policy=auto — restarting immediately", gw.name)
ok = revive_agent(gw, dry_run=dry_run)
if ok:
logger.info("[%s] Auto-revive launched", gw.name)
else:
logger.warning("[%s] Auto-revive failed — opening Gitea issue", gw.name)
open_death_issue(cfg, health, mission_label)
return ok
# policy == RevivePolicy.ASK
if health.revive_pending:
logger.debug("[%s] Approval already pending; waiting", gw.name)
return False
# Send approval requests over all configured channels
health.revive_pending = True
tg_sent = send_telegram_approval_request(
cfg.telegram_bot_token, cfg.telegram_chat_id,
gw.name, mission_label, health.consecutive_dead_seconds,
dry_run=dry_run,
)
send_nostr_approval_request(
cfg.nostr_relay_url, cfg.nostr_recipient_pubkey,
gw.name, mission_label, health.consecutive_dead_seconds,
dry_run=dry_run,
)
if not tg_sent and not cfg.nostr_relay_url:
logger.warning(
"[%s] No approval channel available (configure telegram or nostr). "
"Falling back to auto-revive.", gw.name
)
health.revive_pending = False
return revive_agent(gw, dry_run=dry_run)
# Poll for approval
approved = poll_telegram_for_approval(
cfg.telegram_bot_token, gw.name,
cfg.approval_timeout, dry_run=dry_run,
)
health.revive_pending = False
if approved:
logger.info("[%s] Approval received — launching revive", gw.name)
ok = revive_agent(gw, dry_run=dry_run)
if not ok:
open_death_issue(cfg, health, mission_label)
return ok
logger.info("[%s] Revive not approved — recording death", gw.name)
open_death_issue(cfg, health, mission_label)
return False
# ── Main pool loop ───────────────────────────────────────────────────
class ResurrectionPool:
"""
Fleet-wide health monitor and revive coordinator.
Maintains an in-memory health table keyed by agent name.
Drives one poll-sweep per interval across all configured gateways.
"""
def __init__(self, cfg: PoolConfig, dry_run: bool = False):
self.cfg = cfg
self.dry_run = dry_run
self._health: Dict[str, AgentHealth] = {}
self._running = False
# ── Agent-to-mission lookup ──────────────────────────────────────
def _mission_for(self, agent_name: str) -> Optional[MissionConfig]:
"""Return the first mission that lists this agent."""
for m in self.cfg.missions:
if agent_name in m.agents:
return m
return None
# ── Gateway lookup ───────────────────────────────────────────────
def _gateway_for(self, agent_name: str) -> Optional[GatewayConfig]:
for gw in self.cfg.gateways:
if gw.name == agent_name:
return gw
return None
# ── Health state tracking ────────────────────────────────────────
def _update_health(self, agent_name: str, new: AgentHealth) -> AgentHealth:
"""
Merge a fresh health snapshot into the pool's running state.
- Preserves first_dead_at timestamp so dead_seconds accumulates.
- Resets first_dead_at when agent returns healthy.
- Tracks revive_attempts across cycles.
"""
prev = self._health.get(agent_name)
if new.state == AgentState.HEALTHY:
new.first_dead_at = None
new.revive_attempts = 0
new.revive_pending = False
if prev and prev.state == AgentState.DEAD:
logger.info("[%s] Agent recovered!", agent_name)
close_death_issue(self.cfg, agent_name, revived=prev.revive_attempts > 0)
else:
if prev:
new.revive_pending = prev.revive_pending
new.revive_attempts = prev.revive_attempts
if prev.first_dead_at is not None:
new.first_dead_at = prev.first_dead_at
elif new.state in (AgentState.DEAD, AgentState.DEGRADED):
new.first_dead_at = new.last_checked
else:
new.first_dead_at = new.last_checked
self._health[agent_name] = new
return new
# ── Single sweep ─────────────────────────────────────────────────
def sweep(self) -> Dict[str, AgentHealth]:
"""Poll all gateways once and apply revive policies where needed."""
if not self.cfg.gateways:
logger.debug("No gateways configured; nothing to sweep")
return {}
for gw in self.cfg.gateways:
raw = assess_agent(gw, self.cfg.dead_threshold)
health = self._update_health(gw.name, raw)
icon = {"healthy": "", "degraded": "⚠️", "dead": "🔴",
"reviving": "🔄", "unknown": ""}.get(health.state.value, "?")
logger.info("%s [%s] state=%s dead_for=%ds",
icon, gw.name, health.state.value,
int(health.consecutive_dead_seconds))
if health.state == AgentState.DEAD:
if health.consecutive_dead_seconds >= self.cfg.dead_threshold:
if not health.revive_pending:
mission = self._mission_for(gw.name)
launched = apply_revive_policy(
gw, health, mission, self.cfg, dry_run=self.dry_run
)
if launched:
health.revive_attempts += 1
health.state = AgentState.REVIVING
return dict(self._health)
def status(self) -> List[dict]:
"""Return current pool state as a list of dicts (for --status flag)."""
return [h.to_dict() for h in self._health.values()]
# ── Run loop ─────────────────────────────────────────────────────
def run(self) -> None:
"""Blocking poll loop. Handles SIGTERM/SIGINT cleanly."""
self._running = True
def _stop(signum, frame):
logger.info("Signal %d received; stopping pool", signum)
self._running = False
signal.signal(signal.SIGTERM, _stop)
signal.signal(signal.SIGINT, _stop)
logger.info(
"Resurrection Pool starting — %d gateway(s), poll_interval=%ds, dead_threshold=%ds",
len(self.cfg.gateways), self.cfg.poll_interval, self.cfg.dead_threshold,
)
while self._running:
self.sweep()
# Sleep in 1s ticks so we respond to signals promptly
for _ in range(self.cfg.poll_interval):
if not self._running:
break
time.sleep(1)
logger.info("Resurrection Pool stopped")
def run_once(self) -> bool:
"""Single sweep; returns True if all agents are healthy."""
results = self.sweep()
return all(h.state == AgentState.HEALTHY for h in results.values())
# ── CLI ──────────────────────────────────────────────────────────────
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Resurrection Pool — fleet health polling and auto-revive",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument("--config", metavar="PATH",
help="Path to resurrection_pool.json config file")
p.add_argument("--once", action="store_true",
help="Run one sweep and exit (for cron / smoke tests)")
p.add_argument("--status", action="store_true",
help="Print current pool state as JSON and exit")
p.add_argument("--dry-run", action="store_true",
help="Run all checks but do not restart agents or alert channels")
p.add_argument("--poll-interval", type=int, default=None,
help="Override poll interval (seconds)")
p.add_argument("--dead-threshold", type=int, default=None,
help="Override dead threshold (seconds)")
p.add_argument("-v", "--verbose", action="store_true",
help="Enable DEBUG logging")
return p
def main() -> None:
args = build_parser().parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
cfg = PoolConfig.load(Path(args.config) if args.config else None)
if args.poll_interval is not None:
cfg.poll_interval = args.poll_interval
if args.dead_threshold is not None:
cfg.dead_threshold = args.dead_threshold
pool = ResurrectionPool(cfg, dry_run=args.dry_run)
if args.status:
# Print last-known state (empty if never run)
pool.sweep()
print(json.dumps(pool.status(), indent=2))
return
if args.once:
healthy = pool.run_once()
sys.exit(0 if healthy else 1)
pool.run()
if __name__ == "__main__":
main()

52
config/lazarus-pit.toml Normal file
View File

@@ -0,0 +1,52 @@
# lazarus-pit.toml — Daemon configuration for the Lazarus Pit resurrection pool
# See docs/lazarus-pit/mission-cell-spec.md for architecture context.
# Epic: #878 · Phase P0 Foundation: #879
[daemon]
# PID file location
pid_file = "/var/run/lazarus-pit.pid"
# Log file (use "-" for stdout)
log_file = "/var/log/lazarus-pit.log"
# Log level: DEBUG | INFO | WARNING | ERROR
log_level = "INFO"
[cells]
# Root directory for all mission cells
root = "/var/missions"
# Seconds of silence before an agent is considered stale
heartbeat_stale_threshold = 60
# Seconds of stale heartbeat before attempting resurrection
resurrect_after = 120
# Maximum auto-revive attempts per agent per session
max_revive_attempts = 3
# Poll interval in seconds for health checks
poll_interval = 15
[gateway]
# WebSocket gateway to poll for liveness
host = "localhost"
port = 8765
# HTTP health endpoint exposed by the gateway
health_host = "localhost"
health_port = 8766
[gitea]
# Gitea instance for filing resurrection / incident reports
url = "https://forge.alexanderwhitestone.com"
repo = "Timmy_Foundation/the-nexus"
# Token read from environment: GITEA_TOKEN
# token = "" # Do not commit real tokens
[notifications]
# Whether to open a Gitea issue when an agent goes dark
open_issue_on_death = true
# Whether to close the issue when the agent is successfully revived
close_issue_on_revive = true

View File

@@ -0,0 +1,156 @@
# Mission Cell Directory Specification
**Version:** 1.0
**Status:** Canonical
**Epic:** #878 — The Lazarus Pit & Mission Cell Isolation
---
## Overview
A **Mission Cell** is an ephemeral, isolated working directory provisioned for a specific
multi-agent collaboration. Each cell is scoped to a single mission (project, task, or
incident) and is identified by a UUID. No wizard's home directory is ever touched by
another agent's work within a cell.
---
## Root Path
```
/var/missions/<uuid>/
```
`<uuid>` is a version-4 UUID, e.g. `a3f7c901-1234-4b5e-8def-000000000001`.
---
## Directory Layout
```
/var/missions/<uuid>/
├── cell.json # Cell manifest (identity, status, agents, timestamps)
├── workspace/ # Shared working directory — agents write here
│ └── ...
├── logs/
│ ├── events.jsonl # Append-only event stream (invitation, join, leave, etc.)
│ └── <agent-name>.log # Per-agent stdout/stderr capture
├── checkpoints/ # Snapshot archives for P2 checkpoint/restore
│ └── <timestamp>.tar.gz
├── bus/ # Mission message bus (P4 multi-agent teaming)
│ ├── inbox/ # Per-agent inboxes
│ │ └── <agent-name>/
│ └── outbox/
└── .lazarus/ # Daemon metadata (written by lazarus-pit, not agents)
├── heartbeats/
│ └── <agent-name>.json # Last heartbeat per agent
└── state.json # Cell lifecycle state (active, frozen, archived)
```
---
## File Specs
### `cell.json` — Cell Manifest
```json
{
"uuid": "a3f7c901-1234-4b5e-8def-000000000001",
"name": "optional human-readable name",
"mission": "Short description of the mission objective",
"created_at": "2026-04-06T00:00:00Z",
"created_by": "agent-name or username",
"status": "active",
"agents": [
{
"name": "claude",
"role": "developer",
"joined_at": "2026-04-06T00:01:00Z",
"home": "/root/wizards/claude"
}
],
"gitea_issue": 879,
"repo": "Timmy_Foundation/the-nexus"
}
```
**Status values:** `active` | `frozen` | `archived` | `destroyed`
### `logs/events.jsonl` — Event Stream
One JSON object per line, append-only:
```json
{"ts": "2026-04-06T00:00:00Z", "event": "cell_created", "by": "allegro", "uuid": "..."}
{"ts": "2026-04-06T00:01:00Z", "event": "agent_joined", "agent": "claude", "role": "developer"}
{"ts": "2026-04-06T00:02:00Z", "event": "heartbeat", "agent": "claude", "status": "thinking"}
{"ts": "2026-04-06T01:00:00Z", "event": "agent_left", "agent": "claude"}
{"ts": "2026-04-06T01:01:00Z", "event": "cell_archived"}
```
### `.lazarus/heartbeats/<agent-name>.json` — Per-Agent Heartbeat
Written by agents, monitored by the lazarus-pit daemon:
```json
{
"agent": "claude",
"pid": 12345,
"timestamp": 1744000000.0,
"cycle": 42,
"model": "claude-opus-4-6",
"status": "thinking",
"cell_uuid": "a3f7c901-1234-4b5e-8def-000000000001"
}
```
### `.lazarus/state.json` — Daemon State
Written exclusively by `lazarus-pit`, never by agents:
```json
{
"cell_uuid": "a3f7c901-1234-4b5e-8def-000000000001",
"daemon_pid": 99001,
"last_poll": 1744000000.0,
"agent_health": {
"claude": {"healthy": true, "last_seen": 1744000000.0, "revive_count": 0}
}
}
```
---
## Lifecycle
```
provision → active → [frozen] → archived → destroyed
```
| Transition | Trigger | Actor |
|--------------|--------------------------------------------|---------------|
| provision | Invitation accepted / mission created | allegro / CLI |
| active | At least one agent joined | lazarus-pit |
| frozen | Checkpoint requested or idle timeout | lazarus-pit |
| archived | Mission complete, cell preserved | lazarus-pit |
| destroyed | Explicit teardown, no home dirs touched | CLI / daemon |
---
## Isolation Guarantees
- No agent process within a cell may write outside `/var/missions/<uuid>/`.
- Home directories (`/root/wizards/<name>/`) are read-only mounts or simply not present in the cell's working context.
- The cell UUID is injected as `LAZARUS_CELL_UUID` into each agent's environment.
- Destruction of a cell has zero impact on any wizard's home directory.
---
## Related Issues
- #878 — Epic
- #879 — P0 Foundation (this spec)
- #880 — P1 Invitation & Spawning
- #881 — P2 Checkpoint / Restore
- #882 — P3 Resurrection Pool
- #883 — P4 Multi-Agent Teaming

View File

@@ -3,12 +3,21 @@
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
the body (Evennia/Morrowind), and the visualization surface.
Health heartbeat endpoint (added in #879 — M6-P0 Foundation):
GET http://<host>:<HEALTH_PORT>/health
Returns 200 JSON with gateway status and connected-client count.
Used by the lazarus-pit daemon to verify the gateway is alive.
"""
import asyncio
import json
import logging
import os
import signal
import sys
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
from typing import Set
import websockets
@@ -17,6 +26,13 @@ import websockets
PORT = 8765
HOST = "0.0.0.0" # Allow external connections if needed
# Health heartbeat endpoint — monitored by lazarus-pit daemon (#879)
HEALTH_PORT = int(os.environ.get("NEXUS_HEALTH_PORT", "8766"))
HEALTH_HOST = os.environ.get("NEXUS_HEALTH_HOST", "127.0.0.1")
# Gateway start time for uptime reporting
_start_time = time.time()
# Logging setup
logging.basicConfig(
level=logging.INFO,
@@ -28,6 +44,53 @@ logger = logging.getLogger("nexus-gateway")
# State
clients: Set[websockets.WebSocketServerProtocol] = set()
# ---------------------------------------------------------------------------
# Health heartbeat HTTP endpoint — consumed by lazarus-pit (#879)
# ---------------------------------------------------------------------------
class _HealthHandler(BaseHTTPRequestHandler):
"""Minimal HTTP handler for the /health liveness endpoint."""
def do_GET(self): # noqa: N802
if self.path != "/health":
self.send_response(404)
self.end_headers()
return
payload = {
"status": "ok",
"service": "nexus-gateway",
"uptime_seconds": round(time.time() - _start_time, 1),
"connected_clients": len(clients),
"ws_port": PORT,
"ts": time.time(),
}
body = json.dumps(payload).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt, *args): # noqa: N802
# Suppress default access log spam; use our own logger for errors only
if args and str(args[1]) not in ("200", "204"):
logger.debug("Health endpoint: " + fmt % args)
def _start_health_server() -> Thread:
"""Start the health HTTP server in a background daemon thread."""
server = HTTPServer((HEALTH_HOST, HEALTH_PORT), _HealthHandler)
thread = Thread(target=server.serve_forever, daemon=True)
thread.start()
logger.info(
"Health heartbeat endpoint listening on http://%s:%d/health",
HEALTH_HOST, HEALTH_PORT,
)
return thread
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"""Handles individual client connections and message broadcasting."""
clients.add(websocket)
@@ -80,6 +143,7 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
async def main():
"""Main server loop with graceful shutdown."""
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
_start_health_server()
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()

View File

@@ -0,0 +1,129 @@
"""Tests for the gateway health heartbeat endpoint (#879 — M6-P0 Foundation).
Validates:
- /health returns 200 with correct JSON schema
- /health reports connected_clients count
- Non-/health paths return 404
- Uptime is non-negative
"""
import importlib.util
import json
import sys
import time
from http.server import HTTPServer
from io import BytesIO
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
# ── Load server module directly ──────────────────────────────────────────────
PROJECT_ROOT = Path(__file__).parent.parent
# Patch websockets import so we don't need the package installed to test health
_ws_mock = MagicMock()
sys.modules.setdefault("websockets", _ws_mock)
_srv_spec = importlib.util.spec_from_file_location(
"nexus_server_test",
PROJECT_ROOT / "server.py",
)
_srv = importlib.util.module_from_spec(_srv_spec)
sys.modules["nexus_server_test"] = _srv
_srv_spec.loader.exec_module(_srv)
_HealthHandler = _srv._HealthHandler
# ── Fake request helper ───────────────────────────────────────────────────────
class _FakeRequest:
"""Minimal socket-like object for BaseHTTPRequestHandler testing."""
def __init__(self, raw_bytes: bytes):
self._buf = BytesIO(raw_bytes)
self.sent = BytesIO()
def makefile(self, mode, **kwargs):
if "r" in mode:
return self._buf
return self.sent
def sendall(self, data: bytes):
self.sent.write(data)
def _invoke_handler(path: str) -> tuple[int, dict]:
"""Call the health handler for a GET request and return (status_code, body_dict)."""
raw = f"GET {path} HTTP/1.1\r\nHost: localhost\r\n\r\n".encode()
request = _FakeRequest(raw)
handler = _HealthHandler.__new__(_HealthHandler)
handler.rfile = BytesIO(raw)
handler.wfile = request.sent
handler.client_address = ("127.0.0.1", 9999)
handler.server = MagicMock()
handler.request_version = "HTTP/1.1"
handler.command = "GET"
handler.path = path
handler.headers = {}
# Capture response
responses: list[tuple] = []
handler.send_response = lambda code, *a: responses.append(("status", code))
handler.send_header = lambda k, v: None
handler.end_headers = lambda: None
body_parts: list[bytes] = []
handler.wfile = MagicMock()
handler.wfile.write = lambda b: body_parts.append(b)
handler.do_GET()
status = responses[0][1] if responses else None
body = {}
if body_parts:
try:
body = json.loads(b"".join(body_parts))
except (json.JSONDecodeError, TypeError):
pass
return status, body
# ── Tests ─────────────────────────────────────────────────────────────────────
class TestHealthEndpoint:
def test_health_returns_200(self):
status, _ = _invoke_handler("/health")
assert status == 200
def test_health_body_schema(self):
_, body = _invoke_handler("/health")
assert body.get("status") == "ok"
assert body.get("service") == "nexus-gateway"
assert "uptime_seconds" in body
assert "connected_clients" in body
assert "ws_port" in body
assert "ts" in body
def test_uptime_is_non_negative(self):
_, body = _invoke_handler("/health")
assert body["uptime_seconds"] >= 0
def test_unknown_path_returns_404(self):
status, _ = _invoke_handler("/notfound")
assert status == 404
def test_root_path_returns_404(self):
status, _ = _invoke_handler("/")
assert status == 404
def test_connected_clients_reflects_module_state(self):
original = _srv.clients.copy()
try:
_srv.clients.clear()
_, body = _invoke_handler("/health")
assert body["connected_clients"] == 0
finally:
_srv.clients.update(original)

262
tests/test_lazarus_pit.py Normal file
View File

@@ -0,0 +1,262 @@
"""Tests for the lazarus-pit daemon skeleton.
Validates:
- Config loading (defaults and TOML overrides)
- Cell discovery from /var/missions structure
- Agent heartbeat reading
- Health poll logic (stale vs fresh)
- AgentHealth and CellState dataclasses
- CLI argument parsing
- Resurrection stub behaviour
"""
import importlib.util
import json
import sys
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# ── Load lazarus_pit module directly ────────────────────────────────────────
PROJECT_ROOT = Path(__file__).parent.parent
_lp_spec = importlib.util.spec_from_file_location(
"lazarus_pit_test",
PROJECT_ROOT / "bin" / "lazarus_pit.py",
)
_lp = importlib.util.module_from_spec(_lp_spec)
sys.modules["lazarus_pit_test"] = _lp
_lp_spec.loader.exec_module(_lp)
AgentHealth = _lp.AgentHealth
CellState = _lp.CellState
DaemonConfig = _lp.DaemonConfig
load_config = _lp.load_config
read_agent_heartbeat = _lp.read_agent_heartbeat
discover_cells = _lp.discover_cells
poll_cell = _lp.poll_cell
attempt_revive = _lp.attempt_revive
build_arg_parser = _lp.build_arg_parser
# ── Helpers ──────────────────────────────────────────────────────────────────
def _make_cell(tmp_path: Path, uuid: str, status: str = "active", agents=None) -> Path:
"""Create a minimal mission cell structure under tmp_path."""
cell_path = tmp_path / uuid
cell_path.mkdir()
manifest = {
"uuid": uuid,
"status": status,
"mission": "test mission",
"agents": [{"name": a} for a in (agents or ["claude"])],
}
(cell_path / "cell.json").write_text(json.dumps(manifest))
return cell_path
def _write_heartbeat(cell_path: Path, agent: str, age_seconds: float = 0) -> None:
"""Write a heartbeat file for an agent inside the cell."""
hb_dir = cell_path / ".lazarus" / "heartbeats"
hb_dir.mkdir(parents=True, exist_ok=True)
(hb_dir / f"{agent}.json").write_text(json.dumps({
"agent": agent,
"pid": 12345,
"timestamp": time.time() - age_seconds,
"cycle": 1,
"model": "test",
"status": "thinking",
}))
# ── Config ───────────────────────────────────────────────────────────────────
class TestDaemonConfig:
def test_defaults(self):
cfg = DaemonConfig()
assert cfg.poll_interval == 15
assert cfg.heartbeat_stale_threshold == 60
assert cfg.max_revive_attempts == 3
def test_load_missing_file_returns_defaults(self, tmp_path):
cfg = load_config(tmp_path / "nonexistent.toml")
assert cfg.poll_interval == 15
def test_load_valid_toml(self, tmp_path):
toml_content = b"""
[cells]
root = "/tmp/missions"
poll_interval = 30
heartbeat_stale_threshold = 90
"""
toml_path = tmp_path / "test.toml"
toml_path.write_bytes(toml_content)
try:
cfg = load_config(toml_path)
assert cfg.poll_interval == 30
assert cfg.heartbeat_stale_threshold == 90
assert str(cfg.cells_root) == "/tmp/missions"
except Exception:
# TOML parser not available — skip, not a test failure
pytest.skip("TOML parser not available")
# ── Cell discovery ───────────────────────────────────────────────────────────
class TestDiscoverCells:
def test_empty_root(self, tmp_path):
cells = discover_cells(tmp_path)
assert cells == []
def test_missing_root(self, tmp_path):
cells = discover_cells(tmp_path / "nonexistent")
assert cells == []
def test_finds_active_cell(self, tmp_path):
_make_cell(tmp_path, "aaa-111", status="active")
cells = discover_cells(tmp_path)
assert len(cells) == 1
assert cells[0].uuid == "aaa-111"
def test_finds_frozen_cell(self, tmp_path):
_make_cell(tmp_path, "bbb-222", status="frozen")
cells = discover_cells(tmp_path)
assert len(cells) == 1
def test_ignores_archived_cell(self, tmp_path):
_make_cell(tmp_path, "ccc-333", status="archived")
cells = discover_cells(tmp_path)
assert len(cells) == 0
def test_ignores_directories_without_manifest(self, tmp_path):
(tmp_path / "not-a-cell").mkdir()
cells = discover_cells(tmp_path)
assert cells == []
def test_ignores_corrupt_manifest(self, tmp_path):
cell_path = tmp_path / "bad-cell"
cell_path.mkdir()
(cell_path / "cell.json").write_text("not json {{")
cells = discover_cells(tmp_path)
assert cells == []
def test_agents_populated_from_manifest(self, tmp_path):
_make_cell(tmp_path, "ddd-444", agents=["claude", "allegro"])
cells = discover_cells(tmp_path)
assert set(cells[0].agents.keys()) == {"claude", "allegro"}
# ── Heartbeat reading ─────────────────────────────────────────────────────────
class TestReadAgentHeartbeat:
def test_returns_none_for_missing_file(self, tmp_path):
result = read_agent_heartbeat(tmp_path, "ghost")
assert result is None
def test_returns_data_for_valid_heartbeat(self, tmp_path):
_write_heartbeat(tmp_path, "claude", age_seconds=0)
data = read_agent_heartbeat(tmp_path, "claude")
assert data is not None
assert data["agent"] == "claude"
assert "timestamp" in data
def test_returns_none_for_corrupt_json(self, tmp_path):
hb_dir = tmp_path / ".lazarus" / "heartbeats"
hb_dir.mkdir(parents=True)
(hb_dir / "bad.json").write_text("{not valid")
result = read_agent_heartbeat(tmp_path, "bad")
assert result is None
# ── Health polling ────────────────────────────────────────────────────────────
class TestPollCell:
def _make_cell_state(self, tmp_path, agents=None):
agents = agents or ["claude"]
cell = CellState(uuid="test-uuid", path=tmp_path)
for a in agents:
cell.agents[a] = AgentHealth(name=a)
return cell
def test_healthy_agent_no_warnings(self, tmp_path):
cell = self._make_cell_state(tmp_path)
_write_heartbeat(tmp_path, "claude", age_seconds=5)
cfg = DaemonConfig(heartbeat_stale_threshold=60)
warnings = poll_cell(cell, cfg)
assert warnings == []
assert cell.agents["claude"].healthy is True
def test_stale_heartbeat_generates_warning(self, tmp_path):
cell = self._make_cell_state(tmp_path)
_write_heartbeat(tmp_path, "claude", age_seconds=120)
cfg = DaemonConfig(heartbeat_stale_threshold=60)
warnings = poll_cell(cell, cfg)
assert len(warnings) == 1
assert "stale" in warnings[0]
assert cell.agents["claude"].healthy is False
def test_missing_heartbeat_generates_warning(self, tmp_path):
cell = self._make_cell_state(tmp_path)
cfg = DaemonConfig(heartbeat_stale_threshold=60)
warnings = poll_cell(cell, cfg)
assert len(warnings) == 1
assert "no heartbeat" in warnings[0]
assert cell.agents["claude"].healthy is False
# ── Resurrection stub ─────────────────────────────────────────────────────────
class TestAttemptRevive:
def test_stub_returns_false(self, tmp_path):
cell = CellState(uuid="xyz", path=tmp_path)
cell.agents["claude"] = AgentHealth(name="claude", healthy=False)
cfg = DaemonConfig(max_revive_attempts=3)
result = attempt_revive(cell, "claude", cfg)
assert result is False
def test_increments_revive_count(self, tmp_path):
cell = CellState(uuid="xyz", path=tmp_path)
cell.agents["claude"] = AgentHealth(name="claude", healthy=False)
cfg = DaemonConfig(max_revive_attempts=3)
attempt_revive(cell, "claude", cfg)
assert cell.agents["claude"].revive_count == 1
def test_stops_at_max_revive_attempts(self, tmp_path):
cell = CellState(uuid="xyz", path=tmp_path)
cell.agents["claude"] = AgentHealth(name="claude", healthy=False, revive_count=3)
cfg = DaemonConfig(max_revive_attempts=3)
result = attempt_revive(cell, "claude", cfg)
assert result is False
# Count should not increment beyond max
assert cell.agents["claude"].revive_count == 3
def test_unknown_agent_returns_false(self, tmp_path):
cell = CellState(uuid="xyz", path=tmp_path)
cfg = DaemonConfig()
result = attempt_revive(cell, "nobody", cfg)
assert result is False
# ── CLI ───────────────────────────────────────────────────────────────────────
class TestCLI:
def test_default_config_path(self):
parser = build_arg_parser()
args = parser.parse_args([])
assert "lazarus-pit.toml" in args.config
def test_custom_config(self):
parser = build_arg_parser()
args = parser.parse_args(["--config", "/tmp/custom.toml"])
assert args.config == "/tmp/custom.toml"
def test_status_flag(self):
parser = build_arg_parser()
args = parser.parse_args(["--status"])
assert args.status is True
def test_list_cells_flag(self):
parser = build_arg_parser()
args = parser.parse_args(["--list-cells"])
assert args.list_cells is True

View File

@@ -1,659 +0,0 @@
"""Tests for the Resurrection Pool — health polling, dead-agent detection, auto-revive.
Validates:
- GatewayConfig / PoolConfig loading from dicts and files
- RevivePolicy enum parsing (including unknown values)
- probe_gateway / probe_process / probe_heartbeat functions
- assess_agent combining all three probes into AgentHealth
- AgentHealth state: HEALTHY / DEGRADED / DEAD
- ResurrectionPool._update_health — dead_at tracking and recovery
- ResurrectionPool.sweep — policy routing (auto/ask/never)
- apply_revive_policy — auto-restart, never, ask-with-no-channel fallback
- Telegram / Nostr dry-run paths
- Gitea issue helpers (mocked)
- CLI argument parsing
"""
from __future__ import annotations
import importlib.util
import json
import os
import sys
import tempfile
import time
from pathlib import Path
from unittest.mock import MagicMock, patch, call
import pytest
# ── Module loading (same pattern as test_nexus_watchdog.py) ──────────
PROJECT_ROOT = Path(__file__).parent.parent
_rp_spec = importlib.util.spec_from_file_location(
"resurrection_pool_test",
PROJECT_ROOT / "bin" / "resurrection_pool.py",
)
_rp = importlib.util.module_from_spec(_rp_spec)
sys.modules["resurrection_pool_test"] = _rp
_rp_spec.loader.exec_module(_rp)
# Convenience aliases
AgentHealth = _rp.AgentHealth
AgentState = _rp.AgentState
GatewayConfig = _rp.GatewayConfig
MissionConfig = _rp.MissionConfig
PoolConfig = _rp.PoolConfig
ResurrectionPool = _rp.ResurrectionPool
RevivePolicy = _rp.RevivePolicy
apply_revive_policy = _rp.apply_revive_policy
assess_agent = _rp.assess_agent
build_parser = _rp.build_parser
probe_gateway = _rp.probe_gateway
probe_heartbeat = _rp.probe_heartbeat
probe_process = _rp.probe_process
revive_agent = _rp.revive_agent
send_telegram_approval_request = _rp.send_telegram_approval_request
send_nostr_approval_request = _rp.send_nostr_approval_request
# ── Helpers ──────────────────────────────────────────────────────────
def _make_gw(name="timmy", host="localhost", port=8765,
heartbeat_path=None, process_pattern="nexus_think",
revive_cmd=None, tmp_path=None):
if heartbeat_path is None and tmp_path is not None:
heartbeat_path = str(tmp_path / "heartbeat.json")
elif heartbeat_path is None:
heartbeat_path = "~/.nexus/heartbeat.json"
return GatewayConfig(
name=name,
host=host,
port=port,
heartbeat_path=heartbeat_path,
process_pattern=process_pattern,
revive_cmd=revive_cmd or [],
)
def _write_heartbeat(path: Path, age_seconds: float = 10.0):
path.parent.mkdir(parents=True, exist_ok=True)
data = {
"pid": os.getpid(),
"timestamp": time.time() - age_seconds,
"cycle": 7,
"model": "timmy:v0.1",
"status": "thinking",
}
path.write_text(json.dumps(data))
def _make_cfg(**kwargs) -> PoolConfig:
return PoolConfig(**kwargs)
# ── RevivePolicy tests ───────────────────────────────────────────────
class TestRevivePolicy:
def test_valid_values(self):
assert RevivePolicy("auto") == RevivePolicy.AUTO
assert RevivePolicy("ask") == RevivePolicy.ASK
assert RevivePolicy("never") == RevivePolicy.NEVER
def test_mission_from_dict_unknown_policy(self):
m = MissionConfig.from_dict("m1", {"revive_policy": "bogus", "agents": ["a"]})
assert m.revive_policy == RevivePolicy.ASK
def test_mission_from_dict_valid_policy(self):
m = MissionConfig.from_dict("m1", {"revive_policy": "auto", "agents": ["x"]})
assert m.revive_policy == RevivePolicy.AUTO
def test_mission_agents_default_empty(self):
m = MissionConfig.from_dict("m2", {})
assert m.agents == []
# ── PoolConfig loading tests ─────────────────────────────────────────
class TestPoolConfig:
def test_defaults(self):
cfg = PoolConfig()
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
assert cfg.dead_threshold == _rp.DEFAULT_DEAD_THRESHOLD
assert cfg.gateways == []
assert cfg.missions == []
def test_from_dict_basic(self):
data = {
"poll_interval": 30,
"dead_threshold": 120,
"gateways": [{"name": "allegro", "port": 9000}],
"missions": {
"m1": {"label": "Sprint", "agents": ["allegro"], "revive_policy": "auto"},
},
}
cfg = PoolConfig.from_dict(data)
assert cfg.poll_interval == 30
assert cfg.dead_threshold == 120
assert len(cfg.gateways) == 1
assert cfg.gateways[0].name == "allegro"
assert cfg.gateways[0].port == 9000
assert len(cfg.missions) == 1
assert cfg.missions[0].revive_policy == RevivePolicy.AUTO
def test_from_dict_telegram(self):
data = {"telegram": {"bot_token": "tok123", "chat_id": "-999"}}
cfg = PoolConfig.from_dict(data)
assert cfg.telegram_bot_token == "tok123"
assert cfg.telegram_chat_id == "-999"
def test_from_dict_nostr(self):
data = {"nostr": {"relay_url": "wss://relay.damus.io", "recipient_pubkey": "abc"}}
cfg = PoolConfig.from_dict(data)
assert cfg.nostr_relay_url == "wss://relay.damus.io"
assert cfg.nostr_recipient_pubkey == "abc"
def test_load_from_file(self, tmp_path):
config_file = tmp_path / "pool.json"
config_file.write_text(json.dumps({"poll_interval": 42}))
cfg = PoolConfig.load(config_file)
assert cfg.poll_interval == 42
def test_load_missing_file_returns_defaults(self, tmp_path):
cfg = PoolConfig.load(tmp_path / "nonexistent.json")
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
def test_load_corrupt_file_falls_through(self, tmp_path):
bad = tmp_path / "bad.json"
bad.write_text("not json {{")
cfg = PoolConfig.load(bad)
assert cfg.poll_interval == _rp.DEFAULT_POLL_INTERVAL
def test_load_env_var(self, tmp_path, monkeypatch):
config_file = tmp_path / "env_pool.json"
config_file.write_text(json.dumps({"dead_threshold": 99}))
monkeypatch.setenv("RESURRECTION_CONFIG", str(config_file))
cfg = PoolConfig.load()
assert cfg.dead_threshold == 99
# ── probe_gateway tests ──────────────────────────────────────────────
class TestProbeGateway:
def test_refused_port(self):
# Port 1 should always be refused on localhost
alive, msg = probe_gateway("localhost", 1, timeout=2)
assert not alive
assert "refused" in msg.lower() or "failed" in msg.lower()
def test_open_port(self):
# Open a real server socket to confirm TCP probe works
import threading
import socket as _socket
srv = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
srv.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
srv.bind(("127.0.0.1", 0))
srv.listen(1)
port = srv.getsockname()[1]
def accept_one():
try:
conn, _ = srv.accept()
conn.close()
except Exception:
pass
finally:
srv.close()
t = threading.Thread(target=accept_one, daemon=True)
t.start()
alive, msg = probe_gateway("127.0.0.1", port, timeout=3)
t.join(timeout=2)
assert alive
assert "reachable" in msg.lower()
def test_invalid_host(self):
alive, msg = probe_gateway("this.host.does.not.exist.invalid", 8765, timeout=2)
assert not alive
# ── probe_process tests ──────────────────────────────────────────────
class TestProbeProcess:
def test_self_is_filtered(self):
# The test runner process matches "python" — but watchdog filters own PID.
# We just check it returns a tuple without error.
alive, msg = probe_process("definitely-no-process-xyz-12345")
assert not alive
assert isinstance(msg, str)
def test_running_process(self):
# pgrep for the current python process name should always find something
alive, msg = probe_process("pytest")
# May or may not find it depending on env; just check it doesn't crash
assert isinstance(alive, bool)
assert isinstance(msg, str)
@patch("subprocess.run", side_effect=FileNotFoundError)
def test_pgrep_missing(self, _mock):
alive, msg = probe_process("anything")
assert alive # graceful fallback: don't raise false alarms
assert "pgrep" in msg.lower()
# ── probe_heartbeat tests ────────────────────────────────────────────
class TestProbeHeartbeat:
def test_missing_file(self, tmp_path):
alive, age, msg = probe_heartbeat(tmp_path / "no.json", dead_threshold=300)
assert not alive
assert age is None
assert "no heartbeat" in msg.lower()
def test_fresh_heartbeat(self, tmp_path):
hb = tmp_path / "heartbeat.json"
_write_heartbeat(hb, age_seconds=10)
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
assert alive
assert age is not None and age < 20
assert "alive" in msg.lower()
def test_stale_heartbeat(self, tmp_path):
hb = tmp_path / "heartbeat.json"
_write_heartbeat(hb, age_seconds=600)
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
assert not alive
assert age > 500
assert "stale" in msg.lower()
def test_corrupt_json(self, tmp_path):
hb = tmp_path / "heartbeat.json"
hb.write_text("{ broken json")
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
assert not alive
assert "unreadable" in msg.lower()
def test_missing_timestamp_treated_as_stale(self, tmp_path):
hb = tmp_path / "heartbeat.json"
hb.write_text(json.dumps({"cycle": 1})) # no timestamp key
alive, age, msg = probe_heartbeat(hb, dead_threshold=300)
# timestamp defaults to 0 → very old → stale
assert not alive
# ── assess_agent tests ────────────────────────────────────────────────
class TestAssessAgent:
def _gw(self, tmp_path, port=1):
return _make_gw(tmp_path=tmp_path, port=port)
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
@patch("resurrection_pool_test.probe_process", return_value=(True, "ok"))
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 5.0, "ok"))
def test_all_healthy(self, _hb, _proc, _gw, tmp_path):
gw = self._gw(tmp_path)
h = assess_agent(gw, dead_threshold=300)
assert h.state == AgentState.HEALTHY
@patch("resurrection_pool_test.probe_gateway", return_value=(False, "refused"))
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
@patch("resurrection_pool_test.probe_heartbeat", return_value=(False, None, "missing"))
def test_all_dead(self, _hb, _proc, _gw, tmp_path):
gw = self._gw(tmp_path)
h = assess_agent(gw, dead_threshold=300)
assert h.state == AgentState.DEAD
@patch("resurrection_pool_test.probe_gateway", return_value=(True, "ok"))
@patch("resurrection_pool_test.probe_process", return_value=(False, "not found"))
@patch("resurrection_pool_test.probe_heartbeat", return_value=(True, 10.0, "ok"))
def test_partially_failing_is_degraded(self, _hb, _proc, _gw, tmp_path):
gw = self._gw(tmp_path)
h = assess_agent(gw, dead_threshold=300)
assert h.state == AgentState.DEGRADED
# ── AgentHealth state machine tests ──────────────────────────────────
class TestAgentHealthTracking:
def _pool(self):
cfg = _make_cfg()
return ResurrectionPool(cfg, dry_run=True)
def test_first_dead_at_set_on_first_failure(self):
pool = self._pool()
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
result = pool._update_health("timmy", dead)
assert result.first_dead_at is not None
def test_first_dead_at_preserved_across_cycles(self):
pool = self._pool()
t0 = time.time() - 100
dead1 = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=t0, first_dead_at=t0)
pool._update_health("timmy", dead1)
dead2 = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
result = pool._update_health("timmy", dead2)
assert result.first_dead_at == t0 # preserved
def test_recovery_resets_first_dead_at(self):
pool = self._pool()
dead = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
pool._update_health("timmy", dead)
healthy = AgentHealth(name="timmy", state=AgentState.HEALTHY,
last_checked=time.time())
result = pool._update_health("timmy", healthy)
assert result.first_dead_at is None
assert result.revive_attempts == 0
def test_revive_attempts_increments(self):
pool = self._pool()
h = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
pool._health["timmy"] = h
h.revive_attempts = 2
prev = pool._health["timmy"]
new = AgentHealth(name="timmy", state=AgentState.DEAD,
last_checked=time.time())
result = pool._update_health("timmy", new)
assert result.revive_attempts == 2 # preserved from prev
# ── mission_for / gateway_for tests ─────────────────────────────────
class TestPoolLookups:
def test_mission_for_known_agent(self):
cfg = PoolConfig.from_dict({
"missions": {"m1": {"agents": ["timmy"], "revive_policy": "auto"}},
})
pool = ResurrectionPool(cfg, dry_run=True)
m = pool._mission_for("timmy")
assert m is not None
assert m.mission_id == "m1"
def test_mission_for_unknown_agent(self):
cfg = PoolConfig()
pool = ResurrectionPool(cfg, dry_run=True)
assert pool._mission_for("ghost") is None
def test_gateway_for(self):
cfg = PoolConfig.from_dict({
"gateways": [{"name": "allegro", "port": 9000}],
})
pool = ResurrectionPool(cfg, dry_run=True)
gw = pool._gateway_for("allegro")
assert gw is not None
assert gw.port == 9000
def test_gateway_for_missing(self):
pool = ResurrectionPool(PoolConfig(), dry_run=True)
assert pool._gateway_for("nobody") is None
# ── apply_revive_policy tests ────────────────────────────────────────
class TestApplyRevivePolicy:
def _dead_health(self, name="timmy", dead_seconds=400):
h = AgentHealth(name=name, state=AgentState.DEAD,
first_dead_at=time.time() - dead_seconds,
last_checked=time.time())
return h
def test_never_policy_no_revive(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path)
mission = MissionConfig("m1", revive_policy=RevivePolicy.NEVER)
cfg = _make_cfg()
health = self._dead_health()
with patch.object(_rp, "open_death_issue") as mock_issue:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert not result
mock_issue.assert_called_once()
def test_auto_policy_launches_revive(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "revive"])
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
cfg = _make_cfg()
health = self._dead_health()
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert result
mock_revive.assert_called_once_with(gw, dry_run=True)
def test_auto_policy_opens_issue_on_revive_failure(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path)
mission = MissionConfig("m1", revive_policy=RevivePolicy.AUTO)
cfg = _make_cfg()
health = self._dead_health()
with patch.object(_rp, "revive_agent", return_value=False), \
patch.object(_rp, "open_death_issue") as mock_issue:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert not result
mock_issue.assert_called_once()
def test_ask_policy_dry_run_approves(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
health = self._dead_health()
# dry_run → send_telegram returns True, poll returns True
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert result
mock_revive.assert_called_once()
def test_ask_policy_no_channel_falls_back_to_auto(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
cfg = _make_cfg() # no telegram, no nostr
health = self._dead_health()
with patch.object(_rp, "revive_agent", return_value=True) as mock_revive:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=True)
assert result
mock_revive.assert_called_once()
def test_skip_if_revive_pending(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path)
mission = MissionConfig("m1", revive_policy=RevivePolicy.ASK)
cfg = _make_cfg(telegram_bot_token="tok", telegram_chat_id="-1")
health = self._dead_health()
health.revive_pending = True # approval already outstanding
with patch.object(_rp, "revive_agent") as mock_revive:
result = apply_revive_policy(gw, health, mission, cfg, dry_run=False)
assert not result
mock_revive.assert_not_called()
# ── revive_agent tests ───────────────────────────────────────────────
class TestReviveAgent:
def test_no_revive_cmd(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=[])
assert not revive_agent(gw)
def test_dry_run(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["echo", "ok"])
assert revive_agent(gw, dry_run=True)
def test_launches_process(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["true"])
result = revive_agent(gw, dry_run=False)
assert result
def test_bad_command(self, tmp_path):
gw = _make_gw(tmp_path=tmp_path, revive_cmd=["/does/not/exist/xyz"])
result = revive_agent(gw, dry_run=False)
assert not result
# ── Notification channel tests ────────────────────────────────────────
class TestNotifications:
def test_telegram_dry_run(self):
assert send_telegram_approval_request(
"token", "chat", "timmy", "Sprint", 400, dry_run=True
)
def test_telegram_no_token(self):
assert not send_telegram_approval_request(
"", "", "timmy", "Sprint", 400, dry_run=False
)
def test_nostr_dry_run(self):
result = send_nostr_approval_request(
"wss://relay.damus.io", "pubkey123",
"timmy", "Sprint", 400, dry_run=True
)
assert result
def test_nostr_no_config(self):
result = send_nostr_approval_request(
"", "",
"timmy", "Sprint", 400, dry_run=False
)
assert not result
# ── ResurrectionPool.sweep integration tests ─────────────────────────
class TestPoolSweep:
def test_sweep_empty_gateways(self):
pool = ResurrectionPool(PoolConfig(), dry_run=True)
result = pool.sweep()
assert result == {}
@patch("resurrection_pool_test.assess_agent")
def test_sweep_healthy_agent_no_policy_triggered(self, mock_assess):
gw = GatewayConfig(name="timmy")
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.HEALTHY,
gateway_alive=True, process_alive=True, heartbeat_age=5.0,
)
with patch.object(_rp, "apply_revive_policy") as mock_policy:
pool.sweep()
mock_policy.assert_not_called()
@patch("resurrection_pool_test.assess_agent")
def test_sweep_dead_agent_triggers_policy(self, mock_assess):
cfg = PoolConfig.from_dict({
"gateways": [{"name": "timmy"}],
"dead_threshold": 0, # treat any death as immediate trigger
})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.DEAD,
first_dead_at=time.time() - 999,
last_checked=time.time(),
)
with patch.object(_rp, "apply_revive_policy", return_value=False) as mock_policy:
pool.sweep()
mock_policy.assert_called_once()
@patch("resurrection_pool_test.assess_agent")
def test_sweep_degraded_does_not_trigger_policy(self, mock_assess):
cfg = PoolConfig.from_dict({
"gateways": [{"name": "timmy"}],
"dead_threshold": 10,
})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.DEGRADED,
first_dead_at=time.time() - 5, # only 5s, below threshold
last_checked=time.time(),
)
with patch.object(_rp, "apply_revive_policy") as mock_policy:
pool.sweep()
mock_policy.assert_not_called()
# ── run_once / status tests ───────────────────────────────────────────
class TestRunOnce:
def test_run_once_empty_returns_true(self):
pool = ResurrectionPool(PoolConfig(), dry_run=True)
assert pool.run_once()
@patch("resurrection_pool_test.assess_agent")
def test_run_once_dead_agent_returns_false(self, mock_assess):
cfg = PoolConfig.from_dict({
"gateways": [{"name": "timmy"}],
"dead_threshold": 0,
})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.DEAD,
first_dead_at=time.time() - 9999,
last_checked=time.time(),
)
with patch.object(_rp, "apply_revive_policy", return_value=False):
result = pool.run_once()
assert not result
@patch("resurrection_pool_test.assess_agent")
def test_status_returns_list(self, mock_assess):
cfg = PoolConfig.from_dict({"gateways": [{"name": "timmy"}]})
pool = ResurrectionPool(cfg, dry_run=True)
mock_assess.return_value = AgentHealth(
name="timmy", state=AgentState.HEALTHY,
gateway_alive=True, process_alive=True, heartbeat_age=3.0,
)
pool.sweep()
status = pool.status()
assert isinstance(status, list)
assert len(status) == 1
assert status[0]["name"] == "timmy"
assert status[0]["state"] == "healthy"
# ── CLI tests ─────────────────────────────────────────────────────────
class TestCLI:
def test_default_args(self):
p = build_parser()
args = p.parse_args([])
assert not args.once
assert not args.dry_run
assert not args.status
assert args.config is None
def test_once_flag(self):
args = build_parser().parse_args(["--once"])
assert args.once
def test_dry_run_flag(self):
args = build_parser().parse_args(["--dry-run"])
assert args.dry_run
def test_poll_interval_override(self):
args = build_parser().parse_args(["--poll-interval", "15"])
assert args.poll_interval == 15
def test_dead_threshold_override(self):
args = build_parser().parse_args(["--dead-threshold", "60"])
assert args.dead_threshold == 60
def test_config_path(self, tmp_path):
cfg_file = tmp_path / "pool.json"
cfg_file.write_text("{}")
args = build_parser().parse_args(["--config", str(cfg_file)])
assert args.config == str(cfg_file)