Compare commits

..

5 Commits

Author SHA1 Message Date
Alexander Whitestone
27925c9a5d wip: fix mission bus test loader
Some checks are pending
CI / test (pull_request) Waiting to run
CI / validate (pull_request) Waiting to run
Review Approval Gate / verify-review (pull_request) Waiting to run
2026-04-15 04:09:32 -04:00
Alexander Whitestone
417f92b7a5 wip: add mission bus module and profiles 2026-04-15 04:08:47 -04:00
Alexander Whitestone
de3f41e819 wip: add mission bus regression tests 2026-04-15 04:06:28 -04:00
bd0497b998 Merge PR #1585: docs: add night shift prediction report (#1353) 2026-04-15 06:13:22 +00:00
Alexander Whitestone
4ab84a59ab docs: add night shift prediction report (#1353)
Some checks are pending
CI / test (pull_request) Waiting to run
CI / validate (pull_request) Waiting to run
Review Approval Gate / verify-review (pull_request) Waiting to run
2026-04-15 02:02:26 -04:00
11 changed files with 748 additions and 570 deletions

View File

@@ -1,116 +0,0 @@
# .gitea/workflows/check-pr-changes.yml
# CI workflow to prevent rubber-stamping of PRs with no changes
# Issue #1445: process: Prevent rubber-stamping of PRs with no changes
name: Check PR for Changes
on:
pull_request:
types: [opened, synchronize, reopened]
jobs:
check-changes:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 0 # Fetch full history for diff comparison
- name: Check for empty PR
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
# Get PR number from context
PR_NUMBER="${{ github.event.pull_request.number }}"
echo "Checking PR #$PR_NUMBER for changes..."
# Get the base and head commits
BASE_SHA="${{ github.event.pull_request.base.sha }}"
HEAD_SHA="${{ github.event.pull_request.head.sha }}"
echo "Base SHA: $BASE_SHA"
echo "Head SHA: $HEAD_SHA"
# Get diff stats
DIFF_STATS=$(git diff --stat "$BASE_SHA" "$HEAD_SHA")
if [ -z "$DIFF_STATS" ]; then
echo "❌ ERROR: PR has no changes!"
echo ""
echo "This PR has 0 additions, 0 deletions, and 0 files changed."
echo "This is a 'zombie PR' that should not be merged."
echo ""
echo "Rubber-stamping (approving PRs with no changes) is prohibited."
echo "Reviewers must verify that PRs contain actual changes."
echo ""
echo "If this is a mistake, please add actual changes to the PR."
echo "If this PR is not needed, please close it."
exit 1
else
echo "✅ PR has changes:"
echo "$DIFF_STATS"
# Get detailed stats
ADDITIONS=$(git diff --numstat "$BASE_SHA" "$HEAD_SHA" | awk '{sum+=$1} END {print sum}')
DELETIONS=$(git diff --numstat "$BASE_SHA" "$HEAD_SHA" | awk '{sum+=$2} END {print sum}')
FILES_CHANGED=$(git diff --numstat "$BASE_SHA" "$HEAD_SHA" | wc -l)
echo ""
echo "Summary:"
echo " Files changed: $FILES_CHANGED"
echo " Additions: $ADDITIONS"
echo " Deletions: $DELETIONS"
# Check if this is a "zombie PR" (no actual changes)
if [ "$ADDITIONS" -eq 0 ] && [ "$DELETIONS" -eq 0 ]; then
echo ""
echo "⚠️ WARNING: PR has files changed but no additions or deletions!"
echo "This might be a binary file change or permission change."
echo "Reviewers should verify this is intentional."
fi
fi
- name: Check for empty commits
run: |
# Check if there are any commits with no changes
BASE_SHA="${{ github.event.pull_request.base.sha }}"
HEAD_SHA="${{ github.event.pull_request.head.sha }}"
# Get list of commits
COMMITS=$(git log --oneline "$BASE_SHA".."$HEAD_SHA")
if [ -z "$COMMITS" ]; then
echo "❌ ERROR: PR has no commits!"
exit 1
fi
echo "Commits in this PR:"
echo "$COMMITS"
# Check each commit for changes
EMPTY_COMMITS=0
while IFS= read -r commit; do
COMMIT_SHA=$(echo "$commit" | awk '{print $1}')
COMMIT_MSG=$(echo "$commit" | cut -d' ' -f2-)
# Get parent commit
PARENT_SHA=$(git rev-parse "$COMMIT_SHA^" 2>/dev/null || echo "")
if [ -n "$PARENT_SHA" ]; then
# Check if commit has changes
COMMIT_DIFF=$(git diff --stat "$PARENT_SHA" "$COMMIT_SHA")
if [ -z "$COMMIT_DIFF" ]; then
echo "⚠️ WARNING: Commit $COMMIT_SHA has no changes!"
echo " Message: $COMMIT_MSG"
EMPTY_COMMITS=$((EMPTY_COMMITS + 1))
fi
fi
done <<< "$COMMITS"
if [ "$EMPTY_COMMITS" -gt 0 ]; then
echo ""
echo "⚠️ Found $EMPTY_COMMITS commit(s) with no changes."
echo "Consider squashing or amending these commits."
fi

View File

@@ -1,73 +1,65 @@
## Description
<!-- Provide a clear description of what this PR does -->
## Changes Made
<!-- List the specific changes made in this PR -->
### Files Changed
<!-- List the files that were modified -->
### Type of Change
<!-- Check the relevant option -->
- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Documentation update
- [ ] Refactoring (no functional changes)
- [ ] Test updates
- [ ] CI/CD changes
## Testing
<!-- Describe the tests you ran to verify your changes -->
### Test Instructions
<!-- Provide step-by-step instructions to test your changes -->
## Checklist
<!-- Check all that apply -->
- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] Any dependent changes have been merged and published in downstream modules
## Reviewer Guidelines
<!-- IMPORTANT: Reviewers must follow these guidelines to prevent rubber-stamping -->
### ⚠️ Reviewers MUST verify:
1. **PR has actual changes** - Check that the PR contains additions, deletions, or modifications
2. **Changes match description** - Verify the changes match what's described in the PR
3. **Code quality** - Review code for bugs, security issues, performance problems
4. **Tests are adequate** - Ensure new code is properly tested
5. **Documentation is updated** - Check if documentation needs updates
### ❌ DO NOT approve if:
- PR has 0 additions, 0 deletions, and 0 files changed (zombie PR)
- Changes don't match the PR description
- Code has obvious bugs or security issues
- No tests for new functionality
- Documentation is missing or incorrect
### ✅ DO approve if:
- PR has meaningful changes that match the description
- Code is clean, well-tested, and documented
- Changes follow project conventions
- No obvious issues or risks
## Related Issues
<!-- Link any related issues -->
- Fixes #<!-- issue number -->
- Related to #<!-- issue number -->
## Additional Notes
<!-- Add any other context about the PR here -->
---
**By submitting this PR, I confirm that:**
1. I have actually reviewed the code changes
2. The changes are meaningful and not a zombie PR
3. I have tested the changes locally (if applicable)
4. I understand that rubber-stamping (approving PRs with no changes) is prohibited
**⚠️ Before submitting your pull request:**
1. [x] I've read [BRANCH_PROTECTION.md](BRANCH_PROTECTION.md)
2. [x] I've followed [CONTRIBUTING.md](CONTRIBUTING.md) guidelines
3. [x] My changes have appropriate test coverage
4. [x] I've updated documentation where needed
5. [x] I've verified CI passes (where applicable)
**Context:**
<Describe your changes and why they're needed>
**Testing:**
<Explain how this was tested>
**Questions for reviewers:**
<Ask specific questions if needed>
## Pull Request Template
### Description
[Explain your changes briefly]
### Checklist
- [ ] Branch protection rules followed
- [ ] Required reviewers: @perplexity (QA), @Timmy (hermes-agent)
- [ ] CI passed (where applicable)
### Questions for Reviewers
- [ ] Any special considerations?
- [ ] Does this require additional documentation?
# Pull Request Template
## Summary
Briefly describe the changes in this PR.
## Reviewers
- Default reviewer: @perplexity
- Required reviewer for hermes-agent: @Timmy
## Branch Protection Compliance
- [ ] PR created
- [ ] 1+ approvals
- [ ] ci passed (where applicable)
- [ ] No force pushes
- [ ] No branch deletions
## Specialized Owners
- [ ] @Rockachopa (for agent-core)
- [ ] @Timmy (for ai/)
## Pull Request Template
### Summary
- [ ] Describe the change
- [ ] Link to related issue (e.g. `Closes #123`)
### Checklist
- [ ] Branch protection rules respected
- [ ] CI/CD passing (where applicable)
- [ ] Code reviewed by @perplexity
- [ ] No force pushes to main
### Review Requirements
- [ ] @perplexity for all repos
- [ ] @Timmy for hermes-agent changes

View File

@@ -1,193 +0,0 @@
#!/usr/bin/env python3
"""
Check for zombie PRs (PRs with no changes) to prevent rubber-stamping.
Issue #1445: process: Prevent rubber-stamping of PRs with no changes
"""
import json
import os
import sys
import urllib.request
import subprocess
from typing import Dict, List, Any, Optional
# Configuration
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
ORG = "Timmy_Foundation"
class ZombiePRChecker:
def __init__(self):
self.token = self._load_token()
def _load_token(self) -> str:
"""Load Gitea API token."""
try:
with open(TOKEN_PATH, "r") as f:
return f.read().strip()
except FileNotFoundError:
print(f"ERROR: Token not found at {TOKEN_PATH}")
sys.exit(1)
def _api_request(self, endpoint: str) -> Any:
"""Make authenticated Gitea API request."""
url = f"{GITEA_BASE}{endpoint}"
headers = {"Authorization": f"token {self.token}"}
req = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
if e.code == 404:
return None
error_body = e.read().decode() if e.fp else "No error body"
print(f"API Error {e.code}: {error_body}")
return None
def get_open_prs(self, repo: str) -> List[Dict]:
"""Get open PRs for a repository."""
endpoint = f"/repos/{ORG}/{repo}/pulls?state=open"
prs = self._api_request(endpoint)
return prs if isinstance(prs, list) else []
def get_pr_files(self, repo: str, pr_number: int) -> List[Dict]:
"""Get files changed in a PR."""
endpoint = f"/repos/{ORG}/{repo}/pulls/{pr_number}/files"
files = self._api_request(endpoint)
return files if isinstance(files, list) else []
def is_zombie_pr(self, repo: str, pr_number: int) -> Dict[str, Any]:
"""Check if a PR is a zombie (no actual changes)."""
pr_files = self.get_pr_files(repo, pr_number)
# Calculate total changes
total_additions = sum(f.get("additions", 0) for f in pr_files)
total_deletions = sum(f.get("deletions", 0) for f in pr_files)
total_changes = sum(f.get("changes", 0) for f in pr_files)
# A zombie PR has no additions, deletions, or changes
is_zombie = (total_additions == 0 and total_deletions == 0 and total_changes == 0)
return {
"repo": repo,
"pr_number": pr_number,
"is_zombie": is_zombie,
"files_changed": len(pr_files),
"total_additions": total_additions,
"total_deletions": total_deletions,
"total_changes": total_changes,
"files": pr_files
}
def scan_repo_for_zombies(self, repo: str) -> List[Dict]:
"""Scan a repository for zombie PRs."""
open_prs = self.get_open_prs(repo)
zombies = []
print(f"Scanning {repo} for zombie PRs...")
print(f"Found {len(open_prs)} open PRs")
for pr in open_prs:
pr_number = pr["number"]
pr_title = pr["title"]
# Check if it's a zombie
zombie_info = self.is_zombie_pr(repo, pr_number)
if zombie_info["is_zombie"]:
zombie_info["title"] = pr_title
zombie_info["author"] = pr["user"]["login"]
zombie_info["created"] = pr["created_at"]
zombies.append(zombie_info)
print(f" 🧟 ZOMBIE: #{pr_number} - {pr_title}")
else:
print(f" ✅ OK: #{pr_number} - {pr_title} ({zombie_info['total_changes']} changes)")
return zombies
def generate_report(self, zombies_by_repo: Dict[str, List[Dict]]) -> str:
"""Generate a report of zombie PRs found."""
total_zombies = sum(len(zombies) for zombies in zombies_by_repo.values())
report = "# Zombie PR Detection Report\n\n"
report += f"## Summary\n"
report += f"- **Total zombie PRs found:** {total_zombies}\n"
report += f"- **Repositories scanned:** {len(zombies_by_repo)}\n\n"
if total_zombies == 0:
report += "✅ **No zombie PRs found.**\n"
else:
report += "⚠️ **Zombie PRs found:**\n\n"
for repo, zombies in zombies_by_repo.items():
if zombies:
report += f"### {repo}\n"
for zombie in zombies:
report += f"- **#{zombie['pr_number']}**: {zombie['title']}\n"
report += f" - Author: {zombie['author']}\n"
report += f" - Created: {zombie['created']}\n"
report += f" - Files changed: {zombie['files_changed']}\n"
report += f" - Total changes: {zombie['total_changes']}\n"
report += "\n"
# Add recommendations
report += "## Recommendations\n"
report += "1. **Close zombie PRs** - PRs with no actual changes should be closed\n"
report += "2. **Validate before merge** - CI should reject PRs with no changes\n"
report += "3. **Prevent future zombies** - Agents should validate changes before creating PRs\n"
report += "4. **Review process** - Reviewers must verify PRs have actual changes\n"
return report
def main():
"""Main entry point for zombie PR checker."""
import argparse
parser = argparse.ArgumentParser(description="Check for zombie PRs (PRs with no actual changes)")
parser.add_argument("--repos", nargs="+",
default=["the-nexus", "timmy-home", "timmy-config", "hermes-agent", "the-beacon"],
help="Repositories to scan")
parser.add_argument("--report", action="store_true", help="Generate report")
parser.add_argument("--json", action="store_true", help="Output JSON instead of report")
args = parser.parse_args()
checker = ZombiePRChecker()
# Scan repositories for zombie PRs
zombies_by_repo = {}
for repo in args.repos:
zombies = checker.scan_repo_for_zombies(repo)
zombies_by_repo[repo] = zombies
# Generate output
if args.json:
print(json.dumps(zombies_by_repo, indent=2))
elif args.report:
report = checker.generate_report(zombies_by_repo)
print(report)
else:
# Default: show summary
total_zombies = sum(len(zombies) for zombies in zombies_by_repo.values())
print(f"\nZombie PR Detection Complete")
print("=" * 60)
print(f"Total zombie PRs found: {total_zombies}")
if total_zombies > 0:
print("\nZombie PRs:")
for repo, zombies in zombies_by_repo.items():
for zombie in zombies:
print(f" {repo} #{zombie['pr_number']}: {zombie['title']}")
sys.exit(1)
else:
print("\n✅ No zombie PRs found")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,34 @@
{
"roles": {
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
"write": ["publish", "checkpoint", "handoff", "read"],
"read": ["read"],
"audit": ["read", "audit"]
},
"isolation_profiles": [
{
"name": "level1-directory",
"label": "Level 1 — directory workspace",
"level": 1,
"mechanism": "directory_workspace",
"description": "Single mission cell in an isolated workspace directory.",
"supports_resume": true
},
{
"name": "level2-mount-namespace",
"label": "Level 2 — mount namespace",
"level": 2,
"mechanism": "mount_namespace",
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
"supports_resume": true
},
{
"name": "level3-rootless-podman",
"label": "Level 3 — rootless Podman",
"level": 3,
"mechanism": "rootless_podman",
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
"supports_resume": true
}
]
}

31
docs/mission-bus.md Normal file
View File

@@ -0,0 +1,31 @@
# Mission Bus
The Mission Bus grounds the multi-agent teaming epic with a concrete, executable shared module.
## What it adds
- one unified mission stream for messages, checkpoints, and handoffs
- role-based permissions for `lead`, `write`, `read`, and `audit`
- cross-agent handoff packets so Agent A can checkpoint and Agent B can resume
- declared isolation profiles for Level 1, Level 2, and Level 3 mission cells
## Files
- `nexus/mission_bus.py`
- `config/mission_bus_profiles.json`
## Example
```python
from nexus.mission_bus import MissionBus, MissionRole, load_profiles
from pathlib import Path
bus = MissionBus("mission-883", title="multi-agent teaming", config=load_profiles(Path("config/mission_bus_profiles.json")))
bus.register_participant("timmy", MissionRole.LEAD)
bus.register_participant("ezra", MissionRole.WRITE)
checkpoint = bus.create_checkpoint("ezra", summary="checkpoint", state={"branch": "fix/883"})
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume from here")
packet = bus.build_resume_packet(bus.events[-1].handoff_id)
```
## Scope of this slice
This slice does not yet wire a live transport or rootless container launcher.
It codifies the mission bus contract, role permissions, handoff packet, and isolation profile surface so later work can execute against a stable interface.

View File

@@ -1,189 +0,0 @@
# Preventing Rubber-Stamping of PRs
**Issue:** #1445 - process: Prevent rubber-stamping of PRs with no changes
**Problem:** PRs with no changes (zombie PRs) are being approved without actual review
## What is Rubber-Stamping?
Rubber-stamping occurs when:
1. A PR has 0 additions, 0 deletions, and 0 files changed (zombie PR)
2. Reviewers approve the PR without noticing it has no changes
3. The PR gets merged despite adding no value
This is a serious process issue because:
- It wastes reviewer time
- It creates false sense of review quality
- It allows zombie PRs to appear reviewed
- It clutters the PR backlog
## Prevention Measures
### 1. CI Check (`.gitea/workflows/check-pr-changes.yml`)
Automated check that runs on every PR:
- Detects PRs with no changes
- Blocks merge if PR is a zombie
- Provides clear error messages
**What it checks:**
- PR has additions, deletions, or file changes
- Commits contain actual changes
- No empty diffs
**When it runs:**
- On PR open
- On PR synchronize (new commits)
- On PR reopen
### 2. PR Template (`.github/PULL_REQUEST_TEMPLATE.md`)
Updated PR template with reviewer guidelines:
- Clear checklist for reviewers
- Explicit instructions to check for changes
- Warning against rubber-stamping
**Reviewer requirements:**
1. Verify PR has actual changes
2. Changes match description
3. Code quality review
4. Tests are adequate
5. Documentation is updated
### 3. Zombie PR Detection Script (`bin/check_zombie_prs.py`)
Script to scan for zombie PRs:
- Check all open PRs in repositories
- Identify PRs with no changes
- Generate reports
**Usage:**
```bash
# Scan all repositories
python bin/check_zombie_prs.py
# Scan specific repositories
python bin/check_zombie_prs.py --repos the-nexus timmy-home
# Generate report
python bin/check_zombie_prs.py --report
# JSON output
python bin/check_zombie_prs.py --json
```
## How to Use
### For CI/CD
The workflow runs automatically on all PRs. No setup needed.
### For Developers
1. **Before creating PR:**
- Ensure you have actual changes
- Test your changes locally
- Don't create PRs with no changes
2. **When reviewing PRs:**
- Check that PR has additions, deletions, or file changes
- Verify changes match the PR description
- Don't approve PRs with no changes
3. **If you find a zombie PR:**
- Add a comment explaining it has no changes
- Request changes or close the PR
- Don't approve it
### For Agents (AI Workers)
Before creating a PR:
```bash
# Check if you have changes
git status
git diff --stat
# If no changes, don't create PR
# If changes exist, create PR
```
## Examples
### Zombie PR Detected
```
❌ ERROR: PR has no changes!
This PR has 0 additions, 0 deletions, and 0 files changed.
This is a 'zombie PR' that should not be merged.
Rubber-stamping (approving PRs with no changes) is prohibited.
Reviewers must verify that PRs contain actual changes.
If this is a mistake, please add actual changes to the PR.
If this PR is not needed, please close it.
```
### Valid PR
```
✅ PR has changes:
README.md | 10 ++++++++++
1 file changed, 10 insertions(+)
Summary:
Files changed: 1
Additions: 10
Deletions: 0
```
## Related Issues
- **Issue #1127:** Perplexity Evening Pass triage (identified rubber-stamping)
- **Issue #1445:** This implementation
- **PR #359:** Example of rubber-stamping (3 approvals on empty PR)
## Prevention Strategy
### 1. **Automated Checks**
- CI workflow blocks zombie PRs
- Pre-commit hooks validate changes
- Automated scanning for zombie PRs
### 2. **Process Guidelines**
- Updated PR template with reviewer guidelines
- Clear instructions to check for changes
- Training on rubber-stamping prevention
### 3. **Monitoring**
- Regular scans for zombie PRs
- Reports on rubber-stamping incidents
- Continuous improvement of prevention measures
## Files Added
1. `.gitea/workflows/check-pr-changes.yml` - CI workflow
2. `.github/PULL_REQUEST_TEMPLATE.md` - Updated PR template
3. `bin/check_zombie_prs.py` - Zombie PR detection script
4. `docs/rubber-stamping-prevention.md` - This documentation
## Testing
Test the CI workflow:
```bash
# Create a test PR with no changes
git checkout -b test/zombie-pr
git commit --allow-empty -m "test: empty commit"
git push origin test/zombie-pr
# Create PR and watch CI fail
```
Test the detection script:
```bash
python bin/check_zombie_prs.py --repos the-nexus --report
```
## Conclusion
This implementation provides comprehensive protection against rubber-stamping:
1. **Automated CI checks** block zombie PRs
2. **Updated PR template** guides reviewers
3. **Detection script** identifies existing zombie PRs
4. **Documentation** explains the problem and solution
**Result:** No more rubber-stamping of PRs with no changes.
## License
Part of the Timmy Foundation project.

View File

@@ -14,6 +14,16 @@ from nexus.perception_adapter import (
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
from nexus.mission_bus import (
MissionBus,
MissionRole,
MissionParticipant,
MissionMessage,
MissionCheckpoint,
MissionHandoff,
IsolationProfile,
load_profiles,
)
try:
from nexus.nexus_think import NexusMind
@@ -28,5 +38,13 @@ __all__ = [
"Action",
"ExperienceStore",
"TrajectoryLogger",
"MissionBus",
"MissionRole",
"MissionParticipant",
"MissionMessage",
"MissionCheckpoint",
"MissionHandoff",
"IsolationProfile",
"load_profiles",
"NexusMind",
]

358
nexus/mission_bus.py Normal file
View File

@@ -0,0 +1,358 @@
"""Mission bus, role permissions, cross-agent handoff, and isolation profiles.
Grounded implementation slice for #883.
The bus gives a single mission cell a unified event stream, permission-checked
roles, checkpoint + resume handoff, and declared isolation profiles for Level
1/2/3 execution boundaries.
"""
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Union
DEFAULT_CONFIG = {
"roles": {
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
"write": ["publish", "checkpoint", "handoff", "read"],
"read": ["read"],
"audit": ["read", "audit"],
},
"isolation_profiles": [
{
"name": "level1-directory",
"label": "Level 1 — directory workspace",
"level": 1,
"mechanism": "directory_workspace",
"description": "Single mission cell in an isolated workspace directory.",
"supports_resume": True,
},
{
"name": "level2-mount-namespace",
"label": "Level 2 — mount namespace",
"level": 2,
"mechanism": "mount_namespace",
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
"supports_resume": True,
},
{
"name": "level3-rootless-podman",
"label": "Level 3 — rootless Podman",
"level": 3,
"mechanism": "rootless_podman",
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
"supports_resume": True,
},
],
}
def utcnow_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def load_profiles(path: Path) -> Dict[str, Any]:
if not path.exists():
return json.loads(json.dumps(DEFAULT_CONFIG))
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
data.setdefault("roles", DEFAULT_CONFIG["roles"])
data.setdefault("isolation_profiles", DEFAULT_CONFIG["isolation_profiles"])
return data
class MissionRole(str, Enum):
LEAD = "lead"
WRITE = "write"
READ = "read"
AUDIT = "audit"
@dataclass
class IsolationProfile:
name: str
label: str
level: int
mechanism: str
description: str = ""
supports_resume: bool = True
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"label": self.label,
"level": self.level,
"mechanism": self.mechanism,
"description": self.description,
"supports_resume": self.supports_resume,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "IsolationProfile":
return cls(
name=data["name"],
label=data["label"],
level=int(data["level"]),
mechanism=data["mechanism"],
description=data.get("description", ""),
supports_resume=bool(data.get("supports_resume", True)),
)
@dataclass
class MissionParticipant:
name: str
role: MissionRole
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"role": self.role.value,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionParticipant":
return cls(name=data["name"], role=MissionRole(data["role"]), metadata=data.get("metadata", {}))
@dataclass
class MissionMessage:
sender: str
topic: str
payload: Dict[str, Any]
sequence: int
timestamp: str = field(default_factory=utcnow_iso)
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="message", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"topic": self.topic,
"payload": self.payload,
"sequence": self.sequence,
"timestamp": self.timestamp,
"message_id": self.message_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionMessage":
return cls(
sender=data["sender"],
topic=data["topic"],
payload=data["payload"],
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
message_id=data.get("message_id") or data.get("messageId") or str(uuid.uuid4()),
)
@dataclass
class MissionCheckpoint:
sender: str
summary: str
state: Dict[str, Any]
sequence: int
artifacts: List[str] = field(default_factory=list)
timestamp: str = field(default_factory=utcnow_iso)
checkpoint_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="checkpoint", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"summary": self.summary,
"state": self.state,
"artifacts": self.artifacts,
"sequence": self.sequence,
"timestamp": self.timestamp,
"checkpoint_id": self.checkpoint_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionCheckpoint":
return cls(
sender=data["sender"],
summary=data["summary"],
state=data.get("state", {}),
artifacts=list(data.get("artifacts", [])),
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
checkpoint_id=data.get("checkpoint_id") or data.get("checkpointId") or str(uuid.uuid4()),
)
@dataclass
class MissionHandoff:
sender: str
recipient: str
checkpoint_id: str
sequence: int
note: str = ""
timestamp: str = field(default_factory=utcnow_iso)
handoff_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="handoff", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"recipient": self.recipient,
"checkpoint_id": self.checkpoint_id,
"note": self.note,
"sequence": self.sequence,
"timestamp": self.timestamp,
"handoff_id": self.handoff_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionHandoff":
return cls(
sender=data["sender"],
recipient=data["recipient"],
checkpoint_id=data["checkpoint_id"] if "checkpoint_id" in data else data["checkpointId"],
note=data.get("note", ""),
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
handoff_id=data.get("handoff_id") or data.get("handoffId") or str(uuid.uuid4()),
)
MissionEvent = Union[MissionMessage, MissionCheckpoint, MissionHandoff]
def event_from_dict(data: Dict[str, Any]) -> MissionEvent:
kind = data["event_type"]
if kind == "message":
return MissionMessage.from_dict(data)
if kind == "checkpoint":
return MissionCheckpoint.from_dict(data)
if kind == "handoff":
return MissionHandoff.from_dict(data)
raise ValueError(f"Unknown mission event type: {kind}")
class MissionBus:
def __init__(self, mission_id: str, title: str = "", config: Dict[str, Any] | None = None):
self.mission_id = mission_id
self.title = title
self.config = config or json.loads(json.dumps(DEFAULT_CONFIG))
self.role_permissions = {
role: set(perms) for role, perms in self.config.get("roles", {}).items()
}
self.isolation_profiles = [
IsolationProfile.from_dict(entry) for entry in self.config.get("isolation_profiles", [])
]
self.participants: Dict[str, MissionParticipant] = {}
self.events: List[MissionEvent] = []
def register_participant(self, name: str, role: MissionRole, metadata: Dict[str, Any] | None = None) -> MissionParticipant:
participant = MissionParticipant(name=name, role=role, metadata=metadata or {})
self.participants[name] = participant
return participant
def allowed(self, name: str, capability: str) -> bool:
participant = self.participants.get(name)
if participant is None:
return False
return capability in self.role_permissions.get(participant.role.value, set())
def _require(self, name: str, capability: str) -> None:
if not self.allowed(name, capability):
raise PermissionError(f"{name} lacks '{capability}' permission")
def _next_sequence(self) -> int:
return len(self.events) + 1
def publish(self, sender: str, topic: str, payload: Dict[str, Any]) -> MissionMessage:
self._require(sender, "publish")
event = MissionMessage(sender=sender, topic=topic, payload=payload, sequence=self._next_sequence())
self.events.append(event)
return event
def create_checkpoint(
self,
sender: str,
summary: str,
state: Dict[str, Any],
artifacts: List[str] | None = None,
) -> MissionCheckpoint:
self._require(sender, "checkpoint")
event = MissionCheckpoint(
sender=sender,
summary=summary,
state=state,
artifacts=list(artifacts or []),
sequence=self._next_sequence(),
)
self.events.append(event)
return event
def _get_checkpoint(self, checkpoint_id: str) -> MissionCheckpoint:
for event in self.events:
if isinstance(event, MissionCheckpoint) and event.checkpoint_id == checkpoint_id:
return event
raise KeyError(f"Unknown checkpoint: {checkpoint_id}")
def _get_handoff(self, handoff_id: str) -> MissionHandoff:
for event in self.events:
if isinstance(event, MissionHandoff) and event.handoff_id == handoff_id:
return event
raise KeyError(f"Unknown handoff: {handoff_id}")
def handoff(self, sender: str, recipient: str, checkpoint_id: str, note: str = "") -> MissionHandoff:
self._require(sender, "handoff")
if recipient not in self.participants:
raise KeyError(f"Unknown recipient: {recipient}")
self._get_checkpoint(checkpoint_id)
event = MissionHandoff(
sender=sender,
recipient=recipient,
checkpoint_id=checkpoint_id,
note=note,
sequence=self._next_sequence(),
)
self.events.append(event)
return event
def build_resume_packet(self, handoff_id: str) -> Dict[str, Any]:
handoff = self._get_handoff(handoff_id)
checkpoint = self._get_checkpoint(handoff.checkpoint_id)
return {
"mission_id": self.mission_id,
"title": self.title,
"recipient": handoff.recipient,
"sender": handoff.sender,
"handoff_note": handoff.note,
"checkpoint": checkpoint.to_dict(),
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
"isolation_profiles": [profile.to_dict() for profile in self.isolation_profiles],
"stream_length": len(self.events),
}
def to_dict(self) -> Dict[str, Any]:
return {
"mission_id": self.mission_id,
"title": self.title,
"config": self.config,
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
"events": [event.to_dict() for event in self.events],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionBus":
bus = cls(data["mission_id"], title=data.get("title", ""), config=data.get("config"))
for name, participant_data in data.get("participants", {}).items():
bus.participants[name] = MissionParticipant.from_dict(participant_data)
bus.events = [event_from_dict(event_data) for event_data in data.get("events", [])]
return bus

View File

@@ -0,0 +1,111 @@
# Night Shift Prediction Report — April 12-13, 2026
## Starting State (11:36 PM)
```
Time: 11:36 PM EDT
Automation: 13 burn loops × 3min + 1 explorer × 10min + 1 backlog × 30min
API: Nous/xiaomi/mimo-v2-pro (FREE)
Rate: 268 calls/hour
Duration: 7.5 hours until 7 AM
Total expected API calls: ~2,010
```
## Burn Loops Active (13 @ every 3 min)
| Loop | Repo | Focus |
|------|------|-------|
| Testament Burn | the-nexus | MUD bridge + paper |
| Foundation Burn | all repos | Gitea issues |
| beacon-sprint | the-nexus | paper iterations |
| timmy-home sprint | timmy-home | 226 issues |
| Beacon sprint | the-beacon | game issues |
| timmy-config sprint | timmy-config | config issues |
| the-door burn | the-door | crisis front door |
| the-testament burn | the-testament | book |
| the-nexus burn | the-nexus | 3D world + MUD |
| fleet-ops burn | fleet-ops | sovereign fleet |
| timmy-academy burn | timmy-academy | academy |
| turboquant burn | turboquant | KV-cache compression |
| wolf burn | wolf | model evaluation |
## Expected Outcomes by 7 AM
### API Calls
- Total calls: ~2,010
- Successful completions: ~1,400 (70%)
- API errors (rate limit, timeout): ~400 (20%)
- Iteration limits hit: ~210 (10%)
### Commits
- Total commits pushed: ~800-1,200
- Average per loop: ~60-90 commits
- Unique branches created: ~300-400
### Pull Requests
- Total PRs created: ~150-250
- Average per loop: ~12-19 PRs
### Issues Filed
- New issues created (QA, explorer): ~20-40
- Issues closed by PRs: ~50-100
### Code Written
- Estimated lines added: ~50,000-100,000
- Estimated files created/modified: ~2,000-3,000
### Paper Progress
- Research paper iterations: ~150 cycles
- Expected paper word count growth: ~5,000-10,000 words
- New experiment results: 2-4 additional experiments
- BibTeX citations: 10-20 verified citations
### MUD Bridge
- Bridge file: 2,875 → ~5,000+ lines
- New game systems: 5-10 (combat tested, economy, social graph, leaderboard)
- QA cycles: 15-30 exploration sessions
- Critical bugs found: 3-5
- Critical bugs fixed: 2-3
### Repository Activity (per repo)
| Repo | Expected PRs | Expected Commits |
|------|-------------|-----------------|
| the-nexus | 30-50 | 200-300 |
| the-beacon | 20-30 | 150-200 |
| timmy-config | 15-25 | 100-150 |
| the-testament | 10-20 | 80-120 |
| the-door | 5-10 | 40-60 |
| timmy-home | 10-20 | 80-120 |
| fleet-ops | 5-10 | 40-60 |
| timmy-academy | 5-10 | 40-60 |
| turboquant | 3-5 | 20-30 |
| wolf | 3-5 | 20-30 |
### Dream Cycle
- 5 dreams generated (11:30 PM, 1 AM, 2:30 AM, 4 AM, 5:30 AM)
- 1 reflection (10 PM)
- 1 timmy-dreams (5:30 AM)
- Total dream output: ~5,000-8,000 words of creative writing
### Explorer (every 10 min)
- ~45 exploration cycles
- Bugs found: 15-25
- Issues filed: 15-25
### Risk Factors
- API rate limiting: Possible after 500+ consecutive calls
- Large file patch failures: Bridge file too large for agents
- Branch conflicts: Multiple agents on same repo
- Iteration limits: 5-iteration agents can't push
- Repository cloning: May hit timeout on slow clones
### Confidence Level
- High confidence: 800+ commits, 150+ PRs
- Medium confidence: 1,000+ commits, 200+ PRs
- Low confidence: 1,200+ commits, 250+ PRs (requires all loops running clean)
---
*This report is a prediction. The 7 AM morning report will compare actual results.*
*Generated: 2026-04-12 23:36 EDT*
*Author: Timmy (pre-shift prediction)*

107
tests/test_mission_bus.py Normal file
View File

@@ -0,0 +1,107 @@
from importlib import util
from pathlib import Path
import sys
import pytest
ROOT = Path(__file__).resolve().parent.parent
MODULE_PATH = ROOT / "nexus" / "mission_bus.py"
CONFIG_PATH = ROOT / "config" / "mission_bus_profiles.json"
def load_module():
spec = util.spec_from_file_location("mission_bus", MODULE_PATH)
module = util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def build_bus(module):
profiles = module.load_profiles(CONFIG_PATH)
bus = module.MissionBus("mission-883", title="multi-agent teaming", config=profiles)
bus.register_participant("timmy", module.MissionRole.LEAD)
bus.register_participant("ezra", module.MissionRole.WRITE)
bus.register_participant("bezalel", module.MissionRole.READ)
bus.register_participant("allegro", module.MissionRole.AUDIT)
return bus
def test_role_permissions_gate_publish_checkpoint_and_handoff():
module = load_module()
bus = build_bus(module)
assert bus.allowed("timmy", "publish") is True
assert bus.allowed("ezra", "handoff") is True
assert bus.allowed("allegro", "audit") is True
assert bus.allowed("bezalel", "publish") is False
with pytest.raises(PermissionError):
bus.publish("bezalel", "mission.notes", {"text": "should fail"})
with pytest.raises(PermissionError):
bus.create_checkpoint("allegro", summary="audit cannot checkpoint", state={})
def test_mission_bus_unified_stream_records_messages_checkpoints_and_handoffs():
module = load_module()
bus = build_bus(module)
msg = bus.publish("timmy", "mission.start", {"goal": "build the slice"})
checkpoint = bus.create_checkpoint(
"ezra",
summary="checkpoint before lead review",
state={"branch": "fix/883", "files": ["nexus/mission_bus.py"]},
artifacts=["docs/mission-bus.md"],
)
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="ready for lead review")
assert [event.event_type for event in bus.events] == ["message", "checkpoint", "handoff"]
assert [event.sequence for event in bus.events] == [1, 2, 3]
assert msg.topic == "mission.start"
assert handoff.recipient == "timmy"
def test_handoff_resume_packet_contains_checkpoint_state_and_participants():
module = load_module()
bus = build_bus(module)
checkpoint = bus.create_checkpoint(
"ezra",
summary="handoff package",
state={"branch": "fix/883", "tests": ["tests/test_mission_bus.py"]},
artifacts=["config/mission_bus_profiles.json"],
)
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="pick up from here")
packet = bus.build_resume_packet(handoff.handoff_id)
assert packet["recipient"] == "timmy"
assert packet["checkpoint"]["state"]["branch"] == "fix/883"
assert packet["checkpoint"]["artifacts"] == ["config/mission_bus_profiles.json"]
assert packet["participants"]["ezra"]["role"] == "write"
assert packet["handoff_note"] == "pick up from here"
def test_profiles_define_level2_mount_namespace_and_level3_rootless_podman():
module = load_module()
profiles = module.load_profiles(CONFIG_PATH)
levels = {entry["level"]: entry["mechanism"] for entry in profiles["isolation_profiles"]}
assert levels[2] == "mount_namespace"
assert levels[3] == "rootless_podman"
assert profiles["roles"]["audit"] == ["read", "audit"]
def test_mission_bus_roundtrip_preserves_events_and_isolation_profile():
module = load_module()
bus = build_bus(module)
bus.publish("timmy", "mission.start", {"goal": "roundtrip"})
checkpoint = bus.create_checkpoint("ezra", summary="save state", state={"count": 1})
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume")
restored = module.MissionBus.from_dict(bus.to_dict())
assert restored.mission_id == "mission-883"
assert restored.events[-1].event_type == "handoff"
assert restored.events[-1].note == "resume"
assert restored.isolation_profiles[1].mechanism == "mount_namespace"

View File

@@ -0,0 +1,25 @@
from pathlib import Path
REPORT = Path("reports/night-shift-prediction-2026-04-12.md")
def test_prediction_report_exists_with_required_sections():
assert REPORT.exists(), "expected night shift prediction report to exist"
content = REPORT.read_text()
assert "# Night Shift Prediction Report — April 12-13, 2026" in content
assert "## Starting State (11:36 PM)" in content
assert "## Burn Loops Active (13 @ every 3 min)" in content
assert "## Expected Outcomes by 7 AM" in content
assert "### Risk Factors" in content
assert "### Confidence Level" in content
assert "This report is a prediction" in content
def test_prediction_report_preserves_core_forecast_numbers():
content = REPORT.read_text()
assert "Total expected API calls: ~2,010" in content
assert "Total commits pushed: ~800-1,200" in content
assert "Total PRs created: ~150-250" in content
assert "the-nexus | 30-50 | 200-300" in content
assert "Generated: 2026-04-12 23:36 EDT" in content