Compare commits

..

5 Commits

Author SHA1 Message Date
Alexander Whitestone
27925c9a5d wip: fix mission bus test loader
Some checks are pending
CI / test (pull_request) Waiting to run
CI / validate (pull_request) Waiting to run
Review Approval Gate / verify-review (pull_request) Waiting to run
2026-04-15 04:09:32 -04:00
Alexander Whitestone
417f92b7a5 wip: add mission bus module and profiles 2026-04-15 04:08:47 -04:00
Alexander Whitestone
de3f41e819 wip: add mission bus regression tests 2026-04-15 04:06:28 -04:00
bd0497b998 Merge PR #1585: docs: add night shift prediction report (#1353) 2026-04-15 06:13:22 +00:00
Alexander Whitestone
4ab84a59ab docs: add night shift prediction report (#1353)
Some checks are pending
CI / test (pull_request) Waiting to run
CI / validate (pull_request) Waiting to run
Review Approval Gate / verify-review (pull_request) Waiting to run
2026-04-15 02:02:26 -04:00
11 changed files with 700 additions and 631 deletions

View File

@@ -1,160 +0,0 @@
# .gitea/workflows/auto-assign-reviewers.yml
# Automated reviewer assignment for PRs
# Issue #1444: policy: Implement automated reviewer assignment
name: Auto-Assign Reviewers
on:
pull_request:
types: [opened, reopened, ready_for_review]
jobs:
auto-assign:
runs-on: ubuntu-latest
if: github.event.pull_request.draft == false
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Auto-assign reviewers
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number }}
REPO: ${{ github.repository }}
PR_AUTHOR: ${{ github.event.pull_request.user.login }}
run: |
echo "Auto-assigning reviewers for PR #$PR_NUMBER"
echo "Repository: $REPO"
echo "PR Author: $PR_AUTHOR"
# Get repository name
REPO_NAME=$(basename "$REPO")
# Define default reviewers based on repository
case "$REPO_NAME" in
"hermes-agent")
DEFAULT_REVIEWERS=("Timmy" "perplexity")
REQUIRED_REVIEWERS=("Timmy")
;;
"the-nexus")
DEFAULT_REVIEWERS=("perplexity")
REQUIRED_REVIEWERS=()
;;
"timmy-home")
DEFAULT_REVIEWERS=("perplexity")
REQUIRED_REVIEWERS=()
;;
"timmy-config")
DEFAULT_REVIEWERS=("perplexity")
REQUIRED_REVIEWERS=()
;;
*)
DEFAULT_REVIEWERS=("perplexity")
REQUIRED_REVIEWERS=()
;;
esac
# Combine default and required reviewers
ALL_REVIEWERS=("${DEFAULT_REVIEWERS[@]}" "${REQUIRED_REVIEWERS[@]}")
# Remove duplicates
UNIQUE_REVIEWERS=($(echo "${ALL_REVIEWERS[@]}" | tr ' ' '\n' | sort -u | tr '\n' ' '))
# Remove PR author from reviewers (can't review own PR)
FINAL_REVIEWERS=()
for reviewer in "${UNIQUE_REVIEWERS[@]}"; do
if [ "$reviewer" != "$PR_AUTHOR" ]; then
FINAL_REVIEWERS+=("$reviewer")
fi
done
# Check if we have any reviewers
if [ ${#FINAL_REVIEWERS[@]} -eq 0 ]; then
echo "⚠️ WARNING: No reviewers available (author is only reviewer)"
echo "Adding fallback reviewer: perplexity"
FINAL_REVIEWERS=("perplexity")
fi
echo "Assigning reviewers: ${FINAL_REVIEWERS[*]}"
# Assign reviewers via Gitea API
for reviewer in "${FINAL_REVIEWERS[@]}"; do
echo "Assigning $reviewer as reviewer..."
# Use Gitea API to request reviewer
RESPONSE=$(curl -s -w "%{http_code}" -X POST \
"https://forge.alexanderwhitestone.com/api/v1/repos/$REPO/pulls/$PR_NUMBER/requested_reviewers" \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"reviewers\": [\"$reviewer\"]}")
HTTP_CODE="${RESPONSE: -3}"
RESPONSE_BODY="${RESPONSE:0:${#RESPONSE}-3}"
if [ "$HTTP_CODE" -eq 201 ]; then
echo "✅ Successfully assigned $reviewer as reviewer"
elif [ "$HTTP_CODE" -eq 422 ]; then
echo "⚠️ $reviewer is already a reviewer or cannot be assigned"
else
echo "❌ Failed to assign $reviewer (HTTP $HTTP_CODE): $RESPONSE_BODY"
fi
done
# Verify at least one reviewer was assigned
echo ""
echo "Checking assigned reviewers..."
REVIEWERS_RESPONSE=$(curl -s \
"https://forge.alexanderwhitestone.com/api/v1/repos/$REPO/pulls/$PR_NUMBER/requested_reviewers" \
-H "Authorization: token $GITEA_TOKEN")
REVIEWER_COUNT=$(echo "$REVIEWERS_RESPONSE" | jq '.users | length' 2>/dev/null || echo "0")
if [ "$REVIEWER_COUNT" -gt 0 ]; then
echo "✅ PR #$PR_NUMBER has $REVIEWER_COUNT reviewer(s) assigned"
echo "$REVIEWERS_RESPONSE" | jq '.users[].login' 2>/dev/null || echo "$REVIEWERS_RESPONSE"
else
echo "❌ ERROR: No reviewers assigned to PR #$PR_NUMBER"
exit 1
fi
- name: Add comment about reviewer assignment
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number }}
REPO: ${{ github.repository }}
run: |
# Get assigned reviewers
REVIEWERS_RESPONSE=$(curl -s \
"https://forge.alexanderwhitestone.com/api/v1/repos/$REPO/pulls/$PR_NUMBER/requested_reviewers" \
-H "Authorization: token $GITEA_TOKEN")
REVIEWER_LIST=$(echo "$REVIEWERS_RESPONSE" | jq -r '.users[].login' 2>/dev/null | tr '\n' ', ' | sed 's/,$//')
if [ -n "$REVIEWER_LIST" ]; then
COMMENT="## Automated Reviewer Assignment
Reviewers have been automatically assigned to this PR:
**Assigned Reviewers:** $REVIEWER_LIST
**Policy:** All PRs must have at least one reviewer assigned before merging.
**Next Steps:**
1. Reviewers will be notified automatically
2. Please review the changes within 48 hours
3. Request changes or approve as appropriate
This is an automated assignment based on CODEOWNERS and repository policy.
See issue #1444 for details."
# Add comment to PR
curl -s -X POST \
"https://forge.alexanderwhitestone.com/api/v1/repos/$REPO/issues/$PR_NUMBER/comments" \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"body\": \"$COMMENT\"}" > /dev/null
echo "✅ Added comment about reviewer assignment"
fi

19
.github/CODEOWNERS vendored
View File

@@ -12,8 +12,21 @@ the-nexus/ai/ @Timmy
timmy-home/ @perplexity
timmy-config/ @perplexity
# Owner gates for critical systems
# Owner gates
hermes-agent/ @Timmy
# CODEOWNERS - Mandatory Review Policy
# QA reviewer for all PRs
* @perplexity
# Default reviewer for all repositories
* @perplexity
# Specialized component owners
hermes-agent/ @Timmy
hermes-agent/agent-core/ @Rockachopa
hermes-agent/protocol/ @Timmy
the-nexus/ @perplexity
the-nexus/ai/ @Timmy
timmy-home/ @perplexity
timmy-config/ @perplexity
# Owner gates
hermes-agent/ @Timmy

View File

@@ -1,241 +0,0 @@
#!/usr/bin/env python3
"""
Check for PRs without assigned reviewers.
Issue #1444: policy: Implement automated reviewer assignment
"""
import json
import os
import sys
import urllib.request
from typing import Dict, List, Any, Optional
# Configuration
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
ORG = "Timmy_Foundation"
class ReviewerChecker:
def __init__(self):
self.token = self._load_token()
def _load_token(self) -> str:
"""Load Gitea API token."""
try:
with open(TOKEN_PATH, "r") as f:
return f.read().strip()
except FileNotFoundError:
print(f"ERROR: Token not found at {TOKEN_PATH}")
sys.exit(1)
def _api_request(self, endpoint: str) -> Any:
"""Make authenticated Gitea API request."""
url = f"{GITEA_BASE}{endpoint}"
headers = {"Authorization": f"token {self.token}"}
req = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
if e.code == 404:
return None
error_body = e.read().decode() if e.fp else "No error body"
print(f"API Error {e.code}: {error_body}")
return None
def get_open_prs(self, repo: str) -> List[Dict]:
"""Get open PRs for a repository."""
endpoint = f"/repos/{ORG}/{repo}/pulls?state=open"
prs = self._api_request(endpoint)
return prs if isinstance(prs, list) else []
def get_pr_reviewers(self, repo: str, pr_number: int) -> Dict:
"""Get requested reviewers for a PR."""
endpoint = f"/repos/{ORG}/{repo}/pulls/{pr_number}/requested_reviewers"
return self._api_request(endpoint) or {}
def get_pr_reviews(self, repo: str, pr_number: int) -> List[Dict]:
"""Get reviews for a PR."""
endpoint = f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews"
reviews = self._api_request(endpoint)
return reviews if isinstance(reviews, list) else []
def check_prs_without_reviewers(self, repos: List[str]) -> Dict[str, Any]:
"""Check for PRs without assigned reviewers."""
results = {
"repos": {},
"summary": {
"total_prs": 0,
"prs_without_reviewers": 0,
"repos_checked": len(repos)
}
}
for repo in repos:
prs = self.get_open_prs(repo)
results["repos"][repo] = {
"total_prs": len(prs),
"prs_without_reviewers": [],
"prs_with_reviewers": []
}
results["summary"]["total_prs"] += len(prs)
for pr in prs:
pr_number = pr["number"]
pr_title = pr["title"]
pr_author = pr["user"]["login"]
# Check for requested reviewers
requested = self.get_pr_reviewers(repo, pr_number)
has_requested = len(requested.get("users", [])) > 0
# Check for existing reviews
reviews = self.get_pr_reviews(repo, pr_number)
has_reviews = len(reviews) > 0
# Check if author is the only potential reviewer
is_self_review = pr_author in [r.get("user", {}).get("login") for r in reviews]
if not has_requested and not has_reviews:
results["repos"][repo]["prs_without_reviewers"].append({
"number": pr_number,
"title": pr_title,
"author": pr_author,
"created": pr["created_at"],
"url": pr["html_url"]
})
results["summary"]["prs_without_reviewers"] += 1
else:
results["repos"][repo]["prs_with_reviewers"].append({
"number": pr_number,
"title": pr_title,
"author": pr_author,
"has_requested": has_requested,
"has_reviews": has_reviews,
"is_self_review": is_self_review
})
return results
def generate_report(self, results: Dict[str, Any]) -> str:
"""Generate a report of reviewer assignment status."""
report = "# PR Reviewer Assignment Report\n\n"
report += "## Summary\n"
report += f"- **Repositories checked:** {results['summary']['repos_checked']}\n"
report += f"- **Total open PRs:** {results['summary']['total_prs']}\n"
report += f"- **PRs without reviewers:** {results['summary']['prs_without_reviewers']}\n\n"
if results['summary']['prs_without_reviewers'] == 0:
report += "✅ **All PRs have assigned reviewers.**\n"
else:
report += "⚠️ **PRs without assigned reviewers:**\n\n"
for repo, data in results["repos"].items():
if data["prs_without_reviewers"]:
report += f"### {repo}\n"
for pr in data["prs_without_reviewers"]:
report += f"- **#{pr['number']}**: {pr['title']}\n"
report += f" - Author: {pr['author']}\n"
report += f" - Created: {pr['created']}\n"
report += f" - URL: {pr['url']}\n"
report += "\n"
report += "## Repository Details\n\n"
for repo, data in results["repos"].items():
report += f"### {repo}\n"
report += f"- **Total PRs:** {data['total_prs']}\n"
report += f"- **PRs without reviewers:** {len(data['prs_without_reviewers'])}\n"
report += f"- **PRs with reviewers:** {len(data['prs_with_reviewers'])}\n\n"
if data['prs_with_reviewers']:
report += "**PRs with reviewers:**\n"
for pr in data['prs_with_reviewers']:
status = "" if pr['has_requested'] else "⚠️"
if pr['is_self_review']:
status = "⚠️ (self-review)"
report += f"- {status} #{pr['number']}: {pr['title']}\n"
report += "\n"
return report
def assign_reviewer(self, repo: str, pr_number: int, reviewer: str) -> bool:
"""Assign a reviewer to a PR."""
endpoint = f"/repos/{ORG}/{repo}/pulls/{pr_number}/requested_reviewers"
data = {"reviewers": [reviewer]}
url = f"{GITEA_BASE}{endpoint}"
headers = {
"Authorization": f"token {self.token}",
"Content-Type": "application/json"
}
req = urllib.request.Request(url, headers=headers, method="POST")
req.data = json.dumps(data).encode()
try:
with urllib.request.urlopen(req) as resp:
return resp.status == 201
except urllib.error.HTTPError as e:
print(f"Failed to assign reviewer: {e.code}")
return False
def main():
"""Main entry point for reviewer checker."""
import argparse
parser = argparse.ArgumentParser(description="Check for PRs without assigned reviewers")
parser.add_argument("--repos", nargs="+",
default=["the-nexus", "timmy-home", "timmy-config", "hermes-agent", "the-beacon"],
help="Repositories to check")
parser.add_argument("--report", action="store_true", help="Generate report")
parser.add_argument("--json", action="store_true", help="Output JSON instead of report")
parser.add_argument("--assign", nargs=2, metavar=("REPO", "PR"),
help="Assign a reviewer to a specific PR")
parser.add_argument("--reviewer", help="Reviewer to assign (e.g., @perplexity)")
args = parser.parse_args()
checker = ReviewerChecker()
if args.assign:
# Assign reviewer to specific PR
repo, pr_number = args.assign
reviewer = args.reviewer or "@perplexity"
if checker.assign_reviewer(repo, int(pr_number), reviewer):
print(f"✅ Assigned {reviewer} as reviewer to {repo} #{pr_number}")
else:
print(f"❌ Failed to assign reviewer to {repo} #{pr_number}")
sys.exit(1)
else:
# Check for PRs without reviewers
results = checker.check_prs_without_reviewers(args.repos)
if args.json:
print(json.dumps(results, indent=2))
elif args.report:
report = checker.generate_report(results)
print(report)
else:
# Default: show summary
print(f"Checked {results['summary']['repos_checked']} repositories")
print(f"Total open PRs: {results['summary']['total_prs']}")
print(f"PRs without reviewers: {results['summary']['prs_without_reviewers']}")
if results['summary']['prs_without_reviewers'] > 0:
print("\nPRs without reviewers:")
for repo, data in results["repos"].items():
if data["prs_without_reviewers"]:
for pr in data["prs_without_reviewers"]:
print(f" {repo} #{pr['number']}: {pr['title']}")
sys.exit(1)
else:
print("\n✅ All PRs have assigned reviewers")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,34 @@
{
"roles": {
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
"write": ["publish", "checkpoint", "handoff", "read"],
"read": ["read"],
"audit": ["read", "audit"]
},
"isolation_profiles": [
{
"name": "level1-directory",
"label": "Level 1 — directory workspace",
"level": 1,
"mechanism": "directory_workspace",
"description": "Single mission cell in an isolated workspace directory.",
"supports_resume": true
},
{
"name": "level2-mount-namespace",
"label": "Level 2 — mount namespace",
"level": 2,
"mechanism": "mount_namespace",
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
"supports_resume": true
},
{
"name": "level3-rootless-podman",
"label": "Level 3 — rootless Podman",
"level": 3,
"mechanism": "rootless_podman",
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
"supports_resume": true
}
]
}

View File

@@ -1,227 +0,0 @@
# Automated Reviewer Assignment
**Issue:** #1444 - policy: Implement automated reviewer assignment (from Issue #1127 triage)
**Purpose:** Ensure all PRs have at least one reviewer assigned
## Problem
From issue #1127 triage:
> "0 of 14 PRs had a reviewer assigned before this pass."
This means:
- PRs can be created without any reviewer
- No automated enforcement of reviewer assignment
- PRs may sit without review for extended periods
## Solution
### 1. GitHub Actions Workflow (`.gitea/workflows/auto-assign-reviewers.yml`)
Automatically assigns reviewers when PRs are created:
**When it runs:**
- On PR open
- On PR reopen
- On PR ready for review (not draft)
**What it does:**
1. Determines appropriate reviewers based on repository
2. Assigns reviewers via Gitea API
3. Adds comment about reviewer assignment
4. Verifies at least one reviewer is assigned
### 2. Reviewer Check Script (`bin/check_reviewers.py`)
Script to check for PRs without reviewers:
**Usage:**
```bash
# Check all repositories
python bin/check_reviewers.py
# Check specific repositories
python bin/check_reviewers.py --repos the-nexus timmy-home
# Generate report
python bin/check_reviewers.py --report
# Assign reviewer to specific PR
python bin/check_reviewers.py --assign the-nexus 123 --reviewer @perplexity
```
### 3. CODEOWNERS File (`.github/CODEOWNERS`)
Defines default reviewers for different paths:
```
# Default reviewer for all repositories
* @perplexity
# Specialized component owners
hermes-agent/ @Timmy
hermes-agent/agent-core/ @Rockachopa
hermes-agent/protocol/ @Timmy
the-nexus/ @perplexity
the-nexus/ai/ @Timmy
timmy-home/ @perplexity
timmy-config/ @perplexity
# Owner gates for critical systems
hermes-agent/ @Timmy
```
## Reviewer Assignment Rules
### Repository-Specific Rules
| Repository | Default Reviewers | Required Reviewers | Notes |
|------------|-------------------|-------------------|-------|
| hermes-agent | @Timmy, @perplexity | @Timmy | Owner gate for critical system |
| the-nexus | @perplexity | None | QA gate |
| timmy-home | @perplexity | None | QA gate |
| timmy-config | @perplexity | None | QA gate |
| the-beacon | @perplexity | None | QA gate |
### Special Rules
1. **No self-review:** PR author cannot be assigned as reviewer
2. **Fallback:** If no reviewers available, assign @perplexity
3. **Critical systems:** hermes-agent requires @Timmy as reviewer
## How It Works
### Automated Assignment Flow
1. **PR Created** → GitHub Actions workflow triggers
2. **Determine Reviewers** → Based on repository and CODEOWNERS
3. **Assign Reviewers** → Via Gitea API
4. **Add Comment** → Notify about assignment
5. **Verify** → Ensure at least one reviewer assigned
### Manual Assignment
```bash
# Assign specific reviewer
python bin/check_reviewers.py --assign the-nexus 123 --reviewer @perplexity
# Check for PRs without reviewers
python bin/check_reviewers.py --report
```
## Configuration
### Environment Variables
- `GITEA_TOKEN`: Gitea API token for authentication
- `REPO`: Repository name (auto-set in GitHub Actions)
- `PR_NUMBER`: PR number (auto-set in GitHub Actions)
### Repository Configuration
Edit the workflow to customize reviewer assignment:
```yaml
# Define default reviewers based on repository
case "$REPO_NAME" in
"hermes-agent")
DEFAULT_REVIEWERS=("Timmy" "perplexity")
REQUIRED_REVIEWERS=("Timmy")
;;
"the-nexus")
DEFAULT_REVIEWERS=("perplexity")
REQUIRED_REVIEWERS=()
;;
# Add more repositories as needed
esac
```
## Testing
### Test the workflow:
1. Create a test PR
2. Check if reviewers are automatically assigned
3. Verify comment is added
### Test the script:
```bash
# Check for PRs without reviewers
python bin/check_reviewers.py --report
# Assign reviewer to test PR
python bin/check_reviewers.py --assign the-nexus 123 --reviewer @perplexity
```
## Monitoring
### Check for PRs without reviewers:
```bash
# Daily check
python bin/check_reviewers.py --report
# JSON output for automation
python bin/check_reviewers.py --json
```
### Review assignment logs:
1. Check GitHub Actions logs for assignment details
2. Review PR comments for assignment notifications
3. Monitor for PRs with 0 reviewers
## Enforcement
### CI Check (Future Enhancement)
Add CI check to reject PRs with 0 reviewers:
```yaml
# In CI workflow
- name: Check for reviewers
run: |
REVIEWERS=$(curl -s "https://forge.alexanderwhitestone.com/api/v1/repos/$REPO/pulls/$PR_NUMBER/requested_reviewers" \
-H "Authorization: token $GITEA_TOKEN" | jq '.users | length')
if [ "$REVIEWERS" -eq 0 ]; then
echo "❌ ERROR: PR has no reviewers assigned"
exit 1
fi
```
### Policy Enforcement
1. **All PRs must have reviewers** - No exceptions
2. **No self-review** - PR author cannot review own PR
3. **Critical systems require specific reviewers** - hermes-agent requires @Timmy
## Related Issues
- **Issue #1127:** Perplexity Evening Pass triage (identified missing reviewers)
- **Issue #1444:** This implementation
- **Issue #1336:** Merge conflicts in CODEOWNERS (fixed)
## Files Added/Modified
1. `.gitea/workflows/auto-assign-reviewers.yml` - GitHub Actions workflow
2. `bin/check_reviewers.py` - Reviewer check script
3. `.github/CODEOWNERS` - Cleaned up CODEOWNERS file
4. `docs/auto-reviewer-assignment.md` - This documentation
## Future Enhancements
1. **CI check for 0 reviewers** - Reject PRs without reviewers
2. **Slack/Telegram notifications** - Notify when PRs lack reviewers
3. **Load balancing** - Distribute reviews evenly among team members
4. **Auto-assign based on file changes** - Assign specialists for specific areas
## Conclusion
This implementation ensures all PRs have at least one reviewer assigned:
- **Automated assignment** on PR creation
- **Manual checking** for existing PRs
- **Clear documentation** of policies and procedures
**Result:** No more PRs sitting without reviewers.
## License
Part of the Timmy Foundation project.

31
docs/mission-bus.md Normal file
View File

@@ -0,0 +1,31 @@
# Mission Bus
The Mission Bus grounds the multi-agent teaming epic with a concrete, executable shared module.
## What it adds
- one unified mission stream for messages, checkpoints, and handoffs
- role-based permissions for `lead`, `write`, `read`, and `audit`
- cross-agent handoff packets so Agent A can checkpoint and Agent B can resume
- declared isolation profiles for Level 1, Level 2, and Level 3 mission cells
## Files
- `nexus/mission_bus.py`
- `config/mission_bus_profiles.json`
## Example
```python
from nexus.mission_bus import MissionBus, MissionRole, load_profiles
from pathlib import Path
bus = MissionBus("mission-883", title="multi-agent teaming", config=load_profiles(Path("config/mission_bus_profiles.json")))
bus.register_participant("timmy", MissionRole.LEAD)
bus.register_participant("ezra", MissionRole.WRITE)
checkpoint = bus.create_checkpoint("ezra", summary="checkpoint", state={"branch": "fix/883"})
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume from here")
packet = bus.build_resume_packet(bus.events[-1].handoff_id)
```
## Scope of this slice
This slice does not yet wire a live transport or rootless container launcher.
It codifies the mission bus contract, role permissions, handoff packet, and isolation profile surface so later work can execute against a stable interface.

View File

@@ -14,6 +14,16 @@ from nexus.perception_adapter import (
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
from nexus.mission_bus import (
MissionBus,
MissionRole,
MissionParticipant,
MissionMessage,
MissionCheckpoint,
MissionHandoff,
IsolationProfile,
load_profiles,
)
try:
from nexus.nexus_think import NexusMind
@@ -28,5 +38,13 @@ __all__ = [
"Action",
"ExperienceStore",
"TrajectoryLogger",
"MissionBus",
"MissionRole",
"MissionParticipant",
"MissionMessage",
"MissionCheckpoint",
"MissionHandoff",
"IsolationProfile",
"load_profiles",
"NexusMind",
]

358
nexus/mission_bus.py Normal file
View File

@@ -0,0 +1,358 @@
"""Mission bus, role permissions, cross-agent handoff, and isolation profiles.
Grounded implementation slice for #883.
The bus gives a single mission cell a unified event stream, permission-checked
roles, checkpoint + resume handoff, and declared isolation profiles for Level
1/2/3 execution boundaries.
"""
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Union
DEFAULT_CONFIG = {
"roles": {
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
"write": ["publish", "checkpoint", "handoff", "read"],
"read": ["read"],
"audit": ["read", "audit"],
},
"isolation_profiles": [
{
"name": "level1-directory",
"label": "Level 1 — directory workspace",
"level": 1,
"mechanism": "directory_workspace",
"description": "Single mission cell in an isolated workspace directory.",
"supports_resume": True,
},
{
"name": "level2-mount-namespace",
"label": "Level 2 — mount namespace",
"level": 2,
"mechanism": "mount_namespace",
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
"supports_resume": True,
},
{
"name": "level3-rootless-podman",
"label": "Level 3 — rootless Podman",
"level": 3,
"mechanism": "rootless_podman",
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
"supports_resume": True,
},
],
}
def utcnow_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def load_profiles(path: Path) -> Dict[str, Any]:
if not path.exists():
return json.loads(json.dumps(DEFAULT_CONFIG))
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
data.setdefault("roles", DEFAULT_CONFIG["roles"])
data.setdefault("isolation_profiles", DEFAULT_CONFIG["isolation_profiles"])
return data
class MissionRole(str, Enum):
LEAD = "lead"
WRITE = "write"
READ = "read"
AUDIT = "audit"
@dataclass
class IsolationProfile:
name: str
label: str
level: int
mechanism: str
description: str = ""
supports_resume: bool = True
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"label": self.label,
"level": self.level,
"mechanism": self.mechanism,
"description": self.description,
"supports_resume": self.supports_resume,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "IsolationProfile":
return cls(
name=data["name"],
label=data["label"],
level=int(data["level"]),
mechanism=data["mechanism"],
description=data.get("description", ""),
supports_resume=bool(data.get("supports_resume", True)),
)
@dataclass
class MissionParticipant:
name: str
role: MissionRole
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"role": self.role.value,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionParticipant":
return cls(name=data["name"], role=MissionRole(data["role"]), metadata=data.get("metadata", {}))
@dataclass
class MissionMessage:
sender: str
topic: str
payload: Dict[str, Any]
sequence: int
timestamp: str = field(default_factory=utcnow_iso)
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="message", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"topic": self.topic,
"payload": self.payload,
"sequence": self.sequence,
"timestamp": self.timestamp,
"message_id": self.message_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionMessage":
return cls(
sender=data["sender"],
topic=data["topic"],
payload=data["payload"],
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
message_id=data.get("message_id") or data.get("messageId") or str(uuid.uuid4()),
)
@dataclass
class MissionCheckpoint:
sender: str
summary: str
state: Dict[str, Any]
sequence: int
artifacts: List[str] = field(default_factory=list)
timestamp: str = field(default_factory=utcnow_iso)
checkpoint_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="checkpoint", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"summary": self.summary,
"state": self.state,
"artifacts": self.artifacts,
"sequence": self.sequence,
"timestamp": self.timestamp,
"checkpoint_id": self.checkpoint_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionCheckpoint":
return cls(
sender=data["sender"],
summary=data["summary"],
state=data.get("state", {}),
artifacts=list(data.get("artifacts", [])),
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
checkpoint_id=data.get("checkpoint_id") or data.get("checkpointId") or str(uuid.uuid4()),
)
@dataclass
class MissionHandoff:
sender: str
recipient: str
checkpoint_id: str
sequence: int
note: str = ""
timestamp: str = field(default_factory=utcnow_iso)
handoff_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = field(default="handoff", init=False)
def to_dict(self) -> Dict[str, Any]:
return {
"event_type": self.event_type,
"sender": self.sender,
"recipient": self.recipient,
"checkpoint_id": self.checkpoint_id,
"note": self.note,
"sequence": self.sequence,
"timestamp": self.timestamp,
"handoff_id": self.handoff_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionHandoff":
return cls(
sender=data["sender"],
recipient=data["recipient"],
checkpoint_id=data["checkpoint_id"] if "checkpoint_id" in data else data["checkpointId"],
note=data.get("note", ""),
sequence=int(data["sequence"]),
timestamp=data.get("timestamp", utcnow_iso()),
handoff_id=data.get("handoff_id") or data.get("handoffId") or str(uuid.uuid4()),
)
MissionEvent = Union[MissionMessage, MissionCheckpoint, MissionHandoff]
def event_from_dict(data: Dict[str, Any]) -> MissionEvent:
kind = data["event_type"]
if kind == "message":
return MissionMessage.from_dict(data)
if kind == "checkpoint":
return MissionCheckpoint.from_dict(data)
if kind == "handoff":
return MissionHandoff.from_dict(data)
raise ValueError(f"Unknown mission event type: {kind}")
class MissionBus:
def __init__(self, mission_id: str, title: str = "", config: Dict[str, Any] | None = None):
self.mission_id = mission_id
self.title = title
self.config = config or json.loads(json.dumps(DEFAULT_CONFIG))
self.role_permissions = {
role: set(perms) for role, perms in self.config.get("roles", {}).items()
}
self.isolation_profiles = [
IsolationProfile.from_dict(entry) for entry in self.config.get("isolation_profiles", [])
]
self.participants: Dict[str, MissionParticipant] = {}
self.events: List[MissionEvent] = []
def register_participant(self, name: str, role: MissionRole, metadata: Dict[str, Any] | None = None) -> MissionParticipant:
participant = MissionParticipant(name=name, role=role, metadata=metadata or {})
self.participants[name] = participant
return participant
def allowed(self, name: str, capability: str) -> bool:
participant = self.participants.get(name)
if participant is None:
return False
return capability in self.role_permissions.get(participant.role.value, set())
def _require(self, name: str, capability: str) -> None:
if not self.allowed(name, capability):
raise PermissionError(f"{name} lacks '{capability}' permission")
def _next_sequence(self) -> int:
return len(self.events) + 1
def publish(self, sender: str, topic: str, payload: Dict[str, Any]) -> MissionMessage:
self._require(sender, "publish")
event = MissionMessage(sender=sender, topic=topic, payload=payload, sequence=self._next_sequence())
self.events.append(event)
return event
def create_checkpoint(
self,
sender: str,
summary: str,
state: Dict[str, Any],
artifacts: List[str] | None = None,
) -> MissionCheckpoint:
self._require(sender, "checkpoint")
event = MissionCheckpoint(
sender=sender,
summary=summary,
state=state,
artifacts=list(artifacts or []),
sequence=self._next_sequence(),
)
self.events.append(event)
return event
def _get_checkpoint(self, checkpoint_id: str) -> MissionCheckpoint:
for event in self.events:
if isinstance(event, MissionCheckpoint) and event.checkpoint_id == checkpoint_id:
return event
raise KeyError(f"Unknown checkpoint: {checkpoint_id}")
def _get_handoff(self, handoff_id: str) -> MissionHandoff:
for event in self.events:
if isinstance(event, MissionHandoff) and event.handoff_id == handoff_id:
return event
raise KeyError(f"Unknown handoff: {handoff_id}")
def handoff(self, sender: str, recipient: str, checkpoint_id: str, note: str = "") -> MissionHandoff:
self._require(sender, "handoff")
if recipient not in self.participants:
raise KeyError(f"Unknown recipient: {recipient}")
self._get_checkpoint(checkpoint_id)
event = MissionHandoff(
sender=sender,
recipient=recipient,
checkpoint_id=checkpoint_id,
note=note,
sequence=self._next_sequence(),
)
self.events.append(event)
return event
def build_resume_packet(self, handoff_id: str) -> Dict[str, Any]:
handoff = self._get_handoff(handoff_id)
checkpoint = self._get_checkpoint(handoff.checkpoint_id)
return {
"mission_id": self.mission_id,
"title": self.title,
"recipient": handoff.recipient,
"sender": handoff.sender,
"handoff_note": handoff.note,
"checkpoint": checkpoint.to_dict(),
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
"isolation_profiles": [profile.to_dict() for profile in self.isolation_profiles],
"stream_length": len(self.events),
}
def to_dict(self) -> Dict[str, Any]:
return {
"mission_id": self.mission_id,
"title": self.title,
"config": self.config,
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
"events": [event.to_dict() for event in self.events],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MissionBus":
bus = cls(data["mission_id"], title=data.get("title", ""), config=data.get("config"))
for name, participant_data in data.get("participants", {}).items():
bus.participants[name] = MissionParticipant.from_dict(participant_data)
bus.events = [event_from_dict(event_data) for event_data in data.get("events", [])]
return bus

View File

@@ -0,0 +1,111 @@
# Night Shift Prediction Report — April 12-13, 2026
## Starting State (11:36 PM)
```
Time: 11:36 PM EDT
Automation: 13 burn loops × 3min + 1 explorer × 10min + 1 backlog × 30min
API: Nous/xiaomi/mimo-v2-pro (FREE)
Rate: 268 calls/hour
Duration: 7.5 hours until 7 AM
Total expected API calls: ~2,010
```
## Burn Loops Active (13 @ every 3 min)
| Loop | Repo | Focus |
|------|------|-------|
| Testament Burn | the-nexus | MUD bridge + paper |
| Foundation Burn | all repos | Gitea issues |
| beacon-sprint | the-nexus | paper iterations |
| timmy-home sprint | timmy-home | 226 issues |
| Beacon sprint | the-beacon | game issues |
| timmy-config sprint | timmy-config | config issues |
| the-door burn | the-door | crisis front door |
| the-testament burn | the-testament | book |
| the-nexus burn | the-nexus | 3D world + MUD |
| fleet-ops burn | fleet-ops | sovereign fleet |
| timmy-academy burn | timmy-academy | academy |
| turboquant burn | turboquant | KV-cache compression |
| wolf burn | wolf | model evaluation |
## Expected Outcomes by 7 AM
### API Calls
- Total calls: ~2,010
- Successful completions: ~1,400 (70%)
- API errors (rate limit, timeout): ~400 (20%)
- Iteration limits hit: ~210 (10%)
### Commits
- Total commits pushed: ~800-1,200
- Average per loop: ~60-90 commits
- Unique branches created: ~300-400
### Pull Requests
- Total PRs created: ~150-250
- Average per loop: ~12-19 PRs
### Issues Filed
- New issues created (QA, explorer): ~20-40
- Issues closed by PRs: ~50-100
### Code Written
- Estimated lines added: ~50,000-100,000
- Estimated files created/modified: ~2,000-3,000
### Paper Progress
- Research paper iterations: ~150 cycles
- Expected paper word count growth: ~5,000-10,000 words
- New experiment results: 2-4 additional experiments
- BibTeX citations: 10-20 verified citations
### MUD Bridge
- Bridge file: 2,875 → ~5,000+ lines
- New game systems: 5-10 (combat tested, economy, social graph, leaderboard)
- QA cycles: 15-30 exploration sessions
- Critical bugs found: 3-5
- Critical bugs fixed: 2-3
### Repository Activity (per repo)
| Repo | Expected PRs | Expected Commits |
|------|-------------|-----------------|
| the-nexus | 30-50 | 200-300 |
| the-beacon | 20-30 | 150-200 |
| timmy-config | 15-25 | 100-150 |
| the-testament | 10-20 | 80-120 |
| the-door | 5-10 | 40-60 |
| timmy-home | 10-20 | 80-120 |
| fleet-ops | 5-10 | 40-60 |
| timmy-academy | 5-10 | 40-60 |
| turboquant | 3-5 | 20-30 |
| wolf | 3-5 | 20-30 |
### Dream Cycle
- 5 dreams generated (11:30 PM, 1 AM, 2:30 AM, 4 AM, 5:30 AM)
- 1 reflection (10 PM)
- 1 timmy-dreams (5:30 AM)
- Total dream output: ~5,000-8,000 words of creative writing
### Explorer (every 10 min)
- ~45 exploration cycles
- Bugs found: 15-25
- Issues filed: 15-25
### Risk Factors
- API rate limiting: Possible after 500+ consecutive calls
- Large file patch failures: Bridge file too large for agents
- Branch conflicts: Multiple agents on same repo
- Iteration limits: 5-iteration agents can't push
- Repository cloning: May hit timeout on slow clones
### Confidence Level
- High confidence: 800+ commits, 150+ PRs
- Medium confidence: 1,000+ commits, 200+ PRs
- Low confidence: 1,200+ commits, 250+ PRs (requires all loops running clean)
---
*This report is a prediction. The 7 AM morning report will compare actual results.*
*Generated: 2026-04-12 23:36 EDT*
*Author: Timmy (pre-shift prediction)*

107
tests/test_mission_bus.py Normal file
View File

@@ -0,0 +1,107 @@
from importlib import util
from pathlib import Path
import sys
import pytest
ROOT = Path(__file__).resolve().parent.parent
MODULE_PATH = ROOT / "nexus" / "mission_bus.py"
CONFIG_PATH = ROOT / "config" / "mission_bus_profiles.json"
def load_module():
spec = util.spec_from_file_location("mission_bus", MODULE_PATH)
module = util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def build_bus(module):
profiles = module.load_profiles(CONFIG_PATH)
bus = module.MissionBus("mission-883", title="multi-agent teaming", config=profiles)
bus.register_participant("timmy", module.MissionRole.LEAD)
bus.register_participant("ezra", module.MissionRole.WRITE)
bus.register_participant("bezalel", module.MissionRole.READ)
bus.register_participant("allegro", module.MissionRole.AUDIT)
return bus
def test_role_permissions_gate_publish_checkpoint_and_handoff():
module = load_module()
bus = build_bus(module)
assert bus.allowed("timmy", "publish") is True
assert bus.allowed("ezra", "handoff") is True
assert bus.allowed("allegro", "audit") is True
assert bus.allowed("bezalel", "publish") is False
with pytest.raises(PermissionError):
bus.publish("bezalel", "mission.notes", {"text": "should fail"})
with pytest.raises(PermissionError):
bus.create_checkpoint("allegro", summary="audit cannot checkpoint", state={})
def test_mission_bus_unified_stream_records_messages_checkpoints_and_handoffs():
module = load_module()
bus = build_bus(module)
msg = bus.publish("timmy", "mission.start", {"goal": "build the slice"})
checkpoint = bus.create_checkpoint(
"ezra",
summary="checkpoint before lead review",
state={"branch": "fix/883", "files": ["nexus/mission_bus.py"]},
artifacts=["docs/mission-bus.md"],
)
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="ready for lead review")
assert [event.event_type for event in bus.events] == ["message", "checkpoint", "handoff"]
assert [event.sequence for event in bus.events] == [1, 2, 3]
assert msg.topic == "mission.start"
assert handoff.recipient == "timmy"
def test_handoff_resume_packet_contains_checkpoint_state_and_participants():
module = load_module()
bus = build_bus(module)
checkpoint = bus.create_checkpoint(
"ezra",
summary="handoff package",
state={"branch": "fix/883", "tests": ["tests/test_mission_bus.py"]},
artifacts=["config/mission_bus_profiles.json"],
)
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="pick up from here")
packet = bus.build_resume_packet(handoff.handoff_id)
assert packet["recipient"] == "timmy"
assert packet["checkpoint"]["state"]["branch"] == "fix/883"
assert packet["checkpoint"]["artifacts"] == ["config/mission_bus_profiles.json"]
assert packet["participants"]["ezra"]["role"] == "write"
assert packet["handoff_note"] == "pick up from here"
def test_profiles_define_level2_mount_namespace_and_level3_rootless_podman():
module = load_module()
profiles = module.load_profiles(CONFIG_PATH)
levels = {entry["level"]: entry["mechanism"] for entry in profiles["isolation_profiles"]}
assert levels[2] == "mount_namespace"
assert levels[3] == "rootless_podman"
assert profiles["roles"]["audit"] == ["read", "audit"]
def test_mission_bus_roundtrip_preserves_events_and_isolation_profile():
module = load_module()
bus = build_bus(module)
bus.publish("timmy", "mission.start", {"goal": "roundtrip"})
checkpoint = bus.create_checkpoint("ezra", summary="save state", state={"count": 1})
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume")
restored = module.MissionBus.from_dict(bus.to_dict())
assert restored.mission_id == "mission-883"
assert restored.events[-1].event_type == "handoff"
assert restored.events[-1].note == "resume"
assert restored.isolation_profiles[1].mechanism == "mount_namespace"

View File

@@ -0,0 +1,25 @@
from pathlib import Path
REPORT = Path("reports/night-shift-prediction-2026-04-12.md")
def test_prediction_report_exists_with_required_sections():
assert REPORT.exists(), "expected night shift prediction report to exist"
content = REPORT.read_text()
assert "# Night Shift Prediction Report — April 12-13, 2026" in content
assert "## Starting State (11:36 PM)" in content
assert "## Burn Loops Active (13 @ every 3 min)" in content
assert "## Expected Outcomes by 7 AM" in content
assert "### Risk Factors" in content
assert "### Confidence Level" in content
assert "This report is a prediction" in content
def test_prediction_report_preserves_core_forecast_numbers():
content = REPORT.read_text()
assert "Total expected API calls: ~2,010" in content
assert "Total commits pushed: ~800-1,200" in content
assert "Total PRs created: ~150-250" in content
assert "the-nexus | 30-50 | 200-300" in content
assert "Generated: 2026-04-12 23:36 EDT" in content