Compare commits

..

4 Commits

Author SHA1 Message Date
fe0005974f docs: add Mission Cell foundation spec (#879)
Some checks failed
CI / test (pull_request) Failing after 1m8s
Review Approval Gate / verify-review (pull_request) Successful in 10s
CI / validate (pull_request) Failing after 1m35s
2026-04-15 09:40:26 +00:00
576c24f814 feat: add Lazarus Pit daemon skeleton (#879) 2026-04-15 09:40:25 +00:00
82f04c9675 feat: add Lazarus Pit daemon skeleton (#879) 2026-04-15 09:40:24 +00:00
f60c4c175f wip: add Lazarus Pit foundation tests 2026-04-15 09:40:22 +00:00
13 changed files with 473 additions and 495 deletions

View File

@@ -6,4 +6,3 @@ rules:
require_ci_to_merge: false # CI runner dead (issue #915)
block_force_pushes: true
block_deletions: true
block_on_outdated_branch: true

View File

@@ -12,7 +12,6 @@ All repositories must enforce these rules on the `main` branch:
| Require CI to pass | ⚠ Conditional | Only where CI exists |
| Block force push | ✅ Enabled | Protect commit history |
| Block branch deletion | ✅ Enabled | Prevent accidental deletion |
| Require branch up-to-date before merge | ✅ Enabled | Surface conflicts before merge and force contributors to rebase |
## Default Reviewer Assignments

8
app.js
View File

@@ -714,10 +714,6 @@ async function init() {
camera = new THREE.PerspectiveCamera(65, window.innerWidth / window.innerHeight, 0.1, 1000);
camera.position.copy(playerPos);
// Initialize avatar and LOD systems
if (window.AvatarCustomization) window.AvatarCustomization.init(scene, camera);
if (window.LODSystem) window.LODSystem.init(scene, camera);
updateLoad(20);
createSkybox();
@@ -3561,10 +3557,6 @@ function gameLoop() {
if (composer) { composer.render(); } else { renderer.render(scene, camera); }
// Update avatar and LOD systems
if (window.AvatarCustomization && playerPos) window.AvatarCustomization.update(playerPos);
if (window.LODSystem && playerPos) window.LODSystem.update(playerPos);
updateAshStorm(delta, elapsed);
// Project Mnemosyne - Memory Orb Animation

15
config/lazarus_pit.json Normal file
View File

@@ -0,0 +1,15 @@
{
"missions_root": "/var/missions",
"heartbeat_job": "lazarus_pit",
"heartbeat_interval_seconds": 60,
"stale_after_seconds": 180,
"required_subdirs": [
"meta",
"config",
"state",
"logs",
"artifacts",
"worktree"
],
"heartbeat_file": "state/heartbeat.json"
}

68
docs/mission-cell-spec.md Normal file
View File

@@ -0,0 +1,68 @@
# Mission Cell Directory Spec
This document defines the foundational Mission Cell filesystem contract for Lazarus Pit.
It is a grounded M6 foundation slice, not the full Mission Cell runtime.
Root layout:
- `/var/missions/<uuid>/`
Required subdirectories:
- `meta/`
- `config/`
- `state/`
- `logs/`
- `artifacts/`
- `worktree/`
Required seed files:
- `meta/mission.json`
- `config/cell.json`
- `state/heartbeat.json`
- `logs/daemon.log`
## Intent of each path
- `meta/mission.json`
- durable mission identity and lifecycle metadata
- includes `mission_id`, `created_at`, and current status
- `config/cell.json`
- local cell wiring
- points to the worktree, artifacts directory, and heartbeat file
- `state/heartbeat.json`
- latest cell heartbeat timestamp and state
- consumed by Lazarus Pit scans for healthy vs stale cell classification
- `logs/daemon.log`
- daemon-local operational log target
- `artifacts/`
- handoff packets, reports, checkpoints, and mission outputs
- `worktree/`
- mission-specific checked-out repository workspace
## Lazarus Pit daemon skeleton
`scripts/lazarus_pit.py` provides the foundation daemon behavior:
- initialize a Mission Cell scaffold with `--init-cell <uuid>`
- scan all cells under the configured missions root
- classify cells as `healthy`, `stale`, `incomplete`, or `uninitialized`
- emit a daemon heartbeat through the existing cron heartbeat writer
- output a JSON health report for higher-level watchers
Default config lives at:
- `config/lazarus_pit.json`
## Example bootstrap
```bash
python3 scripts/lazarus_pit.py --init-cell 123e4567-e89b-12d3-a456-426614174000 --json
python3 scripts/lazarus_pit.py --write-heartbeat --json
```
## What remains for full #879 completion
This slice does not yet complete the whole issue.
Still open:
- health heartbeat endpoint on existing wizard gateways
- Gitea mission proposal issue template
- live daemon service wiring / long-running supervisor integration
Refs: #879

View File

@@ -395,8 +395,6 @@
<div id="memory-connections-panel" class="memory-connections-panel" style="display:none;" aria-label="Memory Connections Panel"></div>
<script src="./boot.js"></script>
<script src="./avatar-customization.js"></script>
<script src="./lod-system.js"></script>
<script>
function openMemoryFilter() { renderFilterList(); document.getElementById('memory-filter').style.display = 'flex'; }
function closeMemoryFilter() { document.getElementById('memory-filter').style.display = 'none'; }

View File

@@ -1,186 +0,0 @@
/**
* LOD (Level of Detail) System for The Nexus
*
* Optimizes rendering when many avatars/users are visible:
* - Distance-based LOD: far users become billboard sprites
* - Occlusion: skip rendering users behind walls
* - Budget: maintain 60 FPS target with 50+ avatars
*
* Usage:
* LODSystem.init(scene, camera);
* LODSystem.registerAvatar(avatarMesh, userId);
* LODSystem.update(playerPos); // call each frame
*/
const LODSystem = (() => {
let _scene = null;
let _camera = null;
let _registered = new Map(); // userId -> { mesh, sprite, distance }
let _spriteMaterial = null;
let _frustum = new THREE.Frustum();
let _projScreenMatrix = new THREE.Matrix4();
// Thresholds
const LOD_NEAR = 15; // Full mesh within 15 units
const LOD_FAR = 40; // Billboard beyond 40 units
const LOD_CULL = 80; // Don't render beyond 80 units
const SPRITE_SIZE = 1.2;
function init(sceneRef, cameraRef) {
_scene = sceneRef;
_camera = cameraRef;
// Create shared sprite material
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
// Simple avatar indicator: colored circle
ctx.fillStyle = '#00ffcc';
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2); // head
ctx.fill();
const texture = new THREE.CanvasTexture(canvas);
_spriteMaterial = new THREE.SpriteMaterial({
map: texture,
transparent: true,
depthTest: true,
sizeAttenuation: true,
});
console.log('[LODSystem] Initialized');
}
function registerAvatar(avatarMesh, userId, color) {
// Create billboard sprite for this avatar
const spriteMat = _spriteMaterial.clone();
if (color) {
// Tint sprite to match avatar color
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
ctx.fillStyle = color;
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2);
ctx.fill();
spriteMat.map = new THREE.CanvasTexture(canvas);
spriteMat.map.needsUpdate = true;
}
const sprite = new THREE.Sprite(spriteMat);
sprite.scale.set(SPRITE_SIZE, SPRITE_SIZE, 1);
sprite.visible = false;
_scene.add(sprite);
_registered.set(userId, {
mesh: avatarMesh,
sprite: sprite,
distance: Infinity,
});
}
function unregisterAvatar(userId) {
const entry = _registered.get(userId);
if (entry) {
_scene.remove(entry.sprite);
entry.sprite.material.dispose();
_registered.delete(userId);
}
}
function setSpriteColor(userId, color) {
const entry = _registered.get(userId);
if (!entry) return;
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
ctx.fillStyle = color;
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2);
ctx.fill();
entry.sprite.material.map = new THREE.CanvasTexture(canvas);
entry.sprite.material.map.needsUpdate = true;
}
function update(playerPos) {
if (!_camera) return;
// Update frustum for culling
_projScreenMatrix.multiplyMatrices(
_camera.projectionMatrix,
_camera.matrixWorldInverse
);
_frustum.setFromProjectionMatrix(_projScreenMatrix);
_registered.forEach((entry, userId) => {
if (!entry.mesh) return;
const meshPos = entry.mesh.position;
const distance = playerPos.distanceTo(meshPos);
entry.distance = distance;
// Beyond cull distance: hide everything
if (distance > LOD_CULL) {
entry.mesh.visible = false;
entry.sprite.visible = false;
return;
}
// Check if in camera frustum
const inFrustum = _frustum.containsPoint(meshPos);
if (!inFrustum) {
entry.mesh.visible = false;
entry.sprite.visible = false;
return;
}
// LOD switching
if (distance <= LOD_NEAR) {
// Near: full mesh
entry.mesh.visible = true;
entry.sprite.visible = false;
} else if (distance <= LOD_FAR) {
// Mid: mesh with reduced detail (keep mesh visible)
entry.mesh.visible = true;
entry.sprite.visible = false;
} else {
// Far: billboard sprite
entry.mesh.visible = false;
entry.sprite.visible = true;
entry.sprite.position.copy(meshPos);
entry.sprite.position.y += 1.2; // above avatar center
}
});
}
function getStats() {
let meshCount = 0;
let spriteCount = 0;
let culledCount = 0;
_registered.forEach(entry => {
if (entry.mesh.visible) meshCount++;
else if (entry.sprite.visible) spriteCount++;
else culledCount++;
});
return { total: _registered.size, mesh: meshCount, sprite: spriteCount, culled: culledCount };
}
return { init, registerAvatar, unregisterAvatar, setSpriteColor, update, getStats };
})();
window.LODSystem = LODSystem;

229
scripts/lazarus_pit.py Normal file
View File

@@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""Lazarus Pit daemon skeleton for Mission Cell foundations.
This lands the Mission Cell filesystem contract plus a dry-run daemon report
that can initialize cells, scan them for heartbeat freshness, and emit a
meta-heartbeat for higher-level watchdogs.
Refs: #879
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import sys
import time
from pathlib import Path
from typing import Any
PROJECT_ROOT = Path(__file__).resolve().parent.parent
_hb_spec = importlib.util.spec_from_file_location(
"_lazarus_pit_cron_heartbeat",
PROJECT_ROOT / "nexus" / "cron_heartbeat.py",
)
_hb = importlib.util.module_from_spec(_hb_spec)
sys.modules["_lazarus_pit_cron_heartbeat"] = _hb
_hb_spec.loader.exec_module(_hb)
write_cron_heartbeat = _hb.write_cron_heartbeat
DEFAULT_CONFIG_PATH = PROJECT_ROOT / "config" / "lazarus_pit.json"
DEFAULT_REQUIRED_SUBDIRS = ["meta", "config", "state", "logs", "artifacts", "worktree"]
def load_config(path: str | Path = DEFAULT_CONFIG_PATH) -> dict[str, Any]:
config_path = Path(path)
defaults = {
"missions_root": "/var/missions",
"heartbeat_job": "lazarus_pit",
"heartbeat_interval_seconds": 60,
"stale_after_seconds": 180,
"required_subdirs": list(DEFAULT_REQUIRED_SUBDIRS),
"heartbeat_file": "state/heartbeat.json",
}
if not config_path.exists():
return defaults
loaded = json.loads(config_path.read_text())
defaults.update(loaded)
if not defaults.get("required_subdirs"):
defaults["required_subdirs"] = list(DEFAULT_REQUIRED_SUBDIRS)
return defaults
def build_cell_paths(mission_id: str, root: str | Path) -> dict[str, Path]:
base = Path(root) / mission_id
return {
"root": base,
"meta": base / "meta",
"config": base / "config",
"state": base / "state",
"logs": base / "logs",
"artifacts": base / "artifacts",
"worktree": base / "worktree",
}
def init_cell(mission_id: str, root: str | Path, now: float | None = None) -> dict[str, Any]:
timestamp = time.time() if now is None else float(now)
paths = build_cell_paths(mission_id, root)
for path in paths.values():
if path.name != mission_id:
path.mkdir(parents=True, exist_ok=True)
paths["root"].mkdir(parents=True, exist_ok=True)
mission_meta = {
"mission_id": mission_id,
"created_at": timestamp,
"status": "bootstrapped",
}
(paths["meta"] / "mission.json").write_text(json.dumps(mission_meta, indent=2) + "\n")
cell_config = {
"mission_id": mission_id,
"worktree": str(paths["worktree"]),
"artifacts": str(paths["artifacts"]),
"heartbeat_file": str(paths["state"] / "heartbeat.json"),
}
(paths["config"] / "cell.json").write_text(json.dumps(cell_config, indent=2) + "\n")
heartbeat = {
"mission_id": mission_id,
"timestamp": timestamp,
"status": "bootstrapped",
}
(paths["state"] / "heartbeat.json").write_text(json.dumps(heartbeat, indent=2) + "\n")
(paths["logs"] / "daemon.log").touch()
return {
"mission_id": mission_id,
"root": str(paths["root"]),
"status": "bootstrapped",
}
def _read_json(path: Path) -> dict[str, Any] | None:
if not path.exists():
return None
try:
return json.loads(path.read_text())
except json.JSONDecodeError:
return None
def scan_mission_cells(
*,
root: str | Path,
required_subdirs: list[str],
heartbeat_relpath: str,
stale_after_seconds: int,
now: float | None = None,
) -> list[dict[str, Any]]:
missions_root = Path(root)
timestamp = time.time() if now is None else float(now)
if not missions_root.exists():
return []
cells: list[dict[str, Any]] = []
for entry in sorted(missions_root.iterdir()):
if not entry.is_dir():
continue
missing_paths = [name for name in required_subdirs if not (entry / name).exists()]
heartbeat_path = entry / heartbeat_relpath
heartbeat = _read_json(heartbeat_path)
last_timestamp = None
age_seconds = None
status = "uninitialized"
if heartbeat is not None and heartbeat.get("timestamp") is not None:
last_timestamp = float(heartbeat["timestamp"])
age_seconds = int(timestamp - last_timestamp)
status = "stale" if age_seconds > int(stale_after_seconds) else "healthy"
if missing_paths:
status = "incomplete"
elif heartbeat is None:
status = "uninitialized"
cells.append(
{
"mission_id": entry.name,
"root": str(entry),
"status": status,
"age_seconds": age_seconds,
"last_timestamp": last_timestamp,
"missing_paths": missing_paths,
}
)
return cells
def build_daemon_report(config: dict[str, Any], now: float | None = None) -> dict[str, Any]:
cells = scan_mission_cells(
root=config["missions_root"],
required_subdirs=list(config["required_subdirs"]),
heartbeat_relpath=config["heartbeat_file"],
stale_after_seconds=int(config["stale_after_seconds"]),
now=now,
)
summary = {
"total_cells": len(cells),
"healthy": sum(1 for cell in cells if cell["status"] == "healthy"),
"stale": sum(1 for cell in cells if cell["status"] == "stale"),
"incomplete": sum(1 for cell in cells if cell["status"] == "incomplete"),
"uninitialized": sum(1 for cell in cells if cell["status"] == "uninitialized"),
}
return {
"missions_root": config["missions_root"],
"heartbeat_job": config["heartbeat_job"],
"heartbeat_interval_seconds": int(config["heartbeat_interval_seconds"]),
"summary": summary,
"cells": cells,
}
def write_daemon_heartbeat(config: dict[str, Any], directory: Path | None = None):
return write_cron_heartbeat(
config["heartbeat_job"],
interval_seconds=int(config["heartbeat_interval_seconds"]),
directory=directory,
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Lazarus Pit daemon skeleton")
parser.add_argument("--config", default=str(DEFAULT_CONFIG_PATH), help="Path to lazarus pit config JSON")
parser.add_argument("--root", help="Override missions root directory")
parser.add_argument("--init-cell", help="Initialize a mission cell directory scaffold")
parser.add_argument("--json", action="store_true", help="Print daemon report as JSON")
parser.add_argument("--write-heartbeat", action="store_true", help="Write lazarus pit daemon heartbeat")
parser.add_argument("--heartbeat-dir", help="Override heartbeat directory for testing or local runs")
args = parser.parse_args(argv)
config = load_config(args.config)
if args.root:
config["missions_root"] = args.root
if args.init_cell:
init_cell(args.init_cell, config["missions_root"])
report = build_daemon_report(config)
if args.write_heartbeat:
hb_dir = Path(args.heartbeat_dir) if args.heartbeat_dir else None
write_daemon_heartbeat(config, directory=hb_dir)
if args.json:
print(json.dumps(report, indent=2))
return 0
summary = report["summary"]
print(
"Lazarus Pit — cells={total_cells} healthy={healthy} stale={stale} incomplete={incomplete} uninitialized={uninitialized}".format(
**summary
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,10 +1,7 @@
#!/usr/bin/env python3
"""
Review Gate — Poka-yoke for unreviewed merges and zombie PRs.
Checks:
1. PR has at least 1 approving review (no rubber-stamping without approval)
2. PR has actual changes (no zombie PRs with 0 additions/deletions)
Review Gate — Poka-yoke for unreviewed merges.
Fails if the current PR has fewer than 1 approving review.
Usage in Gitea workflow:
- name: Review Approval Gate
@@ -16,6 +13,7 @@ Usage in Gitea workflow:
import os
import sys
import json
import subprocess
from urllib import request, error
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
@@ -35,68 +33,7 @@ def api_call(method, path):
return {"error": e.read().decode(), "status": e.code}
def check_empty_pr(pr_data):
"""Check if PR has no actual changes (zombie PR)."""
additions = pr_data.get("additions", 0)
deletions = pr_data.get("deletions", 0)
changed_files = pr_data.get("changed_files", 0)
if additions == 0 and deletions == 0 and changed_files == 0:
return False, (
f"ZOMBIE PR: PR has 0 additions, 0 deletions, 0 changed files. "
f"This appears to be an empty PR with no actual changes."
)
return True, None
def check_approvals(reviews):
"""Check if PR has at least one approving review."""
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if len(approvals) >= 1:
return True, len(approvals)
return False, 0
def check_rubber_stamp(pr_data, reviews):
"""
Check for rubber-stamping: approving reviews on PRs with trivial changes.
Rubber-stamping indicators:
- Approving reviews exist
- PR has very few changes (< 5 lines total)
- Review comments are empty or generic
"""
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if not approvals:
return True, None # No approvals to check
additions = pr_data.get("additions", 0)
deletions = pr_data.get("deletions", 0)
total_changes = additions + deletions
# Flag if approving a PR with fewer than 5 total changes
if total_changes < 5 and len(approvals) > 0:
# Check if review bodies are substantive
empty_reviews = [
r for r in approvals
if not r.get("body") or len(r.get("body", "").strip()) < 10
]
if empty_reviews:
return False, (
f"SUSPICIOUS: PR has only {total_changes} total changes "
f"but {len(approvals)} approving review(s), "
f"{len(empty_reviews)} with empty/minimal comments. "
f"This may indicate rubber-stamping."
)
return True, None
def main():
errors = []
warnings = []
# Validate environment
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
sys.exit(1)
@@ -107,57 +44,27 @@ def main():
pr_number = PR_NUMBER
if not pr_number:
# Try to infer from Gitea Actions environment
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
if not pr_number:
print("ERROR: Could not determine PR number")
sys.exit(1)
# Fetch PR data
pr_data = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}")
if isinstance(pr_data, dict) and "error" in pr_data:
print(f"ERROR fetching PR: {pr_data}")
sys.exit(1)
# Fetch reviews
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
if isinstance(reviews, dict) and "error" in reviews:
print(f"ERROR fetching reviews: {reviews}")
sys.exit(1)
# ── Check 1: Empty PR (zombie PR) ───────────────────────
has_changes, empty_msg = check_empty_pr(pr_data)
if not has_changes:
errors.append(empty_msg)
# ── Check 2: Has approvals ──────────────────────────────
has_approval, approval_count = check_approvals(reviews)
if not has_approval:
errors.append(
f"PR #{pr_number} has no approving reviews. "
f"Merges require at least one approval."
)
# ── Check 3: Rubber-stamping detection ──────────────────
clean, rubber_msg = check_rubber_stamp(pr_data, reviews)
if not clean:
warnings.append(rubber_msg)
# ── Report ──────────────────────────────────────────────
if warnings:
for w in warnings:
print(f"⚠️ WARNING: {w}")
if errors:
for e in errors:
print(f"❌ BLOCKED: {e}")
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if len(approvals) >= 1:
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
sys.exit(0)
else:
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
print("Merges are not permitted without at least one approval.")
sys.exit(1)
print(f"✅ OK: PR #{pr_number} has {approval_count} approval(s) "
f"and {pr_data.get('additions', 0)} additions / "
f"{pr_data.get('deletions', 0)} deletions.")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -4,61 +4,48 @@ Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
Correctly uses the Gitea 1.25+ API (not GitHub-style).
"""
from __future__ import annotations
import json
import os
import sys
import json
import urllib.request
from pathlib import Path
import yaml
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORG = "Timmy_Foundation"
PROJECT_ROOT = Path(__file__).resolve().parent.parent
CONFIG_DIR = PROJECT_ROOT / ".gitea" / "branch-protection"
CONFIG_DIR = ".gitea/branch-protection"
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
url = f"{GITEA_URL}/api/v1{path}"
data = json.dumps(payload).encode() if payload else None
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def build_branch_protection_payload(branch: str, rules: dict) -> dict:
return {
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.pop("branch", "main")
# Check if protection already exists
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(r.get("branch_name") == branch for r in existing)
payload = {
"branch_name": branch,
"rule_name": branch,
"required_approvals": rules.get("required_approvals", 1),
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
"block_deletions": rules.get("block_deletions", True),
"block_force_push": rules.get("block_force_push", rules.get("block_force_pushes", True)),
"block_force_push": rules.get("block_force_push", True),
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
"enable_status_check": rules.get("require_ci_to_merge", False),
"status_check_contexts": rules.get("status_check_contexts", []),
"block_on_outdated_branch": rules.get("block_on_outdated_branch", False),
}
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.get("branch", "main")
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(rule.get("branch_name") == branch for rule in existing)
payload = build_branch_protection_payload(branch, rules)
try:
if exists:
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
@@ -66,8 +53,8 @@ def apply_protection(repo: str, rules: dict) -> bool:
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
print(f"{repo}:{branch} synced")
return True
except Exception as exc:
print(f"{repo}:{branch} failed: {exc}")
except Exception as e:
print(f"{repo}:{branch} failed: {e}")
return False
@@ -75,18 +62,15 @@ def main() -> int:
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
return 1
if not CONFIG_DIR.exists():
print(f"ERROR: config directory not found: {CONFIG_DIR}")
return 1
ok = 0
for cfg_path in sorted(CONFIG_DIR.glob("*.yml")):
repo = cfg_path.stem
with cfg_path.open() as fh:
cfg = yaml.safe_load(fh) or {}
rules = cfg.get("rules", {})
rules.setdefault("branch", cfg.get("branch", "main"))
if apply_protection(repo, rules):
for fname in os.listdir(CONFIG_DIR):
if not fname.endswith(".yml"):
continue
repo = fname[:-4]
with open(os.path.join(CONFIG_DIR, fname)) as f:
cfg = yaml.safe_load(f)
if apply_protection(repo, cfg.get("rules", {})):
ok += 1
print(f"\nSynced {ok} repo(s)")

127
tests/test_lazarus_pit.py Normal file
View File

@@ -0,0 +1,127 @@
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent
_spec = importlib.util.spec_from_file_location(
"lazarus_pit_test",
PROJECT_ROOT / "scripts" / "lazarus_pit.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules["lazarus_pit_test"] = _mod
_spec.loader.exec_module(_mod)
build_cell_paths = _mod.build_cell_paths
build_daemon_report = _mod.build_daemon_report
init_cell = _mod.init_cell
load_config = _mod.load_config
scan_mission_cells = _mod.scan_mission_cells
write_daemon_heartbeat = _mod.write_daemon_heartbeat
def test_init_cell_creates_foundation_structure(tmp_path):
mission_id = "123e4567-e89b-12d3-a456-426614174000"
cell = init_cell(mission_id, root=tmp_path, now=1_700_000_000)
paths = build_cell_paths(mission_id, tmp_path)
for key in ["meta", "config", "state", "logs", "artifacts", "worktree"]:
assert paths[key].is_dir(), f"expected {key} directory to exist"
meta = json.loads((paths["meta"] / "mission.json").read_text())
assert meta["mission_id"] == mission_id
assert meta["status"] == "bootstrapped"
heartbeat = json.loads((paths["state"] / "heartbeat.json").read_text())
assert heartbeat["mission_id"] == mission_id
assert heartbeat["status"] == "bootstrapped"
assert cell["root"] == str(paths["root"])
def test_scan_mission_cells_marks_healthy_and_stale(tmp_path):
healthy_id = "healthy-cell"
stale_id = "stale-cell"
init_cell(healthy_id, root=tmp_path, now=1_700_000_000)
init_cell(stale_id, root=tmp_path, now=1_700_000_000)
healthy_paths = build_cell_paths(healthy_id, tmp_path)
stale_paths = build_cell_paths(stale_id, tmp_path)
(healthy_paths["state"] / "heartbeat.json").write_text(
json.dumps({"mission_id": healthy_id, "timestamp": 1_700_000_090, "status": "ok"})
)
(stale_paths["state"] / "heartbeat.json").write_text(
json.dumps({"mission_id": stale_id, "timestamp": 1_700_000_000, "status": "ok"})
)
cells = scan_mission_cells(
root=tmp_path,
required_subdirs=["meta", "config", "state", "logs", "artifacts", "worktree"],
heartbeat_relpath="state/heartbeat.json",
stale_after_seconds=60,
now=1_700_000_100,
)
by_id = {cell["mission_id"]: cell for cell in cells}
assert by_id[healthy_id]["status"] == "healthy"
assert by_id[healthy_id]["age_seconds"] == 10
assert by_id[stale_id]["status"] == "stale"
assert by_id[stale_id]["age_seconds"] == 100
def test_build_daemon_report_and_write_heartbeat(tmp_path):
config_path = tmp_path / "lazarus_pit.json"
config_path.write_text(
json.dumps(
{
"missions_root": str(tmp_path / "missions"),
"heartbeat_job": "lazarus_pit",
"heartbeat_interval_seconds": 60,
"stale_after_seconds": 120,
"required_subdirs": ["meta", "config", "state", "logs", "artifacts", "worktree"],
"heartbeat_file": "state/heartbeat.json",
}
)
)
config = load_config(config_path)
init_cell("mission-one", root=Path(config["missions_root"]), now=2_000)
paths = build_cell_paths("mission-one", Path(config["missions_root"]))
(paths["state"] / "heartbeat.json").write_text(
json.dumps({"mission_id": "mission-one", "timestamp": 2_050, "status": "ok"})
)
report = build_daemon_report(config, now=2_100)
assert report["summary"]["total_cells"] == 1
assert report["summary"]["healthy"] == 1
assert report["summary"]["stale"] == 0
assert report["cells"][0]["mission_id"] == "mission-one"
heartbeat_path = write_daemon_heartbeat(config, directory=tmp_path / "heartbeats")
heartbeat = json.loads(heartbeat_path.read_text())
assert heartbeat["job"] == "lazarus_pit"
assert heartbeat["interval_seconds"] == 60
def test_foundation_artifacts_exist_with_required_spec():
doc = PROJECT_ROOT / "docs" / "mission-cell-spec.md"
config = PROJECT_ROOT / "config" / "lazarus_pit.json"
assert doc.exists(), "expected mission cell spec doc"
assert config.exists(), "expected lazarus pit config"
content = doc.read_text()
for snippet in [
"/var/missions/<uuid>/",
"meta/mission.json",
"config/cell.json",
"state/heartbeat.json",
"logs/daemon.log",
"artifacts/",
"worktree/",
]:
assert snippet in content

View File

@@ -1,109 +0,0 @@
"""
Tests for scripts/review_gate.py — Anti-rubber-stamping PR validation.
"""
import unittest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from review_gate import check_empty_pr, check_approvals, check_rubber_stamp
class TestCheckEmptyPr(unittest.TestCase):
def test_valid_pr(self):
pr = {"additions": 10, "deletions": 5, "changed_files": 2}
ok, msg = check_empty_pr(pr)
self.assertTrue(ok)
self.assertIsNone(msg)
def test_empty_pr(self):
pr = {"additions": 0, "deletions": 0, "changed_files": 0}
ok, msg = check_empty_pr(pr)
self.assertFalse(ok)
self.assertIn("ZOMBIE", msg)
def test_additions_only(self):
pr = {"additions": 50, "deletions": 0, "changed_files": 1}
ok, msg = check_empty_pr(pr)
self.assertTrue(ok)
def test_deletions_only(self):
pr = {"additions": 0, "deletions": 30, "changed_files": 1}
ok, msg = check_empty_pr(pr)
self.assertTrue(ok)
def test_missing_fields_treated_as_zero(self):
pr = {}
ok, msg = check_empty_pr(pr)
self.assertFalse(ok)
class TestCheckApprovals(unittest.TestCase):
def test_has_approval(self):
reviews = [{"state": "APPROVED"}, {"state": "COMMENT"}]
ok, count = check_approvals(reviews)
self.assertTrue(ok)
self.assertEqual(count, 1)
def test_multiple_approvals(self):
reviews = [{"state": "APPROVED"}, {"state": "APPROVED"}]
ok, count = check_approvals(reviews)
self.assertTrue(ok)
self.assertEqual(count, 2)
def test_no_approvals(self):
reviews = [{"state": "COMMENT"}, {"state": "REQUEST_CHANGES"}]
ok, count = check_approvals(reviews)
self.assertFalse(ok)
self.assertEqual(count, 0)
def test_empty_reviews(self):
ok, count = check_approvals([])
self.assertFalse(ok)
self.assertEqual(count, 0)
class TestCheckRubberStamp(unittest.TestCase):
def test_substantive_pr_no_warning(self):
pr = {"additions": 100, "deletions": 50}
reviews = [{"state": "APPROVED", "body": "Looks good, nice changes"}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok)
self.assertIsNone(msg)
def test_trivial_pr_empty_review_detected(self):
pr = {"additions": 2, "deletions": 0}
reviews = [{"state": "APPROVED", "body": ""}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertFalse(ok)
self.assertIn("SUSPICIOUS", msg)
def test_trivial_pr_short_review_detected(self):
pr = {"additions": 1, "deletions": 1}
reviews = [{"state": "APPROVED", "body": "ok"}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertFalse(ok)
self.assertIn("SUSPICIOUS", msg)
def test_trivial_pr_substantive_review_ok(self):
pr = {"additions": 2, "deletions": 0}
reviews = [{"state": "APPROVED", "body": "This small fix is correct. Tested locally."}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok)
def test_no_approvals_skips_check(self):
pr = {"additions": 0, "deletions": 0}
reviews = [{"state": "COMMENT"}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok)
def test_large_pr_with_empty_review_ok(self):
pr = {"additions": 500, "deletions": 200}
reviews = [{"state": "APPROVED", "body": ""}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok) # Large PR, empty review is less suspicious
if __name__ == "__main__":
unittest.main()

View File

@@ -1,45 +0,0 @@
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import yaml
PROJECT_ROOT = Path(__file__).parent.parent
_spec = importlib.util.spec_from_file_location(
"sync_branch_protection_test",
PROJECT_ROOT / "scripts" / "sync_branch_protection.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules["sync_branch_protection_test"] = _mod
_spec.loader.exec_module(_mod)
build_branch_protection_payload = _mod.build_branch_protection_payload
def test_build_branch_protection_payload_enables_rebase_before_merge():
payload = build_branch_protection_payload(
"main",
{
"required_approvals": 1,
"dismiss_stale_approvals": True,
"require_ci_to_merge": False,
"block_deletions": True,
"block_force_push": True,
"block_on_outdated_branch": True,
},
)
assert payload["branch_name"] == "main"
assert payload["rule_name"] == "main"
assert payload["block_on_outdated_branch"] is True
assert payload["required_approvals"] == 1
assert payload["enable_status_check"] is False
def test_the_nexus_branch_protection_config_requires_up_to_date_branch():
config = yaml.safe_load((PROJECT_ROOT / ".gitea" / "branch-protection" / "the-nexus.yml").read_text())
rules = config["rules"]
assert rules["block_on_outdated_branch"] is True