Compare commits

..

3 Commits

Author SHA1 Message Date
Alexander Whitestone
27925c9a5d wip: fix mission bus test loader
Some checks failed
CI / test (pull_request) Failing after 1m9s
Review Approval Gate / verify-review (pull_request) Successful in 10s
CI / validate (pull_request) Failing after 1m40s
2026-04-15 04:09:32 -04:00
Alexander Whitestone
417f92b7a5 wip: add mission bus module and profiles 2026-04-15 04:08:47 -04:00
Alexander Whitestone
de3f41e819 wip: add mission bus regression tests 2026-04-15 04:06:28 -04:00
14 changed files with 582 additions and 495 deletions

View File

@@ -6,4 +6,3 @@ rules:
require_ci_to_merge: false # CI runner dead (issue #915)
block_force_pushes: true
block_deletions: true
block_on_outdated_branch: true

View File

@@ -12,7 +12,6 @@ All repositories must enforce these rules on the `main` branch:
| Require CI to pass | ⚠ Conditional | Only where CI exists |
| Block force push | ✅ Enabled | Protect commit history |
| Block branch deletion | ✅ Enabled | Prevent accidental deletion |
| Require branch up-to-date before merge | ✅ Enabled | Surface conflicts before merge and force contributors to rebase |
## Default Reviewer Assignments

8
app.js
View File

@@ -714,10 +714,6 @@ async function init() {
camera = new THREE.PerspectiveCamera(65, window.innerWidth / window.innerHeight, 0.1, 1000);
camera.position.copy(playerPos);
// Initialize avatar and LOD systems
if (window.AvatarCustomization) window.AvatarCustomization.init(scene, camera);
if (window.LODSystem) window.LODSystem.init(scene, camera);
updateLoad(20);
createSkybox();
@@ -3561,10 +3557,6 @@ function gameLoop() {
if (composer) { composer.render(); } else { renderer.render(scene, camera); }
// Update avatar and LOD systems
if (window.AvatarCustomization && playerPos) window.AvatarCustomization.update(playerPos);
if (window.LODSystem && playerPos) window.LODSystem.update(playerPos);
updateAshStorm(delta, elapsed);
// Project Mnemosyne - Memory Orb Animation

View File

@@ -0,0 +1,34 @@
{
"roles": {
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
"write": ["publish", "checkpoint", "handoff", "read"],
"read": ["read"],
"audit": ["read", "audit"]
},
"isolation_profiles": [
{
"name": "level1-directory",
"label": "Level 1 — directory workspace",
"level": 1,
"mechanism": "directory_workspace",
"description": "Single mission cell in an isolated workspace directory.",
"supports_resume": true
},
{
"name": "level2-mount-namespace",
"label": "Level 2 — mount namespace",
"level": 2,
"mechanism": "mount_namespace",
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
"supports_resume": true
},
{
"name": "level3-rootless-podman",
"label": "Level 3 — rootless Podman",
"level": 3,
"mechanism": "rootless_podman",
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
"supports_resume": true
}
]
}

31
docs/mission-bus.md Normal file
View File

@@ -0,0 +1,31 @@
# Mission Bus
The Mission Bus grounds the multi-agent teaming epic with a concrete, executable shared module.
## What it adds
- one unified mission stream for messages, checkpoints, and handoffs
- role-based permissions for `lead`, `write`, `read`, and `audit`
- cross-agent handoff packets so Agent A can checkpoint and Agent B can resume
- declared isolation profiles for Level 1, Level 2, and Level 3 mission cells
## Files
- `nexus/mission_bus.py`
- `config/mission_bus_profiles.json`
## Example
```python
from nexus.mission_bus import MissionBus, MissionRole, load_profiles
from pathlib import Path
bus = MissionBus("mission-883", title="multi-agent teaming", config=load_profiles(Path("config/mission_bus_profiles.json")))
bus.register_participant("timmy", MissionRole.LEAD)
bus.register_participant("ezra", MissionRole.WRITE)
checkpoint = bus.create_checkpoint("ezra", summary="checkpoint", state={"branch": "fix/883"})
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume from here")
packet = bus.build_resume_packet(bus.events[-1].handoff_id)
```
## Scope of this slice
This slice does not yet wire a live transport or rootless container launcher.
It codifies the mission bus contract, role permissions, handoff packet, and isolation profile surface so later work can execute against a stable interface.

View File

@@ -395,8 +395,6 @@
<div id="memory-connections-panel" class="memory-connections-panel" style="display:none;" aria-label="Memory Connections Panel"></div>
<script src="./boot.js"></script>
<script src="./avatar-customization.js"></script>
<script src="./lod-system.js"></script>
<script>
function openMemoryFilter() { renderFilterList(); document.getElementById('memory-filter').style.display = 'flex'; }
function closeMemoryFilter() { document.getElementById('memory-filter').style.display = 'none'; }

View File

@@ -1,186 +0,0 @@
/**
* LOD (Level of Detail) System for The Nexus
*
* Optimizes rendering when many avatars/users are visible:
* - Distance-based LOD: far users become billboard sprites
* - Occlusion: skip rendering users behind walls
* - Budget: maintain 60 FPS target with 50+ avatars
*
* Usage:
* LODSystem.init(scene, camera);
* LODSystem.registerAvatar(avatarMesh, userId);
* LODSystem.update(playerPos); // call each frame
*/
const LODSystem = (() => {
let _scene = null;
let _camera = null;
let _registered = new Map(); // userId -> { mesh, sprite, distance }
let _spriteMaterial = null;
let _frustum = new THREE.Frustum();
let _projScreenMatrix = new THREE.Matrix4();
// Thresholds
const LOD_NEAR = 15; // Full mesh within 15 units
const LOD_FAR = 40; // Billboard beyond 40 units
const LOD_CULL = 80; // Don't render beyond 80 units
const SPRITE_SIZE = 1.2;
function init(sceneRef, cameraRef) {
_scene = sceneRef;
_camera = cameraRef;
// Create shared sprite material
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
// Simple avatar indicator: colored circle
ctx.fillStyle = '#00ffcc';
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2); // head
ctx.fill();
const texture = new THREE.CanvasTexture(canvas);
_spriteMaterial = new THREE.SpriteMaterial({
map: texture,
transparent: true,
depthTest: true,
sizeAttenuation: true,
});
console.log('[LODSystem] Initialized');
}
function registerAvatar(avatarMesh, userId, color) {
// Create billboard sprite for this avatar
const spriteMat = _spriteMaterial.clone();
if (color) {
// Tint sprite to match avatar color
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
ctx.fillStyle = color;
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2);
ctx.fill();
spriteMat.map = new THREE.CanvasTexture(canvas);
spriteMat.map.needsUpdate = true;
}
const sprite = new THREE.Sprite(spriteMat);
sprite.scale.set(SPRITE_SIZE, SPRITE_SIZE, 1);
sprite.visible = false;
_scene.add(sprite);
_registered.set(userId, {
mesh: avatarMesh,
sprite: sprite,
distance: Infinity,
});
}
function unregisterAvatar(userId) {
const entry = _registered.get(userId);
if (entry) {
_scene.remove(entry.sprite);
entry.sprite.material.dispose();
_registered.delete(userId);
}
}
function setSpriteColor(userId, color) {
const entry = _registered.get(userId);
if (!entry) return;
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
ctx.fillStyle = color;
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2);
ctx.fill();
entry.sprite.material.map = new THREE.CanvasTexture(canvas);
entry.sprite.material.map.needsUpdate = true;
}
function update(playerPos) {
if (!_camera) return;
// Update frustum for culling
_projScreenMatrix.multiplyMatrices(
_camera.projectionMatrix,
_camera.matrixWorldInverse
);
_frustum.setFromProjectionMatrix(_projScreenMatrix);
_registered.forEach((entry, userId) => {
if (!entry.mesh) return;
const meshPos = entry.mesh.position;
const distance = playerPos.distanceTo(meshPos);
entry.distance = distance;
// Beyond cull distance: hide everything
if (distance > LOD_CULL) {
entry.mesh.visible = false;
entry.sprite.visible = false;
return;
}
// Check if in camera frustum
const inFrustum = _frustum.containsPoint(meshPos);
if (!inFrustum) {
entry.mesh.visible = false;
entry.sprite.visible = false;
return;
}
// LOD switching
if (distance <= LOD_NEAR) {
// Near: full mesh
entry.mesh.visible = true;
entry.sprite.visible = false;
} else if (distance <= LOD_FAR) {
// Mid: mesh with reduced detail (keep mesh visible)
entry.mesh.visible = true;
entry.sprite.visible = false;
} else {
// Far: billboard sprite
entry.mesh.visible = false;
entry.sprite.visible = true;
entry.sprite.position.copy(meshPos);
entry.sprite.position.y += 1.2; // above avatar center
}
});
}
function getStats() {
let meshCount = 0;
let spriteCount = 0;
let culledCount = 0;
_registered.forEach(entry => {
if (entry.mesh.visible) meshCount++;
else if (entry.sprite.visible) spriteCount++;
else culledCount++;
});
return { total: _registered.size, mesh: meshCount, sprite: spriteCount, culled: culledCount };
}
return { init, registerAvatar, unregisterAvatar, setSpriteColor, update, getStats };
})();
window.LODSystem = LODSystem;

View File

@@ -14,6 +14,16 @@ from nexus.perception_adapter import (
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
from nexus.mission_bus import (
MissionBus,
MissionRole,
MissionParticipant,
MissionMessage,
MissionCheckpoint,
MissionHandoff,
IsolationProfile,
load_profiles,
)
try:
from nexus.nexus_think import NexusMind
@@ -28,5 +38,13 @@ __all__ = [
"Action",
"ExperienceStore",
"TrajectoryLogger",
"MissionBus",
"MissionRole",
"MissionParticipant",
"MissionMessage",
"MissionCheckpoint",
"MissionHandoff",
"IsolationProfile",
"load_profiles",
"NexusMind",
]

358
nexus/mission_bus.py Normal file
View File

@@ -0,0 +1,358 @@
"""Mission bus, role permissions, cross-agent handoff, and isolation profiles.
Grounded implementation slice for #883.
The bus gives a single mission cell a unified event stream, permission-checked
roles, checkpoint + resume handoff, and declared isolation profiles for Level
1/2/3 execution boundaries.
"""
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Union
DEFAULT_CONFIG = {
"roles": {
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
"write": ["publish", "checkpoint", "handoff", "read"],
"read": ["read"],
"audit": ["read", "audit"],
},
"isolation_profiles": [
{
"name": "level1-directory",
"label": "Level 1 — directory workspace",
"level": 1,
"mechanism": "directory_workspace",
"description": "Single mission cell in an isolated workspace directory.",
"supports_resume": True,
},
{
"name": "level2-mount-namespace",
"label": "Level 2 — mount namespace",
"level": 2,
"mechanism": "mount_namespace",
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
"supports_resume": True,
},
{
"name": "level3-rootless-podman",
"label": "Level 3 — rootless Podman",
"level": 3,
"mechanism": "rootless_podman",
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
"supports_resume": True,
},
],
}
def utcnow_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def load_profiles(path: Path) -> Dict[str, Any]:
if not path.exists():
return json.loads(json.dumps(DEFAULT_CONFIG))
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
data.setdefault("roles", DEFAULT_CONFIG["roles"])
data.setdefault("isolation_profiles", DEFAULT_CONFIG["isolation_profiles"])
return data
class MissionRole(str, Enum):
LEAD = "lead"
WRITE = "write"
READ = "read"
AUDIT = "audit"
@dataclass
class IsolationProfile:
name: str
label: str
level: int
mechanism: str
description: str = ""
supports_resume: bool = True
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"label": self.label,
"level": self.level,
"mechanism": self.mechanism,
"description": self.description,
"supports_resume": self.supports_resume,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "IsolationProfile":
return cls(
name=data["name"],
label=data["label"],
level=int(data["level"]),
mechanism=data["mechanism"],
description=data.get("description", ""),
supports_resume=bool(data.get("supports_resume", True)),
)
@dataclass
class MissionParticipant:
name: str
role: MissionRole
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"role": self.role.value,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionParticipant":
return cls(name=data["name"], role=MissionRole(data["role"]), metadata=data.get("metadata", {}))
@dataclass
class MissionMessage:
sender: str
topic: str
payload: Dict[str, Any]
sequence: int
timestamp: str = field(default_factory=utcnow_iso)
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="message", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"topic": self.topic,
"payload": self.payload,
"sequence": self.sequence,
"timestamp": self.timestamp,
"message_id": self.message_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionMessage":
return cls(
sender=data["sender"],
topic=data["topic"],
payload=data["payload"],
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
message_id=data.get("message_id") or data.get("messageId") or str(uuid.uuid4()),
)
@dataclass
class MissionCheckpoint:
sender: str
summary: str
state: Dict[str, Any]
sequence: int
artifacts: List[str] = field(default_factory=list)
timestamp: str = field(default_factory=utcnow_iso)
checkpoint_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="checkpoint", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"summary": self.summary,
"state": self.state,
"artifacts": self.artifacts,
"sequence": self.sequence,
"timestamp": self.timestamp,
"checkpoint_id": self.checkpoint_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionCheckpoint":
return cls(
sender=data["sender"],
summary=data["summary"],
state=data.get("state", {}),
artifacts=list(data.get("artifacts", [])),
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
checkpoint_id=data.get("checkpoint_id") or data.get("checkpointId") or str(uuid.uuid4()),
)
@dataclass
class MissionHandoff:
sender: str
recipient: str
checkpoint_id: str
sequence: int
note: str = ""
timestamp: str = field(default_factory=utcnow_iso)
handoff_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="handoff", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"recipient": self.recipient,
"checkpoint_id": self.checkpoint_id,
"note": self.note,
"sequence": self.sequence,
"timestamp": self.timestamp,
"handoff_id": self.handoff_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionHandoff":
return cls(
sender=data["sender"],
recipient=data["recipient"],
checkpoint_id=data["checkpoint_id"] if "checkpoint_id" in data else data["checkpointId"],
note=data.get("note", ""),
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
handoff_id=data.get("handoff_id") or data.get("handoffId") or str(uuid.uuid4()),
)
MissionEvent = Union[MissionMessage, MissionCheckpoint, MissionHandoff]
def event_from_dict(data: Dict[str, Any]) -> MissionEvent:
kind = data["event_type"]
if kind == "message":
return MissionMessage.from_dict(data)
if kind == "checkpoint":
return MissionCheckpoint.from_dict(data)
if kind == "handoff":
return MissionHandoff.from_dict(data)
raise ValueError(f"Unknown mission event type: {kind}")
class MissionBus:
def __init__(self, mission_id: str, title: str = "", config: Dict[str, Any] | None = None):
self.mission_id = mission_id
self.title = title
self.config = config or json.loads(json.dumps(DEFAULT_CONFIG))
self.role_permissions = {
role: set(perms) for role, perms in self.config.get("roles", {}).items()
}
self.isolation_profiles = [
IsolationProfile.from_dict(entry) for entry in self.config.get("isolation_profiles", [])
]
self.participants: Dict[str, MissionParticipant] = {}
self.events: List[MissionEvent] = []
def register_participant(self, name: str, role: MissionRole, metadata: Dict[str, Any] | None = None) -> MissionParticipant:
participant = MissionParticipant(name=name, role=role, metadata=metadata or {})
self.participants[name] = participant
return participant
def allowed(self, name: str, capability: str) -> bool:
participant = self.participants.get(name)
if participant is None:
return False
return capability in self.role_permissions.get(participant.role.value, set())
def _require(self, name: str, capability: str) -> None:
if not self.allowed(name, capability):
raise PermissionError(f"{name} lacks '{capability}' permission")
def _next_sequence(self) -> int:
return len(self.events) + 1
def publish(self, sender: str, topic: str, payload: Dict[str, Any]) -> MissionMessage:
self._require(sender, "publish")
event = MissionMessage(sender=sender, topic=topic, payload=payload, sequence=self._next_sequence())
self.events.append(event)
return event
def create_checkpoint(
self,
sender: str,
summary: str,
state: Dict[str, Any],
artifacts: List[str] | None = None,
) -> MissionCheckpoint:
self._require(sender, "checkpoint")
event = MissionCheckpoint(
sender=sender,
summary=summary,
state=state,
artifacts=list(artifacts or []),
sequence=self._next_sequence(),
)
self.events.append(event)
return event
def _get_checkpoint(self, checkpoint_id: str) -> MissionCheckpoint:
for event in self.events:
if isinstance(event, MissionCheckpoint) and event.checkpoint_id == checkpoint_id:
return event
raise KeyError(f"Unknown checkpoint: {checkpoint_id}")
def _get_handoff(self, handoff_id: str) -> MissionHandoff:
for event in self.events:
if isinstance(event, MissionHandoff) and event.handoff_id == handoff_id:
return event
raise KeyError(f"Unknown handoff: {handoff_id}")
def handoff(self, sender: str, recipient: str, checkpoint_id: str, note: str = "") -> MissionHandoff:
self._require(sender, "handoff")
if recipient not in self.participants:
raise KeyError(f"Unknown recipient: {recipient}")
self._get_checkpoint(checkpoint_id)
event = MissionHandoff(
sender=sender,
recipient=recipient,
checkpoint_id=checkpoint_id,
note=note,
sequence=self._next_sequence(),
)
self.events.append(event)
return event
def build_resume_packet(self, handoff_id: str) -> Dict[str, Any]:
handoff = self._get_handoff(handoff_id)
checkpoint = self._get_checkpoint(handoff.checkpoint_id)
return {
"mission_id": self.mission_id,
"title": self.title,
"recipient": handoff.recipient,
"sender": handoff.sender,
"handoff_note": handoff.note,
"checkpoint": checkpoint.to_dict(),
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
"isolation_profiles": [profile.to_dict() for profile in self.isolation_profiles],
"stream_length": len(self.events),
}
def to_dict(self) -> Dict[str, Any]:
return {
"mission_id": self.mission_id,
"title": self.title,
"config": self.config,
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
"events": [event.to_dict() for event in self.events],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionBus":
bus = cls(data["mission_id"], title=data.get("title", ""), config=data.get("config"))
for name, participant_data in data.get("participants", {}).items():
bus.participants[name] = MissionParticipant.from_dict(participant_data)
bus.events = [event_from_dict(event_data) for event_data in data.get("events", [])]
return bus

View File

@@ -1,10 +1,7 @@
#!/usr/bin/env python3
"""
Review Gate — Poka-yoke for unreviewed merges and zombie PRs.
Checks:
1. PR has at least 1 approving review (no rubber-stamping without approval)
2. PR has actual changes (no zombie PRs with 0 additions/deletions)
Review Gate — Poka-yoke for unreviewed merges.
Fails if the current PR has fewer than 1 approving review.
Usage in Gitea workflow:
- name: Review Approval Gate
@@ -16,6 +13,7 @@ Usage in Gitea workflow:
import os
import sys
import json
import subprocess
from urllib import request, error
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
@@ -35,68 +33,7 @@ def api_call(method, path):
return {"error": e.read().decode(), "status": e.code}
def check_empty_pr(pr_data):
"""Check if PR has no actual changes (zombie PR)."""
additions = pr_data.get("additions", 0)
deletions = pr_data.get("deletions", 0)
changed_files = pr_data.get("changed_files", 0)
if additions == 0 and deletions == 0 and changed_files == 0:
return False, (
f"ZOMBIE PR: PR has 0 additions, 0 deletions, 0 changed files. "
f"This appears to be an empty PR with no actual changes."
)
return True, None
def check_approvals(reviews):
"""Check if PR has at least one approving review."""
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if len(approvals) >= 1:
return True, len(approvals)
return False, 0
def check_rubber_stamp(pr_data, reviews):
"""
Check for rubber-stamping: approving reviews on PRs with trivial changes.
Rubber-stamping indicators:
- Approving reviews exist
- PR has very few changes (< 5 lines total)
- Review comments are empty or generic
"""
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if not approvals:
return True, None # No approvals to check
additions = pr_data.get("additions", 0)
deletions = pr_data.get("deletions", 0)
total_changes = additions + deletions
# Flag if approving a PR with fewer than 5 total changes
if total_changes < 5 and len(approvals) > 0:
# Check if review bodies are substantive
empty_reviews = [
r for r in approvals
if not r.get("body") or len(r.get("body", "").strip()) < 10
]
if empty_reviews:
return False, (
f"SUSPICIOUS: PR has only {total_changes} total changes "
f"but {len(approvals)} approving review(s), "
f"{len(empty_reviews)} with empty/minimal comments. "
f"This may indicate rubber-stamping."
)
return True, None
def main():
errors = []
warnings = []
# Validate environment
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
sys.exit(1)
@@ -107,57 +44,27 @@ def main():
pr_number = PR_NUMBER
if not pr_number:
# Try to infer from Gitea Actions environment
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
if not pr_number:
print("ERROR: Could not determine PR number")
sys.exit(1)
# Fetch PR data
pr_data = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}")
if isinstance(pr_data, dict) and "error" in pr_data:
print(f"ERROR fetching PR: {pr_data}")
sys.exit(1)
# Fetch reviews
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
if isinstance(reviews, dict) and "error" in reviews:
print(f"ERROR fetching reviews: {reviews}")
sys.exit(1)
# ── Check 1: Empty PR (zombie PR) ───────────────────────
has_changes, empty_msg = check_empty_pr(pr_data)
if not has_changes:
errors.append(empty_msg)
# ── Check 2: Has approvals ──────────────────────────────
has_approval, approval_count = check_approvals(reviews)
if not has_approval:
errors.append(
f"PR #{pr_number} has no approving reviews. "
f"Merges require at least one approval."
)
# ── Check 3: Rubber-stamping detection ──────────────────
clean, rubber_msg = check_rubber_stamp(pr_data, reviews)
if not clean:
warnings.append(rubber_msg)
# ── Report ──────────────────────────────────────────────
if warnings:
for w in warnings:
print(f"⚠️ WARNING: {w}")
if errors:
for e in errors:
print(f"❌ BLOCKED: {e}")
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if len(approvals) >= 1:
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
sys.exit(0)
else:
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
print("Merges are not permitted without at least one approval.")
sys.exit(1)
print(f"✅ OK: PR #{pr_number} has {approval_count} approval(s) "
f"and {pr_data.get('additions', 0)} additions / "
f"{pr_data.get('deletions', 0)} deletions.")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -4,61 +4,48 @@ Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
Correctly uses the Gitea 1.25+ API (not GitHub-style).
"""
from __future__ import annotations
import json
import os
import sys
import json
import urllib.request
from pathlib import Path
import yaml
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORG = "Timmy_Foundation"
PROJECT_ROOT = Path(__file__).resolve().parent.parent
CONFIG_DIR = PROJECT_ROOT / ".gitea" / "branch-protection"
CONFIG_DIR = ".gitea/branch-protection"
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
url = f"{GITEA_URL}/api/v1{path}"
data = json.dumps(payload).encode() if payload else None
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def build_branch_protection_payload(branch: str, rules: dict) -> dict:
return {
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.pop("branch", "main")
# Check if protection already exists
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(r.get("branch_name") == branch for r in existing)
payload = {
"branch_name": branch,
"rule_name": branch,
"required_approvals": rules.get("required_approvals", 1),
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
"block_deletions": rules.get("block_deletions", True),
"block_force_push": rules.get("block_force_push", rules.get("block_force_pushes", True)),
"block_force_push": rules.get("block_force_push", True),
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
"enable_status_check": rules.get("require_ci_to_merge", False),
"status_check_contexts": rules.get("status_check_contexts", []),
"block_on_outdated_branch": rules.get("block_on_outdated_branch", False),
}
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.get("branch", "main")
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(rule.get("branch_name") == branch for rule in existing)
payload = build_branch_protection_payload(branch, rules)
try:
if exists:
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
@@ -66,8 +53,8 @@ def apply_protection(repo: str, rules: dict) -> bool:
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
print(f"{repo}:{branch} synced")
return True
except Exception as exc:
print(f"{repo}:{branch} failed: {exc}")
except Exception as e:
print(f"{repo}:{branch} failed: {e}")
return False
@@ -75,18 +62,15 @@ def main() -> int:
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
return 1
if not CONFIG_DIR.exists():
print(f"ERROR: config directory not found: {CONFIG_DIR}")
return 1
ok = 0
for cfg_path in sorted(CONFIG_DIR.glob("*.yml")):
repo = cfg_path.stem
with cfg_path.open() as fh:
cfg = yaml.safe_load(fh) or {}
rules = cfg.get("rules", {})
rules.setdefault("branch", cfg.get("branch", "main"))
if apply_protection(repo, rules):
for fname in os.listdir(CONFIG_DIR):
if not fname.endswith(".yml"):
continue
repo = fname[:-4]
with open(os.path.join(CONFIG_DIR, fname)) as f:
cfg = yaml.safe_load(f)
if apply_protection(repo, cfg.get("rules", {})):
ok += 1
print(f"\nSynced {ok} repo(s)")

107
tests/test_mission_bus.py Normal file
View File

@@ -0,0 +1,107 @@
from importlib import util
from pathlib import Path
import sys
import pytest
ROOT = Path(__file__).resolve().parent.parent
MODULE_PATH = ROOT / "nexus" / "mission_bus.py"
CONFIG_PATH = ROOT / "config" / "mission_bus_profiles.json"
def load_module():
spec = util.spec_from_file_location("mission_bus", MODULE_PATH)
module = util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def build_bus(module):
profiles = module.load_profiles(CONFIG_PATH)
bus = module.MissionBus("mission-883", title="multi-agent teaming", config=profiles)
bus.register_participant("timmy", module.MissionRole.LEAD)
bus.register_participant("ezra", module.MissionRole.WRITE)
bus.register_participant("bezalel", module.MissionRole.READ)
bus.register_participant("allegro", module.MissionRole.AUDIT)
return bus
def test_role_permissions_gate_publish_checkpoint_and_handoff():
module = load_module()
bus = build_bus(module)
assert bus.allowed("timmy", "publish") is True
assert bus.allowed("ezra", "handoff") is True
assert bus.allowed("allegro", "audit") is True
assert bus.allowed("bezalel", "publish") is False
with pytest.raises(PermissionError):
bus.publish("bezalel", "mission.notes", {"text": "should fail"})
with pytest.raises(PermissionError):
bus.create_checkpoint("allegro", summary="audit cannot checkpoint", state={})
def test_mission_bus_unified_stream_records_messages_checkpoints_and_handoffs():
module = load_module()
bus = build_bus(module)
msg = bus.publish("timmy", "mission.start", {"goal": "build the slice"})
checkpoint = bus.create_checkpoint(
"ezra",
summary="checkpoint before lead review",
state={"branch": "fix/883", "files": ["nexus/mission_bus.py"]},
artifacts=["docs/mission-bus.md"],
)
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="ready for lead review")
assert [event.event_type for event in bus.events] == ["message", "checkpoint", "handoff"]
assert [event.sequence for event in bus.events] == [1, 2, 3]
assert msg.topic == "mission.start"
assert handoff.recipient == "timmy"
def test_handoff_resume_packet_contains_checkpoint_state_and_participants():
module = load_module()
bus = build_bus(module)
checkpoint = bus.create_checkpoint(
"ezra",
summary="handoff package",
state={"branch": "fix/883", "tests": ["tests/test_mission_bus.py"]},
artifacts=["config/mission_bus_profiles.json"],
)
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="pick up from here")
packet = bus.build_resume_packet(handoff.handoff_id)
assert packet["recipient"] == "timmy"
assert packet["checkpoint"]["state"]["branch"] == "fix/883"
assert packet["checkpoint"]["artifacts"] == ["config/mission_bus_profiles.json"]
assert packet["participants"]["ezra"]["role"] == "write"
assert packet["handoff_note"] == "pick up from here"
def test_profiles_define_level2_mount_namespace_and_level3_rootless_podman():
module = load_module()
profiles = module.load_profiles(CONFIG_PATH)
levels = {entry["level"]: entry["mechanism"] for entry in profiles["isolation_profiles"]}
assert levels[2] == "mount_namespace"
assert levels[3] == "rootless_podman"
assert profiles["roles"]["audit"] == ["read", "audit"]
def test_mission_bus_roundtrip_preserves_events_and_isolation_profile():
module = load_module()
bus = build_bus(module)
bus.publish("timmy", "mission.start", {"goal": "roundtrip"})
checkpoint = bus.create_checkpoint("ezra", summary="save state", state={"count": 1})
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume")
restored = module.MissionBus.from_dict(bus.to_dict())
assert restored.mission_id == "mission-883"
assert restored.events[-1].event_type == "handoff"
assert restored.events[-1].note == "resume"
assert restored.isolation_profiles[1].mechanism == "mount_namespace"

View File

@@ -1,109 +0,0 @@
"""
Tests for scripts/review_gate.py — Anti-rubber-stamping PR validation.
"""
import unittest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from review_gate import check_empty_pr, check_approvals, check_rubber_stamp
class TestCheckEmptyPr(unittest.TestCase):
def test_valid_pr(self):
pr = {"additions": 10, "deletions": 5, "changed_files": 2}
ok, msg = check_empty_pr(pr)
self.assertTrue(ok)
self.assertIsNone(msg)
def test_empty_pr(self):
pr = {"additions": 0, "deletions": 0, "changed_files": 0}
ok, msg = check_empty_pr(pr)
self.assertFalse(ok)
self.assertIn("ZOMBIE", msg)
def test_additions_only(self):
pr = {"additions": 50, "deletions": 0, "changed_files": 1}
ok, msg = check_empty_pr(pr)
self.assertTrue(ok)
def test_deletions_only(self):
pr = {"additions": 0, "deletions": 30, "changed_files": 1}
ok, msg = check_empty_pr(pr)
self.assertTrue(ok)
def test_missing_fields_treated_as_zero(self):
pr = {}
ok, msg = check_empty_pr(pr)
self.assertFalse(ok)
class TestCheckApprovals(unittest.TestCase):
def test_has_approval(self):
reviews = [{"state": "APPROVED"}, {"state": "COMMENT"}]
ok, count = check_approvals(reviews)
self.assertTrue(ok)
self.assertEqual(count, 1)
def test_multiple_approvals(self):
reviews = [{"state": "APPROVED"}, {"state": "APPROVED"}]
ok, count = check_approvals(reviews)
self.assertTrue(ok)
self.assertEqual(count, 2)
def test_no_approvals(self):
reviews = [{"state": "COMMENT"}, {"state": "REQUEST_CHANGES"}]
ok, count = check_approvals(reviews)
self.assertFalse(ok)
self.assertEqual(count, 0)
def test_empty_reviews(self):
ok, count = check_approvals([])
self.assertFalse(ok)
self.assertEqual(count, 0)
class TestCheckRubberStamp(unittest.TestCase):
def test_substantive_pr_no_warning(self):
pr = {"additions": 100, "deletions": 50}
reviews = [{"state": "APPROVED", "body": "Looks good, nice changes"}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok)
self.assertIsNone(msg)
def test_trivial_pr_empty_review_detected(self):
pr = {"additions": 2, "deletions": 0}
reviews = [{"state": "APPROVED", "body": ""}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertFalse(ok)
self.assertIn("SUSPICIOUS", msg)
def test_trivial_pr_short_review_detected(self):
pr = {"additions": 1, "deletions": 1}
reviews = [{"state": "APPROVED", "body": "ok"}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertFalse(ok)
self.assertIn("SUSPICIOUS", msg)
def test_trivial_pr_substantive_review_ok(self):
pr = {"additions": 2, "deletions": 0}
reviews = [{"state": "APPROVED", "body": "This small fix is correct. Tested locally."}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok)
def test_no_approvals_skips_check(self):
pr = {"additions": 0, "deletions": 0}
reviews = [{"state": "COMMENT"}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok)
def test_large_pr_with_empty_review_ok(self):
pr = {"additions": 500, "deletions": 200}
reviews = [{"state": "APPROVED", "body": ""}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok) # Large PR, empty review is less suspicious
if __name__ == "__main__":
unittest.main()

View File

@@ -1,45 +0,0 @@
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import yaml
PROJECT_ROOT = Path(__file__).parent.parent
_spec = importlib.util.spec_from_file_location(
"sync_branch_protection_test",
PROJECT_ROOT / "scripts" / "sync_branch_protection.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules["sync_branch_protection_test"] = _mod
_spec.loader.exec_module(_mod)
build_branch_protection_payload = _mod.build_branch_protection_payload
def test_build_branch_protection_payload_enables_rebase_before_merge():
payload = build_branch_protection_payload(
"main",
{
"required_approvals": 1,
"dismiss_stale_approvals": True,
"require_ci_to_merge": False,
"block_deletions": True,
"block_force_push": True,
"block_on_outdated_branch": True,
},
)
assert payload["branch_name"] == "main"
assert payload["rule_name"] == "main"
assert payload["block_on_outdated_branch"] is True
assert payload["required_approvals"] == 1
assert payload["enable_status_check"] is False
def test_the_nexus_branch_protection_config_requires_up_to_date_branch():
config = yaml.safe_load((PROJECT_ROOT / ".gitea" / "branch-protection" / "the-nexus.yml").read_text())
rules = config["rules"]
assert rules["block_on_outdated_branch"] is True