[M2] Allegro Commit-or-Abort — cycle guard with 10-minute slice rule (Epic #842) #277

Merged
allegro merged 2 commits from allegro/m2-commit-or-abort-845 into main 2026-04-06 16:58:39 +00:00
12 changed files with 1267 additions and 0 deletions

256
allegro/cycle_guard.py Normal file
View File

@@ -0,0 +1,256 @@
#!/usr/bin/env python3
"""Allegro Cycle Guard — Commit-or-Abort discipline for M2, Epic #842.
Every cycle produces a durable artifact or documented abort.
10-minute slice rule with automatic timeout detection.
Cycle-state file provides crash-recovery resume points.
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
DEFAULT_STATE = Path("/root/.hermes/allegro-cycle-state.json")
STATE_PATH = Path(os.environ.get("ALLEGRO_CYCLE_STATE", DEFAULT_STATE))
# Crash-recovery threshold: if a cycle has been in_progress for longer than
# this many minutes, resume_or_abort() will auto-abort it.
CRASH_RECOVERY_MINUTES = 30
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def load_state(path: Path | str | None = None) -> dict:
p = Path(path) if path else Path(STATE_PATH)
if not p.exists():
return _empty_state()
try:
with open(p, "r") as f:
return json.load(f)
except Exception:
return _empty_state()
def save_state(state: dict, path: Path | str | None = None) -> None:
p = Path(path) if path else Path(STATE_PATH)
p.parent.mkdir(parents=True, exist_ok=True)
state["last_updated"] = _now_iso()
with open(p, "w") as f:
json.dump(state, f, indent=2)
def _empty_state() -> dict:
return {
"cycle_id": None,
"status": "complete",
"target": None,
"details": None,
"slices": [],
"started_at": None,
"completed_at": None,
"aborted_at": None,
"abort_reason": None,
"proof": None,
"version": 1,
"last_updated": _now_iso(),
}
def start_cycle(target: str, details: str = "", path: Path | str | None = None) -> dict:
"""Begin a new cycle, discarding any prior in-progress state."""
state = {
"cycle_id": _now_iso(),
"status": "in_progress",
"target": target,
"details": details,
"slices": [],
"started_at": _now_iso(),
"completed_at": None,
"aborted_at": None,
"abort_reason": None,
"proof": None,
"version": 1,
"last_updated": _now_iso(),
}
save_state(state, path)
return state
def start_slice(name: str, path: Path | str | None = None) -> dict:
"""Start a new work slice inside the current cycle."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot start a slice unless a cycle is in_progress.")
state["slices"].append(
{
"name": name,
"started_at": _now_iso(),
"ended_at": None,
"status": "in_progress",
"artifact": None,
}
)
save_state(state, path)
return state
def end_slice(status: str = "complete", artifact: str | None = None, path: Path | str | None = None) -> dict:
"""Close the current work slice."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot end a slice unless a cycle is in_progress.")
if not state["slices"]:
raise RuntimeError("No active slice to end.")
current = state["slices"][-1]
current["ended_at"] = _now_iso()
current["status"] = status
if artifact is not None:
current["artifact"] = artifact
save_state(state, path)
return state
def _parse_dt(iso_str: str) -> datetime:
return datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
def slice_duration_minutes(path: Path | str | None = None) -> float | None:
"""Return the age of the current slice in minutes, or None if no slice."""
state = load_state(path)
if not state["slices"]:
return None
current = state["slices"][-1]
if current.get("ended_at"):
return None
started = _parse_dt(current["started_at"])
return (datetime.now(timezone.utc) - started).total_seconds() / 60.0
def check_slice_timeout(max_minutes: float = 10.0, path: Path | str | None = None) -> bool:
"""Return True if the current slice has exceeded max_minutes."""
duration = slice_duration_minutes(path)
if duration is None:
return False
return duration > max_minutes
def commit_cycle(proof: dict | None = None, path: Path | str | None = None) -> dict:
"""Mark the cycle as successfully completed with optional proof payload."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot commit a cycle that is not in_progress.")
state["status"] = "complete"
state["completed_at"] = _now_iso()
if proof is not None:
state["proof"] = proof
save_state(state, path)
return state
def abort_cycle(reason: str, path: Path | str | None = None) -> dict:
"""Mark the cycle as aborted, recording the reason."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot abort a cycle that is not in_progress.")
state["status"] = "aborted"
state["aborted_at"] = _now_iso()
state["abort_reason"] = reason
# Close any open slice as aborted
if state["slices"] and not state["slices"][-1].get("ended_at"):
state["slices"][-1]["ended_at"] = _now_iso()
state["slices"][-1]["status"] = "aborted"
save_state(state, path)
return state
def resume_or_abort(path: Path | str | None = None) -> dict:
"""Crash-recovery gate: auto-abort stale in-progress cycles."""
state = load_state(path)
if state.get("status") != "in_progress":
return state
started = state.get("started_at")
if started:
started_dt = _parse_dt(started)
age_minutes = (datetime.now(timezone.utc) - started_dt).total_seconds() / 60.0
if age_minutes > CRASH_RECOVERY_MINUTES:
return abort_cycle(
f"crash recovery — stale cycle detected ({int(age_minutes)}m old)",
path,
)
# Also abort if the current slice has been running too long
if check_slice_timeout(max_minutes=CRASH_RECOVERY_MINUTES, path=path):
return abort_cycle(
"crash recovery — stale slice detected",
path,
)
return state
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Allegro Cycle Guard")
sub = parser.add_subparsers(dest="cmd")
p_resume = sub.add_parser("resume", help="Resume or abort stale cycle")
p_start = sub.add_parser("start", help="Start a new cycle")
p_start.add_argument("target")
p_start.add_argument("--details", default="")
p_slice = sub.add_parser("slice", help="Start a named slice")
p_slice.add_argument("name")
p_end = sub.add_parser("end", help="End current slice")
p_end.add_argument("--status", default="complete")
p_end.add_argument("--artifact", default=None)
p_commit = sub.add_parser("commit", help="Commit the current cycle")
p_commit.add_argument("--proof", default="{}")
p_abort = sub.add_parser("abort", help="Abort the current cycle")
p_abort.add_argument("reason")
p_check = sub.add_parser("check", help="Check slice timeout")
args = parser.parse_args(argv)
if args.cmd == "resume":
state = resume_or_abort()
print(state["status"])
return 0
elif args.cmd == "start":
state = start_cycle(args.target, args.details)
print(f"Cycle started: {state['cycle_id']}")
return 0
elif args.cmd == "slice":
state = start_slice(args.name)
print(f"Slice started: {args.name}")
return 0
elif args.cmd == "end":
artifact = args.artifact
state = end_slice(args.status, artifact)
print("Slice ended")
return 0
elif args.cmd == "commit":
proof = json.loads(args.proof)
state = commit_cycle(proof)
print(f"Cycle committed: {state['cycle_id']}")
return 0
elif args.cmd == "abort":
state = abort_cycle(args.reason)
print(f"Cycle aborted: {args.reason}")
return 0
elif args.cmd == "check":
timed_out = check_slice_timeout()
print("TIMEOUT" if timed_out else "OK")
return 1 if timed_out else 0
else:
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,143 @@
"""100% compliance test for Allegro Commit-or-Abort (M2, Epic #842)."""
import json
import os
import sys
import tempfile
import time
import unittest
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import cycle_guard as cg
class TestCycleGuard(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.state_path = os.path.join(self.tmpdir.name, "cycle_state.json")
cg.STATE_PATH = self.state_path
def tearDown(self):
self.tmpdir.cleanup()
cg.STATE_PATH = cg.DEFAULT_STATE
def test_load_empty_state(self):
state = cg.load_state(self.state_path)
self.assertEqual(state["status"], "complete")
self.assertIsNone(state["cycle_id"])
def test_start_cycle(self):
state = cg.start_cycle("M2: Commit-or-Abort", path=self.state_path)
self.assertEqual(state["status"], "in_progress")
self.assertEqual(state["target"], "M2: Commit-or-Abort")
self.assertIsNotNone(state["cycle_id"])
def test_start_slice_requires_in_progress(self):
with self.assertRaises(RuntimeError):
cg.start_slice("test", path=self.state_path)
def test_slice_lifecycle(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("gather", path=self.state_path)
state = cg.load_state(self.state_path)
self.assertEqual(len(state["slices"]), 1)
self.assertEqual(state["slices"][0]["name"], "gather")
self.assertEqual(state["slices"][0]["status"], "in_progress")
cg.end_slice(status="complete", artifact="artifact.txt", path=self.state_path)
state = cg.load_state(self.state_path)
self.assertEqual(state["slices"][0]["status"], "complete")
self.assertEqual(state["slices"][0]["artifact"], "artifact.txt")
self.assertIsNotNone(state["slices"][0]["ended_at"])
def test_commit_cycle(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
cg.end_slice(path=self.state_path)
proof = {"files": ["a.py"]}
state = cg.commit_cycle(proof=proof, path=self.state_path)
self.assertEqual(state["status"], "complete")
self.assertEqual(state["proof"], proof)
self.assertIsNotNone(state["completed_at"])
def test_commit_without_in_progress_fails(self):
with self.assertRaises(RuntimeError):
cg.commit_cycle(path=self.state_path)
def test_abort_cycle(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
state = cg.abort_cycle("manual abort", path=self.state_path)
self.assertEqual(state["status"], "aborted")
self.assertEqual(state["abort_reason"], "manual abort")
self.assertIsNotNone(state["aborted_at"])
self.assertEqual(state["slices"][-1]["status"], "aborted")
def test_slice_timeout_true(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
# Manually backdate slice start to 11 minutes ago
state = cg.load_state(self.state_path)
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat()
state["slices"][0]["started_at"] = old
cg.save_state(state, self.state_path)
self.assertTrue(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
def test_slice_timeout_false(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
self.assertFalse(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
def test_resume_or_abort_keeps_fresh_cycle(self):
cg.start_cycle("test", path=self.state_path)
state = cg.resume_or_abort(path=self.state_path)
self.assertEqual(state["status"], "in_progress")
def test_resume_or_abort_aborts_stale_cycle(self):
cg.start_cycle("test", path=self.state_path)
# Backdate start to 31 minutes ago
state = cg.load_state(self.state_path)
old = (datetime.now(timezone.utc) - timedelta(minutes=31)).isoformat()
state["started_at"] = old
cg.save_state(state, self.state_path)
state = cg.resume_or_abort(path=self.state_path)
self.assertEqual(state["status"], "aborted")
self.assertIn("crash recovery", state["abort_reason"])
def test_slice_duration_minutes(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
# Backdate by 5 minutes
state = cg.load_state(self.state_path)
old = (datetime.now(timezone.utc) - timedelta(minutes=5)).isoformat()
state["slices"][0]["started_at"] = old
cg.save_state(state, self.state_path)
mins = cg.slice_duration_minutes(path=self.state_path)
self.assertAlmostEqual(mins, 5.0, delta=0.5)
def test_cli_resume_prints_status(self):
cg.start_cycle("test", path=self.state_path)
rc = cg.main(["resume"])
self.assertEqual(rc, 0)
def test_cli_check_timeout(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
state = cg.load_state(self.state_path)
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat()
state["slices"][0]["started_at"] = old
cg.save_state(state, self.state_path)
rc = cg.main(["check"])
self.assertEqual(rc, 1)
def test_cli_check_ok(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
rc = cg.main(["check"])
self.assertEqual(rc, 0)
if __name__ == "__main__":
unittest.main()

55
lazarus/README.md Normal file
View File

@@ -0,0 +1,55 @@
# Lazarus Pit v2.0 — Cells, Invites, and Teaming
## Quick Start
```bash
# Summon a single agent
python3 -m lazarus.cli summon allegro Timmy_Foundation/timmy-config#262
# Form a team
python3 -m lazarus.cli team allegro+ezra Timmy_Foundation/timmy-config#262
# Invite a guest bot
python3 -m lazarus.cli invite <cell_id> bot:kimi observer
# Check status
python3 -m lazarus.cli status
# Close / destroy
python3 -m lazarus.cli close <cell_id>
python3 -m lazarus.cli destroy <cell_id>
```
## Architecture
```
lazarus/
├── cell.py # Cell model, registry, filesystem packager
├── operator_ctl.py # summon, invite, team, status, close, destroy
├── cli.py # CLI entrypoint
├── backends/
│ ├── base.py # Backend abstraction
│ └── process_backend.py
└── tests/
└── test_cell.py # Isolation + lifecycle tests
```
## Design Invariants
1. **One cell, one project target**
2. **Cells cannot read each other's `HERMES_HOME` by default**
3. **Guests join with least privilege**
4. **Destroyed cells are scrubbed from disk**
5. **All cell state is persisted in SQLite registry**
## Cell Lifecycle
```
proposed -> active -> idle -> closing -> archived -> destroyed
```
## Roles
- `executor` — read/write code, run tools, push commits
- `observer` — read-only access
- `director` — can close cells and publish back to Gitea

5
lazarus/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Lazarus Pit v2.0 — Multi-Agent Resurrection & Teaming Arena."""
from .cell import Cell, CellState, CellRole
from .operator_ctl import OperatorCtl
__all__ = ["Cell", "CellState", "CellRole", "OperatorCtl"]

View File

@@ -0,0 +1,5 @@
"""Cell execution backends for Lazarus Pit."""
from .base import Backend, BackendResult
from .process_backend import ProcessBackend
__all__ = ["Backend", "BackendResult", "ProcessBackend"]

50
lazarus/backends/base.py Normal file
View File

@@ -0,0 +1,50 @@
"""
Backend abstraction for Lazarus Pit cells.
Every backend must implement the same contract so that cells,
teams, and operators do not need to know how a cell is actually running.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass
class BackendResult:
success: bool
stdout: str = ""
stderr: str = ""
pid: Optional[int] = None
message: str = ""
class Backend(ABC):
"""Abstract cell backend."""
name: str = "abstract"
@abstractmethod
def spawn(self, cell_id: str, hermes_home: str, command: list, env: Optional[dict] = None) -> BackendResult:
"""Spawn the cell process/environment."""
...
@abstractmethod
def probe(self, cell_id: str) -> BackendResult:
"""Check if the cell is alive and responsive."""
...
@abstractmethod
def logs(self, cell_id: str, tail: int = 50) -> BackendResult:
"""Return recent logs from the cell."""
...
@abstractmethod
def close(self, cell_id: str) -> BackendResult:
"""Gracefully close the cell."""
...
@abstractmethod
def destroy(self, cell_id: str) -> BackendResult:
"""Forcefully destroy the cell and all runtime state."""
...

View File

@@ -0,0 +1,97 @@
"""
Process backend for Lazarus Pit.
Fastest spawn/teardown. Isolation comes from unique HERMES_HOME
and independent Python process. No containers required.
"""
import os
import signal
import subprocess
from pathlib import Path
from typing import Optional
from .base import Backend, BackendResult
class ProcessBackend(Backend):
name = "process"
def __init__(self, log_dir: Optional[str] = None):
self.log_dir = Path(log_dir or "/tmp/lazarus-logs")
self.log_dir.mkdir(parents=True, exist_ok=True)
self._pids: dict = {}
def _log_path(self, cell_id: str) -> Path:
return self.log_dir / f"{cell_id}.log"
def spawn(self, cell_id: str, hermes_home: str, command: list, env: Optional[dict] = None) -> BackendResult:
log_path = self._log_path(cell_id)
run_env = dict(os.environ)
run_env["HERMES_HOME"] = hermes_home
if env:
run_env.update(env)
try:
with open(log_path, "a") as log_file:
proc = subprocess.Popen(
command,
env=run_env,
stdout=log_file,
stderr=subprocess.STDOUT,
cwd=hermes_home,
)
self._pids[cell_id] = proc.pid
return BackendResult(
success=True,
pid=proc.pid,
message=f"Spawned {cell_id} with pid {proc.pid}",
)
except Exception as e:
return BackendResult(success=False, stderr=str(e), message=f"Spawn failed: {e}")
def probe(self, cell_id: str) -> BackendResult:
pid = self._pids.get(cell_id)
if not pid:
return BackendResult(success=False, message="No known pid for cell")
try:
os.kill(pid, 0)
return BackendResult(success=True, pid=pid, message="Process is alive")
except OSError:
return BackendResult(success=False, pid=pid, message="Process is dead")
def logs(self, cell_id: str, tail: int = 50) -> BackendResult:
log_path = self._log_path(cell_id)
if not log_path.exists():
return BackendResult(success=True, stdout="", message="No logs yet")
try:
lines = log_path.read_text().splitlines()
return BackendResult(
success=True,
stdout="\n".join(lines[-tail:]),
message=f"Returned last {tail} lines",
)
except Exception as e:
return BackendResult(success=False, stderr=str(e), message=f"Log read failed: {e}")
def close(self, cell_id: str) -> BackendResult:
pid = self._pids.get(cell_id)
if not pid:
return BackendResult(success=False, message="No known pid for cell")
try:
os.kill(pid, signal.SIGTERM)
return BackendResult(success=True, pid=pid, message="Sent SIGTERM")
except OSError as e:
return BackendResult(success=False, stderr=str(e), message=f"Close failed: {e}")
def destroy(self, cell_id: str) -> BackendResult:
res = self.close(cell_id)
log_path = self._log_path(cell_id)
if log_path.exists():
log_path.unlink()
self._pids.pop(cell_id, None)
return BackendResult(
success=res.success,
pid=res.pid,
message=f"Destroyed {cell_id}",
)

240
lazarus/cell.py Normal file
View File

@@ -0,0 +1,240 @@
"""
Cell model for Lazarus Pit v2.0.
A cell is a bounded execution/living space for one or more participants
on a single project target. Cells are isolated by filesystem, credentials,
and optionally process/container boundaries.
"""
import json
import shutil
import uuid
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum, auto
from pathlib import Path
from typing import Dict, List, Optional, Set
class CellState(Enum):
PROPOSED = "proposed"
ACTIVE = "active"
IDLE = "idle"
CLOSING = "closing"
ARCHIVED = "archived"
DESTROYED = "destroyed"
class CellRole(Enum):
EXECUTOR = "executor" # Can read/write code, run tools, push commits
OBSERVER = "observer" # Read-only cell access
DIRECTOR = "director" # Can assign, close, publish back to Gitea
@dataclass
class CellMember:
identity: str # e.g. "agent:allegro", "bot:kimi", "human:Alexander"
role: CellRole
joined_at: str
@dataclass
class Cell:
cell_id: str
project: str # e.g. "Timmy_Foundation/timmy-config#262"
owner: str # e.g. "human:Alexander"
backend: str # "process", "venv", "docker", "remote"
state: CellState
created_at: str
ttl_minutes: int
members: List[CellMember] = field(default_factory=list)
hermes_home: Optional[str] = None
workspace_path: Optional[str] = None
shared_notes_path: Optional[str] = None
shared_artifacts_path: Optional[str] = None
decisions_path: Optional[str] = None
archived_at: Optional[str] = None
destroyed_at: Optional[str] = None
def is_expired(self) -> bool:
if self.state in (CellState.CLOSING, CellState.ARCHIVED, CellState.DESTROYED):
return False
created = datetime.fromisoformat(self.created_at.replace("Z", "+00:00"))
return datetime.now().astimezone() > created + timedelta(minutes=self.ttl_minutes)
def to_dict(self) -> dict:
d = asdict(self)
d["state"] = self.state.value
d["members"] = [
{**asdict(m), "role": m.role.value} for m in self.members
]
return d
@classmethod
def from_dict(cls, d: dict) -> "Cell":
d = dict(d)
d["state"] = CellState(d["state"])
d["members"] = [
CellMember(
identity=m["identity"],
role=CellRole(m["role"]),
joined_at=m["joined_at"],
)
for m in d.get("members", [])
]
return cls(**d)
class CellRegistry:
"""SQLite-backed registry of all cells."""
def __init__(self, db_path: Optional[str] = None):
self.db_path = Path(db_path or "/root/.lazarus_registry.sqlite")
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _init_db(self):
import sqlite3
with sqlite3.connect(str(self.db_path)) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS cells (
cell_id TEXT PRIMARY KEY,
project TEXT NOT NULL,
owner TEXT NOT NULL,
backend TEXT NOT NULL,
state TEXT NOT NULL,
created_at TEXT NOT NULL,
ttl_minutes INTEGER NOT NULL,
members TEXT,
hermes_home TEXT,
workspace_path TEXT,
shared_notes_path TEXT,
shared_artifacts_path TEXT,
decisions_path TEXT,
archived_at TEXT,
destroyed_at TEXT
)
"""
)
def save(self, cell: Cell):
import sqlite3
with sqlite3.connect(str(self.db_path)) as conn:
conn.execute(
"""
INSERT OR REPLACE INTO cells (
cell_id, project, owner, backend, state, created_at, ttl_minutes,
members, hermes_home, workspace_path, shared_notes_path,
shared_artifacts_path, decisions_path, archived_at, destroyed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
cell.cell_id,
cell.project,
cell.owner,
cell.backend,
cell.state.value,
cell.created_at,
cell.ttl_minutes,
json.dumps(cell.to_dict()["members"]),
cell.hermes_home,
cell.workspace_path,
cell.shared_notes_path,
cell.shared_artifacts_path,
cell.decisions_path,
cell.archived_at,
cell.destroyed_at,
),
)
def load(self, cell_id: str) -> Optional[Cell]:
import sqlite3
with sqlite3.connect(str(self.db_path)) as conn:
row = conn.execute(
"SELECT * FROM cells WHERE cell_id = ?", (cell_id,)
).fetchone()
if not row:
return None
keys = [
"cell_id", "project", "owner", "backend", "state", "created_at",
"ttl_minutes", "members", "hermes_home", "workspace_path",
"shared_notes_path", "shared_artifacts_path", "decisions_path",
"archived_at", "destroyed_at",
]
d = dict(zip(keys, row))
d["members"] = json.loads(d["members"])
return Cell.from_dict(d)
def list_active(self) -> List[Cell]:
import sqlite3
with sqlite3.connect(str(self.db_path)) as conn:
rows = conn.execute(
"SELECT * FROM cells WHERE state NOT IN ('destroyed', 'archived')"
).fetchall()
keys = [
"cell_id", "project", "owner", "backend", "state", "created_at",
"ttl_minutes", "members", "hermes_home", "workspace_path",
"shared_notes_path", "shared_artifacts_path", "decisions_path",
"archived_at", "destroyed_at",
]
cells = []
for row in rows:
d = dict(zip(keys, row))
d["members"] = json.loads(d["members"])
cells.append(Cell.from_dict(d))
return cells
def delete(self, cell_id: str):
import sqlite3
with sqlite3.connect(str(self.db_path)) as conn:
conn.execute("DELETE FROM cells WHERE cell_id = ?", (cell_id,))
class CellPackager:
"""Creates and destroys per-cell filesystem isolation."""
ROOT = Path("/tmp/lazarus-cells")
def __init__(self, root: Optional[str] = None):
self.root = Path(root or self.ROOT)
def create(self, cell_id: str, backend: str = "process") -> Cell:
cell_home = self.root / cell_id
workspace = cell_home / "workspace"
shared_notes = cell_home / "shared" / "notes.md"
shared_artifacts = cell_home / "shared" / "artifacts"
decisions = cell_home / "shared" / "decisions.jsonl"
hermes_home = cell_home / ".hermes"
for p in [workspace, shared_artifacts, hermes_home]:
p.mkdir(parents=True, exist_ok=True)
if not shared_notes.exists():
shared_notes.write_text(f"# Cell {cell_id} — Shared Notes\n\n")
if not decisions.exists():
decisions.write_text("")
now = datetime.now().astimezone().isoformat()
return Cell(
cell_id=cell_id,
project="",
owner="",
backend=backend,
state=CellState.PROPOSED,
created_at=now,
ttl_minutes=60,
hermes_home=str(hermes_home),
workspace_path=str(workspace),
shared_notes_path=str(shared_notes),
shared_artifacts_path=str(shared_artifacts),
decisions_path=str(decisions),
)
def destroy(self, cell_id: str) -> bool:
cell_home = self.root / cell_id
if cell_home.exists():
shutil.rmtree(cell_home)
return True
return False

18
lazarus/cli.py Normal file
View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""CLI entrypoint for Lazarus Pit operator control surface."""
import json
import sys
from .operator_ctl import OperatorCtl
def main():
ctl = OperatorCtl()
result = ctl.run_cli(sys.argv[1:])
print(json.dumps(result, indent=2))
sys.exit(0 if result.get("success") else 1)
if __name__ == "__main__":
main()

211
lazarus/operator_ctl.py Normal file
View File

@@ -0,0 +1,211 @@
"""
Operator control surface for Lazarus Pit v2.0.
Provides the command grammar and API for:
summon, invite, team, status, close, destroy
"""
import json
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from .cell import Cell, CellMember, CellPackager, CellRegistry, CellRole, CellState
from .backends.process_backend import ProcessBackend
class OperatorCtl:
"""The operator's command surface for the Lazarus Pit."""
def __init__(self, registry: Optional[CellRegistry] = None, packager: Optional[CellPackager] = None):
self.registry = registry or CellRegistry()
self.packager = packager or CellPackager()
self.backend = ProcessBackend()
# ------------------------------------------------------------------
# Commands
# ------------------------------------------------------------------
def summon(self, agent: str, project: str, owner: str = "human:Alexander", backend: str = "process", ttl_minutes: int = 60) -> dict:
"""Summon a single agent into a fresh resurrection cell."""
cell_id = f"laz-{uuid.uuid4().hex[:8]}"
cell = self.packager.create(cell_id, backend=backend)
cell.project = project
cell.owner = owner
cell.ttl_minutes = ttl_minutes
cell.state = CellState.ACTIVE
cell.members.append(CellMember(
identity=f"agent:{agent}",
role=CellRole.EXECUTOR,
joined_at=datetime.now().astimezone().isoformat(),
))
# Spawn a simple keep-alive process (real agent bootstrap would go here)
res = self.backend.spawn(
cell_id=cell_id,
hermes_home=cell.hermes_home,
command=["python3", "-c", "import time; time.sleep(86400)"],
)
self.registry.save(cell)
return {
"success": res.success,
"cell_id": cell_id,
"project": project,
"agent": agent,
"backend": backend,
"pid": res.pid,
"message": res.message,
}
def invite(self, cell_id: str, guest: str, role: str, invited_by: str = "human:Alexander") -> dict:
"""Invite a guest bot or human into an active cell."""
cell = self.registry.load(cell_id)
if not cell:
return {"success": False, "message": f"Cell {cell_id} not found"}
if cell.state not in (CellState.PROPOSED, CellState.ACTIVE, CellState.IDLE):
return {"success": False, "message": f"Cell {cell_id} is not accepting invites (state={cell.state.value})"}
role_enum = CellRole(role)
cell.members.append(CellMember(
identity=guest,
role=role_enum,
joined_at=datetime.now().astimezone().isoformat(),
))
cell.state = CellState.ACTIVE
self.registry.save(cell)
return {
"success": True,
"cell_id": cell_id,
"guest": guest,
"role": role_enum.value,
"invited_by": invited_by,
"message": f"Invited {guest} as {role_enum.value} to {cell_id}",
}
def team(self, agents: List[str], project: str, owner: str = "human:Alexander", backend: str = "process", ttl_minutes: int = 120) -> dict:
"""Form a multi-agent team in one cell against a project."""
cell_id = f"laz-{uuid.uuid4().hex[:8]}"
cell = self.packager.create(cell_id, backend=backend)
cell.project = project
cell.owner = owner
cell.ttl_minutes = ttl_minutes
cell.state = CellState.ACTIVE
for agent in agents:
cell.members.append(CellMember(
identity=f"agent:{agent}",
role=CellRole.EXECUTOR,
joined_at=datetime.now().astimezone().isoformat(),
))
res = self.backend.spawn(
cell_id=cell_id,
hermes_home=cell.hermes_home,
command=["python3", "-c", "import time; time.sleep(86400)"],
)
self.registry.save(cell)
return {
"success": res.success,
"cell_id": cell_id,
"project": project,
"agents": agents,
"backend": backend,
"pid": res.pid,
"message": res.message,
}
def status(self, cell_id: Optional[str] = None) -> dict:
"""Show status of one cell or all active cells."""
if cell_id:
cell = self.registry.load(cell_id)
if not cell:
return {"success": False, "message": f"Cell {cell_id} not found"}
probe = self.backend.probe(cell_id)
return {
"success": True,
"cell": cell.to_dict(),
"probe": probe.message,
"expired": cell.is_expired(),
}
active = self.registry.list_active()
cells = []
for cell in active:
probe = self.backend.probe(cell.cell_id)
cells.append({
"cell_id": cell.cell_id,
"project": cell.project,
"backend": cell.backend,
"state": cell.state.value,
"members": [m.identity for m in cell.members],
"probe": probe.message,
"expired": cell.is_expired(),
})
return {"success": True, "active_cells": cells}
def close(self, cell_id: str, closed_by: str = "human:Alexander") -> dict:
"""Gracefully close a cell."""
cell = self.registry.load(cell_id)
if not cell:
return {"success": False, "message": f"Cell {cell_id} not found"}
res = self.backend.close(cell_id)
cell.state = CellState.CLOSING
self.registry.save(cell)
return {
"success": res.success,
"cell_id": cell_id,
"message": f"Closed {cell_id} by {closed_by}",
}
def destroy(self, cell_id: str, destroyed_by: str = "human:Alexander") -> dict:
"""Forcefully destroy a cell and all runtime state."""
cell = self.registry.load(cell_id)
if not cell:
return {"success": False, "message": f"Cell {cell_id} not found"}
res = self.backend.destroy(cell_id)
self.packager.destroy(cell_id)
cell.state = CellState.DESTROYED
cell.destroyed_at = datetime.now().astimezone().isoformat()
self.registry.save(cell)
return {
"success": True,
"cell_id": cell_id,
"message": f"Destroyed {cell_id} by {destroyed_by}",
}
# ------------------------------------------------------------------
# CLI helpers
# ------------------------------------------------------------------
def run_cli(self, args: List[str]) -> dict:
if not args:
return self.status()
cmd = args[0]
if cmd == "summon" and len(args) >= 3:
return self.summon(agent=args[1], project=args[2])
if cmd == "invite" and len(args) >= 4:
return self.invite(cell_id=args[1], guest=args[2], role=args[3])
if cmd == "team" and len(args) >= 3:
agents = args[1].split("+")
project = args[2]
return self.team(agents=agents, project=project)
if cmd == "status":
return self.status(cell_id=args[1] if len(args) > 1 else None)
if cmd == "close" and len(args) >= 2:
return self.close(cell_id=args[1])
if cmd == "destroy" and len(args) >= 2:
return self.destroy(cell_id=args[1])
return {
"success": False,
"message": (
"Unknown command. Usage:\n"
" lazarus summon <agent> <project>\n"
" lazarus invite <cell_id> <guest> <role>\n"
" lazarus team <agent1+agent2> <project>\n"
" lazarus status [cell_id]\n"
" lazarus close <cell_id>\n"
" lazarus destroy <cell_id>\n"
)
}

View File

@@ -0,0 +1 @@
"""Tests for Lazarus Pit."""

186
lazarus/tests/test_cell.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""Tests for Lazarus Pit cell isolation and registry."""
import os
import sys
import tempfile
import unittest
from datetime import datetime, timedelta
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lazarus.cell import Cell, CellMember, CellPackager, CellRegistry, CellRole, CellState
from lazarus.operator_ctl import OperatorCtl
from lazarus.backends.process_backend import ProcessBackend
class TestCellPackager(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.packager = CellPackager(root=self.tmpdir.name)
def tearDown(self):
self.tmpdir.cleanup()
def test_create_isolated_paths(self):
cell = self.packager.create("laz-test-001", backend="process")
self.assertTrue(os.path.isdir(cell.hermes_home))
self.assertTrue(os.path.isdir(cell.workspace_path))
self.assertTrue(os.path.isfile(cell.shared_notes_path))
self.assertTrue(os.path.isdir(cell.shared_artifacts_path))
self.assertTrue(os.path.isfile(cell.decisions_path))
def test_destroy_cleans_paths(self):
cell = self.packager.create("laz-test-002", backend="process")
self.assertTrue(os.path.isdir(cell.hermes_home))
self.packager.destroy("laz-test-002")
self.assertFalse(os.path.exists(cell.hermes_home))
def test_cells_are_separate(self):
c1 = self.packager.create("laz-a", backend="process")
c2 = self.packager.create("laz-b", backend="process")
self.assertNotEqual(c1.hermes_home, c2.hermes_home)
self.assertNotEqual(c1.workspace_path, c2.workspace_path)
class TestCellRegistry(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.registry = CellRegistry(db_path=os.path.join(self.tmpdir.name, "registry.sqlite"))
def tearDown(self):
self.tmpdir.cleanup()
def test_save_and_load(self):
cell = Cell(
cell_id="laz-reg-001",
project="Timmy_Foundation/test#1",
owner="human:Alexander",
backend="process",
state=CellState.ACTIVE,
created_at=datetime.now().astimezone().isoformat(),
ttl_minutes=60,
members=[CellMember("agent:allegro", CellRole.EXECUTOR, datetime.now().astimezone().isoformat())],
)
self.registry.save(cell)
loaded = self.registry.load("laz-reg-001")
self.assertEqual(loaded.cell_id, "laz-reg-001")
self.assertEqual(loaded.members[0].role, CellRole.EXECUTOR)
def test_list_active(self):
for sid, state in [("a", CellState.ACTIVE), ("b", CellState.DESTROYED)]:
cell = Cell(
cell_id=f"laz-{sid}",
project="test",
owner="human:Alexander",
backend="process",
state=state,
created_at=datetime.now().astimezone().isoformat(),
ttl_minutes=60,
)
self.registry.save(cell)
active = self.registry.list_active()
ids = {c.cell_id for c in active}
self.assertIn("laz-a", ids)
self.assertNotIn("laz-b", ids)
def test_ttl_expiry(self):
created = (datetime.now().astimezone() - timedelta(minutes=61)).isoformat()
cell = Cell(
cell_id="laz-exp",
project="test",
owner="human:Alexander",
backend="process",
state=CellState.ACTIVE,
created_at=created,
ttl_minutes=60,
)
self.assertTrue(cell.is_expired())
def test_ttl_not_expired(self):
cell = Cell(
cell_id="laz-ok",
project="test",
owner="human:Alexander",
backend="process",
state=CellState.ACTIVE,
created_at=datetime.now().astimezone().isoformat(),
ttl_minutes=60,
)
self.assertFalse(cell.is_expired())
class TestOperatorCtl(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.ctl = OperatorCtl(
registry=CellRegistry(db_path=os.path.join(self.tmpdir.name, "registry.sqlite")),
packager=CellPackager(root=self.tmpdir.name),
)
def tearDown(self):
# Destroy any cells we created
for cell in self.ctl.registry.list_active():
self.ctl.destroy(cell.cell_id)
self.tmpdir.cleanup()
def test_summon_creates_cell(self):
res = self.ctl.summon("allegro", "Timmy_Foundation/test#1")
self.assertTrue(res["success"])
self.assertIn("cell_id", res)
cell = self.ctl.registry.load(res["cell_id"])
self.assertEqual(cell.members[0].identity, "agent:allegro")
self.assertEqual(cell.state, CellState.ACTIVE)
# Clean up
self.ctl.destroy(res["cell_id"])
def test_team_creates_multi_agent_cell(self):
res = self.ctl.team(["allegro", "ezra"], "Timmy_Foundation/test#2")
self.assertTrue(res["success"])
cell = self.ctl.registry.load(res["cell_id"])
self.assertEqual(len(cell.members), 2)
identities = {m.identity for m in cell.members}
self.assertEqual(identities, {"agent:allegro", "agent:ezra"})
self.ctl.destroy(res["cell_id"])
def test_invite_adds_guest(self):
res = self.ctl.summon("allegro", "Timmy_Foundation/test#3")
cell_id = res["cell_id"]
invite = self.ctl.invite(cell_id, "bot:kimi", "observer")
self.assertTrue(invite["success"])
cell = self.ctl.registry.load(cell_id)
roles = {m.identity: m.role for m in cell.members}
self.assertEqual(roles["bot:kimi"], CellRole.OBSERVER)
self.ctl.destroy(cell_id)
def test_status_shows_active_cells(self):
res = self.ctl.summon("allegro", "Timmy_Foundation/test#4")
status = self.ctl.status()
self.assertTrue(status["success"])
ids = {c["cell_id"] for c in status["active_cells"]}
self.assertIn(res["cell_id"], ids)
self.ctl.destroy(res["cell_id"])
def test_close_then_destroy(self):
res = self.ctl.summon("allegro", "Timmy_Foundation/test#5")
cell_id = res["cell_id"]
close_res = self.ctl.close(cell_id)
self.assertTrue(close_res["success"])
destroy_res = self.ctl.destroy(cell_id)
self.assertTrue(destroy_res["success"])
cell = self.ctl.registry.load(cell_id)
self.assertEqual(cell.state, CellState.DESTROYED)
self.assertFalse(os.path.exists(cell.hermes_home))
def test_destroy_scrubs_filesystem(self):
res = self.ctl.summon("allegro", "Timmy_Foundation/test#6")
cell_id = res["cell_id"]
cell = self.ctl.registry.load(cell_id)
hermes = cell.hermes_home
self.assertTrue(os.path.isdir(hermes))
self.ctl.destroy(cell_id)
self.assertFalse(os.path.exists(hermes))
if __name__ == "__main__":
unittest.main()