Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
f83e103d27 feat: M6-P0 Foundation — cell spec, lazarus-pit daemon, health endpoint, issue template
Implements all P0 deliverables for the Lazarus Pit epic.

- docs/lazarus-pit/mission-cell-spec.md: Canonical `/var/missions/<uuid>/`
  directory layout, file schemas (cell.json, events.jsonl, heartbeat,
  daemon state), lifecycle state machine, and isolation guarantees.

- bin/lazarus_pit.py: Daemon skeleton with config loading, cell discovery,
  per-agent heartbeat polling, stale detection, resurrection stub (P3 placeholder),
  graceful signal handling, PID file management, and CLI subcommands
  (--status, --list-cells).

- config/lazarus-pit.toml: Documented config file with all tunable knobs:
  cells root, heartbeat thresholds, revive policy, gateway coordinates,
  Gitea notification settings.

- server.py: Added HTTP health heartbeat endpoint on NEXUS_HEALTH_PORT (default
  8766). GET /health returns JSON with status, uptime, connected_clients, ws_port,
  and timestamp. Consumed by lazarus-pit to verify gateway liveness.
  Runs in a background daemon thread — zero impact on existing WS logic.

- .gitea/ISSUE_TEMPLATE/mission-proposal.md: Gitea issue template for
  proposing new mission cells (agents, scope, isolation requirements,
  cell config, success criteria).

- tests/test_lazarus_pit.py: 25 unit tests covering config, cell discovery,
  heartbeat reading, health polling, resurrection stub, and CLI.

- tests/test_gateway_health.py: 6 unit tests covering health endpoint
  schema, 404 for unknown paths, uptime, and client count reflection.

All 31 new tests pass. Pre-existing test failures unchanged.

Fixes #879
Refs #878

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 14:16:45 -04:00
24 changed files with 1144 additions and 1443 deletions

View File

@@ -0,0 +1,62 @@
---
name: Mission Proposal
about: Propose a new Mission Cell — a temporary, isolated collaboration space for agents and humans
title: "[MISSION] "
labels: mission-proposal
---
## Mission Summary
<!-- One-sentence description of the mission objective -->
## Objective
<!-- What is the outcome we are driving toward? What does "done" look like? -->
## Agents Invited
<!-- List agents (or humans) that should be invited into this cell.
Format: @agent-name — role (developer / reviewer / observer / coordinator) -->
- @ —
## Scope & Deliverables
<!-- What artifacts will this mission produce? PRs, docs, deployed services, etc. -->
- [ ]
- [ ]
## Isolation Requirements
- [ ] No wizard home-directory access needed
- [ ] Read-only access to the following homes: (list or "none")
- [ ] External network access required: yes / no
## Cell Configuration
<!-- Leave blank to use defaults from config/lazarus-pit.toml -->
| Setting | Value |
|--------------------------|-------|
| Max duration | |
| Checkpoint interval | |
| Auto-archive on close | yes / no |
| Max revive attempts | |
## Related Issues / Context
- Epic: #
- Depends on: #
- Blocked by: #
## Success Criteria
<!-- How will we know this mission succeeded? Be specific. -->
1.
2.
## Notes
<!-- Anything else the lazarus-pit daemon or cell participants should know -->

419
bin/lazarus_pit.py Normal file
View File

@@ -0,0 +1,419 @@
#!/usr/bin/env python3
"""
lazarus-pit — Agent resurrection pool daemon.
Monitors active mission cells, heartbeats agents, detects downed agents,
and revives them back into their mission cells.
Usage:
python bin/lazarus_pit.py [--config path/to/lazarus-pit.toml]
python bin/lazarus_pit.py --status
python bin/lazarus_pit.py --list-cells
Config: config/lazarus-pit.toml
Architecture: docs/lazarus-pit/mission-cell-spec.md
Epic: #878 P0 Foundation: #879
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Optional TOML support (stdlib tomllib in 3.11+, tomli on older)
# ---------------------------------------------------------------------------
try:
import tomllib # Python 3.11+
except ImportError:
try:
import tomli as tomllib # type: ignore[no-reuse-def]
except ImportError:
tomllib = None # Config loading will fall back to defaults
PROJECT_ROOT = Path(__file__).parent.parent
DEFAULT_CONFIG_PATH = PROJECT_ROOT / "config" / "lazarus-pit.toml"
DEFAULT_CELLS_ROOT = Path("/var/missions")
DEFAULT_HEARTBEAT_STALE = 60 # seconds
DEFAULT_RESURRECT_AFTER = 120 # seconds
DEFAULT_MAX_REVIVES = 3
DEFAULT_POLL_INTERVAL = 15 # seconds
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class AgentHealth:
name: str
healthy: bool = True
last_seen: float = 0.0
revive_count: int = 0
last_status: str = "unknown"
@dataclass
class CellState:
uuid: str
path: Path
agents: dict[str, AgentHealth] = field(default_factory=dict)
status: str = "active"
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
@dataclass
class DaemonConfig:
cells_root: Path = DEFAULT_CELLS_ROOT
heartbeat_stale_threshold: int = DEFAULT_HEARTBEAT_STALE
resurrect_after: int = DEFAULT_RESURRECT_AFTER
max_revive_attempts: int = DEFAULT_MAX_REVIVES
poll_interval: int = DEFAULT_POLL_INTERVAL
log_level: str = "INFO"
log_file: str = "-"
pid_file: str = "/var/run/lazarus-pit.pid"
gitea_url: str = "https://forge.alexanderwhitestone.com"
gitea_repo: str = "Timmy_Foundation/the-nexus"
gitea_token: str = ""
open_issue_on_death: bool = True
close_issue_on_revive: bool = True
def load_config(config_path: Path) -> DaemonConfig:
"""Load configuration from TOML file, falling back to defaults."""
cfg = DaemonConfig()
if not config_path.exists():
return cfg
if tomllib is None:
logging.warning(
"TOML parser not available (install tomli for Python < 3.11). "
"Using defaults."
)
return cfg
with open(config_path, "rb") as f:
raw = tomllib.load(f)
cells = raw.get("cells", {})
daemon = raw.get("daemon", {})
gitea = raw.get("gitea", {})
notifications = raw.get("notifications", {})
cfg.cells_root = Path(cells.get("root", str(cfg.cells_root)))
cfg.heartbeat_stale_threshold = cells.get(
"heartbeat_stale_threshold", cfg.heartbeat_stale_threshold
)
cfg.resurrect_after = cells.get("resurrect_after", cfg.resurrect_after)
cfg.max_revive_attempts = cells.get("max_revive_attempts", cfg.max_revive_attempts)
cfg.poll_interval = cells.get("poll_interval", cfg.poll_interval)
cfg.log_level = daemon.get("log_level", cfg.log_level)
cfg.log_file = daemon.get("log_file", cfg.log_file)
cfg.pid_file = daemon.get("pid_file", cfg.pid_file)
cfg.gitea_url = gitea.get("url", cfg.gitea_url)
cfg.gitea_repo = gitea.get("repo", cfg.gitea_repo)
cfg.gitea_token = os.environ.get("GITEA_TOKEN", gitea.get("token", ""))
cfg.open_issue_on_death = notifications.get(
"open_issue_on_death", cfg.open_issue_on_death
)
cfg.close_issue_on_revive = notifications.get(
"close_issue_on_revive", cfg.close_issue_on_revive
)
return cfg
# ---------------------------------------------------------------------------
# Heartbeat reader
# ---------------------------------------------------------------------------
def read_agent_heartbeat(cell_path: Path, agent_name: str) -> Optional[dict]:
"""Read the heartbeat file for an agent in a cell. Returns None on failure."""
hb_path = cell_path / ".lazarus" / "heartbeats" / f"{agent_name}.json"
if not hb_path.exists():
return None
try:
return json.loads(hb_path.read_text())
except (json.JSONDecodeError, OSError):
return None
# ---------------------------------------------------------------------------
# Cell discovery
# ---------------------------------------------------------------------------
def discover_cells(cells_root: Path) -> list[CellState]:
"""Walk cells_root and return all active mission cells."""
cells: list[CellState] = []
if not cells_root.exists():
return cells
for entry in cells_root.iterdir():
if not entry.is_dir():
continue
manifest_path = entry / "cell.json"
if not manifest_path.exists():
continue
try:
manifest = json.loads(manifest_path.read_text())
except (json.JSONDecodeError, OSError):
continue
if manifest.get("status") not in ("active", "frozen"):
continue
cell = CellState(
uuid=manifest.get("uuid", entry.name),
path=entry,
status=manifest.get("status", "active"),
)
for agent_info in manifest.get("agents", []):
name = agent_info.get("name", "unknown")
cell.agents[name] = AgentHealth(name=name)
cells.append(cell)
return cells
# ---------------------------------------------------------------------------
# Health poll
# ---------------------------------------------------------------------------
def poll_cell(cell: CellState, cfg: DaemonConfig) -> list[str]:
"""
Poll a cell's agents. Returns a list of warning messages for any
agents whose heartbeat is stale.
"""
now = time.time()
warnings: list[str] = []
for agent_name, health in cell.agents.items():
hb = read_agent_heartbeat(cell.path, agent_name)
if hb is None:
health.healthy = False
warnings.append(
f"[{cell.uuid}] {agent_name}: no heartbeat file found"
)
continue
age = now - hb.get("timestamp", 0)
health.last_seen = hb.get("timestamp", 0)
health.last_status = hb.get("status", "unknown")
if age > cfg.heartbeat_stale_threshold:
health.healthy = False
warnings.append(
f"[{cell.uuid}] {agent_name}: heartbeat stale ({age:.0f}s old, "
f"threshold {cfg.heartbeat_stale_threshold}s)"
)
else:
health.healthy = True
return warnings
# ---------------------------------------------------------------------------
# Resurrection (stub — P3 will implement fully)
# ---------------------------------------------------------------------------
def attempt_revive(cell: CellState, agent_name: str, cfg: DaemonConfig) -> bool:
"""
Attempt to revive a downed agent into its mission cell.
This is a stub. Full implementation lands in P3 (#882).
Currently logs the intent and increments the revive counter.
"""
health = cell.agents.get(agent_name)
if health is None:
return False
if health.revive_count >= cfg.max_revive_attempts:
logging.error(
"[%s] %s: max revive attempts (%d) reached — escalating to human",
cell.uuid, agent_name, cfg.max_revive_attempts,
)
return False
health.revive_count += 1
logging.warning(
"[%s] %s: initiating resurrection attempt %d/%d (P3 stub)",
cell.uuid, agent_name, health.revive_count, cfg.max_revive_attempts,
)
# TODO (P3 #882): exec the agent's harness entrypoint inside the cell
return False
# ---------------------------------------------------------------------------
# Main daemon loop
# ---------------------------------------------------------------------------
class LazarusPit:
def __init__(self, cfg: DaemonConfig):
self.cfg = cfg
self._running = False
def start(self) -> None:
self._running = True
self._write_pid()
self._setup_signals()
logging.info("Lazarus Pit daemon started (poll interval: %ds)", self.cfg.poll_interval)
while self._running:
self._tick()
time.sleep(self.cfg.poll_interval)
logging.info("Lazarus Pit daemon stopped.")
self._remove_pid()
def _tick(self) -> None:
cells = discover_cells(self.cfg.cells_root)
if not cells:
logging.debug("No active mission cells found in %s", self.cfg.cells_root)
return
for cell in cells:
warnings = poll_cell(cell, self.cfg)
for msg in warnings:
logging.warning(msg)
now = time.time()
for agent_name, health in cell.agents.items():
if not health.healthy:
age = now - health.last_seen if health.last_seen else float("inf")
if age > self.cfg.resurrect_after:
attempt_revive(cell, agent_name, self.cfg)
def _write_pid(self) -> None:
pid_path = Path(self.cfg.pid_file)
try:
pid_path.parent.mkdir(parents=True, exist_ok=True)
pid_path.write_text(str(os.getpid()))
except OSError as e:
logging.warning("Could not write PID file: %s", e)
def _remove_pid(self) -> None:
try:
Path(self.cfg.pid_file).unlink(missing_ok=True)
except OSError:
pass
def _setup_signals(self) -> None:
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)
def _handle_shutdown(self, signum, frame) -> None:
logging.info("Signal %d received — shutting down.", signum)
self._running = False
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def cmd_status(cfg: DaemonConfig) -> None:
"""Print status of all active mission cells."""
cells = discover_cells(cfg.cells_root)
if not cells:
print(f"No active cells in {cfg.cells_root}")
return
for cell in cells:
print(f"\nCell {cell.uuid} [{cell.status}]")
for agent_name, health in cell.agents.items():
hb = read_agent_heartbeat(cell.path, agent_name)
if hb:
age = time.time() - hb.get("timestamp", 0)
print(f" {agent_name}: {hb.get('status', '?')} (heartbeat {age:.0f}s ago)")
else:
print(f" {agent_name}: no heartbeat")
def cmd_list_cells(cfg: DaemonConfig) -> None:
"""List all mission cells (active and otherwise)."""
root = cfg.cells_root
if not root.exists():
print(f"{root} does not exist")
return
found = False
for entry in sorted(root.iterdir()):
manifest = entry / "cell.json"
if manifest.exists():
try:
data = json.loads(manifest.read_text())
print(f"{data.get('uuid', entry.name)}: {data.get('status', '?')}{data.get('mission', '')}")
found = True
except (json.JSONDecodeError, OSError):
print(f"{entry.name}: [unreadable manifest]")
found = True
if not found:
print(f"No mission cells found in {root}")
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Lazarus Pit — agent resurrection pool daemon",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--config",
default=str(DEFAULT_CONFIG_PATH),
help="Path to lazarus-pit.toml (default: config/lazarus-pit.toml)",
)
parser.add_argument(
"--status",
action="store_true",
help="Print current cell/agent health and exit",
)
parser.add_argument(
"--list-cells",
action="store_true",
help="List all mission cells and exit",
)
return parser
def main() -> None:
args = build_arg_parser().parse_args()
cfg = load_config(Path(args.config))
# Configure logging
log_format = "%(asctime)s [%(levelname)s] %(message)s"
log_level = getattr(logging, cfg.log_level.upper(), logging.INFO)
if cfg.log_file == "-":
logging.basicConfig(level=log_level, format=log_format)
else:
logging.basicConfig(level=log_level, format=log_format, filename=cfg.log_file)
if args.status:
cmd_status(cfg)
return
if args.list_cells:
cmd_list_cells(cfg)
return
pit = LazarusPit(cfg)
pit.start()
if __name__ == "__main__":
main()

52
config/lazarus-pit.toml Normal file
View File

@@ -0,0 +1,52 @@
# lazarus-pit.toml — Daemon configuration for the Lazarus Pit resurrection pool
# See docs/lazarus-pit/mission-cell-spec.md for architecture context.
# Epic: #878 · Phase P0 Foundation: #879
[daemon]
# PID file location
pid_file = "/var/run/lazarus-pit.pid"
# Log file (use "-" for stdout)
log_file = "/var/log/lazarus-pit.log"
# Log level: DEBUG | INFO | WARNING | ERROR
log_level = "INFO"
[cells]
# Root directory for all mission cells
root = "/var/missions"
# Seconds of silence before an agent is considered stale
heartbeat_stale_threshold = 60
# Seconds of stale heartbeat before attempting resurrection
resurrect_after = 120
# Maximum auto-revive attempts per agent per session
max_revive_attempts = 3
# Poll interval in seconds for health checks
poll_interval = 15
[gateway]
# WebSocket gateway to poll for liveness
host = "localhost"
port = 8765
# HTTP health endpoint exposed by the gateway
health_host = "localhost"
health_port = 8766
[gitea]
# Gitea instance for filing resurrection / incident reports
url = "https://forge.alexanderwhitestone.com"
repo = "Timmy_Foundation/the-nexus"
# Token read from environment: GITEA_TOKEN
# token = "" # Do not commit real tokens
[notifications]
# Whether to open a Gitea issue when an agent goes dark
open_issue_on_death = true
# Whether to close the issue when the agent is successfully revived
close_issue_on_revive = true

View File

@@ -0,0 +1,156 @@
# Mission Cell Directory Specification
**Version:** 1.0
**Status:** Canonical
**Epic:** #878 — The Lazarus Pit & Mission Cell Isolation
---
## Overview
A **Mission Cell** is an ephemeral, isolated working directory provisioned for a specific
multi-agent collaboration. Each cell is scoped to a single mission (project, task, or
incident) and is identified by a UUID. No wizard's home directory is ever touched by
another agent's work within a cell.
---
## Root Path
```
/var/missions/<uuid>/
```
`<uuid>` is a version-4 UUID, e.g. `a3f7c901-1234-4b5e-8def-000000000001`.
---
## Directory Layout
```
/var/missions/<uuid>/
├── cell.json # Cell manifest (identity, status, agents, timestamps)
├── workspace/ # Shared working directory — agents write here
│ └── ...
├── logs/
│ ├── events.jsonl # Append-only event stream (invitation, join, leave, etc.)
│ └── <agent-name>.log # Per-agent stdout/stderr capture
├── checkpoints/ # Snapshot archives for P2 checkpoint/restore
│ └── <timestamp>.tar.gz
├── bus/ # Mission message bus (P4 multi-agent teaming)
│ ├── inbox/ # Per-agent inboxes
│ │ └── <agent-name>/
│ └── outbox/
└── .lazarus/ # Daemon metadata (written by lazarus-pit, not agents)
├── heartbeats/
│ └── <agent-name>.json # Last heartbeat per agent
└── state.json # Cell lifecycle state (active, frozen, archived)
```
---
## File Specs
### `cell.json` — Cell Manifest
```json
{
"uuid": "a3f7c901-1234-4b5e-8def-000000000001",
"name": "optional human-readable name",
"mission": "Short description of the mission objective",
"created_at": "2026-04-06T00:00:00Z",
"created_by": "agent-name or username",
"status": "active",
"agents": [
{
"name": "claude",
"role": "developer",
"joined_at": "2026-04-06T00:01:00Z",
"home": "/root/wizards/claude"
}
],
"gitea_issue": 879,
"repo": "Timmy_Foundation/the-nexus"
}
```
**Status values:** `active` | `frozen` | `archived` | `destroyed`
### `logs/events.jsonl` — Event Stream
One JSON object per line, append-only:
```json
{"ts": "2026-04-06T00:00:00Z", "event": "cell_created", "by": "allegro", "uuid": "..."}
{"ts": "2026-04-06T00:01:00Z", "event": "agent_joined", "agent": "claude", "role": "developer"}
{"ts": "2026-04-06T00:02:00Z", "event": "heartbeat", "agent": "claude", "status": "thinking"}
{"ts": "2026-04-06T01:00:00Z", "event": "agent_left", "agent": "claude"}
{"ts": "2026-04-06T01:01:00Z", "event": "cell_archived"}
```
### `.lazarus/heartbeats/<agent-name>.json` — Per-Agent Heartbeat
Written by agents, monitored by the lazarus-pit daemon:
```json
{
"agent": "claude",
"pid": 12345,
"timestamp": 1744000000.0,
"cycle": 42,
"model": "claude-opus-4-6",
"status": "thinking",
"cell_uuid": "a3f7c901-1234-4b5e-8def-000000000001"
}
```
### `.lazarus/state.json` — Daemon State
Written exclusively by `lazarus-pit`, never by agents:
```json
{
"cell_uuid": "a3f7c901-1234-4b5e-8def-000000000001",
"daemon_pid": 99001,
"last_poll": 1744000000.0,
"agent_health": {
"claude": {"healthy": true, "last_seen": 1744000000.0, "revive_count": 0}
}
}
```
---
## Lifecycle
```
provision → active → [frozen] → archived → destroyed
```
| Transition | Trigger | Actor |
|--------------|--------------------------------------------|---------------|
| provision | Invitation accepted / mission created | allegro / CLI |
| active | At least one agent joined | lazarus-pit |
| frozen | Checkpoint requested or idle timeout | lazarus-pit |
| archived | Mission complete, cell preserved | lazarus-pit |
| destroyed | Explicit teardown, no home dirs touched | CLI / daemon |
---
## Isolation Guarantees
- No agent process within a cell may write outside `/var/missions/<uuid>/`.
- Home directories (`/root/wizards/<name>/`) are read-only mounts or simply not present in the cell's working context.
- The cell UUID is injected as `LAZARUS_CELL_UUID` into each agent's environment.
- Destruction of a cell has zero impact on any wizard's home directory.
---
## Related Issues
- #878 — Epic
- #879 — P0 Foundation (this spec)
- #880 — P1 Invitation & Spawning
- #881 — P2 Checkpoint / Restore
- #882 — P3 Resurrection Pool
- #883 — P4 Multi-Agent Teaming

View File

@@ -1 +0,0 @@
"""Lazarus Pit — Agent resurrection pool and mission cell isolation."""

View File

@@ -1,145 +0,0 @@
"""Mission cell lifecycle — create, load, save, destroy."""
from __future__ import annotations
import json
import logging
import os
import tempfile
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
log = logging.getLogger(__name__)
CellState = Literal["pending", "active", "archived", "destroyed"]
@dataclass
class MissionCell:
"""Represents a mission cell — an ephemeral project workspace."""
id: str
name: str
repo_url: str
created_at: str
state: CellState
agents: list[str]
cell_path: str
home_path: str
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"repo_url": self.repo_url,
"created_at": self.created_at,
"state": self.state,
"agents": list(self.agents),
"cell_path": self.cell_path,
"home_path": self.home_path,
}
@classmethod
def from_dict(cls, data: dict) -> MissionCell:
return cls(
id=data["id"],
name=data["name"],
repo_url=data["repo_url"],
created_at=data["created_at"],
state=data["state"],
agents=list(data.get("agents", [])),
cell_path=data["cell_path"],
home_path=data.get("home_path", ""),
)
def create_cell(
mission_id: str,
name: str,
repo_url: str,
base_path: Path,
home_path: str = "",
) -> MissionCell:
"""Create a new mission cell directory structure and return a MissionCell."""
cell_path = base_path / mission_id
cell_path.mkdir(parents=True, exist_ok=True)
# Create required subdirectories
(cell_path / "workspace").mkdir(exist_ok=True)
(cell_path / "logs").mkdir(exist_ok=True)
(cell_path / "snapshots").mkdir(exist_ok=True)
created_at = datetime.now(timezone.utc).isoformat()
cell = MissionCell(
id=mission_id,
name=name,
repo_url=repo_url,
created_at=created_at,
state="pending",
agents=[],
cell_path=str(cell_path),
home_path=home_path,
)
# Write roster.json placeholder
roster_path = cell_path / "roster.json"
_atomic_write(roster_path, {"agents": []})
# Write cell.json
save_cell(cell)
log.info("Created mission cell %s at %s", mission_id, cell_path)
return cell
def destroy_cell(cell: MissionCell, cleanup_dirs: bool = False) -> None:
"""Mark a cell as destroyed and optionally remove its directories."""
cell.state = "destroyed"
save_cell(cell)
if cleanup_dirs:
import shutil
cell_dir = Path(cell.cell_path)
if cell_dir.exists():
shutil.rmtree(cell_dir)
log.info("Removed cell directory %s", cell_dir)
log.info("Destroyed mission cell %s", cell.id)
def load_cell(cell_path: Path) -> MissionCell:
"""Load a MissionCell from a cell.json file at cell_path."""
cell_json = Path(cell_path) / "cell.json"
with open(cell_json, "r", encoding="utf-8") as fh:
data = json.load(fh)
return MissionCell.from_dict(data)
def save_cell(cell: MissionCell) -> None:
"""Atomically save a MissionCell to cell.json in its cell_path."""
cell_path = Path(cell.cell_path)
cell_path.mkdir(parents=True, exist_ok=True)
target = cell_path / "cell.json"
_atomic_write(target, cell.to_dict())
log.debug("Saved cell %s to %s", cell.id, target)
# ─── helpers ──────────────────────────────────────────────────────────────────
def _atomic_write(path: Path, data: dict) -> None:
"""Write JSON atomically via a temp file + rename."""
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2)
os.replace(tmp, path)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise

View File

@@ -1,162 +0,0 @@
"""Lazarus CLI — invite agents, manage mission cells."""
from __future__ import annotations
import argparse
import json
import platform
import sys
import uuid
from pathlib import Path
from lazarus.cell import MissionCell, create_cell, load_cell
from lazarus.isolation import setup_level1_isolation
from lazarus.roster import DEFAULT_REGISTRY_PATH, MissionRoster
def _default_base_path() -> Path:
if platform.system() == "Linux":
return Path("/var/missions")
return Path.home() / ".nexus" / "missions"
# ─── command handlers ─────────────────────────────────────────────────────────
def cmd_invite(args: argparse.Namespace) -> int:
"""Create or load a mission cell and invite an agent."""
base_path = Path(args.base_path) if args.base_path else _default_base_path()
mission_id: str = args.mission or str(uuid.uuid4())
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
# Create or load cell
cell_dir = base_path / mission_id
if (cell_dir / "cell.json").exists():
cell = load_cell(cell_dir)
else:
cell = create_cell(
mission_id=mission_id,
name=args.name or mission_id,
repo_url=args.repo,
base_path=base_path,
)
# Level 1 isolation
setup_level1_isolation(cell)
# Register mission and invite agent
roster.add_mission(cell)
entry = roster.invite_agent(mission_id, args.agent, role=args.role)
roster.save()
summary = {
"mission_id": mission_id,
"agent": args.agent,
"role": entry.role,
"status": entry.status,
"cell_path": cell.cell_path,
"repo_url": cell.repo_url,
"invited_at": entry.invited_at,
}
print(json.dumps(summary, indent=2))
return 0
def cmd_list(args: argparse.Namespace) -> int:
"""List missions, optionally filtered by state."""
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
missions = roster.list_missions(state=args.state)
print(json.dumps(missions, indent=2))
return 0
def cmd_status(args: argparse.Namespace) -> int:
"""Show details of a specific mission."""
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
mission = roster.get_mission(args.mission_id)
if mission is None:
print(f"Mission {args.mission_id!r} not found.", file=sys.stderr)
return 1
agents = roster.list_agents(args.mission_id)
output = dict(mission)
output["agent_count"] = len(agents)
output["agents"] = [a.to_dict() for a in agents]
print(json.dumps(output, indent=2))
return 0
def cmd_depart(args: argparse.Namespace) -> int:
"""Mark an agent as departed from a mission."""
roster = MissionRoster(Path(args.registry) if args.registry else DEFAULT_REGISTRY_PATH)
try:
entry = roster.depart_agent(args.mission, args.agent)
except KeyError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
roster.save()
print(json.dumps(entry.to_dict(), indent=2))
return 0
# ─── argument parsing ─────────────────────────────────────────────────────────
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="lazarus",
description="Lazarus Pit — agent invitation and mission cell management",
)
parser.add_argument(
"--registry",
default=None,
help="Path to mission roster JSON (default: ~/.nexus/mission-roster.json)",
)
sub = parser.add_subparsers(dest="command", required=True)
# invite
p_invite = sub.add_parser("invite", help="Invite an agent to a mission")
p_invite.add_argument("agent", help="Agent name to invite")
p_invite.add_argument("--mission", default=None, help="Mission ID (UUID generated if omitted)")
p_invite.add_argument("--repo", required=True, help="Repository URL for the mission")
p_invite.add_argument("--base-path", default=None, help="Base directory for mission cells")
p_invite.add_argument("--role", default="contributor", help="Agent role (default: contributor)")
p_invite.add_argument("--name", default=None, help="Human-readable mission name")
# list
p_list = sub.add_parser("list", help="List missions")
p_list.add_argument("--state", default=None, help="Filter by state (pending/active/archived/destroyed)")
# status
p_status = sub.add_parser("status", help="Show mission status")
p_status.add_argument("mission_id", help="Mission ID")
# depart
p_depart = sub.add_parser("depart", help="Mark agent as departed")
p_depart.add_argument("agent", help="Agent name")
p_depart.add_argument("--mission", required=True, help="Mission ID")
return parser
def main(argv: list[str] | None = None) -> int:
parser = _build_parser()
args = parser.parse_args(argv)
dispatch = {
"invite": cmd_invite,
"list": cmd_list,
"status": cmd_status,
"depart": cmd_depart,
}
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
return 1
return handler(args)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,165 +0,0 @@
"""Hermes harness mission-cell mode — wraps agent lifecycle in a mission cell."""
from __future__ import annotations
import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Awaitable
try:
import websockets # type: ignore[import]
except ImportError: # pragma: no cover
websockets = None # type: ignore[assignment]
from lazarus.cell import MissionCell, load_cell
from lazarus.isolation import setup_level1_isolation
log = logging.getLogger(__name__)
DEFAULT_HERMES_WS_URL = "ws://localhost:8000/ws"
class MissionCellHarness:
"""
Wraps agent execution in a mission cell with Hermes WS telemetry.
Usage::
harness = MissionCellHarness("myagent", "/var/missions/<uuid>")
await harness.run_with_harness(my_coroutine())
"""
def __init__(
self,
agent_name: str,
cell_path: str | Path,
hermes_ws_url: str = DEFAULT_HERMES_WS_URL,
home_path: str | None = None,
) -> None:
self.agent_name = agent_name
self.cell_path = Path(cell_path)
self.hermes_ws_url = hermes_ws_url
self.home_path = home_path
self._cell: MissionCell | None = None
self._ws: Any = None # websockets connection
self._ws_connected = False
# ─── public API ───────────────────────────────────────────────────────────
async def start(self) -> None:
"""Load cell, set up isolation, emit agent_joined telemetry."""
self._cell = load_cell(self.cell_path)
setup_level1_isolation(self._cell)
await self._connect_hermes()
await self._emit_mission_event("agent_joined")
log.info(
"MissionCellHarness started for agent=%s mission=%s",
self.agent_name,
self._cell.id,
)
async def stop(self) -> None:
"""Emit agent_departed telemetry and save state."""
await self._emit_mission_event("agent_departed")
if self._ws is not None:
try:
await self._ws.close()
except Exception as exc:
log.debug("WS close error (ignored): %s", exc)
self._ws_connected = False
log.info(
"MissionCellHarness stopped for agent=%s", self.agent_name
)
def get_env(self) -> dict[str, str]:
"""Return environment variables for the mission cell context."""
cell = self._cell
env: dict[str, str] = {}
if cell:
env["MISSION_CELL_PATH"] = str(self.cell_path)
env["MISSION_ID"] = cell.id
else:
env["MISSION_CELL_PATH"] = str(self.cell_path)
env["MISSION_ID"] = ""
env["AGENT_NAME"] = self.agent_name
if self.home_path:
env["HOME"] = self.home_path
else:
env["HOME"] = os.environ.get("HOME", str(Path.home()))
return env
def write_mission_log(self, message: str) -> None:
"""Write a timestamped log entry to the cell's logs directory."""
if self._cell is None:
log.warning("Cannot write mission log: cell not loaded")
return
logs_dir = Path(self._cell.cell_path) / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
log_file = logs_dir / "mission.log"
timestamp = datetime.now(timezone.utc).isoformat()
line = f"[{timestamp}] [{self.agent_name}] {message}\n"
with open(log_file, "a", encoding="utf-8") as fh:
fh.write(line)
async def run_with_harness(self, coro: Awaitable[Any]) -> Any:
"""Start harness, run coro, stop harness. Returns coro result."""
await self.start()
try:
return await coro
finally:
await self.stop()
# ─── internals ────────────────────────────────────────────────────────────
async def _connect_hermes(self) -> None:
"""Attempt WebSocket connection to Hermes; fail gracefully."""
try:
import websockets # type: ignore[import]
self._ws = await websockets.connect(self.hermes_ws_url)
self._ws_connected = True
log.info("Connected to Hermes at %s", self.hermes_ws_url)
except Exception as exc:
log.warning("Could not connect to Hermes WS: %s", exc)
self._ws_connected = False
async def _emit_mission_event(self, event: str) -> None:
"""Send a mission_event message to Hermes."""
cell = self._cell
mission_id = cell.id if cell else ""
payload = {
"type": "mission_event",
"event": event,
"mission_id": mission_id,
"agent_name": self.agent_name,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if self._ws_connected and self._ws is not None:
try:
await self._ws.send(json.dumps(payload))
log.debug("Emitted %s for agent=%s", event, self.agent_name)
except Exception as exc:
log.warning("Failed to emit %s telemetry: %s", event, exc)
self._ws_connected = False
else:
log.debug(
"Hermes not connected; %s event not sent (agent=%s)",
event,
self.agent_name,
)

View File

@@ -1,170 +0,0 @@
"""Level 1 isolation — directory setup and AppArmor profile generation."""
from __future__ import annotations
import logging
import platform
import subprocess
from dataclasses import dataclass
from pathlib import Path
from lazarus.cell import MissionCell
log = logging.getLogger(__name__)
_APPARMOR_TEMPLATE = """\
#include <tunables/global>
profile nexus-mission-{cell_id} flags=(attach_disconnected,mediate_deleted) {{
#include <abstractions/base>
# Allow read access to agent home
"{home_path}/" r,
"{home_path}/**" r,
# Allow read/write access to cell workspace
"{workspace_path}/" rw,
"{workspace_path}/**" rwkl,
# Allow read/write access to cell logs
"{logs_path}/" rw,
"{logs_path}/**" rwkl,
# Deny writes to home path
deny "{home_path}/**" w,
# Deny access to other agent home dirs
deny "/root/wizards/**" rw,
# Deny network access by default
deny network,
# Basic system access
/bin/** ix,
/usr/bin/** ix,
/lib/** r,
/usr/lib/** r,
/proc/self/** r,
}}
"""
@dataclass
class IsolationManifest:
"""Result of setting up Level 1 isolation for a mission cell."""
cell_id: str
workspace_path: Path
logs_path: Path
apparmor_profile_path: Path | None
platform: str
def setup_level1_isolation(cell: MissionCell) -> IsolationManifest:
"""
Set up Level 1 isolation for a mission cell.
Creates the proper directory structure with permissions and writes
an AppArmor profile (Linux) or just a profile file (other platforms).
Returns an IsolationManifest describing what was set up.
"""
cell_path = Path(cell.cell_path)
workspace_path = cell_path / "workspace"
logs_path = cell_path / "logs"
workspace_path.mkdir(parents=True, exist_ok=True)
logs_path.mkdir(parents=True, exist_ok=True)
# Set restrictive permissions on the workspace (owner-only)
try:
workspace_path.chmod(0o700)
logs_path.chmod(0o700)
except OSError as exc:
log.warning("Could not set directory permissions: %s", exc)
apparmor_profile_path: Path | None = None
sys_platform = platform.system()
if cell.home_path:
profile_path = cell_path / f"nexus-mission-{cell.id}.apparmor"
apparmor_profile_path = write_apparmor_profile(cell, profile_path)
log.info(
"Level 1 isolation set up for cell %s (platform=%s)", cell.id, sys_platform
)
return IsolationManifest(
cell_id=cell.id,
workspace_path=workspace_path,
logs_path=logs_path,
apparmor_profile_path=apparmor_profile_path,
platform=sys_platform,
)
def write_apparmor_profile(cell: MissionCell, output_path: Path) -> Path:
"""
Write an AppArmor profile for the mission cell to output_path.
On non-Linux platforms this just writes the file without attempting to load it.
Returns the path to the written profile.
"""
cell_path = Path(cell.cell_path)
workspace_path = cell_path / "workspace"
logs_path = cell_path / "logs"
home_path = cell.home_path or str(Path.home())
profile_content = _APPARMOR_TEMPLATE.format(
cell_id=cell.id,
home_path=home_path,
workspace_path=str(workspace_path),
logs_path=str(logs_path),
)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(profile_content, encoding="utf-8")
log.info("Wrote AppArmor profile to %s", output_path)
return output_path
def load_apparmor_profile(profile_path: Path) -> bool:
"""
Attempt to load an AppArmor profile via apparmor_parser.
On non-Linux platforms or when apparmor_parser is absent, this is a no-op
and returns False gracefully.
Returns True if the profile was loaded successfully.
"""
if platform.system() != "Linux":
log.debug(
"AppArmor load skipped: not on Linux (platform=%s)", platform.system()
)
return False
try:
result = subprocess.run(
["apparmor_parser", "-r", str(profile_path)],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
log.info("Loaded AppArmor profile from %s", profile_path)
return True
else:
log.warning(
"apparmor_parser failed (rc=%d): %s",
result.returncode,
result.stderr.strip(),
)
return False
except FileNotFoundError:
log.debug("apparmor_parser not found; skipping profile load")
return False
except subprocess.TimeoutExpired:
log.warning("apparmor_parser timed out loading %s", profile_path)
return False
except Exception as exc:
log.warning("Unexpected error loading AppArmor profile: %s", exc)
return False

View File

@@ -1,184 +0,0 @@
"""Mission roster registry — tracks all missions and agents."""
from __future__ import annotations
import json
import logging
import os
import tempfile
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
from lazarus.cell import MissionCell
log = logging.getLogger(__name__)
DEFAULT_REGISTRY_PATH = Path.home() / ".nexus" / "mission-roster.json"
EntryStatus = Literal["invited", "active", "departed"]
@dataclass
class RosterEntry:
"""An agent's participation record in a mission."""
mission_id: str
agent_name: str
invited_at: str
accepted_at: str | None
role: str
status: EntryStatus
def to_dict(self) -> dict:
return {
"mission_id": self.mission_id,
"agent_name": self.agent_name,
"invited_at": self.invited_at,
"accepted_at": self.accepted_at,
"role": self.role,
"status": self.status,
}
@classmethod
def from_dict(cls, data: dict) -> RosterEntry:
return cls(
mission_id=data["mission_id"],
agent_name=data["agent_name"],
invited_at=data["invited_at"],
accepted_at=data.get("accepted_at"),
role=data.get("role", "contributor"),
status=data["status"],
)
class MissionRoster:
"""Registry of all missions and their agent rosters."""
def __init__(self, registry_path: Path = DEFAULT_REGISTRY_PATH) -> None:
self.registry_path = Path(registry_path)
self._data: dict = {}
self._load()
# ─── internal I/O ─────────────────────────────────────────────────────────
def _load(self) -> None:
if self.registry_path.exists():
with open(self.registry_path, "r", encoding="utf-8") as fh:
self._data = json.load(fh)
else:
self._data = {"schema_version": "1", "missions": {}}
def save(self) -> None:
"""Atomically persist the registry to disk."""
self.registry_path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=self.registry_path.parent, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(self._data, fh, indent=2)
os.replace(tmp, self.registry_path)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise
log.debug("Saved mission roster to %s", self.registry_path)
# ─── missions ─────────────────────────────────────────────────────────────
def add_mission(self, cell: MissionCell) -> None:
"""Register a new mission from a MissionCell."""
missions: dict = self._data.setdefault("missions", {})
if cell.id not in missions:
missions[cell.id] = {
"id": cell.id,
"name": cell.name,
"repo_url": cell.repo_url,
"created_at": cell.created_at,
"state": cell.state,
"cell_path": cell.cell_path,
"agents": [],
}
log.info("Registered mission %s in roster", cell.id)
else:
log.debug("Mission %s already in roster, skipping add", cell.id)
def get_mission(self, mission_id: str) -> dict | None:
return self._data.get("missions", {}).get(mission_id)
def list_missions(self, state: str | None = None) -> list[dict]:
missions = list(self._data.get("missions", {}).values())
if state is not None:
missions = [m for m in missions if m.get("state") == state]
return missions
# ─── agents ───────────────────────────────────────────────────────────────
def invite_agent(
self,
mission_id: str,
agent_name: str,
role: str = "contributor",
) -> RosterEntry:
"""Invite an agent to a mission; returns the new RosterEntry."""
mission = self._require_mission(mission_id)
agents: list[dict] = mission.setdefault("agents", [])
# Check for existing entry
for entry_dict in agents:
if entry_dict["agent_name"] == agent_name:
log.warning(
"Agent %s already in mission %s roster", agent_name, mission_id
)
return RosterEntry.from_dict(entry_dict)
entry = RosterEntry(
mission_id=mission_id,
agent_name=agent_name,
invited_at=datetime.now(timezone.utc).isoformat(),
accepted_at=None,
role=role,
status="invited",
)
agents.append(entry.to_dict())
log.info("Invited %s to mission %s as %s", agent_name, mission_id, role)
return entry
def accept_invitation(self, mission_id: str, agent_name: str) -> RosterEntry:
"""Mark an invitation as accepted; status becomes 'active'."""
entry_dict = self._require_agent_entry(mission_id, agent_name)
entry_dict["status"] = "active"
entry_dict["accepted_at"] = datetime.now(timezone.utc).isoformat()
log.info("Agent %s accepted invitation to mission %s", agent_name, mission_id)
return RosterEntry.from_dict(entry_dict)
def depart_agent(self, mission_id: str, agent_name: str) -> RosterEntry:
"""Mark an agent as departed from a mission."""
entry_dict = self._require_agent_entry(mission_id, agent_name)
entry_dict["status"] = "departed"
log.info("Agent %s departed mission %s", agent_name, mission_id)
return RosterEntry.from_dict(entry_dict)
def list_agents(self, mission_id: str) -> list[RosterEntry]:
"""Return all RosterEntry objects for a mission."""
mission = self._require_mission(mission_id)
return [RosterEntry.from_dict(e) for e in mission.get("agents", [])]
# ─── helpers ──────────────────────────────────────────────────────────────
def _require_mission(self, mission_id: str) -> dict:
missions = self._data.get("missions", {})
if mission_id not in missions:
raise KeyError(f"Mission {mission_id!r} not found in roster")
return missions[mission_id]
def _require_agent_entry(self, mission_id: str, agent_name: str) -> dict:
mission = self._require_mission(mission_id)
for entry_dict in mission.get("agents", []):
if entry_dict["agent_name"] == agent_name:
return entry_dict
raise KeyError(
f"Agent {agent_name!r} not found in mission {mission_id!r} roster"
)

View File

@@ -3,12 +3,21 @@
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
the body (Evennia/Morrowind), and the visualization surface.
Health heartbeat endpoint (added in #879 — M6-P0 Foundation):
GET http://<host>:<HEALTH_PORT>/health
Returns 200 JSON with gateway status and connected-client count.
Used by the lazarus-pit daemon to verify the gateway is alive.
"""
import asyncio
import json
import logging
import os
import signal
import sys
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
from typing import Set
import websockets
@@ -17,6 +26,13 @@ import websockets
PORT = 8765
HOST = "0.0.0.0" # Allow external connections if needed
# Health heartbeat endpoint — monitored by lazarus-pit daemon (#879)
HEALTH_PORT = int(os.environ.get("NEXUS_HEALTH_PORT", "8766"))
HEALTH_HOST = os.environ.get("NEXUS_HEALTH_HOST", "127.0.0.1")
# Gateway start time for uptime reporting
_start_time = time.time()
# Logging setup
logging.basicConfig(
level=logging.INFO,
@@ -28,6 +44,53 @@ logger = logging.getLogger("nexus-gateway")
# State
clients: Set[websockets.WebSocketServerProtocol] = set()
# ---------------------------------------------------------------------------
# Health heartbeat HTTP endpoint — consumed by lazarus-pit (#879)
# ---------------------------------------------------------------------------
class _HealthHandler(BaseHTTPRequestHandler):
"""Minimal HTTP handler for the /health liveness endpoint."""
def do_GET(self): # noqa: N802
if self.path != "/health":
self.send_response(404)
self.end_headers()
return
payload = {
"status": "ok",
"service": "nexus-gateway",
"uptime_seconds": round(time.time() - _start_time, 1),
"connected_clients": len(clients),
"ws_port": PORT,
"ts": time.time(),
}
body = json.dumps(payload).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt, *args): # noqa: N802
# Suppress default access log spam; use our own logger for errors only
if args and str(args[1]) not in ("200", "204"):
logger.debug("Health endpoint: " + fmt % args)
def _start_health_server() -> Thread:
"""Start the health HTTP server in a background daemon thread."""
server = HTTPServer((HEALTH_HOST, HEALTH_PORT), _HealthHandler)
thread = Thread(target=server.serve_forever, daemon=True)
thread.start()
logger.info(
"Health heartbeat endpoint listening on http://%s:%d/health",
HEALTH_HOST, HEALTH_PORT,
)
return thread
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"""Handles individual client connections and message broadcasting."""
clients.add(websocket)
@@ -80,6 +143,7 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
async def main():
"""Main server loop with graceful shutdown."""
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
_start_health_server()
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()

View File

@@ -0,0 +1,129 @@
"""Tests for the gateway health heartbeat endpoint (#879 — M6-P0 Foundation).
Validates:
- /health returns 200 with correct JSON schema
- /health reports connected_clients count
- Non-/health paths return 404
- Uptime is non-negative
"""
import importlib.util
import json
import sys
import time
from http.server import HTTPServer
from io import BytesIO
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
# ── Load server module directly ──────────────────────────────────────────────
PROJECT_ROOT = Path(__file__).parent.parent
# Patch websockets import so we don't need the package installed to test health
_ws_mock = MagicMock()
sys.modules.setdefault("websockets", _ws_mock)
_srv_spec = importlib.util.spec_from_file_location(
"nexus_server_test",
PROJECT_ROOT / "server.py",
)
_srv = importlib.util.module_from_spec(_srv_spec)
sys.modules["nexus_server_test"] = _srv
_srv_spec.loader.exec_module(_srv)
_HealthHandler = _srv._HealthHandler
# ── Fake request helper ───────────────────────────────────────────────────────
class _FakeRequest:
"""Minimal socket-like object for BaseHTTPRequestHandler testing."""
def __init__(self, raw_bytes: bytes):
self._buf = BytesIO(raw_bytes)
self.sent = BytesIO()
def makefile(self, mode, **kwargs):
if "r" in mode:
return self._buf
return self.sent
def sendall(self, data: bytes):
self.sent.write(data)
def _invoke_handler(path: str) -> tuple[int, dict]:
"""Call the health handler for a GET request and return (status_code, body_dict)."""
raw = f"GET {path} HTTP/1.1\r\nHost: localhost\r\n\r\n".encode()
request = _FakeRequest(raw)
handler = _HealthHandler.__new__(_HealthHandler)
handler.rfile = BytesIO(raw)
handler.wfile = request.sent
handler.client_address = ("127.0.0.1", 9999)
handler.server = MagicMock()
handler.request_version = "HTTP/1.1"
handler.command = "GET"
handler.path = path
handler.headers = {}
# Capture response
responses: list[tuple] = []
handler.send_response = lambda code, *a: responses.append(("status", code))
handler.send_header = lambda k, v: None
handler.end_headers = lambda: None
body_parts: list[bytes] = []
handler.wfile = MagicMock()
handler.wfile.write = lambda b: body_parts.append(b)
handler.do_GET()
status = responses[0][1] if responses else None
body = {}
if body_parts:
try:
body = json.loads(b"".join(body_parts))
except (json.JSONDecodeError, TypeError):
pass
return status, body
# ── Tests ─────────────────────────────────────────────────────────────────────
class TestHealthEndpoint:
def test_health_returns_200(self):
status, _ = _invoke_handler("/health")
assert status == 200
def test_health_body_schema(self):
_, body = _invoke_handler("/health")
assert body.get("status") == "ok"
assert body.get("service") == "nexus-gateway"
assert "uptime_seconds" in body
assert "connected_clients" in body
assert "ws_port" in body
assert "ts" in body
def test_uptime_is_non_negative(self):
_, body = _invoke_handler("/health")
assert body["uptime_seconds"] >= 0
def test_unknown_path_returns_404(self):
status, _ = _invoke_handler("/notfound")
assert status == 404
def test_root_path_returns_404(self):
status, _ = _invoke_handler("/")
assert status == 404
def test_connected_clients_reflects_module_state(self):
original = _srv.clients.copy()
try:
_srv.clients.clear()
_, body = _invoke_handler("/health")
assert body["connected_clients"] == 0
finally:
_srv.clients.update(original)

View File

@@ -1,110 +0,0 @@
"""Tests for lazarus.cell — MissionCell lifecycle."""
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cell import MissionCell, create_cell, destroy_cell, load_cell, save_cell
def test_create_cell_creates_directories(tmp_path: Path) -> None:
"""workspace/, logs/, and snapshots/ subdirs are created under the cell dir."""
cell = create_cell(
mission_id="test-mission-1",
name="Test Mission",
repo_url="https://github.com/test/repo",
base_path=tmp_path,
)
cell_dir = tmp_path / "test-mission-1"
assert cell_dir.is_dir()
assert (cell_dir / "workspace").is_dir()
assert (cell_dir / "logs").is_dir()
assert (cell_dir / "snapshots").is_dir()
def test_create_cell_writes_cell_json(tmp_path: Path) -> None:
"""cell.json is created and contains valid mission metadata."""
cell = create_cell(
mission_id="test-mission-2",
name="Another Mission",
repo_url="https://github.com/test/another",
base_path=tmp_path,
)
cell_json_path = tmp_path / "test-mission-2" / "cell.json"
assert cell_json_path.exists()
with open(cell_json_path) as fh:
data = json.load(fh)
assert data["id"] == "test-mission-2"
assert data["name"] == "Another Mission"
assert data["repo_url"] == "https://github.com/test/another"
assert data["state"] == "pending"
assert isinstance(data["agents"], list)
def test_save_and_load_cell(tmp_path: Path) -> None:
"""Round-trip: save a cell, load it back, fields match."""
cell = create_cell(
mission_id="roundtrip-mission",
name="Roundtrip",
repo_url="https://github.com/test/roundtrip",
base_path=tmp_path,
)
# Mutate and save
cell.state = "active"
cell.agents.append("agentA")
save_cell(cell)
loaded = load_cell(tmp_path / "roundtrip-mission")
assert loaded.id == "roundtrip-mission"
assert loaded.state == "active"
assert "agentA" in loaded.agents
assert loaded.repo_url == "https://github.com/test/roundtrip"
def test_cell_state_transitions(tmp_path: Path) -> None:
"""State changes are persisted correctly through save/load."""
cell = create_cell(
mission_id="state-mission",
name="State Test",
repo_url="https://github.com/test/state",
base_path=tmp_path,
)
assert cell.state == "pending"
cell.state = "active"
save_cell(cell)
reloaded = load_cell(tmp_path / "state-mission")
assert reloaded.state == "active"
reloaded.state = "archived"
save_cell(reloaded)
final = load_cell(tmp_path / "state-mission")
assert final.state == "archived"
def test_destroy_cell_marks_destroyed(tmp_path: Path) -> None:
"""destroy_cell sets state to 'destroyed' and persists it."""
cell = create_cell(
mission_id="destroy-mission",
name="Destroy Test",
repo_url="https://github.com/test/destroy",
base_path=tmp_path,
)
destroy_cell(cell, cleanup_dirs=False)
assert cell.state == "destroyed"
reloaded = load_cell(tmp_path / "destroy-mission")
assert reloaded.state == "destroyed"

View File

@@ -1,136 +0,0 @@
"""Tests for lazarus.cli — CLI commands."""
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cli import main
from lazarus.roster import MissionRoster
def _run(argv: list[str], tmp_path: Path, capsys: pytest.CaptureFixture) -> tuple:
"""Helper: run CLI with a tmp registry and capture stdout as JSON.
Prepends --registry before the subcommand so argparse sees it as a global arg.
"""
registry = str(tmp_path / "roster.json")
rc = main(["--registry", registry, *argv])
out = capsys.readouterr().out
return json.loads(out), rc
# Patch setup_level1_isolation globally for all CLI tests to avoid permission issues
@pytest.fixture(autouse=True)
def no_isolation():
with patch("lazarus.cli.setup_level1_isolation"):
yield
def test_invite_creates_mission_cell(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""invite creates a mission cell directory with cell.json."""
base = str(tmp_path / "missions")
result, rc = _run(
[
"invite", "agentAlpha",
"--repo", "https://github.com/test/repo",
"--base-path", base,
],
tmp_path,
capsys,
)
assert rc == 0
mission_id = result["mission_id"]
cell_json = tmp_path / "missions" / mission_id / "cell.json"
assert cell_json.exists()
def test_invite_adds_agent_to_roster(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""After invite, the agent appears in the roster with status 'invited'."""
base = str(tmp_path / "missions")
registry = str(tmp_path / "roster.json")
rc = main([
"--registry", registry,
"invite", "agentBeta",
"--repo", "https://github.com/test/repo",
"--base-path", base,
])
out = capsys.readouterr().out
result = json.loads(out)
assert rc == 0
roster = MissionRoster(Path(registry))
agents = roster.list_agents(result["mission_id"])
agent_names = [a.agent_name for a in agents]
assert "agentBeta" in agent_names
def test_list_shows_missions(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""list command shows missions that were previously created via invite."""
base = str(tmp_path / "missions")
registry = str(tmp_path / "roster.json")
# Create a mission first
main(["--registry", registry, "invite", "agentGamma",
"--repo", "https://github.com/test/repo", "--base-path", base])
capsys.readouterr() # discard invite output
rc = main(["--registry", registry, "list"])
out = capsys.readouterr().out
missions = json.loads(out)
assert rc == 0
assert isinstance(missions, list)
assert len(missions) >= 1
def test_status_shows_agents(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""status command shows agent count for a mission."""
base = str(tmp_path / "missions")
registry = str(tmp_path / "roster.json")
main(["--registry", registry, "invite", "agentDelta",
"--repo", "https://github.com/test/repo", "--base-path", base])
invite_out = json.loads(capsys.readouterr().out)
mission_id = invite_out["mission_id"]
rc = main(["--registry", registry, "status", mission_id])
out = capsys.readouterr().out
status = json.loads(out)
assert rc == 0
assert status["agent_count"] >= 1
agent_names = [a["agent_name"] for a in status["agents"]]
assert "agentDelta" in agent_names
def test_depart_updates_agent_status(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
"""depart command marks an agent's status as 'departed'."""
base = str(tmp_path / "missions")
registry = str(tmp_path / "roster.json")
main(["--registry", registry, "invite", "agentEpsilon",
"--repo", "https://github.com/test/repo", "--base-path", base])
invite_out = json.loads(capsys.readouterr().out)
mission_id = invite_out["mission_id"]
rc = main(["--registry", registry, "depart", "agentEpsilon",
"--mission", mission_id])
out = capsys.readouterr().out
result = json.loads(out)
assert rc == 0
assert result["status"] == "departed"
# Verify persisted
roster = MissionRoster(Path(registry))
agents = roster.list_agents(mission_id)
epsilon = next(a for a in agents if a.agent_name == "agentEpsilon")
assert epsilon.status == "departed"

View File

@@ -1,141 +0,0 @@
"""Tests for lazarus.harness — MissionCellHarness."""
from __future__ import annotations
import asyncio
import json
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cell import create_cell, save_cell
from lazarus.harness import MissionCellHarness
pytestmark = pytest.mark.asyncio
def _make_cell_and_harness(
tmp_path: Path,
agent_name: str = "testAgent",
) -> tuple[object, MissionCellHarness]:
cell = create_cell(
mission_id="harness-test-001",
name="Harness Test",
repo_url="https://github.com/test/repo",
base_path=tmp_path / "cells",
)
harness = MissionCellHarness(
agent_name=agent_name,
cell_path=Path(cell.cell_path),
hermes_ws_url="ws://localhost:8000/ws",
)
return cell, harness
def test_harness_get_env_includes_cell_path(tmp_path: Path) -> None:
"""get_env() returns a dict that includes MISSION_CELL_PATH and AGENT_NAME."""
cell, harness = _make_cell_and_harness(tmp_path)
# Load cell manually so get_env works without start()
from lazarus.cell import load_cell
harness._cell = load_cell(Path(cell.cell_path))
env = harness.get_env()
assert "MISSION_CELL_PATH" in env
assert env["MISSION_CELL_PATH"] == str(Path(cell.cell_path))
assert env["AGENT_NAME"] == "testAgent"
assert "MISSION_ID" in env
assert "HOME" in env
def test_harness_get_env_uses_custom_home(tmp_path: Path) -> None:
"""When home_path is provided, HOME in env equals that path."""
cell, _ = _make_cell_and_harness(tmp_path)
custom_home = str(tmp_path / "wizard_home")
harness = MissionCellHarness(
agent_name="agentX",
cell_path=Path(cell.cell_path),
home_path=custom_home,
)
from lazarus.cell import load_cell
harness._cell = load_cell(Path(cell.cell_path))
env = harness.get_env()
assert env["HOME"] == custom_home
def test_harness_write_mission_log(tmp_path: Path) -> None:
"""write_mission_log creates a log file in the cell's logs/ directory."""
cell, harness = _make_cell_and_harness(tmp_path)
from lazarus.cell import load_cell
harness._cell = load_cell(Path(cell.cell_path))
harness.write_mission_log("Hello from test")
log_file = Path(cell.cell_path) / "logs" / "mission.log"
assert log_file.exists()
content = log_file.read_text()
assert "Hello from test" in content
assert "testAgent" in content
async def test_harness_start_emits_telemetry(tmp_path: Path) -> None:
"""start() emits an agent_joined mission_event via WebSocket."""
cell, harness = _make_cell_and_harness(tmp_path)
mock_ws = AsyncMock()
mock_ws.send = AsyncMock()
mock_ws.close = AsyncMock()
sent_messages = []
async def capture_send(payload: str) -> None:
sent_messages.append(json.loads(payload))
mock_ws.send.side_effect = capture_send
with patch("lazarus.isolation.setup_level1_isolation"):
with patch("lazarus.harness.setup_level1_isolation"):
with patch("lazarus.harness.websockets") as mock_websockets:
mock_websockets.connect = AsyncMock(return_value=mock_ws)
harness._ws_connected = False
await harness.start()
assert any(
m.get("event") == "agent_joined" and m.get("type") == "mission_event"
for m in sent_messages
), f"Expected agent_joined event; got: {sent_messages}"
async def test_harness_stop_emits_departure_telemetry(tmp_path: Path) -> None:
"""stop() emits an agent_departed mission_event via WebSocket."""
cell, harness = _make_cell_and_harness(tmp_path)
from lazarus.cell import load_cell
harness._cell = load_cell(Path(cell.cell_path))
mock_ws = AsyncMock()
mock_ws.close = AsyncMock()
sent_messages = []
async def capture_send(payload: str) -> None:
sent_messages.append(json.loads(payload))
mock_ws.send = AsyncMock(side_effect=capture_send)
harness._ws = mock_ws
harness._ws_connected = True
await harness.stop()
assert any(
m.get("event") == "agent_departed"
for m in sent_messages
), f"Expected agent_departed event; got: {sent_messages}"

View File

@@ -1,107 +0,0 @@
"""Tests for lazarus.isolation — Level 1 isolation setup."""
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cell import create_cell
from lazarus.isolation import (
IsolationManifest,
load_apparmor_profile,
setup_level1_isolation,
write_apparmor_profile,
)
def _make_cell(tmp_path: Path, home_path: str = "") -> object:
return create_cell(
mission_id="iso-test-001",
name="Isolation Test",
repo_url="https://github.com/test/repo",
base_path=tmp_path / "cells",
home_path=home_path,
)
def test_setup_isolation_creates_workspace(tmp_path: Path) -> None:
"""setup_level1_isolation creates workspace and logs dirs."""
cell = _make_cell(tmp_path)
manifest = setup_level1_isolation(cell)
assert manifest.workspace_path.is_dir()
assert manifest.logs_path.is_dir()
assert isinstance(manifest, IsolationManifest)
assert manifest.cell_id == cell.id
def test_write_apparmor_profile_creates_file(tmp_path: Path) -> None:
"""write_apparmor_profile writes a file at the given output path."""
cell = _make_cell(tmp_path, home_path=str(tmp_path / "home" / "agentX"))
output_path = tmp_path / "test.apparmor"
result_path = write_apparmor_profile(cell, output_path)
assert result_path == output_path
assert output_path.exists()
assert output_path.stat().st_size > 0
def test_apparmor_profile_content(tmp_path: Path) -> None:
"""Profile file contains correct cell ID, workspace, logs, and home paths."""
home = str(tmp_path / "home" / "agentX")
cell = _make_cell(tmp_path, home_path=home)
output_path = tmp_path / "test.apparmor"
write_apparmor_profile(cell, output_path)
content = output_path.read_text()
assert cell.id in content
assert "nexus-mission-" in content
assert "workspace" in content
assert "logs" in content
assert home in content
# Deny write to home
assert "deny" in content
def test_load_apparmor_profile_noop_on_no_apparmor(tmp_path: Path) -> None:
"""load_apparmor_profile returns False gracefully when apparmor_parser is absent."""
profile_path = tmp_path / "dummy.apparmor"
profile_path.write_text("# fake profile\n")
# Simulate apparmor_parser not found (FileNotFoundError from subprocess.run)
with patch("lazarus.isolation.platform.system", return_value="Linux"):
with patch(
"lazarus.isolation.subprocess.run",
side_effect=FileNotFoundError("apparmor_parser not found"),
):
result = load_apparmor_profile(profile_path)
assert result is False
def test_load_apparmor_profile_noop_on_non_linux(tmp_path: Path) -> None:
"""load_apparmor_profile returns False on non-Linux platforms."""
profile_path = tmp_path / "dummy.apparmor"
profile_path.write_text("# fake profile\n")
with patch("lazarus.isolation.platform.system", return_value="Darwin"):
result = load_apparmor_profile(profile_path)
assert result is False
def test_setup_isolation_writes_profile_when_home_path_set(tmp_path: Path) -> None:
"""When home_path is set, setup_level1_isolation writes an AppArmor profile."""
home = str(tmp_path / "home" / "agentZ")
cell = _make_cell(tmp_path, home_path=home)
manifest = setup_level1_isolation(cell)
assert manifest.apparmor_profile_path is not None
assert manifest.apparmor_profile_path.exists()

262
tests/test_lazarus_pit.py Normal file
View File

@@ -0,0 +1,262 @@
"""Tests for the lazarus-pit daemon skeleton.
Validates:
- Config loading (defaults and TOML overrides)
- Cell discovery from /var/missions structure
- Agent heartbeat reading
- Health poll logic (stale vs fresh)
- AgentHealth and CellState dataclasses
- CLI argument parsing
- Resurrection stub behaviour
"""
import importlib.util
import json
import sys
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# ── Load lazarus_pit module directly ────────────────────────────────────────
PROJECT_ROOT = Path(__file__).parent.parent
_lp_spec = importlib.util.spec_from_file_location(
"lazarus_pit_test",
PROJECT_ROOT / "bin" / "lazarus_pit.py",
)
_lp = importlib.util.module_from_spec(_lp_spec)
sys.modules["lazarus_pit_test"] = _lp
_lp_spec.loader.exec_module(_lp)
AgentHealth = _lp.AgentHealth
CellState = _lp.CellState
DaemonConfig = _lp.DaemonConfig
load_config = _lp.load_config
read_agent_heartbeat = _lp.read_agent_heartbeat
discover_cells = _lp.discover_cells
poll_cell = _lp.poll_cell
attempt_revive = _lp.attempt_revive
build_arg_parser = _lp.build_arg_parser
# ── Helpers ──────────────────────────────────────────────────────────────────
def _make_cell(tmp_path: Path, uuid: str, status: str = "active", agents=None) -> Path:
"""Create a minimal mission cell structure under tmp_path."""
cell_path = tmp_path / uuid
cell_path.mkdir()
manifest = {
"uuid": uuid,
"status": status,
"mission": "test mission",
"agents": [{"name": a} for a in (agents or ["claude"])],
}
(cell_path / "cell.json").write_text(json.dumps(manifest))
return cell_path
def _write_heartbeat(cell_path: Path, agent: str, age_seconds: float = 0) -> None:
"""Write a heartbeat file for an agent inside the cell."""
hb_dir = cell_path / ".lazarus" / "heartbeats"
hb_dir.mkdir(parents=True, exist_ok=True)
(hb_dir / f"{agent}.json").write_text(json.dumps({
"agent": agent,
"pid": 12345,
"timestamp": time.time() - age_seconds,
"cycle": 1,
"model": "test",
"status": "thinking",
}))
# ── Config ───────────────────────────────────────────────────────────────────
class TestDaemonConfig:
def test_defaults(self):
cfg = DaemonConfig()
assert cfg.poll_interval == 15
assert cfg.heartbeat_stale_threshold == 60
assert cfg.max_revive_attempts == 3
def test_load_missing_file_returns_defaults(self, tmp_path):
cfg = load_config(tmp_path / "nonexistent.toml")
assert cfg.poll_interval == 15
def test_load_valid_toml(self, tmp_path):
toml_content = b"""
[cells]
root = "/tmp/missions"
poll_interval = 30
heartbeat_stale_threshold = 90
"""
toml_path = tmp_path / "test.toml"
toml_path.write_bytes(toml_content)
try:
cfg = load_config(toml_path)
assert cfg.poll_interval == 30
assert cfg.heartbeat_stale_threshold == 90
assert str(cfg.cells_root) == "/tmp/missions"
except Exception:
# TOML parser not available — skip, not a test failure
pytest.skip("TOML parser not available")
# ── Cell discovery ───────────────────────────────────────────────────────────
class TestDiscoverCells:
def test_empty_root(self, tmp_path):
cells = discover_cells(tmp_path)
assert cells == []
def test_missing_root(self, tmp_path):
cells = discover_cells(tmp_path / "nonexistent")
assert cells == []
def test_finds_active_cell(self, tmp_path):
_make_cell(tmp_path, "aaa-111", status="active")
cells = discover_cells(tmp_path)
assert len(cells) == 1
assert cells[0].uuid == "aaa-111"
def test_finds_frozen_cell(self, tmp_path):
_make_cell(tmp_path, "bbb-222", status="frozen")
cells = discover_cells(tmp_path)
assert len(cells) == 1
def test_ignores_archived_cell(self, tmp_path):
_make_cell(tmp_path, "ccc-333", status="archived")
cells = discover_cells(tmp_path)
assert len(cells) == 0
def test_ignores_directories_without_manifest(self, tmp_path):
(tmp_path / "not-a-cell").mkdir()
cells = discover_cells(tmp_path)
assert cells == []
def test_ignores_corrupt_manifest(self, tmp_path):
cell_path = tmp_path / "bad-cell"
cell_path.mkdir()
(cell_path / "cell.json").write_text("not json {{")
cells = discover_cells(tmp_path)
assert cells == []
def test_agents_populated_from_manifest(self, tmp_path):
_make_cell(tmp_path, "ddd-444", agents=["claude", "allegro"])
cells = discover_cells(tmp_path)
assert set(cells[0].agents.keys()) == {"claude", "allegro"}
# ── Heartbeat reading ─────────────────────────────────────────────────────────
class TestReadAgentHeartbeat:
def test_returns_none_for_missing_file(self, tmp_path):
result = read_agent_heartbeat(tmp_path, "ghost")
assert result is None
def test_returns_data_for_valid_heartbeat(self, tmp_path):
_write_heartbeat(tmp_path, "claude", age_seconds=0)
data = read_agent_heartbeat(tmp_path, "claude")
assert data is not None
assert data["agent"] == "claude"
assert "timestamp" in data
def test_returns_none_for_corrupt_json(self, tmp_path):
hb_dir = tmp_path / ".lazarus" / "heartbeats"
hb_dir.mkdir(parents=True)
(hb_dir / "bad.json").write_text("{not valid")
result = read_agent_heartbeat(tmp_path, "bad")
assert result is None
# ── Health polling ────────────────────────────────────────────────────────────
class TestPollCell:
def _make_cell_state(self, tmp_path, agents=None):
agents = agents or ["claude"]
cell = CellState(uuid="test-uuid", path=tmp_path)
for a in agents:
cell.agents[a] = AgentHealth(name=a)
return cell
def test_healthy_agent_no_warnings(self, tmp_path):
cell = self._make_cell_state(tmp_path)
_write_heartbeat(tmp_path, "claude", age_seconds=5)
cfg = DaemonConfig(heartbeat_stale_threshold=60)
warnings = poll_cell(cell, cfg)
assert warnings == []
assert cell.agents["claude"].healthy is True
def test_stale_heartbeat_generates_warning(self, tmp_path):
cell = self._make_cell_state(tmp_path)
_write_heartbeat(tmp_path, "claude", age_seconds=120)
cfg = DaemonConfig(heartbeat_stale_threshold=60)
warnings = poll_cell(cell, cfg)
assert len(warnings) == 1
assert "stale" in warnings[0]
assert cell.agents["claude"].healthy is False
def test_missing_heartbeat_generates_warning(self, tmp_path):
cell = self._make_cell_state(tmp_path)
cfg = DaemonConfig(heartbeat_stale_threshold=60)
warnings = poll_cell(cell, cfg)
assert len(warnings) == 1
assert "no heartbeat" in warnings[0]
assert cell.agents["claude"].healthy is False
# ── Resurrection stub ─────────────────────────────────────────────────────────
class TestAttemptRevive:
def test_stub_returns_false(self, tmp_path):
cell = CellState(uuid="xyz", path=tmp_path)
cell.agents["claude"] = AgentHealth(name="claude", healthy=False)
cfg = DaemonConfig(max_revive_attempts=3)
result = attempt_revive(cell, "claude", cfg)
assert result is False
def test_increments_revive_count(self, tmp_path):
cell = CellState(uuid="xyz", path=tmp_path)
cell.agents["claude"] = AgentHealth(name="claude", healthy=False)
cfg = DaemonConfig(max_revive_attempts=3)
attempt_revive(cell, "claude", cfg)
assert cell.agents["claude"].revive_count == 1
def test_stops_at_max_revive_attempts(self, tmp_path):
cell = CellState(uuid="xyz", path=tmp_path)
cell.agents["claude"] = AgentHealth(name="claude", healthy=False, revive_count=3)
cfg = DaemonConfig(max_revive_attempts=3)
result = attempt_revive(cell, "claude", cfg)
assert result is False
# Count should not increment beyond max
assert cell.agents["claude"].revive_count == 3
def test_unknown_agent_returns_false(self, tmp_path):
cell = CellState(uuid="xyz", path=tmp_path)
cfg = DaemonConfig()
result = attempt_revive(cell, "nobody", cfg)
assert result is False
# ── CLI ───────────────────────────────────────────────────────────────────────
class TestCLI:
def test_default_config_path(self):
parser = build_arg_parser()
args = parser.parse_args([])
assert "lazarus-pit.toml" in args.config
def test_custom_config(self):
parser = build_arg_parser()
args = parser.parse_args(["--config", "/tmp/custom.toml"])
assert args.config == "/tmp/custom.toml"
def test_status_flag(self):
parser = build_arg_parser()
args = parser.parse_args(["--status"])
assert args.status is True
def test_list_cells_flag(self):
parser = build_arg_parser()
args = parser.parse_args(["--list-cells"])
assert args.list_cells is True

View File

@@ -1,122 +0,0 @@
"""Tests for lazarus.roster — MissionRoster registry."""
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from lazarus.cell import create_cell
from lazarus.roster import MissionRoster
def _make_roster(tmp_path: Path) -> MissionRoster:
return MissionRoster(registry_path=tmp_path / "roster.json")
def _make_cell(tmp_path: Path, mission_id: str = "m-001") -> object:
return create_cell(
mission_id=mission_id,
name="Test Mission",
repo_url="https://github.com/test/repo",
base_path=tmp_path / "cells",
)
def test_add_mission_and_invite(tmp_path: Path) -> None:
"""Can add a mission and invite an agent to it."""
roster = _make_roster(tmp_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
entry = roster.invite_agent(cell.id, "agentA", role="contributor")
assert entry.agent_name == "agentA"
assert entry.status == "invited"
assert entry.role == "contributor"
assert entry.accepted_at is None
def test_accept_invitation(tmp_path: Path) -> None:
"""Accepting an invitation changes status from 'invited' to 'active'."""
roster = _make_roster(tmp_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
roster.invite_agent(cell.id, "agentB")
entry = roster.accept_invitation(cell.id, "agentB")
assert entry.status == "active"
assert entry.accepted_at is not None
def test_depart_agent(tmp_path: Path) -> None:
"""Departing changes status to 'departed'."""
roster = _make_roster(tmp_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
roster.invite_agent(cell.id, "agentC")
roster.accept_invitation(cell.id, "agentC")
entry = roster.depart_agent(cell.id, "agentC")
assert entry.status == "departed"
def test_list_missions_filtered_by_state(tmp_path: Path) -> None:
"""list_missions(state=...) filters correctly."""
roster = _make_roster(tmp_path)
cell1 = _make_cell(tmp_path, "m-001")
cell2 = _make_cell(tmp_path, "m-002")
roster.add_mission(cell1)
roster.add_mission(cell2)
# Both are 'pending' by default
pending = roster.list_missions(state="pending")
assert len(pending) == 2
# No active missions yet
active = roster.list_missions(state="active")
assert len(active) == 0
def test_roster_persistence(tmp_path: Path) -> None:
"""Save and reload the roster file; data survives the round-trip."""
registry_path = tmp_path / "roster.json"
roster = MissionRoster(registry_path=registry_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
roster.invite_agent(cell.id, "agentD", role="lead")
roster.save()
# Load fresh from disk
roster2 = MissionRoster(registry_path=registry_path)
mission = roster2.get_mission(cell.id)
assert mission is not None
assert mission["id"] == cell.id
agents = roster2.list_agents(cell.id)
assert len(agents) == 1
assert agents[0].agent_name == "agentD"
assert agents[0].role == "lead"
assert agents[0].status == "invited"
def test_list_agents(tmp_path: Path) -> None:
"""list_agents returns all agents for a given mission."""
roster = _make_roster(tmp_path)
cell = _make_cell(tmp_path)
roster.add_mission(cell)
roster.invite_agent(cell.id, "alice")
roster.invite_agent(cell.id, "bob")
agents = roster.list_agents(cell.id)
names = {a.agent_name for a in agents}
assert names == {"alice", "bob"}