Files
the-nexus/scripts/lazarus_pit.py

230 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""Lazarus Pit daemon skeleton for Mission Cell foundations.
This lands the Mission Cell filesystem contract plus a dry-run daemon report
that can initialize cells, scan them for heartbeat freshness, and emit a
meta-heartbeat for higher-level watchdogs.
Refs: #879
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import sys
import time
from pathlib import Path
from typing import Any
PROJECT_ROOT = Path(__file__).resolve().parent.parent
_hb_spec = importlib.util.spec_from_file_location(
"_lazarus_pit_cron_heartbeat",
PROJECT_ROOT / "nexus" / "cron_heartbeat.py",
)
_hb = importlib.util.module_from_spec(_hb_spec)
sys.modules["_lazarus_pit_cron_heartbeat"] = _hb
_hb_spec.loader.exec_module(_hb)
write_cron_heartbeat = _hb.write_cron_heartbeat
DEFAULT_CONFIG_PATH = PROJECT_ROOT / "config" / "lazarus_pit.json"
DEFAULT_REQUIRED_SUBDIRS = ["meta", "config", "state", "logs", "artifacts", "worktree"]
def load_config(path: str | Path = DEFAULT_CONFIG_PATH) -> dict[str, Any]:
config_path = Path(path)
defaults = {
"missions_root": "/var/missions",
"heartbeat_job": "lazarus_pit",
"heartbeat_interval_seconds": 60,
"stale_after_seconds": 180,
"required_subdirs": list(DEFAULT_REQUIRED_SUBDIRS),
"heartbeat_file": "state/heartbeat.json",
}
if not config_path.exists():
return defaults
loaded = json.loads(config_path.read_text())
defaults.update(loaded)
if not defaults.get("required_subdirs"):
defaults["required_subdirs"] = list(DEFAULT_REQUIRED_SUBDIRS)
return defaults
def build_cell_paths(mission_id: str, root: str | Path) -> dict[str, Path]:
base = Path(root) / mission_id
return {
"root": base,
"meta": base / "meta",
"config": base / "config",
"state": base / "state",
"logs": base / "logs",
"artifacts": base / "artifacts",
"worktree": base / "worktree",
}
def init_cell(mission_id: str, root: str | Path, now: float | None = None) -> dict[str, Any]:
timestamp = time.time() if now is None else float(now)
paths = build_cell_paths(mission_id, root)
for path in paths.values():
if path.name != mission_id:
path.mkdir(parents=True, exist_ok=True)
paths["root"].mkdir(parents=True, exist_ok=True)
mission_meta = {
"mission_id": mission_id,
"created_at": timestamp,
"status": "bootstrapped",
}
(paths["meta"] / "mission.json").write_text(json.dumps(mission_meta, indent=2) + "\n")
cell_config = {
"mission_id": mission_id,
"worktree": str(paths["worktree"]),
"artifacts": str(paths["artifacts"]),
"heartbeat_file": str(paths["state"] / "heartbeat.json"),
}
(paths["config"] / "cell.json").write_text(json.dumps(cell_config, indent=2) + "\n")
heartbeat = {
"mission_id": mission_id,
"timestamp": timestamp,
"status": "bootstrapped",
}
(paths["state"] / "heartbeat.json").write_text(json.dumps(heartbeat, indent=2) + "\n")
(paths["logs"] / "daemon.log").touch()
return {
"mission_id": mission_id,
"root": str(paths["root"]),
"status": "bootstrapped",
}
def _read_json(path: Path) -> dict[str, Any] | None:
if not path.exists():
return None
try:
return json.loads(path.read_text())
except json.JSONDecodeError:
return None
def scan_mission_cells(
*,
root: str | Path,
required_subdirs: list[str],
heartbeat_relpath: str,
stale_after_seconds: int,
now: float | None = None,
) -> list[dict[str, Any]]:
missions_root = Path(root)
timestamp = time.time() if now is None else float(now)
if not missions_root.exists():
return []
cells: list[dict[str, Any]] = []
for entry in sorted(missions_root.iterdir()):
if not entry.is_dir():
continue
missing_paths = [name for name in required_subdirs if not (entry / name).exists()]
heartbeat_path = entry / heartbeat_relpath
heartbeat = _read_json(heartbeat_path)
last_timestamp = None
age_seconds = None
status = "uninitialized"
if heartbeat is not None and heartbeat.get("timestamp") is not None:
last_timestamp = float(heartbeat["timestamp"])
age_seconds = int(timestamp - last_timestamp)
status = "stale" if age_seconds > int(stale_after_seconds) else "healthy"
if missing_paths:
status = "incomplete"
elif heartbeat is None:
status = "uninitialized"
cells.append(
{
"mission_id": entry.name,
"root": str(entry),
"status": status,
"age_seconds": age_seconds,
"last_timestamp": last_timestamp,
"missing_paths": missing_paths,
}
)
return cells
def build_daemon_report(config: dict[str, Any], now: float | None = None) -> dict[str, Any]:
cells = scan_mission_cells(
root=config["missions_root"],
required_subdirs=list(config["required_subdirs"]),
heartbeat_relpath=config["heartbeat_file"],
stale_after_seconds=int(config["stale_after_seconds"]),
now=now,
)
summary = {
"total_cells": len(cells),
"healthy": sum(1 for cell in cells if cell["status"] == "healthy"),
"stale": sum(1 for cell in cells if cell["status"] == "stale"),
"incomplete": sum(1 for cell in cells if cell["status"] == "incomplete"),
"uninitialized": sum(1 for cell in cells if cell["status"] == "uninitialized"),
}
return {
"missions_root": config["missions_root"],
"heartbeat_job": config["heartbeat_job"],
"heartbeat_interval_seconds": int(config["heartbeat_interval_seconds"]),
"summary": summary,
"cells": cells,
}
def write_daemon_heartbeat(config: dict[str, Any], directory: Path | None = None):
return write_cron_heartbeat(
config["heartbeat_job"],
interval_seconds=int(config["heartbeat_interval_seconds"]),
directory=directory,
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Lazarus Pit daemon skeleton")
parser.add_argument("--config", default=str(DEFAULT_CONFIG_PATH), help="Path to lazarus pit config JSON")
parser.add_argument("--root", help="Override missions root directory")
parser.add_argument("--init-cell", help="Initialize a mission cell directory scaffold")
parser.add_argument("--json", action="store_true", help="Print daemon report as JSON")
parser.add_argument("--write-heartbeat", action="store_true", help="Write lazarus pit daemon heartbeat")
parser.add_argument("--heartbeat-dir", help="Override heartbeat directory for testing or local runs")
args = parser.parse_args(argv)
config = load_config(args.config)
if args.root:
config["missions_root"] = args.root
if args.init_cell:
init_cell(args.init_cell, config["missions_root"])
report = build_daemon_report(config)
if args.write_heartbeat:
hb_dir = Path(args.heartbeat_dir) if args.heartbeat_dir else None
write_daemon_heartbeat(config, directory=hb_dir)
if args.json:
print(json.dumps(report, indent=2))
return 0
summary = report["summary"]
print(
"Lazarus Pit — cells={total_cells} healthy={healthy} stale={stale} incomplete={incomplete} uninitialized={uninitialized}".format(
**summary
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main())