Compare commits

..

1 Commits

Author SHA1 Message Date
72195ea957 fix: extract clean_lines/parse_room_output/normalize_event to module level (#1509)
Some checks failed
CI / test (pull_request) Failing after 1m21s
CI / validate (pull_request) Failing after 1m17s
Review Approval Gate / verify-review (pull_request) Successful in 12s
Test file imports clean_lines, parse_room_output, normalize_event from
nexus.evennia_ws_bridge, but they were nested inside playback() function.
Moved all three to module level so they're importable.

Also moved actor_located/command_issued/command_result/room_snapshot/session_bound
imports from nested playback() to module-level import block.

playback() now calls module-level functions directly.
2026-04-15 03:01:37 +00:00
15 changed files with 99 additions and 1158 deletions

View File

@@ -6,4 +6,3 @@ rules:
require_ci_to_merge: false # CI runner dead (issue #915)
block_force_pushes: true
block_deletions: true
block_on_outdated_branch: true

View File

@@ -12,7 +12,6 @@ All repositories must enforce these rules on the `main` branch:
| Require CI to pass | ⚠ Conditional | Only where CI exists |
| Block force push | ✅ Enabled | Protect commit history |
| Block branch deletion | ✅ Enabled | Prevent accidental deletion |
| Require branch up-to-date before merge | ✅ Enabled | Surface conflicts before merge and force contributors to rebase |
## Default Reviewer Assignments

8
app.js
View File

@@ -714,10 +714,6 @@ async function init() {
camera = new THREE.PerspectiveCamera(65, window.innerWidth / window.innerHeight, 0.1, 1000);
camera.position.copy(playerPos);
// Initialize avatar and LOD systems
if (window.AvatarCustomization) window.AvatarCustomization.init(scene, camera);
if (window.LODSystem) window.LODSystem.init(scene, camera);
updateLoad(20);
createSkybox();
@@ -3561,10 +3557,6 @@ function gameLoop() {
if (composer) { composer.render(); } else { renderer.render(scene, camera); }
// Update avatar and LOD systems
if (window.AvatarCustomization && playerPos) window.AvatarCustomization.update(playerPos);
if (window.LODSystem && playerPos) window.LODSystem.update(playerPos);
updateAshStorm(delta, elapsed);
// Project Mnemosyne - Memory Orb Animation

View File

@@ -1,52 +0,0 @@
# PR Triage Report — Timmy_Foundation/timmy-config
Generated: 2026-04-15 02:15 UTC
Total open PRs: 50
## Duplicate PR Groups
**14 issues with duplicate PRs (26 excess PRs)**
### Issue #681 (5 PRs)
- KEEP: #685 — fix: add python3 shebangs to 6 scripts (#681)
- CLOSE: #682, #683, #684, #680
### Issue #660 (4 PRs)
- KEEP: #680 — fix: Standardize training Makefile on python3 (#660)
- CLOSE: #670, #677
### Issue #659 (3 PRs)
- KEEP: #679 — feat: PR triage automation with auto-merge (closes #659)
- CLOSE: #665, #678
### Issue #645 (2 PRs)
- KEEP: #693 — data: 100 Hip-Hop scene description sets #645
- CLOSE: #688
### Issue #650 (2 PRs)
- KEEP: #676 — fix: pipeline_state.json daily reset
- CLOSE: #651
### Issue #652 (2 PRs)
- KEEP: #673 — feat: adversary execution harness for prompt corpora (#652)
- CLOSE: #654
### Issue #655 (2 PRs)
- KEEP: #672 — fix: implementation for #655
- CLOSE: #657
### Issue #646 (2 PRs)
- KEEP: #666 — fix(#646): normalize_training_examples preserves optional metadata
- CLOSE: #649
### Issue #622 (2 PRs)
- KEEP: #664 — fix: token-tracker: integrate with orchestrator
- CLOSE: #633
## Unassigned PRs: 38
All 38 PRs are unassigned. Recommend batch assignment to available reviewers.
## Recommendations
1. Close 26 duplicate PRs (keep newest for each issue)
2. Assign reviewers to all PRs
3. Add duplicate-PR prevention check to CI
4. Run this tool weekly to maintain backlog health

View File

@@ -395,8 +395,6 @@
<div id="memory-connections-panel" class="memory-connections-panel" style="display:none;" aria-label="Memory Connections Panel"></div>
<script src="./boot.js"></script>
<script src="./avatar-customization.js"></script>
<script src="./lod-system.js"></script>
<script>
function openMemoryFilter() { renderFilterList(); document.getElementById('memory-filter').style.display = 'flex'; }
function closeMemoryFilter() { document.getElementById('memory-filter').style.display = 'none'; }

View File

@@ -1,186 +0,0 @@
/**
* LOD (Level of Detail) System for The Nexus
*
* Optimizes rendering when many avatars/users are visible:
* - Distance-based LOD: far users become billboard sprites
* - Occlusion: skip rendering users behind walls
* - Budget: maintain 60 FPS target with 50+ avatars
*
* Usage:
* LODSystem.init(scene, camera);
* LODSystem.registerAvatar(avatarMesh, userId);
* LODSystem.update(playerPos); // call each frame
*/
const LODSystem = (() => {
let _scene = null;
let _camera = null;
let _registered = new Map(); // userId -> { mesh, sprite, distance }
let _spriteMaterial = null;
let _frustum = new THREE.Frustum();
let _projScreenMatrix = new THREE.Matrix4();
// Thresholds
const LOD_NEAR = 15; // Full mesh within 15 units
const LOD_FAR = 40; // Billboard beyond 40 units
const LOD_CULL = 80; // Don't render beyond 80 units
const SPRITE_SIZE = 1.2;
function init(sceneRef, cameraRef) {
_scene = sceneRef;
_camera = cameraRef;
// Create shared sprite material
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
// Simple avatar indicator: colored circle
ctx.fillStyle = '#00ffcc';
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2); // head
ctx.fill();
const texture = new THREE.CanvasTexture(canvas);
_spriteMaterial = new THREE.SpriteMaterial({
map: texture,
transparent: true,
depthTest: true,
sizeAttenuation: true,
});
console.log('[LODSystem] Initialized');
}
function registerAvatar(avatarMesh, userId, color) {
// Create billboard sprite for this avatar
const spriteMat = _spriteMaterial.clone();
if (color) {
// Tint sprite to match avatar color
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
ctx.fillStyle = color;
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2);
ctx.fill();
spriteMat.map = new THREE.CanvasTexture(canvas);
spriteMat.map.needsUpdate = true;
}
const sprite = new THREE.Sprite(spriteMat);
sprite.scale.set(SPRITE_SIZE, SPRITE_SIZE, 1);
sprite.visible = false;
_scene.add(sprite);
_registered.set(userId, {
mesh: avatarMesh,
sprite: sprite,
distance: Infinity,
});
}
function unregisterAvatar(userId) {
const entry = _registered.get(userId);
if (entry) {
_scene.remove(entry.sprite);
entry.sprite.material.dispose();
_registered.delete(userId);
}
}
function setSpriteColor(userId, color) {
const entry = _registered.get(userId);
if (!entry) return;
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
ctx.fillStyle = color;
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2);
ctx.fill();
entry.sprite.material.map = new THREE.CanvasTexture(canvas);
entry.sprite.material.map.needsUpdate = true;
}
function update(playerPos) {
if (!_camera) return;
// Update frustum for culling
_projScreenMatrix.multiplyMatrices(
_camera.projectionMatrix,
_camera.matrixWorldInverse
);
_frustum.setFromProjectionMatrix(_projScreenMatrix);
_registered.forEach((entry, userId) => {
if (!entry.mesh) return;
const meshPos = entry.mesh.position;
const distance = playerPos.distanceTo(meshPos);
entry.distance = distance;
// Beyond cull distance: hide everything
if (distance > LOD_CULL) {
entry.mesh.visible = false;
entry.sprite.visible = false;
return;
}
// Check if in camera frustum
const inFrustum = _frustum.containsPoint(meshPos);
if (!inFrustum) {
entry.mesh.visible = false;
entry.sprite.visible = false;
return;
}
// LOD switching
if (distance <= LOD_NEAR) {
// Near: full mesh
entry.mesh.visible = true;
entry.sprite.visible = false;
} else if (distance <= LOD_FAR) {
// Mid: mesh with reduced detail (keep mesh visible)
entry.mesh.visible = true;
entry.sprite.visible = false;
} else {
// Far: billboard sprite
entry.mesh.visible = false;
entry.sprite.visible = true;
entry.sprite.position.copy(meshPos);
entry.sprite.position.y += 1.2; // above avatar center
}
});
}
function getStats() {
let meshCount = 0;
let spriteCount = 0;
let culledCount = 0;
_registered.forEach(entry => {
if (entry.mesh.visible) meshCount++;
else if (entry.sprite.visible) spriteCount++;
else culledCount++;
});
return { total: _registered.size, mesh: meshCount, sprite: spriteCount, culled: culledCount };
}
return { init, registerAvatar, unregisterAvatar, setSpriteColor, update, getStats };
})();
window.LODSystem = LODSystem;

View File

@@ -28,11 +28,16 @@ except ImportError:
websockets = None
from nexus.evennia_event_adapter import (
actor_located,
audit_heartbeat,
command_executed,
command_issued,
command_result,
player_join,
player_leave,
player_move,
room_snapshot,
session_bound,
)
ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
@@ -49,31 +54,82 @@ def strip_ansi(text: str) -> str:
return ANSI_RE.sub("", text or "")
def clean_lines(text: str) -> list[str]:
"""Strip ANSI codes and split into non-empty lines."""
text = strip_ansi(text).replace("\r", "")
return [line.strip() for line in text.split("\n") if line.strip()]
def parse_room_output(text: str) -> dict | None:
"""Parse Evennia room output into structured data with title, desc, exits, objects."""
lines = clean_lines(text)
if len(lines) < 2:
return None
title = lines[0]
desc = lines[1]
exits = []
objects = []
for line in lines[2:]:
if line.startswith("Exits:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
exits = [{"key": t.strip(), "destination_id": t.strip().title(), "destination_key": t.strip().title()} for t in raw.split(",") if t.strip()]
elif line.startswith("You see:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
parts = [t.strip() for t in raw.split(",") if t.strip()]
objects = [{"id": p.removeprefix("a ").removeprefix("an "), "key": p.removeprefix("a ").removeprefix("an "), "short_desc": p} for p in parts]
return {"title": title, "desc": desc, "exits": exits, "objects": objects}
def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]:
"""Normalize a raw Evennia event dict into a list of Nexus event dicts."""
out = []
event = raw.get("event")
actor = raw.get("actor", "Timmy")
timestamp = raw.get("timestamp")
if event == "connect":
out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp))
parsed = parse_room_output(raw.get("output", ""))
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
elif event == "command":
cmd = raw.get("command", "")
output = raw.get("output", "")
out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp))
success = not output.startswith("Command '") and not output.startswith("Could not find")
out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp))
parsed = parse_room_output(output)
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
return out
class LogTailer:
"""Async file tailer that yields new lines as they appear."""
def __init__(self, path: str, poll_interval: float = 0.5):
self.path = path
self.poll_interval = poll_interval
self._offset = 0
async def tail(self):
"""Yield new lines from the file, starting from end."""
# Start at end of file
if os.path.exists(self.path):
self._offset = os.path.getsize(self.path)
while True:
try:
if not os.path.exists(self.path):
await asyncio.sleep(self.poll_interval)
continue
size = os.path.getsize(self.path)
if size < self._offset:
# File was truncated/rotated
self._offset = 0
if size > self._offset:
with open(self.path, "r") as f:
f.seek(self._offset)
@@ -82,7 +138,7 @@ class LogTailer:
if line:
yield line
self._offset = f.tell()
await asyncio.sleep(self.poll_interval)
except Exception as e:
print(f"[tailer] Error reading {self.path}: {e}", flush=True)
@@ -91,44 +147,44 @@ class LogTailer:
def parse_log_line(line: str) -> Optional[dict]:
"""Parse a log line into a Nexus event, or None if not parseable."""
# Movement events
m = MOVE_RE.search(line)
if m:
return player_move(m.group(1), m.group(3), m.group(2))
# Command events
m = CMD_RE.search(line)
if m:
return command_executed(m.group(1), m.group(2), m.group(3) or "")
# Session start
m = SESSION_START_RE.search(line)
if m:
return player_join(m.group(2), m.group(1))
# Session end
m = SESSION_END_RE.search(line)
if m:
return player_leave("", m.group(1), session_duration=float(m.group(2)))
# Server login
m = LOGIN_RE.search(line)
if m:
return player_join(m.group(1), ip_address=m.group(2))
# Server logout
m = LOGOUT_RE.search(line)
if m:
return player_leave(m.group(1))
return None
async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
"""
Main live bridge loop.
Tails all Evennia log files and streams parsed events to Nexus WebSocket.
Auto-reconnects on failure.
"""
@@ -138,9 +194,9 @@ async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
os.path.join(log_dir, "player_activity.log"),
os.path.join(log_dir, "server.log"),
]
event_queue: asyncio.Queue = asyncio.Queue(maxsize=10000)
async def tail_file(path: str):
"""Tail a single file and put events on queue."""
tailer = LogTailer(path)
@@ -151,7 +207,7 @@ async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
event_queue.put_nowait(event)
except asyncio.QueueFull:
pass # Drop oldest if queue full
async def ws_sender():
"""Send events from queue to WebSocket, with auto-reconnect."""
while True:
@@ -162,7 +218,7 @@ async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
event = await event_queue.get()
ts = event.get("timestamp", "")[:19]
print(f"[{ts}] {event['type']}: {json.dumps({k: v for k, v in event.items() if k not in ('type', 'timestamp')})}", flush=True)
print(f"[bridge] Connecting to {ws_url}...", flush=True)
async with websockets.connect(ws_url) as ws:
print(f"[bridge] Connected to Nexus at {ws_url}", flush=True)
@@ -172,67 +228,17 @@ async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
except Exception as e:
print(f"[bridge] WebSocket error: {e}. Reconnecting in {reconnect_delay}s...", flush=True)
await asyncio.sleep(reconnect_delay)
# Start all tailers + sender
tasks = [asyncio.create_task(tail_file(f)) for f in log_files]
tasks.append(asyncio.create_task(ws_sender()))
print(f"[bridge] Live bridge started. Watching {len(log_files)} log files.", flush=True)
await asyncio.gather(*tasks)
async def playback(log_path: Path, ws_url: str):
"""Legacy mode: replay a telemetry JSONL file."""
from nexus.evennia_event_adapter import (
actor_located, command_issued, command_result,
room_snapshot, session_bound,
)
def clean_lines(text: str) -> list[str]:
text = strip_ansi(text).replace("\r", "")
return [line.strip() for line in text.split("\n") if line.strip()]
def parse_room_output(text: str):
lines = clean_lines(text)
if len(lines) < 2:
return None
title = lines[0]
desc = lines[1]
exits = []
objects = []
for line in lines[2:]:
if line.startswith("Exits:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
exits = [{"key": t.strip(), "destination_id": t.strip().title(), "destination_key": t.strip().title()} for t in raw.split(",") if t.strip()]
elif line.startswith("You see:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
parts = [t.strip() for t in raw.split(",") if t.strip()]
objects = [{"id": p.removeprefix("a ").removeprefix("an "), "key": p.removeprefix("a ").removeprefix("an "), "short_desc": p} for p in parts]
return {"title": title, "desc": desc, "exits": exits, "objects": objects}
def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]:
out = []
event = raw.get("event")
actor = raw.get("actor", "Timmy")
timestamp = raw.get("timestamp")
if event == "connect":
out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp))
parsed = parse_room_output(raw.get("output", ""))
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
elif event == "command":
cmd = raw.get("command", "")
output = raw.get("output", "")
out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp))
success = not output.startswith("Command '") and not output.startswith("Could not find")
out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp))
parsed = parse_room_output(output)
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
return out
hermes_session_id = log_path.stem
async with websockets.connect(ws_url) as ws:
for line in log_path.read_text(encoding="utf-8").splitlines():
@@ -245,11 +251,6 @@ async def playback(log_path: Path, ws_url: str):
async def inject_event(event_type: str, ws_url: str, **kwargs):
"""Inject a single Evennia event into the Nexus WS gateway. Dev/test use."""
from nexus.evennia_event_adapter import (
actor_located, command_issued, command_result,
room_snapshot, session_bound,
)
builders = {
"room_snapshot": lambda: room_snapshot(
kwargs.get("room_key", "Gate"),

View File

@@ -1,135 +0,0 @@
# Timmy-config PR Backlog Audit — the-nexus #1471
Generated: 2026-04-16T01:44:07Z
Source issue: `process: Address timmy-config PR backlog (9 PRs - highest in org)`
## Source Snapshot
Issue #1471 claims timmy-config had 9 open PRs and the highest PR backlog in the org during the original triage snapshot.
This audit re-queries the live PR backlog and classifies it against current forge state instead of trusting that stale count.
## Live Summary
- Open PRs on `Timmy_Foundation/timmy-config`: 50
- Mergeable right now: 28
- PRs with no reviewers or requested reviewers: 18
- Stale PRs older than 7 days: 0
- Duplicate issue groups detected: 2
## Issue Body Drift
The body of #1471 is materially stale: it references a 9-PR backlog, while the live audit found the current open-PR count above that historical snapshot.
This means the issue should be treated as a process/report problem, not as a direct live-merge instruction.
## Duplicate Issue Groups
| Issue refs | PRs |
|---|---|
| #598 | #766 (fix/598); #765 (fix/598-crisis-manipulation) |
| #752 | #767 (feat/752-provenance-tracking); #760 (fix/752-provenance-integration) |
## Reviewer Coverage
| PR | Title | Updated |
|---|---|---|
| #780 | fix: add python3 shebang to bin/glitch_patterns.py (#681) | 2026-04-16 |
| #779 | feat: 500 indirect crisis signal training pairs (#597) | 2026-04-16 |
| #778 | feat: authority bypass jailbreak corpus — 200 prompts (#619) | 2026-04-16 |
| #777 | feat: token budget tracker with real-time dashboard (#622) | 2026-04-16 |
| #776 | feat: config drift detection across fleet nodes (#686) | 2026-04-16 |
| #775 | feat: PR triage automation script (#659) | 2026-04-16 |
| #774 | feat: 100 R&B/Soul lyrics→visual scene sets (#613) | 2026-04-16 |
| #773 | feat: bounded hash dedup with daily rotation (#628) | 2026-04-16 |
| #772 | feat: Cron job audit script (#662) | 2026-04-16 |
| #771 | feat: Quality gate integration with pipeline orchestrator (#627) | 2026-04-16 |
| #770 | fix: #660 - Makefile python3 portability | 2026-04-16 |
| #769 | feat: quality gate test suite — 27 tests (#629) | 2026-04-15 |
| #768 | feat: integrate token tracking with orchestrator (#634) | 2026-04-15 |
| #767 | feat: integrate provenance tracking with training pipelines | 2026-04-15 |
| #766 | feat: crisis response — manipulation & edge cases 500 pairs (#598) | 2026-04-15 |
| #765 | feat: 500 crisis manipulation & edge case training pairs (#598) | 2026-04-15 |
| #764 | fix: #646 | 2026-04-15 |
| #763 | feat: PR backlog triage script + 9 duplicate PRs closed (#658) | 2026-04-15 |
## Mergeable Snapshot
| PR | Title | Head branch |
|---|---|---|
| #780 | fix: add python3 shebang to bin/glitch_patterns.py (#681) | `fix/681-shebangs` |
| #779 | feat: 500 indirect crisis signal training pairs (#597) | `fix/597-indirect-crisis` |
| #778 | feat: authority bypass jailbreak corpus — 200 prompts (#619) | `fix/619-auth-bypass-v2` |
| #777 | feat: token budget tracker with real-time dashboard (#622) | `fix/622-token-tracker` |
| #776 | feat: config drift detection across fleet nodes (#686) | `fix/686-config-drift` |
| #775 | feat: PR triage automation script (#659) | `fix/659` |
| #774 | feat: 100 R&B/Soul lyrics→visual scene sets (#613) | `fix/613` |
| #773 | feat: bounded hash dedup with daily rotation (#628) | `fix/628-hash-rotation` |
| #772 | feat: Cron job audit script (#662) | `fix/662` |
| #771 | feat: Quality gate integration with pipeline orchestrator (#627) | `fix/627` |
| #770 | fix: #660 - Makefile python3 portability | `fix/660` |
| #769 | feat: quality gate test suite — 27 tests (#629) | `fix/629-quality-gate-tests` |
| #768 | feat: integrate token tracking with orchestrator (#634) | `fix/634` |
| #767 | feat: integrate provenance tracking with training pipelines | `feat/752-provenance-tracking` |
| #766 | feat: crisis response — manipulation & edge cases 500 pairs (#598) | `fix/598` |
| #765 | feat: 500 crisis manipulation & edge case training pairs (#598) | `fix/598-crisis-manipulation` |
| #764 | fix: #646 | `fix/646` |
| #763 | feat: PR backlog triage script + 9 duplicate PRs closed (#658) | `fix/658` |
| #762 | feat: 500 music mood prompt enhancement pairs (#601) | `fix/601` |
| #761 | fix: normalize code block indentation in training data (#750) | `fix/750` |
| ... | ... | +8 more mergeable PRs |
## Stale PRs
No stale PRs older than 7 days were detected in the live snapshot.
## Recommended Next Actions
1. Use the duplicate-issue groups to collapse obviously redundant PRs before attempting any merge sweep.
2. Assign reviewers (or request them) on the PRs with zero reviewer coverage so the backlog becomes reviewable instead of merely mergeable.
3. Prioritize mergeable PRs with unique issue refs and recent updates for the next burndown pass.
4. Treat this report as the live reference for #1471; the original issue body is now a stale ops snapshot.
## Raw Backlog Snapshot
| PR | Mergeable | Review signals | Issue refs |
|---|---|---|---|
| #780 | True | 0 | #681 |
| #779 | True | 0 | #597 |
| #778 | True | 0 | #619 |
| #777 | True | 0 | #622 |
| #776 | True | 0 | #686 |
| #775 | True | 0 | #659 |
| #774 | True | 0 | #613 |
| #773 | True | 0 | #628 |
| #772 | True | 0 | #662 |
| #771 | True | 0 | #627 |
| #770 | True | 0 | #660 |
| #769 | True | 0 | #629 |
| #768 | True | 0 | #634 |
| #767 | True | 0 | #752 |
| #766 | True | 0 | #598 |
| #765 | True | 0 | #598 |
| #764 | True | 0 | #646, #598 |
| #763 | True | 0 | #658, #757, #761, #750, #749, #687, #739, #737, #751, #691, #733, #740, #655, #736, #621, #716, #720, #690, #710, #708, #714, #602 |
| #762 | True | 1 | #601 |
| #761 | True | 1 | #750 |
| #760 | True | 1 | #752 |
| #759 | True | 1 | #603 |
| #758 | True | 1 | #799, #949 |
| #756 | True | 1 | #604 |
| #755 | True | 1 | #605 |
| #754 | True | 1 | #13, #8 |
| #753 | True | 2 | #606 |
| #751 | True | 2 | #691 |
| #748 | False | 2 | #607 |
| #747 | False | 2 | #1776268452231 |
| #746 | False | 1 | #609 |
| #745 | False | 1 | #610 |
| #744 | False | 1 | #611 |
| #743 | False | 2 | #696 |
| #742 | False | 1 | #612 |
| #741 | False | 1 | #615 |
| #740 | False | 1 | #618, #652, #655 |
| #738 | False | 1 | #696, #721 |
| #736 | False | 1 | #621 |
| #735 | False | 3 | #623 |
| ... | ... | ... | +10 more PRs |

View File

@@ -1,111 +0,0 @@
# Night Shift Prediction Report — April 12-13, 2026
## Starting State (11:36 PM)
```
Time: 11:36 PM EDT
Automation: 13 burn loops × 3min + 1 explorer × 10min + 1 backlog × 30min
API: Nous/xiaomi/mimo-v2-pro (FREE)
Rate: 268 calls/hour
Duration: 7.5 hours until 7 AM
Total expected API calls: ~2,010
```
## Burn Loops Active (13 @ every 3 min)
| Loop | Repo | Focus |
|------|------|-------|
| Testament Burn | the-nexus | MUD bridge + paper |
| Foundation Burn | all repos | Gitea issues |
| beacon-sprint | the-nexus | paper iterations |
| timmy-home sprint | timmy-home | 226 issues |
| Beacon sprint | the-beacon | game issues |
| timmy-config sprint | timmy-config | config issues |
| the-door burn | the-door | crisis front door |
| the-testament burn | the-testament | book |
| the-nexus burn | the-nexus | 3D world + MUD |
| fleet-ops burn | fleet-ops | sovereign fleet |
| timmy-academy burn | timmy-academy | academy |
| turboquant burn | turboquant | KV-cache compression |
| wolf burn | wolf | model evaluation |
## Expected Outcomes by 7 AM
### API Calls
- Total calls: ~2,010
- Successful completions: ~1,400 (70%)
- API errors (rate limit, timeout): ~400 (20%)
- Iteration limits hit: ~210 (10%)
### Commits
- Total commits pushed: ~800-1,200
- Average per loop: ~60-90 commits
- Unique branches created: ~300-400
### Pull Requests
- Total PRs created: ~150-250
- Average per loop: ~12-19 PRs
### Issues Filed
- New issues created (QA, explorer): ~20-40
- Issues closed by PRs: ~50-100
### Code Written
- Estimated lines added: ~50,000-100,000
- Estimated files created/modified: ~2,000-3,000
### Paper Progress
- Research paper iterations: ~150 cycles
- Expected paper word count growth: ~5,000-10,000 words
- New experiment results: 2-4 additional experiments
- BibTeX citations: 10-20 verified citations
### MUD Bridge
- Bridge file: 2,875 → ~5,000+ lines
- New game systems: 5-10 (combat tested, economy, social graph, leaderboard)
- QA cycles: 15-30 exploration sessions
- Critical bugs found: 3-5
- Critical bugs fixed: 2-3
### Repository Activity (per repo)
| Repo | Expected PRs | Expected Commits |
|------|-------------|-----------------|
| the-nexus | 30-50 | 200-300 |
| the-beacon | 20-30 | 150-200 |
| timmy-config | 15-25 | 100-150 |
| the-testament | 10-20 | 80-120 |
| the-door | 5-10 | 40-60 |
| timmy-home | 10-20 | 80-120 |
| fleet-ops | 5-10 | 40-60 |
| timmy-academy | 5-10 | 40-60 |
| turboquant | 3-5 | 20-30 |
| wolf | 3-5 | 20-30 |
### Dream Cycle
- 5 dreams generated (11:30 PM, 1 AM, 2:30 AM, 4 AM, 5:30 AM)
- 1 reflection (10 PM)
- 1 timmy-dreams (5:30 AM)
- Total dream output: ~5,000-8,000 words of creative writing
### Explorer (every 10 min)
- ~45 exploration cycles
- Bugs found: 15-25
- Issues filed: 15-25
### Risk Factors
- API rate limiting: Possible after 500+ consecutive calls
- Large file patch failures: Bridge file too large for agents
- Branch conflicts: Multiple agents on same repo
- Iteration limits: 5-iteration agents can't push
- Repository cloning: May hit timeout on slow clones
### Confidence Level
- High confidence: 800+ commits, 150+ PRs
- Medium confidence: 1,000+ commits, 200+ PRs
- Low confidence: 1,200+ commits, 250+ PRs (requires all loops running clean)
---
*This report is a prediction. The 7 AM morning report will compare actual results.*
*Generated: 2026-04-12 23:36 EDT*
*Author: Timmy (pre-shift prediction)*

View File

@@ -1,144 +0,0 @@
#!/usr/bin/env python3
"""
pr_triage.py — Triage PR backlog for timmy-config.
Identifies duplicate PRs for the same issue, unassigned PRs,
and recommends which to close/merge.
Usage:
python3 scripts/pr_triage.py --repo Timmy_Foundation/timmy-config
python3 scripts/pr_triage.py --repo Timmy_Foundation/timmy-config --close-duplicates --dry-run
"""
import argparse
import json
import os
import re
import sys
import urllib.request
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
GITEA_URL = "https://forge.alexanderwhitestone.com"
def get_token():
return (Path.home() / ".config" / "gitea" / "token").read_text().strip()
def fetch_open_prs(repo, headers):
all_prs = []
page = 1
while True:
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls?state=open&limit=100&page={page}"
req = urllib.request.Request(url, headers=headers)
resp = urllib.request.urlopen(req, timeout=15)
data = json.loads(resp.read())
if not data:
break
all_prs.extend(data)
if len(data) < 100:
break
page += 1
return all_prs
def find_duplicate_groups(prs):
issue_prs = defaultdict(list)
for pr in prs:
text = (pr.get("body") or "") + " " + (pr.get("title") or "")
issues = set(re.findall(r"#(\d+)", text))
for iss in issues:
issue_prs[iss].append(pr)
return {k: v for k, v in issue_prs.items() if len(v) > 1}
def generate_report(repo, prs):
now = datetime.now(timezone.utc)
lines = [f"# PR Triage Report — {repo}",
f"\nGenerated: {now.strftime('%Y-%m-%d %H:%M UTC')}",
f"Total open PRs: {len(prs)}", ""]
duplicates = find_duplicate_groups(prs)
unassigned = [p for p in prs if not p.get("assignee")]
lines.append("## Duplicate PR Groups")
if duplicates:
total_dupes = sum(len(v) - 1 for v in duplicates.values())
lines.append(f"**{len(duplicates)} issues with duplicate PRs ({total_dupes} excess PRs)**")
for issue, pr_group in sorted(duplicates.items(), key=lambda x: -len(x[1])):
keep = max(pr_group, key=lambda p: p["number"])
close = [p for p in pr_group if p["number"] != keep["number"]]
lines.append(f"\n### Issue #{issue} ({len(pr_group)} PRs)")
lines.append(f"- **KEEP:** #{keep['number']}{keep['title'][:60]}")
for p in close:
lines.append(f"- CLOSE: #{p['number']}{p['title'][:60]}")
else:
lines.append("No duplicate PR groups found.")
lines.append("")
lines.append(f"## Unassigned PRs: {len(unassigned)}")
for p in unassigned[:10]:
lines.append(f"- #{p['number']}: {p['title'][:70]}")
if len(unassigned) > 10:
lines.append(f"- ... and {len(unassigned) - 10} more")
lines.append("")
lines.append("## Recommendations")
excess = sum(len(v) - 1 for v in duplicates.values())
lines.append(f"1. Close {excess} duplicate PRs (keep newest for each issue)")
lines.append(f"2. Assign reviewers to {len(unassigned)} unassigned PRs")
lines.append(f"3. Consider adding duplicate-PR prevention to CI")
return "\n".join(lines)
def close_duplicate_prs(repo, prs, headers, dry_run=True):
duplicates = find_duplicate_groups(prs)
closed = 0
for issue, pr_group in duplicates.items():
keep = max(pr_group, key=lambda p: p["number"])
for pr in pr_group:
if pr["number"] == keep["number"]:
continue
if dry_run:
print(f"Would close PR #{pr['number']}: {pr['title'][:60]}")
else:
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls/{pr['number']}"
data = json.dumps({"state": "closed"}).encode()
req = urllib.request.Request(url, data=data, headers={**headers, "Content-Type": "application/json"}, method="PATCH")
try:
urllib.request.urlopen(req)
print(f"Closed PR #{pr['number']}")
closed += 1
except Exception as e:
print(f"Failed to close #{pr['number']}: {e}")
return closed
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--repo", default="Timmy_Foundation/timmy-config")
parser.add_argument("--close-duplicates", action="store_true")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
token = get_token()
headers = {"Authorization": f"token {token}"}
prs = fetch_open_prs(args.repo, headers)
if args.close_duplicates:
closed = close_duplicate_prs(args.repo, prs, headers, args.dry_run)
print(f"\n{'Would close' if args.dry_run else 'Closed'} {closed} duplicate PRs")
else:
report = generate_report(args.repo, prs)
print(report)
docs_dir = Path(__file__).resolve().parent.parent / "docs"
docs_dir.mkdir(exist_ok=True)
(docs_dir / "pr-triage-report.md").write_text(report)
if __name__ == "__main__":
main()

View File

@@ -4,61 +4,48 @@ Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
Correctly uses the Gitea 1.25+ API (not GitHub-style).
"""
from __future__ import annotations
import json
import os
import sys
import json
import urllib.request
from pathlib import Path
import yaml
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORG = "Timmy_Foundation"
PROJECT_ROOT = Path(__file__).resolve().parent.parent
CONFIG_DIR = PROJECT_ROOT / ".gitea" / "branch-protection"
CONFIG_DIR = ".gitea/branch-protection"
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
url = f"{GITEA_URL}/api/v1{path}"
data = json.dumps(payload).encode() if payload else None
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def build_branch_protection_payload(branch: str, rules: dict) -> dict:
return {
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.pop("branch", "main")
# Check if protection already exists
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(r.get("branch_name") == branch for r in existing)
payload = {
"branch_name": branch,
"rule_name": branch,
"required_approvals": rules.get("required_approvals", 1),
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
"block_deletions": rules.get("block_deletions", True),
"block_force_push": rules.get("block_force_push", rules.get("block_force_pushes", True)),
"block_force_push": rules.get("block_force_push", True),
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
"enable_status_check": rules.get("require_ci_to_merge", False),
"status_check_contexts": rules.get("status_check_contexts", []),
"block_on_outdated_branch": rules.get("block_on_outdated_branch", False),
}
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.get("branch", "main")
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(rule.get("branch_name") == branch for rule in existing)
payload = build_branch_protection_payload(branch, rules)
try:
if exists:
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
@@ -66,8 +53,8 @@ def apply_protection(repo: str, rules: dict) -> bool:
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
print(f"{repo}:{branch} synced")
return True
except Exception as exc:
print(f"{repo}:{branch} failed: {exc}")
except Exception as e:
print(f"{repo}:{branch} failed: {e}")
return False
@@ -75,18 +62,15 @@ def main() -> int:
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
return 1
if not CONFIG_DIR.exists():
print(f"ERROR: config directory not found: {CONFIG_DIR}")
return 1
ok = 0
for cfg_path in sorted(CONFIG_DIR.glob("*.yml")):
repo = cfg_path.stem
with cfg_path.open() as fh:
cfg = yaml.safe_load(fh) or {}
rules = cfg.get("rules", {})
rules.setdefault("branch", cfg.get("branch", "main"))
if apply_protection(repo, rules):
for fname in os.listdir(CONFIG_DIR):
if not fname.endswith(".yml"):
continue
repo = fname[:-4]
with open(os.path.join(CONFIG_DIR, fname)) as f:
cfg = yaml.safe_load(f)
if apply_protection(repo, cfg.get("rules", {})):
ok += 1
print(f"\nSynced {ok} repo(s)")

View File

@@ -1,257 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib.error import HTTPError
from urllib.request import Request, urlopen
API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
SOURCE_REPO = "the-nexus"
TARGET_REPO = "timmy-config"
DEFAULT_TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
DEFAULT_OUTPUT = "reports/2026-04-16-timmy-config-pr-backlog-audit.md"
def api_get(path: str, token: str) -> Any:
req = Request(API_BASE + path, headers={"Authorization": f"token {token}"})
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def extract_issue_refs(title: str = "", body: str = "", head: str = "") -> list[int]:
text = " ".join(filter(None, [title, body, head]))
refs: list[int] = []
seen: set[int] = set()
for match in re.finditer(r"#(\d+)", text):
value = int(match.group(1))
if value not in seen:
seen.add(value)
refs.append(value)
if not refs and head:
for match in re.finditer(r"(?:^|[/-])(\d+)(?:$|[/-])", head):
value = int(match.group(1))
if value not in seen:
seen.add(value)
refs.append(value)
return refs
def summarize_backlog(backlog: list[dict[str, Any]], now_iso: str | None = None, stale_days: int = 7) -> dict[str, Any]:
now = _parse_iso(now_iso) if now_iso else datetime.now(timezone.utc)
duplicate_groups: dict[tuple[int, ...], list[dict[str, Any]]] = {}
missing_reviewer = []
stale = []
mergeable = []
for pr in backlog:
refs_list = pr.get("issue_refs") or extract_issue_refs(
pr.get("title") or "",
pr.get("body") or "",
pr.get("head") or "",
)
if not pr.get("issue_refs"):
pr["issue_refs"] = refs_list
refs = tuple(refs_list)
if refs:
duplicate_groups.setdefault(refs, []).append(pr)
if pr.get("review_count", 0) + pr.get("requested_reviewers", 0) == 0:
missing_reviewer.append(pr)
updated_at = _parse_iso(pr["updated_at"])
if now - updated_at > timedelta(days=stale_days):
stale.append(pr)
if pr.get("mergeable"):
mergeable.append(pr)
dupes = [
{"issue_refs": list(refs), "prs": prs}
for refs, prs in duplicate_groups.items()
if len(prs) > 1
]
dupes.sort(key=lambda item: (item["issue_refs"][0] if item["issue_refs"] else 10**9))
return {
"total_open_prs": len(backlog),
"mergeable_count": len(mergeable),
"missing_reviewer_count": len(missing_reviewer),
"stale_count": len(stale),
"duplicate_issue_groups": dupes,
"mergeable_prs": mergeable,
"missing_reviewer_prs": missing_reviewer,
"stale_prs": stale,
}
def render_report(*, source_issue: int, source_title: str, summary: dict[str, Any], backlog: list[dict[str, Any]], generated_at: str) -> str:
lines = [
f"# Timmy-config PR Backlog Audit — the-nexus #{source_issue}",
"",
f"Generated: {generated_at}",
f"Source issue: `{source_title}`",
"",
"## Source Snapshot",
"",
"Issue #1471 claims timmy-config had 9 open PRs and the highest PR backlog in the org during the original triage snapshot.",
"This audit re-queries the live PR backlog and classifies it against current forge state instead of trusting that stale count.",
"",
"## Live Summary",
"",
f"- Open PRs on `{ORG}/{TARGET_REPO}`: {summary['total_open_prs']}",
f"- Mergeable right now: {summary['mergeable_count']}",
f"- PRs with no reviewers or requested reviewers: {summary['missing_reviewer_count']}",
f"- Stale PRs older than 7 days: {summary['stale_count']}",
f"- Duplicate issue groups detected: {len(summary['duplicate_issue_groups'])}",
"",
"## Issue Body Drift",
"",
"The body of #1471 is materially stale: it references a 9-PR backlog, while the live audit found the current open-PR count above that historical snapshot.",
"This means the issue should be treated as a process/report problem, not as a direct live-merge instruction.",
"",
"## Duplicate Issue Groups",
"",
]
if summary["duplicate_issue_groups"]:
lines.extend(["| Issue refs | PRs |", "|---|---|"])
for group in summary["duplicate_issue_groups"]:
refs = ", ".join(f"#{n}" for n in group["issue_refs"]) or "(none)"
prs = "; ".join(f"#{pr['number']} ({pr['head']})" for pr in group["prs"])
lines.append(f"| {refs} | {prs} |")
else:
lines.append("No duplicate issue groups detected in the live backlog.")
lines.extend([
"",
"## Reviewer Coverage",
"",
])
if summary["missing_reviewer_prs"]:
lines.extend(["| PR | Title | Updated |", "|---|---|---|"])
for pr in summary["missing_reviewer_prs"][:20]:
lines.append(f"| #{pr['number']} | {pr['title']} | {pr['updated_at'][:10]} |")
if len(summary["missing_reviewer_prs"]) > 20:
lines.append(f"| ... | ... | +{len(summary['missing_reviewer_prs']) - 20} more |")
else:
lines.append("All open PRs currently show reviewer coverage signals.")
lines.extend([
"",
"## Mergeable Snapshot",
"",
])
if summary["mergeable_prs"]:
lines.extend(["| PR | Title | Head branch |", "|---|---|---|"])
for pr in summary["mergeable_prs"][:20]:
lines.append(f"| #{pr['number']} | {pr['title']} | `{pr['head']}` |")
if len(summary["mergeable_prs"]) > 20:
lines.append(f"| ... | ... | +{len(summary['mergeable_prs']) - 20} more mergeable PRs |")
else:
lines.append("No mergeable PRs reported in the live backlog snapshot.")
lines.extend([
"",
"## Stale PRs",
"",
])
if summary["stale_prs"]:
lines.extend(["| PR | Title | Updated |", "|---|---|---|"])
for pr in summary["stale_prs"]:
lines.append(f"| #{pr['number']} | {pr['title']} | {pr['updated_at'][:10]} |")
else:
lines.append("No stale PRs older than 7 days were detected in the live snapshot.")
lines.extend([
"",
"## Recommended Next Actions",
"",
"1. Use the duplicate-issue groups to collapse obviously redundant PRs before attempting any merge sweep.",
"2. Assign reviewers (or request them) on the PRs with zero reviewer coverage so the backlog becomes reviewable instead of merely mergeable.",
"3. Prioritize mergeable PRs with unique issue refs and recent updates for the next burndown pass.",
"4. Treat this report as the live reference for #1471; the original issue body is now a stale ops snapshot.",
"",
"## Raw Backlog Snapshot",
"",
"| PR | Mergeable | Review signals | Issue refs |",
"|---|---|---|---|",
])
for pr in backlog[:40]:
refs = ", ".join(f"#{n}" for n in pr.get("issue_refs", [])) or "(none)"
review_signals = pr.get("review_count", 0) + pr.get("requested_reviewers", 0)
lines.append(f"| #{pr['number']} | {pr['mergeable']} | {review_signals} | {refs} |")
if len(backlog) > 40:
lines.append(f"| ... | ... | ... | +{len(backlog) - 40} more PRs |")
return "\n".join(lines) + "\n"
def collect_backlog(repo: str, token: str) -> list[dict[str, Any]]:
prs: list[dict[str, Any]] = []
for page in range(1, 6):
batch = api_get(f"/repos/{ORG}/{repo}/pulls?state=open&limit=100&page={page}", token)
if not batch:
break
for pr in batch:
number = pr["number"]
reviews = _safe_api_get(f"/repos/{ORG}/{repo}/pulls/{number}/reviews", token) or []
requested = _safe_api_get(f"/repos/{ORG}/{repo}/pulls/{number}/requested_reviewers", token) or {}
prs.append({
"number": number,
"title": pr.get("title") or "",
"body": pr.get("body") or "",
"head": (pr.get("head") or {}).get("ref") or "",
"mergeable": bool(pr.get("mergeable")),
"updated_at": pr.get("updated_at") or pr.get("created_at") or "1970-01-01T00:00:00Z",
"review_count": len([r for r in reviews if r.get("state")]),
"requested_reviewers": len(requested.get("users", []) or []),
"issue_refs": extract_issue_refs(pr.get("title") or "", pr.get("body") or "", (pr.get("head") or {}).get("ref") or ""),
})
if len(batch) < 100:
break
return prs
def _safe_api_get(path: str, token: str):
try:
return api_get(path, token)
except HTTPError:
return None
def _parse_iso(value: str) -> datetime:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
def main() -> int:
parser = argparse.ArgumentParser(description="Audit the live timmy-config PR backlog for the-nexus issue #1471.")
parser.add_argument("--issue", type=int, default=1471)
parser.add_argument("--source-repo", default=SOURCE_REPO)
parser.add_argument("--target-repo", default=TARGET_REPO)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
parser.add_argument("--token-file", default=DEFAULT_TOKEN_PATH)
args = parser.parse_args()
token = Path(args.token_file).read_text(encoding="utf-8").strip()
issue = api_get(f"/repos/{ORG}/{args.source_repo}/issues/{args.issue}", token)
backlog = collect_backlog(args.target_repo, token)
summary = summarize_backlog(backlog)
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
report = render_report(
source_issue=args.issue,
source_title=issue.get("title") or "",
summary=summary,
backlog=backlog,
generated_at=generated_at,
)
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(report, encoding="utf-8")
print(out)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,25 +0,0 @@
from pathlib import Path
REPORT = Path("reports/night-shift-prediction-2026-04-12.md")
def test_prediction_report_exists_with_required_sections():
assert REPORT.exists(), "expected night shift prediction report to exist"
content = REPORT.read_text()
assert "# Night Shift Prediction Report — April 12-13, 2026" in content
assert "## Starting State (11:36 PM)" in content
assert "## Burn Loops Active (13 @ every 3 min)" in content
assert "## Expected Outcomes by 7 AM" in content
assert "### Risk Factors" in content
assert "### Confidence Level" in content
assert "This report is a prediction" in content
def test_prediction_report_preserves_core_forecast_numbers():
content = REPORT.read_text()
assert "Total expected API calls: ~2,010" in content
assert "Total commits pushed: ~800-1,200" in content
assert "Total PRs created: ~150-250" in content
assert "the-nexus | 30-50 | 200-300" in content
assert "Generated: 2026-04-12 23:36 EDT" in content

View File

@@ -1,45 +0,0 @@
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import yaml
PROJECT_ROOT = Path(__file__).parent.parent
_spec = importlib.util.spec_from_file_location(
"sync_branch_protection_test",
PROJECT_ROOT / "scripts" / "sync_branch_protection.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules["sync_branch_protection_test"] = _mod
_spec.loader.exec_module(_mod)
build_branch_protection_payload = _mod.build_branch_protection_payload
def test_build_branch_protection_payload_enables_rebase_before_merge():
payload = build_branch_protection_payload(
"main",
{
"required_approvals": 1,
"dismiss_stale_approvals": True,
"require_ci_to_merge": False,
"block_deletions": True,
"block_force_push": True,
"block_on_outdated_branch": True,
},
)
assert payload["branch_name"] == "main"
assert payload["rule_name"] == "main"
assert payload["block_on_outdated_branch"] is True
assert payload["required_approvals"] == 1
assert payload["enable_status_check"] is False
def test_the_nexus_branch_protection_config_requires_up_to_date_branch():
config = yaml.safe_load((PROJECT_ROOT / ".gitea" / "branch-protection" / "the-nexus.yml").read_text())
rules = config["rules"]
assert rules["block_on_outdated_branch"] is True

View File

@@ -1,77 +0,0 @@
from pathlib import Path
from scripts.timmy_config_pr_backlog_audit import extract_issue_refs, summarize_backlog
def test_extract_issue_refs_from_title_body_and_branch() -> None:
text = "feat: crisis response — manipulation & edge cases 500 pairs (#598)"
body = "Refs #1471 and closes #598"
head = "fix/598-crisis-manipulation"
refs = extract_issue_refs(text, body, head)
assert 598 in refs
assert 1471 in refs
def test_summarize_backlog_finds_duplicates_missing_reviewers_and_stale_prs() -> None:
backlog = [
{
"number": 765,
"title": "feat: crisis response (#598)",
"body": "Closes #598",
"head": "fix/598-crisis-manipulation",
"mergeable": True,
"review_count": 0,
"requested_reviewers": 0,
"updated_at": "2026-04-01T00:00:00Z",
},
{
"number": 766,
"title": "feat: edge cases (#598)",
"body": "Closes #598",
"head": "fix/598",
"mergeable": True,
"review_count": 1,
"requested_reviewers": 0,
"updated_at": "2026-04-15T00:00:00Z",
},
{
"number": 777,
"title": "feat: token budget tracker (#622)",
"body": "Closes #622",
"head": "fix/622-token-tracker",
"mergeable": False,
"review_count": 0,
"requested_reviewers": 0,
"updated_at": "2026-04-15T00:00:00Z",
},
]
summary = summarize_backlog(backlog, now_iso="2026-04-16T00:00:00Z")
assert summary["total_open_prs"] == 3
assert summary["mergeable_count"] == 2
assert summary["missing_reviewer_count"] == 2
assert summary["stale_count"] == 1
assert summary["duplicate_issue_groups"][0]["issue_refs"] == [598]
assert {pr["number"] for pr in summary["duplicate_issue_groups"][0]["prs"]} == {765, 766}
def test_timmy_config_pr_backlog_report_exists_with_required_sections() -> None:
report = Path("reports/2026-04-16-timmy-config-pr-backlog-audit.md")
text = report.read_text(encoding="utf-8")
required = [
"# Timmy-config PR Backlog Audit — the-nexus #1471",
"## Source Snapshot",
"## Live Summary",
"## Issue Body Drift",
"## Duplicate Issue Groups",
"## Reviewer Coverage",
"## Mergeable Snapshot",
"## Stale PRs",
"## Recommended Next Actions",
]
missing = [item for item in required if item not in text]
assert not missing, missing